mirror of
https://github.com/OpenTTD/OpenTTD.git
synced 2025-03-10 08:00:05 +00:00
Codechange: use std::deque of std::unique_ptr to queue packets
This commit is contained in:
parent
c77a45ed86
commit
36e1b32ccf
@ -28,7 +28,7 @@
|
|||||||
* loose some the data of the packet, so there you pass the maximum
|
* loose some the data of the packet, so there you pass the maximum
|
||||||
* size for the packet you expect from the network.
|
* size for the packet you expect from the network.
|
||||||
*/
|
*/
|
||||||
Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size) : next(nullptr), pos(0), limit(limit)
|
Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size) : pos(0), limit(limit)
|
||||||
{
|
{
|
||||||
assert(cs != nullptr);
|
assert(cs != nullptr);
|
||||||
|
|
||||||
@ -44,45 +44,20 @@ Packet::Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size)
|
|||||||
* the limit as it might break things if the other side is not expecting
|
* the limit as it might break things if the other side is not expecting
|
||||||
* much larger packets than what they support.
|
* much larger packets than what they support.
|
||||||
*/
|
*/
|
||||||
Packet::Packet(PacketType type, size_t limit) : next(nullptr), pos(0), limit(limit), cs(nullptr)
|
Packet::Packet(PacketType type, size_t limit) : pos(0), limit(limit), cs(nullptr)
|
||||||
{
|
{
|
||||||
/* Allocate space for the the size so we can write that in just before sending the packet. */
|
/* Allocate space for the the size so we can write that in just before sending the packet. */
|
||||||
this->Send_uint16(0);
|
this->Send_uint16(0);
|
||||||
this->Send_uint8(type);
|
this->Send_uint8(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Add the given Packet to the end of the queue of packets.
|
|
||||||
* @param queue The pointer to the begin of the queue.
|
|
||||||
* @param packet The packet to append to the queue.
|
|
||||||
*/
|
|
||||||
/* static */ void Packet::AddToQueue(Packet **queue, Packet *packet)
|
|
||||||
{
|
|
||||||
while (*queue != nullptr) queue = &(*queue)->next;
|
|
||||||
*queue = packet;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pop the packet from the begin of the queue and set the
|
|
||||||
* begin of the queue to the second element in the queue.
|
|
||||||
* @param queue The pointer to the begin of the queue.
|
|
||||||
* @return The Packet that used to be a the begin of the queue.
|
|
||||||
*/
|
|
||||||
/* static */ Packet *Packet::PopFromQueue(Packet **queue)
|
|
||||||
{
|
|
||||||
Packet *p = *queue;
|
|
||||||
*queue = p->next;
|
|
||||||
p->next = nullptr;
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes the packet size from the raw packet from packet->size
|
* Writes the packet size from the raw packet from packet->size
|
||||||
*/
|
*/
|
||||||
void Packet::PrepareToSend()
|
void Packet::PrepareToSend()
|
||||||
{
|
{
|
||||||
assert(this->cs == nullptr && this->next == nullptr);
|
assert(this->cs == nullptr);
|
||||||
|
|
||||||
this->buffer[0] = GB(this->Size(), 0, 8);
|
this->buffer[0] = GB(this->Size(), 0, 8);
|
||||||
this->buffer[1] = GB(this->Size(), 8, 8);
|
this->buffer[1] = GB(this->Size(), 8, 8);
|
||||||
@ -268,7 +243,7 @@ size_t Packet::Size() const
|
|||||||
*/
|
*/
|
||||||
bool Packet::ParsePacketSize()
|
bool Packet::ParsePacketSize()
|
||||||
{
|
{
|
||||||
assert(this->cs != nullptr && this->next == nullptr);
|
assert(this->cs != nullptr);
|
||||||
size_t size = (size_t)this->buffer[0];
|
size_t size = (size_t)this->buffer[0];
|
||||||
size += (size_t)this->buffer[1] << 8;
|
size += (size_t)this->buffer[1] << 8;
|
||||||
|
|
||||||
|
@ -41,8 +41,6 @@ typedef uint8_t PacketType; ///< Identifier for the packet
|
|||||||
*/
|
*/
|
||||||
struct Packet {
|
struct Packet {
|
||||||
private:
|
private:
|
||||||
/** The next packet. Used for queueing packets before sending. */
|
|
||||||
Packet *next;
|
|
||||||
/** The current read/write position in the packet */
|
/** The current read/write position in the packet */
|
||||||
PacketSize pos;
|
PacketSize pos;
|
||||||
/** The buffer of this packet. */
|
/** The buffer of this packet. */
|
||||||
@ -57,9 +55,6 @@ public:
|
|||||||
Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size = sizeof(PacketSize));
|
Packet(NetworkSocketHandler *cs, size_t limit, size_t initial_read_size = sizeof(PacketSize));
|
||||||
Packet(PacketType type, size_t limit = COMPAT_MTU);
|
Packet(PacketType type, size_t limit = COMPAT_MTU);
|
||||||
|
|
||||||
static void AddToQueue(Packet **queue, Packet *packet);
|
|
||||||
static Packet *PopFromQueue(Packet **queue);
|
|
||||||
|
|
||||||
/* Sending/writing of packets */
|
/* Sending/writing of packets */
|
||||||
void PrepareToSend();
|
void PrepareToSend();
|
||||||
|
|
||||||
|
@ -22,28 +22,15 @@
|
|||||||
*/
|
*/
|
||||||
NetworkTCPSocketHandler::NetworkTCPSocketHandler(SOCKET s) :
|
NetworkTCPSocketHandler::NetworkTCPSocketHandler(SOCKET s) :
|
||||||
NetworkSocketHandler(),
|
NetworkSocketHandler(),
|
||||||
packet_queue(nullptr), packet_recv(nullptr),
|
|
||||||
sock(s), writable(false)
|
sock(s), writable(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
NetworkTCPSocketHandler::~NetworkTCPSocketHandler()
|
NetworkTCPSocketHandler::~NetworkTCPSocketHandler()
|
||||||
{
|
{
|
||||||
this->EmptyPacketQueue();
|
|
||||||
this->CloseSocket();
|
this->CloseSocket();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Free all pending and partially received packets.
|
|
||||||
*/
|
|
||||||
void NetworkTCPSocketHandler::EmptyPacketQueue()
|
|
||||||
{
|
|
||||||
while (this->packet_queue != nullptr) {
|
|
||||||
delete Packet::PopFromQueue(&this->packet_queue);
|
|
||||||
}
|
|
||||||
this->packet_recv = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the actual socket of the connection.
|
* Close the actual socket of the connection.
|
||||||
* Please make sure CloseConnection is called before CloseSocket, as
|
* Please make sure CloseConnection is called before CloseSocket, as
|
||||||
@ -66,7 +53,8 @@ NetworkRecvStatus NetworkTCPSocketHandler::CloseConnection([[maybe_unused]] bool
|
|||||||
this->MarkClosed();
|
this->MarkClosed();
|
||||||
this->writable = false;
|
this->writable = false;
|
||||||
|
|
||||||
this->EmptyPacketQueue();
|
this->packet_queue.clear();
|
||||||
|
this->packet_recv = nullptr;
|
||||||
|
|
||||||
return NETWORK_RECV_STATUS_OKAY;
|
return NETWORK_RECV_STATUS_OKAY;
|
||||||
}
|
}
|
||||||
@ -82,7 +70,7 @@ void NetworkTCPSocketHandler::SendPacket(Packet *packet)
|
|||||||
assert(packet != nullptr);
|
assert(packet != nullptr);
|
||||||
|
|
||||||
packet->PrepareToSend();
|
packet->PrepareToSend();
|
||||||
Packet::AddToQueue(&this->packet_queue, packet);
|
this->packet_queue.push_back(std::unique_ptr<Packet>(packet));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -97,15 +85,13 @@ void NetworkTCPSocketHandler::SendPacket(Packet *packet)
|
|||||||
*/
|
*/
|
||||||
SendPacketsState NetworkTCPSocketHandler::SendPackets(bool closing_down)
|
SendPacketsState NetworkTCPSocketHandler::SendPackets(bool closing_down)
|
||||||
{
|
{
|
||||||
ssize_t res;
|
|
||||||
Packet *p;
|
|
||||||
|
|
||||||
/* We can not write to this socket!! */
|
/* We can not write to this socket!! */
|
||||||
if (!this->writable) return SPS_NONE_SENT;
|
if (!this->writable) return SPS_NONE_SENT;
|
||||||
if (!this->IsConnected()) return SPS_CLOSED;
|
if (!this->IsConnected()) return SPS_CLOSED;
|
||||||
|
|
||||||
while ((p = this->packet_queue) != nullptr) {
|
while (!this->packet_queue.empty()) {
|
||||||
res = p->TransferOut<int>(send, this->sock, 0);
|
Packet *p = this->packet_queue.front().get();
|
||||||
|
ssize_t res = p->TransferOut<int>(send, this->sock, 0);
|
||||||
if (res == -1) {
|
if (res == -1) {
|
||||||
NetworkError err = NetworkError::GetLast();
|
NetworkError err = NetworkError::GetLast();
|
||||||
if (!err.WouldBlock()) {
|
if (!err.WouldBlock()) {
|
||||||
@ -127,7 +113,7 @@ SendPacketsState NetworkTCPSocketHandler::SendPackets(bool closing_down)
|
|||||||
/* Is this packet sent? */
|
/* Is this packet sent? */
|
||||||
if (p->RemainingBytesToTransfer() == 0) {
|
if (p->RemainingBytesToTransfer() == 0) {
|
||||||
/* Go to the next packet */
|
/* Go to the next packet */
|
||||||
delete Packet::PopFromQueue(&this->packet_queue);
|
this->packet_queue.pop_front();
|
||||||
} else {
|
} else {
|
||||||
return SPS_PARTLY_SENT;
|
return SPS_PARTLY_SENT;
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ enum SendPacketsState {
|
|||||||
/** Base socket handler for all TCP sockets */
|
/** Base socket handler for all TCP sockets */
|
||||||
class NetworkTCPSocketHandler : public NetworkSocketHandler {
|
class NetworkTCPSocketHandler : public NetworkSocketHandler {
|
||||||
private:
|
private:
|
||||||
Packet *packet_queue; ///< Packets that are awaiting delivery
|
std::deque<std::unique_ptr<Packet>> packet_queue; ///< Packets that are awaiting delivery. Cannot be std::queue as that does not have a clear() function.
|
||||||
std::unique_ptr<Packet> packet_recv; ///< Partially received packet
|
std::unique_ptr<Packet> packet_recv; ///< Partially received packet
|
||||||
|
|
||||||
void EmptyPacketQueue();
|
void EmptyPacketQueue();
|
||||||
@ -58,7 +58,7 @@ public:
|
|||||||
* Whether there is something pending in the send queue.
|
* Whether there is something pending in the send queue.
|
||||||
* @return true when something is pending in the send queue.
|
* @return true when something is pending in the send queue.
|
||||||
*/
|
*/
|
||||||
bool HasSendQueue() { return this->packet_queue != nullptr; }
|
bool HasSendQueue() { return !this->packet_queue.empty(); }
|
||||||
|
|
||||||
NetworkTCPSocketHandler(SOCKET s = INVALID_SOCKET);
|
NetworkTCPSocketHandler(SOCKET s = INVALID_SOCKET);
|
||||||
~NetworkTCPSocketHandler();
|
~NetworkTCPSocketHandler();
|
||||||
|
@ -61,9 +61,9 @@ template SocketList TCPListenHandler<ServerNetworkGameSocketHandler, PACKET_SERV
|
|||||||
/** Writing a savegame directly to a number of packets. */
|
/** Writing a savegame directly to a number of packets. */
|
||||||
struct PacketWriter : SaveFilter {
|
struct PacketWriter : SaveFilter {
|
||||||
ServerNetworkGameSocketHandler *cs; ///< Socket we are associated with.
|
ServerNetworkGameSocketHandler *cs; ///< Socket we are associated with.
|
||||||
Packet *current; ///< The packet we're currently writing to.
|
std::unique_ptr<Packet> current; ///< The packet we're currently writing to.
|
||||||
size_t total_size; ///< Total size of the compressed savegame.
|
size_t total_size; ///< Total size of the compressed savegame.
|
||||||
Packet *packets; ///< Packet queue of the savegame; send these "slowly" to the client.
|
std::deque<std::unique_ptr<Packet>> packets; ///< Packet queue of the savegame; send these "slowly" to the client. Cannot be a std::queue as we want to push the map size packet in front of the data packets.
|
||||||
std::mutex mutex; ///< Mutex for making threaded saving safe.
|
std::mutex mutex; ///< Mutex for making threaded saving safe.
|
||||||
std::condition_variable exit_sig; ///< Signal for threaded destruction of this packet writer.
|
std::condition_variable exit_sig; ///< Signal for threaded destruction of this packet writer.
|
||||||
|
|
||||||
@ -71,7 +71,7 @@ struct PacketWriter : SaveFilter {
|
|||||||
* Create the packet writer.
|
* Create the packet writer.
|
||||||
* @param cs The socket handler we're making the packets for.
|
* @param cs The socket handler we're making the packets for.
|
||||||
*/
|
*/
|
||||||
PacketWriter(ServerNetworkGameSocketHandler *cs) : SaveFilter(nullptr), cs(cs), current(nullptr), total_size(0), packets(nullptr)
|
PacketWriter(ServerNetworkGameSocketHandler *cs) : SaveFilter(nullptr), cs(cs), total_size(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,11 +84,9 @@ struct PacketWriter : SaveFilter {
|
|||||||
|
|
||||||
/* This must all wait until the Destroy function is called. */
|
/* This must all wait until the Destroy function is called. */
|
||||||
|
|
||||||
while (this->packets != nullptr) {
|
Debug(net, 0, "Destruct!");
|
||||||
delete Packet::PopFromQueue(&this->packets);
|
this->packets.clear();
|
||||||
}
|
this->current = nullptr;
|
||||||
|
|
||||||
delete this->current;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -125,14 +123,14 @@ struct PacketWriter : SaveFilter {
|
|||||||
bool TransferToNetworkQueue(ServerNetworkGameSocketHandler *socket)
|
bool TransferToNetworkQueue(ServerNetworkGameSocketHandler *socket)
|
||||||
{
|
{
|
||||||
/* Unsafe check for the queue being empty or not. */
|
/* Unsafe check for the queue being empty or not. */
|
||||||
if (this->packets == nullptr) return false;
|
if (this->packets.empty()) return false;
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(this->mutex);
|
std::lock_guard<std::mutex> lock(this->mutex);
|
||||||
|
|
||||||
while (this->packets != nullptr) {
|
while (!this->packets.empty()) {
|
||||||
Packet *p = Packet::PopFromQueue(&this->packets);
|
bool last_packet = this->packets.front()->GetPacketType() == PACKET_SERVER_MAP_DONE;
|
||||||
bool last_packet = p->GetPacketType() == PACKET_SERVER_MAP_DONE;
|
socket->SendPacket(this->packets.front().release());
|
||||||
socket->SendPacket(p);
|
this->packets.pop_front();
|
||||||
|
|
||||||
if (last_packet) return true;
|
if (last_packet) return true;
|
||||||
}
|
}
|
||||||
@ -140,32 +138,12 @@ struct PacketWriter : SaveFilter {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Append the current packet to the queue. */
|
|
||||||
void AppendQueue()
|
|
||||||
{
|
|
||||||
if (this->current == nullptr) return;
|
|
||||||
|
|
||||||
Packet::AddToQueue(&this->packets, this->current);
|
|
||||||
this->current = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Prepend the current packet to the queue. */
|
|
||||||
void PrependQueue()
|
|
||||||
{
|
|
||||||
if (this->current == nullptr) return;
|
|
||||||
|
|
||||||
/* Reversed from AppendQueue so the queue gets added to the current one. */
|
|
||||||
Packet::AddToQueue(&this->current, this->packets);
|
|
||||||
this->packets = this->current;
|
|
||||||
this->current = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Write(byte *buf, size_t size) override
|
void Write(byte *buf, size_t size) override
|
||||||
{
|
{
|
||||||
/* We want to abort the saving when the socket is closed. */
|
/* We want to abort the saving when the socket is closed. */
|
||||||
if (this->cs == nullptr) SlError(STR_NETWORK_ERROR_LOSTCONNECTION);
|
if (this->cs == nullptr) SlError(STR_NETWORK_ERROR_LOSTCONNECTION);
|
||||||
|
|
||||||
if (this->current == nullptr) this->current = new Packet(PACKET_SERVER_MAP_DATA, TCP_MTU);
|
if (this->current == nullptr) this->current = std::make_unique<Packet>(PACKET_SERVER_MAP_DATA, TCP_MTU);
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(this->mutex);
|
std::lock_guard<std::mutex> lock(this->mutex);
|
||||||
|
|
||||||
@ -175,8 +153,8 @@ struct PacketWriter : SaveFilter {
|
|||||||
buf += written;
|
buf += written;
|
||||||
|
|
||||||
if (!this->current->CanWriteToPacket(1)) {
|
if (!this->current->CanWriteToPacket(1)) {
|
||||||
this->AppendQueue();
|
this->packets.push_back(std::move(this->current));
|
||||||
if (buf != bufe) this->current = new Packet(PACKET_SERVER_MAP_DATA, TCP_MTU);
|
if (buf != bufe) this->current = std::make_unique<Packet>(PACKET_SERVER_MAP_DATA, TCP_MTU);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,16 +169,15 @@ struct PacketWriter : SaveFilter {
|
|||||||
std::lock_guard<std::mutex> lock(this->mutex);
|
std::lock_guard<std::mutex> lock(this->mutex);
|
||||||
|
|
||||||
/* Make sure the last packet is flushed. */
|
/* Make sure the last packet is flushed. */
|
||||||
this->AppendQueue();
|
if (this->current != nullptr) this->packets.push_back(std::move(this->current));
|
||||||
|
|
||||||
/* Add a packet stating that this is the end to the queue. */
|
/* Add a packet stating that this is the end to the queue. */
|
||||||
this->current = new Packet(PACKET_SERVER_MAP_DONE);
|
this->packets.push_back(std::make_unique<Packet>(PACKET_SERVER_MAP_DONE));
|
||||||
this->AppendQueue();
|
|
||||||
|
|
||||||
/* Fast-track the size to the client. */
|
/* Fast-track the size to the client. */
|
||||||
this->current = new Packet(PACKET_SERVER_MAP_SIZE);
|
auto p = std::make_unique<Packet>(PACKET_SERVER_MAP_SIZE);
|
||||||
this->current->Send_uint32((uint32_t)this->total_size);
|
p->Send_uint32((uint32_t)this->total_size);
|
||||||
this->PrependQueue();
|
this->packets.push_front(std::move(p));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user