diff --git a/lib/socketserver/GwSocketConnection.cpp b/lib/socketserver/GwSocketConnection.cpp new file mode 100644 index 0000000..b518b4c --- /dev/null +++ b/lib/socketserver/GwSocketConnection.cpp @@ -0,0 +1,214 @@ +#include "GwSocketConnection.h" +IPAddress GwSocketConnection::remoteIP(int fd) +{ + struct sockaddr_storage addr; + socklen_t len = sizeof addr; + getpeername(fd, (struct sockaddr *)&addr, &len); + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + return IPAddress((uint32_t)(s->sin_addr.s_addr)); +} +GwSocketConnection::GwSocketConnection(GwLog *logger, int id, bool allowRead) +{ + this->logger = logger; + this->allowRead = allowRead; + String bufName = "Sock("; + bufName += String(id); + bufName += ")"; + buffer = new GwBuffer(logger, GwBuffer::TX_BUFFER_SIZE, bufName + "wr"); + if (allowRead) + { + readBuffer = new GwBuffer(logger, GwBuffer::RX_BUFFER_SIZE, bufName + "rd"); + } + overflows = 0; +} +void GwSocketConnection::setClient(int fd) +{ + this->fd = fd; + buffer->reset("new client"); + if (readBuffer) + readBuffer->reset("new client"); + overflows = 0; + pendingWrite = false; + writeError = false; + lastWrite = 0; + if (fd >= 0) + { + remoteIpAddress = remoteIP(fd).toString(); + } + else + { + remoteIpAddress = String("---"); + } +} +bool GwSocketConnection::hasClient() +{ + return fd >= 0; +} +void GwSocketConnection::stop() +{ + if (fd >= 0) + { + close(fd); + fd = -1; + } +} +GwSocketConnection::~GwSocketConnection() +{ + delete buffer; + if (readBuffer) + delete readBuffer; +} +bool GwSocketConnection::connected() +{ + if (fd >= 0) + { + uint8_t dummy; + int res = recv(fd, &dummy, 0, MSG_DONTWAIT); + // avoid unused var warning by gcc + (void)res; + // recv only sets errno if res is <= 0 + if (res <= 0) + { + switch (errno) + { + case EWOULDBLOCK: + case ENOENT: //caused by vfs + return true; + break; + case ENOTCONN: + case EPIPE: + case ECONNRESET: + case ECONNREFUSED: + case ECONNABORTED: + return false; + break; + default: + return true; + } + } + else + { + return true; + } + } + return false; +} + +bool GwSocketConnection::enqueue(uint8_t *data, size_t len) +{ + if (len == 0) + return true; + size_t rt = buffer->addData(data, len); + if (rt < len) + { + LOG_DEBUG(GwLog::LOG, "overflow on %s", remoteIpAddress.c_str()); + overflows++; + return false; + } + return true; +} +bool GwSocketConnection::hasData() +{ + return buffer->usedSpace() > 0; +} +bool GwSocketConnection::handleError(int res, bool errorIf0) +{ + if (res == 0 && errorIf0) + { + LOG_DEBUG(GwLog::LOG, "client shutdown (recv 0) on %s", remoteIpAddress.c_str()); + stop(); + return false; + } + if (res < 0) + { + if (errno != EAGAIN) + { + LOG_DEBUG(GwLog::LOG, "client read error %d on %s", errno, remoteIpAddress.c_str()); + stop(); + return false; + } + return false; + } + return true; +} +GwBuffer::WriteStatus GwSocketConnection::write() +{ + if (!hasClient()) + { + LOG_DEBUG(GwLog::LOG, "write called on empty client"); + return GwBuffer::ERROR; + } + if (!buffer->usedSpace()) + { + pendingWrite = false; + return GwBuffer::OK; + } + buffer->fetchData( + -1, [](uint8_t *buffer, size_t len, void *param) -> size_t + { + GwSocketConnection *c = (GwSocketConnection *)param; + int res = send(c->fd, (void *)buffer, len, MSG_DONTWAIT); + if (!c->handleError(res, false)) + return 0; + if (res >= len) + { + c->pendingWrite = false; + } + else + { + if (!c->pendingWrite) + { + c->lastWrite = millis(); + c->pendingWrite = true; + } + else + { + //we need to check if we have still not been able + //to write until timeout + if (millis() >= (c->lastWrite + c->writeTimeout)) + { + c->logger->logDebug(GwLog::ERROR, "Write timeout on channel %s", c->remoteIpAddress.c_str()); + c->writeError = true; + } + } + } + return res; + }, + this); + if (writeError) + { + LOG_DEBUG(GwLog::DEBUG + 1, "write error on %s", remoteIpAddress.c_str()); + return GwBuffer::ERROR; + } + + return GwBuffer::OK; +} + +bool GwSocketConnection::read() +{ + if (!allowRead) + { + size_t maxLen = 100; + char buffer[maxLen]; + int res = recv(fd, (void *)buffer, maxLen, MSG_DONTWAIT); + return handleError(res); + } + readBuffer->fillData( + -1, [](uint8_t *buffer, size_t len, void *param) -> size_t + { + GwSocketConnection *c = (GwSocketConnection *)param; + int res = recv(c->fd, (void *)buffer, len, MSG_DONTWAIT); + if (!c->handleError(res)) + return 0; + return res; + }, + this); + return true; +} +bool GwSocketConnection::messagesFromBuffer(GwMessageFetcher *writer) +{ + if (!allowRead) + return false; + return writer->handleBuffer(readBuffer); +} + diff --git a/lib/socketserver/GwSocketConnection.h b/lib/socketserver/GwSocketConnection.h new file mode 100644 index 0000000..8b693ee --- /dev/null +++ b/lib/socketserver/GwSocketConnection.h @@ -0,0 +1,36 @@ +#pragma once +#include +#include +#include "GwBuffer.h" +class GwSocketConnection +{ +public: + int fd=-1; + int overflows; + String remoteIpAddress; + +private: + unsigned long lastWrite = 0; + unsigned long writeTimeout = 10000; + bool pendingWrite = false; + bool writeError = false; + bool allowRead; + GwBuffer *buffer = NULL; + GwBuffer *readBuffer = NULL; + GwLog *logger; + +public: + static IPAddress remoteIP(int fd); + GwSocketConnection(GwLog *logger, int id, bool allowRead = false); + void setClient(int fd); + bool hasClient(); + void stop(); + ~GwSocketConnection(); + bool connected(); + bool enqueue(uint8_t *data, size_t len); + bool hasData(); + bool handleError(int res, bool errorIf0 = true); + GwBuffer::WriteStatus write(); + bool read(); + bool messagesFromBuffer(GwMessageFetcher *writer); +}; diff --git a/lib/socketserver/GwSocketServer.cpp b/lib/socketserver/GwSocketServer.cpp index a7f788f..d30e7b2 100644 --- a/lib/socketserver/GwSocketServer.cpp +++ b/lib/socketserver/GwSocketServer.cpp @@ -2,240 +2,11 @@ #include #include #include "GwBuffer.h" - -class GwClient -{ -public: - int fd=-1; - int overflows; - String remoteIpAddress; - -private: - unsigned long lastWrite = 0; - unsigned long writeTimeout = 10000; - bool pendingWrite = false; - bool writeError = false; - bool allowRead; - GwBuffer *buffer = NULL; - GwBuffer *readBuffer = NULL; - GwLog *logger; - -public: - static IPAddress remoteIP(int fd) - { - struct sockaddr_storage addr; - socklen_t len = sizeof addr; - getpeername(fd, (struct sockaddr*)&addr, &len); - struct sockaddr_in *s = (struct sockaddr_in *)&addr; - return IPAddress((uint32_t)(s->sin_addr.s_addr)); - } - GwClient(GwLog *logger, int id, bool allowRead = false) - { - this->logger = logger; - this->allowRead = allowRead; - String bufName = "Sock("; - bufName += String(id); - bufName += ")"; - buffer = new GwBuffer(logger, GwBuffer::TX_BUFFER_SIZE, bufName + "wr"); - if (allowRead) - { - readBuffer = new GwBuffer(logger, GwBuffer::RX_BUFFER_SIZE, bufName + "rd"); - } - overflows = 0; - } - void setClient(int fd) - { - this->fd = fd; - buffer->reset("new client"); - if (readBuffer) - readBuffer->reset("new client"); - overflows = 0; - pendingWrite = false; - writeError = false; - lastWrite = 0; - if (fd >= 0) - { - remoteIpAddress = remoteIP(fd).toString(); - } - else - { - remoteIpAddress = String("---"); - } - } - bool hasClient() - { - return fd >= 0; - } - void stop(){ - if (fd >= 0){ - close(fd); - fd=-1; - } - } - ~GwClient() - { - delete buffer; - if (readBuffer) - delete readBuffer; - } - bool connected() - { - if (fd >= 0) - { - uint8_t dummy; - int res = recv(fd, &dummy, 0, MSG_DONTWAIT); - // avoid unused var warning by gcc - (void)res; - // recv only sets errno if res is <= 0 - if (res <= 0) - { - switch (errno) - { - case EWOULDBLOCK: - case ENOENT: //caused by vfs - return true; - break; - case ENOTCONN: - case EPIPE: - case ECONNRESET: - case ECONNREFUSED: - case ECONNABORTED: - return false; - break; - default: - return true; - } - } - else - { - return true; - } - } - return false; - } - - bool enqueue(uint8_t *data, size_t len) - { - if (len == 0) - return true; - size_t rt = buffer->addData(data, len); - if (rt < len) - { - LOG_DEBUG(GwLog::LOG, "overflow on %s", remoteIpAddress.c_str()); - overflows++; - return false; - } - return true; - } - bool hasData() - { - return buffer->usedSpace() > 0; - } - bool handleError(int res, bool errorIf0 = true) - { - if (res == 0 && errorIf0) - { - LOG_DEBUG(GwLog::LOG, "client shutdown (recv 0) on %s", remoteIpAddress.c_str()); - stop(); - return false; - } - if (res < 0) - { - if (errno != EAGAIN) - { - LOG_DEBUG(GwLog::LOG, "client read error %d on %s", errno, remoteIpAddress.c_str()); - stop(); - return false; - } - return false; - } - return true; - } - GwBuffer::WriteStatus write() - { - if (!hasClient()) - { - LOG_DEBUG(GwLog::LOG, "write called on empty client"); - return GwBuffer::ERROR; - } - if (!buffer->usedSpace()) - { - pendingWrite = false; - return GwBuffer::OK; - } - buffer->fetchData( - -1, [](uint8_t *buffer, size_t len, void *param) -> size_t - { - GwClient *c = (GwClient *)param; - int res = send(c->fd, (void *)buffer, len, MSG_DONTWAIT); - if (!c->handleError(res, false)) - return 0; - if (res >= len) - { - c->pendingWrite = false; - } - else - { - if (!c->pendingWrite) - { - c->lastWrite = millis(); - c->pendingWrite = true; - } - else - { - //we need to check if we have still not been able - //to write until timeout - if (millis() >= (c->lastWrite + c->writeTimeout)) - { - c->logger->logDebug(GwLog::ERROR, "Write timeout on channel %s", c->remoteIpAddress.c_str()); - c->writeError = true; - } - } - } - return res; - }, - this); - if (writeError) - { - LOG_DEBUG(GwLog::DEBUG + 1, "write error on %s", remoteIpAddress.c_str()); - return GwBuffer::ERROR; - } - - return GwBuffer::OK; - } - - bool read() - { - if (!allowRead) - { - size_t maxLen = 100; - char buffer[maxLen]; - int res = recv(fd, (void *)buffer, maxLen, MSG_DONTWAIT); - return handleError(res); - } - readBuffer->fillData( - -1, [](uint8_t *buffer, size_t len, void *param) -> size_t - { - GwClient *c = (GwClient *)param; - int res = recv(c->fd, (void *)buffer, len, MSG_DONTWAIT); - if (!c->handleError(res)) - return 0; - return res; - }, - this); - return true; - } - bool messagesFromBuffer(GwMessageFetcher *writer) - { - if (!allowRead) - return false; - return writer->handleBuffer(readBuffer); - } -}; +#include "GwSocketConnection.h" class GwTcpClient { - GwClient *gwClient = NULL; + GwSocketConnection *gwClient = NULL; IPAddress remoteAddress; uint16_t port = 0; GwLog *logger; @@ -270,7 +41,7 @@ private: } public: - GwTcpClient(GwLog *logger, GwClient *gwClient) + GwTcpClient(GwLog *logger, GwSocketConnection *gwClient) { this->logger = logger; this->gwClient = gwClient; @@ -325,10 +96,10 @@ void GwSocketServer::begin() maxClients = config->getInt(config->maxClients); allowReceive = config->getBool(config->readTCP); listenerPort=config->getInt(config->serverPort); - clients = new GwClient*[maxClients]; + clients = new GwSocketConnection*[maxClients]; for (int i = 0; i < maxClients; i++) { - clients[i] = new GwClient(logger, i, allowReceive); + clients[i] = new GwSocketConnection(logger, i, allowReceive); } if (! createListener()){ listener=-1; @@ -368,7 +139,7 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite) if (client >= 0) { LOG_DEBUG(GwLog::LOG, "new client connected from %s", - GwClient::remoteIP(client).toString().c_str()); + GwSocketConnection::remoteIP(client).toString().c_str()); bool canHandle = false; for (int i = 0; i < maxClients; i++) { @@ -391,7 +162,7 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite) //sending for (int i = 0; i < maxClients; i++) { - GwClient *client = clients[i]; + GwSocketConnection *client = clients[i]; if (!client->hasClient()) continue; GwBuffer::WriteStatus rt = client->write(); @@ -404,7 +175,7 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite) } for (int i = 0; i < maxClients; i++) { - GwClient *client = clients[i]; + GwSocketConnection *client = clients[i]; if (!client->hasClient()) continue; @@ -446,7 +217,7 @@ void GwSocketServer::sendToClients(const char *buf, int source) { if (i == sourceIndex) continue; //never send out to the source we received from - GwClient *client = clients[i]; + GwSocketConnection *client = clients[i]; if (!client->hasClient()) continue; if (client->connected()) diff --git a/lib/socketserver/GwSocketServer.h b/lib/socketserver/GwSocketServer.h index 0b9c64f..e3762e7 100644 --- a/lib/socketserver/GwSocketServer.h +++ b/lib/socketserver/GwSocketServer.h @@ -5,12 +5,12 @@ #include "GwBuffer.h" #include -class GwClient; +class GwSocketConnection; class GwSocketServer{ private: const GwConfigHandler *config; GwLog *logger; - GwClient **clients=NULL; + GwSocketConnection **clients=NULL; int listener=-1; int listenerPort=-1; bool allowReceive;