From cd1fefad5211f2fff4555fe8e6703fbcb22d5319 Mon Sep 17 00:00:00 2001 From: wellenvogel Date: Thu, 2 Dec 2021 17:10:50 +0100 Subject: [PATCH] restructure buffer handling, better buffer logging --- lib/queue/GwBuffer.cpp | 104 +++++------------- lib/queue/GwBuffer.h | 16 +-- lib/serial/GwSerial.cpp | 20 ++-- lib/socketserver/GwSocketServer.cpp | 158 +++++++++++++--------------- src/main.cpp | 2 +- 5 files changed, 115 insertions(+), 185 deletions(-) diff --git a/lib/queue/GwBuffer.cpp b/lib/queue/GwBuffer.cpp index bfbc91b..8e12934 100644 --- a/lib/queue/GwBuffer.cpp +++ b/lib/queue/GwBuffer.cpp @@ -2,25 +2,26 @@ void GwBuffer::lp(const char *fkt, int p) { - LOG_DEBUG(GwLog::DEBUG+2 , "Buffer[%s]: buf=%p,wp=%d,rp=%d,used=%d,free=%d, p=%d", - fkt, buffer, offset(writePointer), offset(readPointer), usedSpace(), freeSpace(), p); + LOG_DEBUG(GwLog::DEBUG+2 , "Buffer[%s:%s]: buf=%p,wp=%d,rp=%d,used=%d,free=%d, p=%d", + name.c_str(),fkt, buffer, offset(writePointer), offset(readPointer), usedSpace(), freeSpace(), p); } -GwBuffer::GwBuffer(GwLog *logger,size_t bufferSize) +GwBuffer::GwBuffer(GwLog *logger,size_t bufferSize,String name) { - LOG_DEBUG(GwLog::DEBUG,"creating new buffer %p of size %d",this,(int)bufferSize); + LOG_DEBUG(GwLog::DEBUG,"creating new buffer %p=%s of size %d",this,name.c_str(),(int)bufferSize); this->logger = logger; this->bufferSize=bufferSize; this->buffer=new uint8_t[bufferSize]; writePointer = buffer; readPointer = buffer; + this->name=name; } GwBuffer::~GwBuffer(){ delete buffer; } void GwBuffer::reset(String reason) { - LOG_DEBUG(GwLog::LOG,"reseting buffer %p, reason %s",this,reason.c_str()); + LOG_DEBUG(GwLog::LOG,"reseting buffer %s, reason %s",this->name.c_str(),reason.c_str()); writePointer = buffer; readPointer = buffer; lp("reset"); @@ -44,7 +45,7 @@ int GwBuffer::read(){ readPointer++; if (offset(readPointer) >= bufferSize) readPointer -= bufferSize; - lp("read"); + lp("read",rt); return rt; } int GwBuffer::peek(){ @@ -56,8 +57,10 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len, bool addPartial) lp("addDataE", len); if (len == 0) return 0; - if (freeSpace() < len && !addPartial) + if (freeSpace() < len && !addPartial){ + lp("addDataR0",0); return 0; + } size_t written = 0; for (int i=0;i<2;i++){ size_t currentFree=freeSpace(); @@ -77,85 +80,41 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len, bool addPartial) } lp("addData1", toWrite); } - lp("addData2", written); + lp("addDataR", written); return written; } -/** - * write some data to the buffer writer - * return an error if the buffer writer returned < 0 - */ -GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, int maxLen,bool errorIf0 ) -{ - lp("fetchDataE",maxLen); - size_t len = usedSpace(); - if (maxLen > 0 && len > maxLen) len=maxLen; - if (len == 0){ - lp("fetchData0",maxLen); - writer->done(); - return OK; - } - size_t written = 0; - for (int i=0;i<2;i++){ - size_t currentUsed=usedSpace(); - size_t toWrite=len-written; - if (toWrite > currentUsed) toWrite=currentUsed; - if (toWrite > (bufferSize - offset(readPointer))) { - toWrite=bufferSize - offset(readPointer); - } - lp("fetchData1", toWrite); - if (toWrite > 0) - { - int rt = writer->write(readPointer, toWrite); - lp("fetchData2", rt); - if (rt < 0) - { - LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write returns error %d", rt); - writer->done(); - return ERROR; - } - if (rt > toWrite) - { - LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write too many bytes(1) %d", rt); - writer->done(); - return ERROR; - } - readPointer += rt; - if (offset(readPointer) >= bufferSize) - readPointer -= bufferSize; - written += rt; - if (rt == 0) break; //no need to try again - } - } - writer->done(); - if (written == 0){ - return (errorIf0 ? ERROR : AGAIN); - } - return (written == len)?OK:AGAIN; -} + size_t GwBuffer::fetchData(int maxLen, GwBufferHandleFunction handler, void *param){ - if (usedSpace() < 1) return 0; + lp("fetchE",maxLen); + if (usedSpace() < 1) { + lp("fetchR0",0); + return 0; + } size_t len=0; if (writePointer > readPointer){ len=writePointer-readPointer; } else{ - len=bufferSize-offset(readPointer)-1; + len=bufferSize-offset(readPointer); } if (maxLen >= 0 && maxLen < len) len=maxLen; size_t handled=handler(readPointer,len,param); if (handled > len) handled=len; readPointer+=handled; if (offset(readPointer) >= bufferSize ) readPointer-=bufferSize; + lp("fetchR",handled); return handled; } size_t GwBuffer::fillData(int maxLen, GwBufferHandleFunction handler, void *param) { + lp("fillDataE",maxLen); if (freeSpace() < 1) return 0; size_t len = 0; - if (writePointer > readPointer) + if (writePointer >= readPointer) { - len = bufferSize - offset(writePointer) - 1; + len = bufferSize - offset(writePointer); + if (len > 0 && offset(readPointer) == 0) len--; } else { @@ -163,12 +122,14 @@ size_t GwBuffer::fillData(int maxLen, GwBufferHandleFunction handler, void *para } if (maxLen >= 0 && maxLen < len) len = maxLen; + if (len == 0) return 0; size_t handled = handler(writePointer, len,param); if (handled > len) handled = len; writePointer += handled; if (offset(writePointer) >= bufferSize) writePointer -= bufferSize; + lp("fillDataR",handled); return handled; } @@ -184,22 +145,10 @@ int GwBuffer::findChar(char x){ } of++; } - lp("findChar2"); + lp("findChar2",-1); return -1; } -GwBuffer::WriteStatus GwBuffer::fetchMessage(GwBufferWriter *writer,char delimiter,bool emptyIfFull){ - int pos=findChar(delimiter); - if (pos < 0) { - if (!freeSpace() && emptyIfFull){ - LOG_DEBUG(GwLog::LOG,"line to long, reset, buffer=%p",buffer); - reset(); - return ERROR; - } - return AGAIN; - } - return fetchData(writer,pos+1,true); -} size_t GwMessageFetcher::fetchMessageToBuffer(GwBuffer *gwbuffer,uint8_t *buffer, size_t bufferLen,char delimiter){ int offset=gwbuffer->findChar(delimiter); @@ -228,5 +177,6 @@ size_t GwMessageFetcher::fetchMessageToBuffer(GwBuffer *gwbuffer,uint8_t *buffer } fetched+=rd; } + buffer[fetched]=0; return fetched; } \ No newline at end of file diff --git a/lib/queue/GwBuffer.h b/lib/queue/GwBuffer.h index d8d51f8..22022c9 100644 --- a/lib/queue/GwBuffer.h +++ b/lib/queue/GwBuffer.h @@ -5,13 +5,6 @@ #include "GwLog.h" class GwBuffer; -class GwBufferWriter{ - public: - int id=0; //can be set be users - virtual int write(const uint8_t *buffer,size_t len)=0; - virtual void done(){} - virtual ~GwBufferWriter(){}; -}; class GwMessageFetcher{ public: @@ -44,9 +37,10 @@ class GwBuffer{ return (size_t)(ptr-buffer); } GwLog *logger; + String name; void lp(const char *fkt,int p=0); public: - GwBuffer(GwLog *logger,size_t bufferSize); + GwBuffer(GwLog *logger,size_t bufferSize,String name); ~GwBuffer(); void reset(String reason=""); size_t freeSpace(); @@ -56,16 +50,10 @@ class GwBuffer{ int read(); int peek(); size_t fetchData(int maxLen,GwBufferHandleFunction handler, void *param); - /** - * write some data to the buffer writer - * return an error if the buffer writer returned < 0 - */ - WriteStatus fetchData(GwBufferWriter *writer, int maxLen=-1,bool errorIf0 = true); /** * find the first occurance of x in the buffer, -1 if not found */ int findChar(char x); - WriteStatus fetchMessage(GwBufferWriter *writer,char delimiter,bool emptyIfFull=true); }; #endif \ No newline at end of file diff --git a/lib/serial/GwSerial.cpp b/lib/serial/GwSerial.cpp index 474148e..1539bb3 100644 --- a/lib/serial/GwSerial.cpp +++ b/lib/serial/GwSerial.cpp @@ -42,17 +42,19 @@ class GwSerialStream: public Stream{ GwSerial::GwSerial(GwLog *logger, int num, int id,bool allowRead) { - LOG_DEBUG(GwLog::DEBUG,"creating GwSerial %p port %d",this,(int)num); + LOG_DEBUG(GwLog::DEBUG,"creating GwSerial %p port %d for %d",this,(int)num,id); this->id=id; this->logger = logger; this->num = num; - this->buffer = new GwBuffer(logger,GwBuffer::TX_BUFFER_SIZE); + String bufName="Ser("; + bufName+=String(id); + bufName+=")"; + this->buffer = new GwBuffer(logger,GwBuffer::TX_BUFFER_SIZE,bufName+"wr"); this->allowRead=allowRead; if (allowRead){ - this->readBuffer=new GwBuffer(logger, GwBuffer::RX_BUFFER_SIZE); + this->readBuffer=new GwBuffer(logger, GwBuffer::RX_BUFFER_SIZE,bufName+"rd"); } this->serial=new HardwareSerial(num); - } GwSerial::~GwSerial() { @@ -63,7 +65,7 @@ GwSerial::~GwSerial() int GwSerial::setup(int baud, int rxpin, int txpin) { serial->begin(baud,SERIAL_8N1,rxpin,txpin); - buffer->reset(); + buffer->reset(F("init")); initialized = true; return 0; } @@ -79,7 +81,9 @@ GwBuffer::WriteStatus GwSerial::write(){ size_t rt=buffer->fetchData(numWrite,[](uint8_t *buffer,size_t len, void *p){ return ((GwSerial *)p)->serial->write(buffer,len); },this); - LOG_DEBUG(GwLog::DEBUG+1,"Serial %p write %d",this,rt); + if (rt != 0){ + LOG_DEBUG(GwLog::DEBUG+1,"Serial %d write %d",id,rt); + } return buffer->usedSpace()?GwBuffer::AGAIN:GwBuffer::OK; } size_t GwSerial::sendToClients(const char *buf,int sourceId,bool partial){ @@ -102,7 +106,9 @@ void GwSerial::loop(bool handleRead){ size_t rd=readBuffer->fillData(available,[](uint8_t *buffer, size_t len, void *p)->size_t{ return ((GwSerial *)p)->serial->readBytes(buffer,len); },this); - this->logger->logDebug(GwLog::DEBUG+2,"GwSerial read %d bytes",rd); + if (rd != 0){ + LOG_DEBUG(GwLog::DEBUG+2,"GwSerial %d read %d bytes",id,rd); + } } else{ uint8_t buffer[10]; diff --git a/lib/socketserver/GwSocketServer.cpp b/lib/socketserver/GwSocketServer.cpp index c8ca51f..957113c 100644 --- a/lib/socketserver/GwSocketServer.cpp +++ b/lib/socketserver/GwSocketServer.cpp @@ -3,88 +3,46 @@ #include #include "GwBuffer.h" -class Writer : public GwBufferWriter{ - public: - wiFiClientPtr client; - bool writeError=false; - bool timeOut=false; - unsigned long writeTimeout; - unsigned long lastWrite; - bool pending; - Writer(wiFiClientPtr client, unsigned long writeTimeout=10000){ - this->client=client; - pending=false; - this->writeTimeout=writeTimeout; - } - virtual ~Writer(){} - virtual int write(const uint8_t *buffer,size_t len){ - int res = send(client->fd(), (void*) buffer, len, MSG_DONTWAIT); - if (res < 0){ - if (errno != EAGAIN){ - writeError=true; - return res; - } - res=0; - } - if (res >= len){ - pending=false; - } - else{ - if (!pending){ - lastWrite=millis(); - pending=true; - } - else{ - //we need to check if we have still not been able - //to write until timeout - if (millis() >= (lastWrite+writeTimeout)){ - timeOut=true; - } - } - } - return res; - } -}; class GwClient{ public: wiFiClientPtr client; int overflows; String remoteIp; private: - Writer *writer=NULL; + unsigned long lastWrite=0; + unsigned long writeTimeout=10000; + bool pendingWrite=false; + bool writeError=false; bool allowRead; GwBuffer *buffer=NULL; GwBuffer *readBuffer=NULL; GwLog *logger; public: - GwClient(wiFiClientPtr client,GwLog *logger, bool allowRead=false){ + GwClient(wiFiClientPtr client,GwLog *logger,int id, bool allowRead=false){ this->client=client; this->logger=logger; this->allowRead=allowRead; - buffer=new GwBuffer(logger,GwBuffer::TX_BUFFER_SIZE); + String bufName="Sock("; + bufName+=String(id); + bufName+=")"; + buffer=new GwBuffer(logger,GwBuffer::TX_BUFFER_SIZE,bufName+"wr"); if (allowRead){ - readBuffer=new GwBuffer(logger,GwBuffer::RX_BUFFER_SIZE); + readBuffer=new GwBuffer(logger,GwBuffer::RX_BUFFER_SIZE,bufName+"rd"); } overflows=0; if (client != NULL){ - writer=new Writer(client); - LOG_DEBUG(GwLog::DEBUG,"creating SocketWriter %p",writer); remoteIp=client->remoteIP().toString(); } } void setClient(wiFiClientPtr client){ this->client=client; - buffer->reset(); - if (readBuffer) readBuffer->reset(); + buffer->reset("new client"); + if (readBuffer) readBuffer->reset("new client"); overflows=0; - if (writer) { - LOG_DEBUG(GwLog::DEBUG,"deleting SocketWriter %p",writer); - delete writer; - } - writer=NULL; + pendingWrite=false; + writeError=false; + lastWrite=0; if (client){ - writer=new Writer(client); - LOG_DEBUG(GwLog::DEBUG,"creating SocketWriter %p",writer); remoteIp=client->remoteIP().toString(); } else{ @@ -95,7 +53,6 @@ class GwClient{ return client != NULL; } ~GwClient(){ - delete writer; delete buffer; if (readBuffer) delete readBuffer; } @@ -112,29 +69,8 @@ class GwClient{ bool hasData(){ return buffer->usedSpace() > 0; } - GwBuffer::WriteStatus write(){ - if (! writer) { - LOG_DEBUG(GwLog::LOG,"write called on empty client"); - return GwBuffer::ERROR; - } - GwBuffer::WriteStatus rt=buffer->fetchData(writer,-1,false); - if (rt != GwBuffer::OK){ - LOG_DEBUG(GwLog::DEBUG+1,"write returns %d on %s",rt,remoteIp.c_str()); - } - if (writer->timeOut ){ - LOG_DEBUG(GwLog::LOG,"timeout on %s",remoteIp.c_str()); - return GwBuffer::ERROR; - } - return rt; - } - bool read(){ - size_t maxLen=allowRead?readBuffer->freeSpace():100; - if (!maxLen) { - return false; - } - char buffer[maxLen]; - int res = recv(client->fd(), (void*) buffer, maxLen, MSG_DONTWAIT); - if (res == 0){ + bool handleError(int res,bool errorIf0=true){ + if (res == 0 && errorIf0){ LOG_DEBUG(GwLog::LOG,"client shutdown (recv 0) on %s",remoteIp.c_str()); client->stop(); return false; @@ -147,11 +83,61 @@ class GwClient{ } return false; } - if (! allowRead) return true; - size_t stored=readBuffer->addData((uint8_t*)buffer,res); - if (stored != res){ - LOG_DEBUG(GwLog::LOG,"internal read error buffer overflow (w=%d,c=%d) on %s",res,(int)stored,remoteIp.c_str()); + return true; + } + GwBuffer::WriteStatus write(){ + if (! hasClient()) { + LOG_DEBUG(GwLog::LOG,"write called on empty client"); + return GwBuffer::ERROR; } + if (! buffer->usedSpace()){ + pendingWrite=false; + return GwBuffer::OK; + } + buffer->fetchData(-1,[](uint8_t *buffer, size_t len, void *param)->size_t{ + GwClient *c=(GwClient*)param; + int res = send(c->client->fd(), (void*) buffer, len, MSG_DONTWAIT); + if (! c->handleError(res,false)) return 0; + if (res >= len){ + c->pendingWrite=false; + } + else{ + if (!c->pendingWrite){ + c->lastWrite=millis(); + c->pendingWrite=true; + } + else{ + //we need to check if we have still not been able + //to write until timeout + if (millis() >= (c->lastWrite+c->writeTimeout)){ + c->logger->logDebug(GwLog::ERROR,"Write timeout on channel %s",c->remoteIp.c_str()); + c->writeError=true; + } + } + } + return res; + },this); + if (writeError){ + LOG_DEBUG(GwLog::DEBUG+1,"write error on %s",remoteIp.c_str()); + return GwBuffer::ERROR; + } + + return GwBuffer::OK; + } + + bool read(){ + if (! allowRead){ + size_t maxLen=100; + char buffer[maxLen]; + int res = recv(client->fd(), (void*) buffer, maxLen, MSG_DONTWAIT); + return handleError(res); + } + readBuffer->fillData(-1,[](uint8_t *buffer, size_t len, void *param)->size_t{ + GwClient *c=(GwClient*)param; + int res = recv(c->client->fd(), (void*) buffer, len, MSG_DONTWAIT); + if (! c->handleError(res)) return 0; + return res; + },this); return true; } bool messagesFromBuffer(GwMessageFetcher *writer){ @@ -171,7 +157,7 @@ GwSocketServer::GwSocketServer(const GwConfigHandler *config,GwLog *logger,int m void GwSocketServer::begin(){ clients=new gwClientPtr[maxClients]; for (int i=0;igetInt(config->serverPort),maxClients); server->begin(); diff --git a/src/main.cpp b/src/main.cpp index c46b88f..95acd32 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -28,7 +28,7 @@ #endif // #define GW_MESSAGE_DEBUG_ENABLED -// #define FALLBACK_SERIAL +//#define FALLBACK_SERIAL const unsigned long HEAP_REPORT_TIME=2000; //set to 0 to disable heap reporting #include #include "GwHardware.h"