externalize socket connection

This commit is contained in:
wellenvogel 2021-12-28 18:44:01 +01:00
parent cd9cbd11d0
commit 31798e3bec
4 changed files with 261 additions and 240 deletions

View File

@ -0,0 +1,214 @@
#include "GwSocketConnection.h"
IPAddress GwSocketConnection::remoteIP(int fd)
{
struct sockaddr_storage addr;
socklen_t len = sizeof addr;
getpeername(fd, (struct sockaddr *)&addr, &len);
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
return IPAddress((uint32_t)(s->sin_addr.s_addr));
}
GwSocketConnection::GwSocketConnection(GwLog *logger, int id, bool allowRead)
{
this->logger = logger;
this->allowRead = allowRead;
String bufName = "Sock(";
bufName += String(id);
bufName += ")";
buffer = new GwBuffer(logger, GwBuffer::TX_BUFFER_SIZE, bufName + "wr");
if (allowRead)
{
readBuffer = new GwBuffer(logger, GwBuffer::RX_BUFFER_SIZE, bufName + "rd");
}
overflows = 0;
}
void GwSocketConnection::setClient(int fd)
{
this->fd = fd;
buffer->reset("new client");
if (readBuffer)
readBuffer->reset("new client");
overflows = 0;
pendingWrite = false;
writeError = false;
lastWrite = 0;
if (fd >= 0)
{
remoteIpAddress = remoteIP(fd).toString();
}
else
{
remoteIpAddress = String("---");
}
}
bool GwSocketConnection::hasClient()
{
return fd >= 0;
}
void GwSocketConnection::stop()
{
if (fd >= 0)
{
close(fd);
fd = -1;
}
}
GwSocketConnection::~GwSocketConnection()
{
delete buffer;
if (readBuffer)
delete readBuffer;
}
bool GwSocketConnection::connected()
{
if (fd >= 0)
{
uint8_t dummy;
int res = recv(fd, &dummy, 0, MSG_DONTWAIT);
// avoid unused var warning by gcc
(void)res;
// recv only sets errno if res is <= 0
if (res <= 0)
{
switch (errno)
{
case EWOULDBLOCK:
case ENOENT: //caused by vfs
return true;
break;
case ENOTCONN:
case EPIPE:
case ECONNRESET:
case ECONNREFUSED:
case ECONNABORTED:
return false;
break;
default:
return true;
}
}
else
{
return true;
}
}
return false;
}
bool GwSocketConnection::enqueue(uint8_t *data, size_t len)
{
if (len == 0)
return true;
size_t rt = buffer->addData(data, len);
if (rt < len)
{
LOG_DEBUG(GwLog::LOG, "overflow on %s", remoteIpAddress.c_str());
overflows++;
return false;
}
return true;
}
bool GwSocketConnection::hasData()
{
return buffer->usedSpace() > 0;
}
bool GwSocketConnection::handleError(int res, bool errorIf0)
{
if (res == 0 && errorIf0)
{
LOG_DEBUG(GwLog::LOG, "client shutdown (recv 0) on %s", remoteIpAddress.c_str());
stop();
return false;
}
if (res < 0)
{
if (errno != EAGAIN)
{
LOG_DEBUG(GwLog::LOG, "client read error %d on %s", errno, remoteIpAddress.c_str());
stop();
return false;
}
return false;
}
return true;
}
GwBuffer::WriteStatus GwSocketConnection::write()
{
if (!hasClient())
{
LOG_DEBUG(GwLog::LOG, "write called on empty client");
return GwBuffer::ERROR;
}
if (!buffer->usedSpace())
{
pendingWrite = false;
return GwBuffer::OK;
}
buffer->fetchData(
-1, [](uint8_t *buffer, size_t len, void *param) -> size_t
{
GwSocketConnection *c = (GwSocketConnection *)param;
int res = send(c->fd, (void *)buffer, len, MSG_DONTWAIT);
if (!c->handleError(res, false))
return 0;
if (res >= len)
{
c->pendingWrite = false;
}
else
{
if (!c->pendingWrite)
{
c->lastWrite = millis();
c->pendingWrite = true;
}
else
{
//we need to check if we have still not been able
//to write until timeout
if (millis() >= (c->lastWrite + c->writeTimeout))
{
c->logger->logDebug(GwLog::ERROR, "Write timeout on channel %s", c->remoteIpAddress.c_str());
c->writeError = true;
}
}
}
return res;
},
this);
if (writeError)
{
LOG_DEBUG(GwLog::DEBUG + 1, "write error on %s", remoteIpAddress.c_str());
return GwBuffer::ERROR;
}
return GwBuffer::OK;
}
bool GwSocketConnection::read()
{
if (!allowRead)
{
size_t maxLen = 100;
char buffer[maxLen];
int res = recv(fd, (void *)buffer, maxLen, MSG_DONTWAIT);
return handleError(res);
}
readBuffer->fillData(
-1, [](uint8_t *buffer, size_t len, void *param) -> size_t
{
GwSocketConnection *c = (GwSocketConnection *)param;
int res = recv(c->fd, (void *)buffer, len, MSG_DONTWAIT);
if (!c->handleError(res))
return 0;
return res;
},
this);
return true;
}
bool GwSocketConnection::messagesFromBuffer(GwMessageFetcher *writer)
{
if (!allowRead)
return false;
return writer->handleBuffer(readBuffer);
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Arduino.h>
#include <lwip/sockets.h>
#include "GwBuffer.h"
class GwSocketConnection
{
public:
int fd=-1;
int overflows;
String remoteIpAddress;
private:
unsigned long lastWrite = 0;
unsigned long writeTimeout = 10000;
bool pendingWrite = false;
bool writeError = false;
bool allowRead;
GwBuffer *buffer = NULL;
GwBuffer *readBuffer = NULL;
GwLog *logger;
public:
static IPAddress remoteIP(int fd);
GwSocketConnection(GwLog *logger, int id, bool allowRead = false);
void setClient(int fd);
bool hasClient();
void stop();
~GwSocketConnection();
bool connected();
bool enqueue(uint8_t *data, size_t len);
bool hasData();
bool handleError(int res, bool errorIf0 = true);
GwBuffer::WriteStatus write();
bool read();
bool messagesFromBuffer(GwMessageFetcher *writer);
};

View File

@ -2,240 +2,11 @@
#include <ESPmDNS.h>
#include <lwip/sockets.h>
#include "GwBuffer.h"
class GwClient
{
public:
int fd=-1;
int overflows;
String remoteIpAddress;
private:
unsigned long lastWrite = 0;
unsigned long writeTimeout = 10000;
bool pendingWrite = false;
bool writeError = false;
bool allowRead;
GwBuffer *buffer = NULL;
GwBuffer *readBuffer = NULL;
GwLog *logger;
public:
static IPAddress remoteIP(int fd)
{
struct sockaddr_storage addr;
socklen_t len = sizeof addr;
getpeername(fd, (struct sockaddr*)&addr, &len);
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
return IPAddress((uint32_t)(s->sin_addr.s_addr));
}
GwClient(GwLog *logger, int id, bool allowRead = false)
{
this->logger = logger;
this->allowRead = allowRead;
String bufName = "Sock(";
bufName += String(id);
bufName += ")";
buffer = new GwBuffer(logger, GwBuffer::TX_BUFFER_SIZE, bufName + "wr");
if (allowRead)
{
readBuffer = new GwBuffer(logger, GwBuffer::RX_BUFFER_SIZE, bufName + "rd");
}
overflows = 0;
}
void setClient(int fd)
{
this->fd = fd;
buffer->reset("new client");
if (readBuffer)
readBuffer->reset("new client");
overflows = 0;
pendingWrite = false;
writeError = false;
lastWrite = 0;
if (fd >= 0)
{
remoteIpAddress = remoteIP(fd).toString();
}
else
{
remoteIpAddress = String("---");
}
}
bool hasClient()
{
return fd >= 0;
}
void stop(){
if (fd >= 0){
close(fd);
fd=-1;
}
}
~GwClient()
{
delete buffer;
if (readBuffer)
delete readBuffer;
}
bool connected()
{
if (fd >= 0)
{
uint8_t dummy;
int res = recv(fd, &dummy, 0, MSG_DONTWAIT);
// avoid unused var warning by gcc
(void)res;
// recv only sets errno if res is <= 0
if (res <= 0)
{
switch (errno)
{
case EWOULDBLOCK:
case ENOENT: //caused by vfs
return true;
break;
case ENOTCONN:
case EPIPE:
case ECONNRESET:
case ECONNREFUSED:
case ECONNABORTED:
return false;
break;
default:
return true;
}
}
else
{
return true;
}
}
return false;
}
bool enqueue(uint8_t *data, size_t len)
{
if (len == 0)
return true;
size_t rt = buffer->addData(data, len);
if (rt < len)
{
LOG_DEBUG(GwLog::LOG, "overflow on %s", remoteIpAddress.c_str());
overflows++;
return false;
}
return true;
}
bool hasData()
{
return buffer->usedSpace() > 0;
}
bool handleError(int res, bool errorIf0 = true)
{
if (res == 0 && errorIf0)
{
LOG_DEBUG(GwLog::LOG, "client shutdown (recv 0) on %s", remoteIpAddress.c_str());
stop();
return false;
}
if (res < 0)
{
if (errno != EAGAIN)
{
LOG_DEBUG(GwLog::LOG, "client read error %d on %s", errno, remoteIpAddress.c_str());
stop();
return false;
}
return false;
}
return true;
}
GwBuffer::WriteStatus write()
{
if (!hasClient())
{
LOG_DEBUG(GwLog::LOG, "write called on empty client");
return GwBuffer::ERROR;
}
if (!buffer->usedSpace())
{
pendingWrite = false;
return GwBuffer::OK;
}
buffer->fetchData(
-1, [](uint8_t *buffer, size_t len, void *param) -> size_t
{
GwClient *c = (GwClient *)param;
int res = send(c->fd, (void *)buffer, len, MSG_DONTWAIT);
if (!c->handleError(res, false))
return 0;
if (res >= len)
{
c->pendingWrite = false;
}
else
{
if (!c->pendingWrite)
{
c->lastWrite = millis();
c->pendingWrite = true;
}
else
{
//we need to check if we have still not been able
//to write until timeout
if (millis() >= (c->lastWrite + c->writeTimeout))
{
c->logger->logDebug(GwLog::ERROR, "Write timeout on channel %s", c->remoteIpAddress.c_str());
c->writeError = true;
}
}
}
return res;
},
this);
if (writeError)
{
LOG_DEBUG(GwLog::DEBUG + 1, "write error on %s", remoteIpAddress.c_str());
return GwBuffer::ERROR;
}
return GwBuffer::OK;
}
bool read()
{
if (!allowRead)
{
size_t maxLen = 100;
char buffer[maxLen];
int res = recv(fd, (void *)buffer, maxLen, MSG_DONTWAIT);
return handleError(res);
}
readBuffer->fillData(
-1, [](uint8_t *buffer, size_t len, void *param) -> size_t
{
GwClient *c = (GwClient *)param;
int res = recv(c->fd, (void *)buffer, len, MSG_DONTWAIT);
if (!c->handleError(res))
return 0;
return res;
},
this);
return true;
}
bool messagesFromBuffer(GwMessageFetcher *writer)
{
if (!allowRead)
return false;
return writer->handleBuffer(readBuffer);
}
};
#include "GwSocketConnection.h"
class GwTcpClient
{
GwClient *gwClient = NULL;
GwSocketConnection *gwClient = NULL;
IPAddress remoteAddress;
uint16_t port = 0;
GwLog *logger;
@ -270,7 +41,7 @@ private:
}
public:
GwTcpClient(GwLog *logger, GwClient *gwClient)
GwTcpClient(GwLog *logger, GwSocketConnection *gwClient)
{
this->logger = logger;
this->gwClient = gwClient;
@ -325,10 +96,10 @@ void GwSocketServer::begin()
maxClients = config->getInt(config->maxClients);
allowReceive = config->getBool(config->readTCP);
listenerPort=config->getInt(config->serverPort);
clients = new GwClient*[maxClients];
clients = new GwSocketConnection*[maxClients];
for (int i = 0; i < maxClients; i++)
{
clients[i] = new GwClient(logger, i, allowReceive);
clients[i] = new GwSocketConnection(logger, i, allowReceive);
}
if (! createListener()){
listener=-1;
@ -368,7 +139,7 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite)
if (client >= 0)
{
LOG_DEBUG(GwLog::LOG, "new client connected from %s",
GwClient::remoteIP(client).toString().c_str());
GwSocketConnection::remoteIP(client).toString().c_str());
bool canHandle = false;
for (int i = 0; i < maxClients; i++)
{
@ -391,7 +162,7 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite)
//sending
for (int i = 0; i < maxClients; i++)
{
GwClient *client = clients[i];
GwSocketConnection *client = clients[i];
if (!client->hasClient())
continue;
GwBuffer::WriteStatus rt = client->write();
@ -404,7 +175,7 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite)
}
for (int i = 0; i < maxClients; i++)
{
GwClient *client = clients[i];
GwSocketConnection *client = clients[i];
if (!client->hasClient())
continue;
@ -446,7 +217,7 @@ void GwSocketServer::sendToClients(const char *buf, int source)
{
if (i == sourceIndex)
continue; //never send out to the source we received from
GwClient *client = clients[i];
GwSocketConnection *client = clients[i];
if (!client->hasClient())
continue;
if (client->connected())

View File

@ -5,12 +5,12 @@
#include "GwBuffer.h"
#include <memory>
class GwClient;
class GwSocketConnection;
class GwSocketServer{
private:
const GwConfigHandler *config;
GwLog *logger;
GwClient **clients=NULL;
GwSocketConnection **clients=NULL;
int listener=-1;
int listenerPort=-1;
bool allowReceive;