mirror of
https://github.com/OpenTTD/OpenTTD.git
synced 2025-03-10 08:00:05 +00:00
Codechange: use std::vector for the incoming command queue
This commit is contained in:
parent
b3aa8a9c35
commit
cb588d8d3f
@ -131,24 +131,12 @@ enum PacketGameType : uint8_t {
|
|||||||
/** Packet that wraps a command */
|
/** Packet that wraps a command */
|
||||||
struct CommandPacket;
|
struct CommandPacket;
|
||||||
|
|
||||||
/** A queue of CommandPackets. */
|
/**
|
||||||
class CommandQueue {
|
* A "queue" of CommandPackets.
|
||||||
CommandPacket *first; ///< The first packet in the queue.
|
* Not a std::queue because, when paused, some commands remain on the queue.
|
||||||
CommandPacket *last; ///< The last packet in the queue; only valid when first != nullptr.
|
* In other words, you do not always pop the first element from this queue.
|
||||||
uint count; ///< The number of items in the queue.
|
*/
|
||||||
|
using CommandQueue = std::vector<CommandPacket>;
|
||||||
public:
|
|
||||||
/** Initialise the command queue. */
|
|
||||||
CommandQueue() : first(nullptr), last(nullptr), count(0) {}
|
|
||||||
/** Clear the command queue. */
|
|
||||||
~CommandQueue() { this->Free(); }
|
|
||||||
void Append(CommandPacket *p);
|
|
||||||
CommandPacket *Pop(bool ignore_paused = false);
|
|
||||||
CommandPacket *Peek(bool ignore_paused = false);
|
|
||||||
void Free();
|
|
||||||
/** Get the number of items in the queue. */
|
|
||||||
uint Count() const { return this->count; }
|
|
||||||
};
|
|
||||||
|
|
||||||
/** Base socket handler for all TCP sockets */
|
/** Base socket handler for all TCP sockets */
|
||||||
class NetworkGameSocketHandler : public NetworkTCPSocketHandler {
|
class NetworkGameSocketHandler : public NetworkTCPSocketHandler {
|
||||||
|
@ -972,7 +972,7 @@ NetworkRecvStatus ClientNetworkGameSocketHandler::Receive_SERVER_COMMAND(Packet
|
|||||||
return NETWORK_RECV_STATUS_MALFORMED_PACKET;
|
return NETWORK_RECV_STATUS_MALFORMED_PACKET;
|
||||||
}
|
}
|
||||||
|
|
||||||
this->incoming_queue.Append(&cp);
|
this->incoming_queue.push_back(cp);
|
||||||
|
|
||||||
return NETWORK_RECV_STATUS_OKAY;
|
return NETWORK_RECV_STATUS_OKAY;
|
||||||
}
|
}
|
||||||
|
@ -165,76 +165,6 @@ static constexpr auto _cmd_dispatch = MakeDispatchTable(std::make_integer_sequen
|
|||||||
# pragma GCC diagnostic pop
|
# pragma GCC diagnostic pop
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Append a CommandPacket at the end of the queue.
|
|
||||||
* @param p The packet to append to the queue.
|
|
||||||
* @note A new instance of the CommandPacket will be made.
|
|
||||||
*/
|
|
||||||
void CommandQueue::Append(CommandPacket *p)
|
|
||||||
{
|
|
||||||
CommandPacket *add = new CommandPacket();
|
|
||||||
*add = *p;
|
|
||||||
add->next = nullptr;
|
|
||||||
if (this->first == nullptr) {
|
|
||||||
this->first = add;
|
|
||||||
} else {
|
|
||||||
this->last->next = add;
|
|
||||||
}
|
|
||||||
this->last = add;
|
|
||||||
this->count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the first item in the queue and remove it from the queue.
|
|
||||||
* @param ignore_paused Whether to ignore commands that may not be executed while paused.
|
|
||||||
* @return the first item in the queue.
|
|
||||||
*/
|
|
||||||
CommandPacket *CommandQueue::Pop(bool ignore_paused)
|
|
||||||
{
|
|
||||||
CommandPacket **prev = &this->first;
|
|
||||||
CommandPacket *ret = this->first;
|
|
||||||
CommandPacket *prev_item = nullptr;
|
|
||||||
if (ignore_paused && _pause_mode != PM_UNPAUSED) {
|
|
||||||
while (ret != nullptr && !IsCommandAllowedWhilePaused(ret->cmd)) {
|
|
||||||
prev_item = ret;
|
|
||||||
prev = &ret->next;
|
|
||||||
ret = ret->next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ret != nullptr) {
|
|
||||||
if (ret == this->last) this->last = prev_item;
|
|
||||||
*prev = ret->next;
|
|
||||||
this->count--;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the first item in the queue, but don't remove it.
|
|
||||||
* @param ignore_paused Whether to ignore commands that may not be executed while paused.
|
|
||||||
* @return the first item in the queue.
|
|
||||||
*/
|
|
||||||
CommandPacket *CommandQueue::Peek(bool ignore_paused)
|
|
||||||
{
|
|
||||||
if (!ignore_paused || _pause_mode == PM_UNPAUSED) return this->first;
|
|
||||||
|
|
||||||
for (CommandPacket *p = this->first; p != nullptr; p = p->next) {
|
|
||||||
if (IsCommandAllowedWhilePaused(p->cmd)) return p;
|
|
||||||
}
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Free everything that is in the queue. */
|
|
||||||
void CommandQueue::Free()
|
|
||||||
{
|
|
||||||
CommandPacket *cp;
|
|
||||||
while ((cp = this->Pop()) != nullptr) {
|
|
||||||
delete cp;
|
|
||||||
}
|
|
||||||
assert(this->count == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Local queue of packets waiting for handling. */
|
/** Local queue of packets waiting for handling. */
|
||||||
static CommandQueue _local_wait_queue;
|
static CommandQueue _local_wait_queue;
|
||||||
/** Local queue of packets waiting for execution. */
|
/** Local queue of packets waiting for execution. */
|
||||||
@ -282,7 +212,7 @@ void NetworkSendCommand(Commands cmd, StringID err_message, CommandCallback *cal
|
|||||||
c.frame = _frame_counter_max + 1;
|
c.frame = _frame_counter_max + 1;
|
||||||
c.my_cmd = true;
|
c.my_cmd = true;
|
||||||
|
|
||||||
_local_wait_queue.Append(&c);
|
_local_wait_queue.push_back(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -303,8 +233,8 @@ void NetworkSendCommand(Commands cmd, StringID err_message, CommandCallback *cal
|
|||||||
*/
|
*/
|
||||||
void NetworkSyncCommandQueue(NetworkClientSocket *cs)
|
void NetworkSyncCommandQueue(NetworkClientSocket *cs)
|
||||||
{
|
{
|
||||||
for (CommandPacket *p = _local_execution_queue.Peek(); p != nullptr; p = p->next) {
|
for (auto &p : _local_execution_queue) {
|
||||||
CommandPacket &c = cs->outgoing_queue.emplace_back(*p);
|
CommandPacket &c = cs->outgoing_queue.emplace_back(p);
|
||||||
c.callback = nullptr;
|
c.callback = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -318,8 +248,8 @@ void NetworkExecuteLocalCommandQueue()
|
|||||||
|
|
||||||
CommandQueue &queue = (_network_server ? _local_execution_queue : ClientNetworkGameSocketHandler::my_client->incoming_queue);
|
CommandQueue &queue = (_network_server ? _local_execution_queue : ClientNetworkGameSocketHandler::my_client->incoming_queue);
|
||||||
|
|
||||||
CommandPacket *cp;
|
auto cp = queue.begin();
|
||||||
while ((cp = queue.Peek()) != nullptr) {
|
for (; cp != queue.end(); cp++) {
|
||||||
/* The queue is always in order, which means
|
/* The queue is always in order, which means
|
||||||
* that the first element will be executed first. */
|
* that the first element will be executed first. */
|
||||||
if (_frame_counter < cp->frame) break;
|
if (_frame_counter < cp->frame) break;
|
||||||
@ -335,11 +265,9 @@ void NetworkExecuteLocalCommandQueue()
|
|||||||
size_t cb_index = FindCallbackIndex(cp->callback);
|
size_t cb_index = FindCallbackIndex(cp->callback);
|
||||||
assert(cb_index < _callback_tuple_size);
|
assert(cb_index < _callback_tuple_size);
|
||||||
assert(_cmd_dispatch[cp->cmd].Unpack[cb_index] != nullptr);
|
assert(_cmd_dispatch[cp->cmd].Unpack[cb_index] != nullptr);
|
||||||
_cmd_dispatch[cp->cmd].Unpack[cb_index](cp);
|
_cmd_dispatch[cp->cmd].Unpack[cb_index](&*cp);
|
||||||
|
|
||||||
queue.Pop();
|
|
||||||
delete cp;
|
|
||||||
}
|
}
|
||||||
|
queue.erase(queue.begin(), cp);
|
||||||
|
|
||||||
/* Local company may have changed, so we should not restore the old value */
|
/* Local company may have changed, so we should not restore the old value */
|
||||||
_current_company = _local_company;
|
_current_company = _local_company;
|
||||||
@ -350,8 +278,8 @@ void NetworkExecuteLocalCommandQueue()
|
|||||||
*/
|
*/
|
||||||
void NetworkFreeLocalCommandQueue()
|
void NetworkFreeLocalCommandQueue()
|
||||||
{
|
{
|
||||||
_local_wait_queue.Free();
|
_local_wait_queue.clear();
|
||||||
_local_execution_queue.Free();
|
_local_execution_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -376,7 +304,7 @@ static void DistributeCommandPacket(CommandPacket &cp, const NetworkClientSocket
|
|||||||
|
|
||||||
cp.callback = (nullptr != owner) ? nullptr : callback;
|
cp.callback = (nullptr != owner) ? nullptr : callback;
|
||||||
cp.my_cmd = (nullptr == owner);
|
cp.my_cmd = (nullptr == owner);
|
||||||
_local_execution_queue.Append(&cp);
|
_local_execution_queue.push_back(cp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -384,7 +312,7 @@ static void DistributeCommandPacket(CommandPacket &cp, const NetworkClientSocket
|
|||||||
* @param queue The queue of commands that has to be distributed.
|
* @param queue The queue of commands that has to be distributed.
|
||||||
* @param owner The client that owns the commands,
|
* @param owner The client that owns the commands,
|
||||||
*/
|
*/
|
||||||
static void DistributeQueue(CommandQueue *queue, const NetworkClientSocket *owner)
|
static void DistributeQueue(CommandQueue &queue, const NetworkClientSocket *owner)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_DUMP_COMMANDS
|
#ifdef DEBUG_DUMP_COMMANDS
|
||||||
/* When replaying we do not want this limitation. */
|
/* When replaying we do not want this limitation. */
|
||||||
@ -397,11 +325,20 @@ static void DistributeQueue(CommandQueue *queue, const NetworkClientSocket *owne
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
CommandPacket *cp;
|
/* Not technically the most performant way, but consider clients rarely click more than once per tick. */
|
||||||
while (--to_go >= 0 && (cp = queue->Pop(true)) != nullptr) {
|
for (auto cp = queue.begin(); cp != queue.end(); /* removing some items */) {
|
||||||
|
/* Limit the number of commands per client per tick. */
|
||||||
|
if (--to_go < 0) break;
|
||||||
|
|
||||||
|
/* Do not distribute commands when paused and the command is not allowed while paused. */
|
||||||
|
if (_pause_mode != PM_UNPAUSED && !IsCommandAllowedWhilePaused(cp->cmd)) {
|
||||||
|
++cp;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
DistributeCommandPacket(*cp, owner);
|
DistributeCommandPacket(*cp, owner);
|
||||||
NetworkAdminCmdLogging(owner, cp);
|
NetworkAdminCmdLogging(owner, &*cp);
|
||||||
delete cp;
|
cp = queue.erase(cp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,11 +346,11 @@ static void DistributeQueue(CommandQueue *queue, const NetworkClientSocket *owne
|
|||||||
void NetworkDistributeCommands()
|
void NetworkDistributeCommands()
|
||||||
{
|
{
|
||||||
/* First send the server's commands. */
|
/* First send the server's commands. */
|
||||||
DistributeQueue(&_local_wait_queue, nullptr);
|
DistributeQueue(_local_wait_queue, nullptr);
|
||||||
|
|
||||||
/* Then send the queues of the others. */
|
/* Then send the queues of the others. */
|
||||||
for (NetworkClientSocket *cs : NetworkClientSocket::Iterate()) {
|
for (NetworkClientSocket *cs : NetworkClientSocket::Iterate()) {
|
||||||
DistributeQueue(&cs->incoming_queue, cs);
|
DistributeQueue(cs->incoming_queue, cs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,9 +107,7 @@ void UpdateNetworkGameWindow();
|
|||||||
* Everything we need to know about a command to be able to execute it.
|
* Everything we need to know about a command to be able to execute it.
|
||||||
*/
|
*/
|
||||||
struct CommandPacket {
|
struct CommandPacket {
|
||||||
/** Make sure the pointer is nullptr. */
|
CommandPacket() : company(INVALID_COMPANY), frame(0), my_cmd(false) {}
|
||||||
CommandPacket() : next(nullptr), company(INVALID_COMPANY), frame(0), my_cmd(false) {}
|
|
||||||
CommandPacket *next; ///< the next command packet (if in queue)
|
|
||||||
CompanyID company; ///< company that is executing the command
|
CompanyID company; ///< company that is executing the command
|
||||||
uint32_t frame; ///< the frame in which this packet is executed
|
uint32_t frame; ///< the frame in which this packet is executed
|
||||||
bool my_cmd; ///< did the command originate from "me"
|
bool my_cmd; ///< did the command originate from "me"
|
||||||
|
@ -1060,7 +1060,7 @@ NetworkRecvStatus ServerNetworkGameSocketHandler::Receive_CLIENT_COMMAND(Packet
|
|||||||
return this->SendError(NETWORK_ERROR_NOT_EXPECTED);
|
return this->SendError(NETWORK_ERROR_NOT_EXPECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this->incoming_queue.Count() >= _settings_client.network.max_commands_in_queue) {
|
if (this->incoming_queue.size() >= _settings_client.network.max_commands_in_queue) {
|
||||||
return this->SendError(NETWORK_ERROR_TOO_MANY_COMMANDS);
|
return this->SendError(NETWORK_ERROR_TOO_MANY_COMMANDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1115,7 +1115,7 @@ NetworkRecvStatus ServerNetworkGameSocketHandler::Receive_CLIENT_COMMAND(Packet
|
|||||||
|
|
||||||
if (GetCommandFlags(cp.cmd) & CMD_CLIENT_ID) NetworkReplaceCommandClientId(cp, this->client_id);
|
if (GetCommandFlags(cp.cmd) & CMD_CLIENT_ID) NetworkReplaceCommandClientId(cp, this->client_id);
|
||||||
|
|
||||||
this->incoming_queue.Append(&cp);
|
this->incoming_queue.push_back(cp);
|
||||||
return NETWORK_RECV_STATUS_OKAY;
|
return NETWORK_RECV_STATUS_OKAY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ public:
|
|||||||
byte last_token; ///< The last random token we did send to verify the client is listening
|
byte last_token; ///< The last random token we did send to verify the client is listening
|
||||||
uint32_t last_token_frame; ///< The last frame we received the right token
|
uint32_t last_token_frame; ///< The last frame we received the right token
|
||||||
ClientStatus status; ///< Status of this client
|
ClientStatus status; ///< Status of this client
|
||||||
std::vector<CommandPacket> outgoing_queue; ///< The command-queue awaiting delivery; conceptually more a bucket to gather commands in, after which the whole bucket is sent to the client.
|
CommandQueue outgoing_queue; ///< The command-queue awaiting delivery; conceptually more a bucket to gather commands in, after which the whole bucket is sent to the client.
|
||||||
size_t receive_limit; ///< Amount of bytes that we can receive at this moment
|
size_t receive_limit; ///< Amount of bytes that we can receive at this moment
|
||||||
|
|
||||||
std::shared_ptr<struct PacketWriter> savegame; ///< Writer used to write the savegame.
|
std::shared_ptr<struct PacketWriter> savegame; ///< Writer used to write the savegame.
|
||||||
|
Loading…
Reference in New Issue
Block a user