diff --git a/lib/log/GwLog.h b/lib/log/GwLog.h index ed25841..379ea5e 100644 --- a/lib/log/GwLog.h +++ b/lib/log/GwLog.h @@ -16,6 +16,6 @@ class GwLog{ void logDebug(int level, const char *fmt,...); int isActive(int level){return level <= logLevel;}; }; -#define LOG_DEBUG(level,fmt,...){ if (logger->isActive(level)) logger->logDebug(level,fmt,__VA_ARGS__);} +#define LOG_DEBUG(level,...){ if (logger->isActive(level)) logger->logDebug(level,__VA_ARGS__);} #endif \ No newline at end of file diff --git a/lib/queue/GwBuffer.h b/lib/queue/GwBuffer.h new file mode 100644 index 0000000..8e994ca --- /dev/null +++ b/lib/queue/GwBuffer.h @@ -0,0 +1,151 @@ +#ifndef _GWBUFFER_H +#define _GWBUFFER_H +#include +#include +#include "GwLog.h" + +class GwBufferWriter{ + public: + virtual int write(const uint8_t *buffer,size_t len)=0; + virtual ~GwBufferWriter(){}; +}; + +class GwBuffer{ + public: + static const size_t BUFFER_SIZE=1620; // app. 20 NMEA messages + typedef enum { + OK, + ERROR, + AGAIN + } WriteStatus; + private: + uint8_t buffer[BUFFER_SIZE]; + uint8_t *writePointer=buffer; + uint8_t *readPointer=buffer; + size_t offset(uint8_t* ptr){ + return (size_t)(ptr-buffer); + } + GwLog *logger; + void lp(const char *fkt,int p=0){ + LOG_DEBUG(GwLog::DEBUG + 1,"Buffer[%s]: buf=%p,wp=%d,rp=%d,used=%d,free=%d, p=%d", + fkt,buffer,offset(writePointer),offset(readPointer),usedSpace(),freeSpace(),p + ); + } + public: + GwBuffer(GwLog *logger){ + this->logger=logger; + } + void reset(){ + writePointer=buffer; + readPointer=buffer; + lp("reset"); + } + size_t freeSpace(){ + if (readPointer < writePointer){ + size_t rt=BUFFER_SIZE-offset(writePointer)-1+offset(readPointer); + return rt; + } + if (readPointer == writePointer) return BUFFER_SIZE-1; + return readPointer-writePointer-1; + } + size_t usedSpace(){ + if (readPointer == writePointer) return 0; + if (readPointer < writePointer) return writePointer-readPointer; + return BUFFER_SIZE-offset(readPointer)-1+offset(writePointer); + } + size_t addData(const uint8_t *data,size_t len){ + lp("addDataE",len); + if (len == 0) return 0; + if (freeSpace() < len) return 0; + size_t written=0; + if (writePointer >= readPointer){ + written=BUFFER_SIZE-offset(writePointer)-1; + if (written > len) written=len; + if (written) { + memcpy(writePointer,data,written); + len-=written; + data+=written; + writePointer+=written; + if (offset(writePointer) >= (BUFFER_SIZE-1)) writePointer=buffer; + } + lp("addData1",written); + if (len <= 0) { + return written; + } + } + //now we have the write pointer before the read pointer + //and we did the length check before - so we can safely copy + memcpy(writePointer,data,len); + writePointer+=len; + lp("addData2",len); + return len+written; + } + /** + * write some data to the buffer writer + * return an error if the buffer writer returned < 0 + */ + WriteStatus fetchData(GwBufferWriter *writer, bool errorIf0 = true) + { + lp("fetchDataE"); + size_t len = usedSpace(); + if (len == 0) + return OK; + size_t written=0; + size_t plen=len; + if (writePointer < readPointer) + { + //we need to write from readPointer till end and then till writePointer-1 + plen = BUFFER_SIZE - offset(readPointer)-1; + int rt = writer->write(readPointer, plen); + lp("fetchData1",rt); + if (rt < 0){ + LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns error %d",rt); + return ERROR; + } + if (rt > plen){ + LOG_DEBUG(GwLog::DEBUG+1,"buffer: write too many bytes(1) %d",rt); + return ERROR; + } + if (rt == 0){ + LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns 0 (1)"); + return (errorIf0 ? ERROR : AGAIN); + } + readPointer += rt; + if (offset(readPointer) >= (BUFFER_SIZE-1)) + readPointer = buffer; + if (rt < plen) + return AGAIN; + if (plen >= len) + return OK; + len -=rt; + written+=rt; + //next part - readPointer should be at buffer now + } + plen=writePointer - readPointer; + if (plen == 0) return OK; + int rt = writer->write(readPointer, plen); + lp("fetchData2",rt); + if (rt < 0){ + LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns error %d",rt); + return ERROR; + } + if (rt == 0){ + LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns 0 (1)"); + return (errorIf0 ? ERROR : AGAIN); + } + if (rt > plen){ + LOG_DEBUG(GwLog::DEBUG+1,"buffer: write too many bytes(2)"); + return ERROR; + } + readPointer += rt; + if (offset(readPointer) >= (BUFFER_SIZE-1)) + readPointer = buffer; + lp("fetchData3"); + written+=rt; + if (written < len) + return AGAIN; + return OK; + } +}; + +#endif \ No newline at end of file diff --git a/lib/socketserver/GwSocketServer.cpp b/lib/socketserver/GwSocketServer.cpp index 7dacd26..dc59136 100644 --- a/lib/socketserver/GwSocketServer.cpp +++ b/lib/socketserver/GwSocketServer.cpp @@ -1,5 +1,95 @@ #include "GwSocketServer.h" #include +#include +#include "GwBuffer.h" + +class Writer : public GwBufferWriter{ + public: + wiFiClientPtr client; + bool writeError=false; + bool timeOut=false; + unsigned long writeTimeout; + unsigned long lastWrite; + bool pending; + Writer(wiFiClientPtr client, unsigned long writeTimeout=10000){ + this->client=client; + pending=false; + this->writeTimeout=writeTimeout; + } + virtual ~Writer(){} + virtual int write(const uint8_t *buffer,size_t len){ + int res = send(client->fd(), (void*) buffer, len, MSG_DONTWAIT); + if (res < 0){ + if (errno != EAGAIN){ + writeError=true; + return res; + } + res=0; + } + if (res >= len){ + pending=false; + } + else{ + if (!pending){ + lastWrite=millis(); + pending=true; + } + else{ + //we need to check if we have still not been able + //to write until timeout + if (millis() >= (lastWrite+writeTimeout)){ + timeOut=true; + } + } + } + return res; + } +}; +class GwClient{ + public: + wiFiClientPtr client; + GwBuffer *buffer; + GwLog *logger; + int overflows; + private: + Writer *writer; + public: + GwClient(wiFiClientPtr client,GwLog *logger){ + this->client=client; + this->logger=logger; + buffer=new GwBuffer(logger); + overflows=0; + writer=new Writer(client); + } + ~GwClient(){ + delete writer; + } + bool enqueue(uint8_t *data, size_t len){ + if (len == 0) return true; + size_t rt=buffer->addData(data,len); + if (rt < len){ + LOG_DEBUG(GwLog::LOG,"overflow on %s",client->remoteIP().toString().c_str()); + overflows++; + return false; + } + return true; + } + bool hasData(){ + return buffer->usedSpace() > 0; + } + GwBuffer::WriteStatus write(){ + GwBuffer::WriteStatus rt=buffer->fetchData(writer,false); + if (rt != GwBuffer::OK){ + LOG_DEBUG(GwLog::DEBUG+1,"write returns %d on %s",rt,client->remoteIP().toString().c_str()); + } + if (writer->timeOut ){ + LOG_DEBUG(GwLog::LOG,"timeout on %s",client->remoteIP().toString().c_str()); + return GwBuffer::ERROR; + } + return rt; + } +}; + GwSocketServer::GwSocketServer(const GwConfigHandler *config,GwLog *logger){ this->config=config; @@ -20,23 +110,33 @@ void GwSocketServer::loop() if (client){ logger->logString("new client connected from %s", client.remoteIP().toString().c_str()); - clients.push_back(wiFiClientPtr(new WiFiClient(client))); + fcntl(client.fd(), F_SETFL, O_NONBLOCK); + gwClientPtr newClient(new GwClient(wiFiClientPtr(new WiFiClient(client)),logger)); + clients.push_back(newClient); + } + //sending + for (auto it = clients.begin(); it != clients.end();it++){ + GwBuffer::WriteStatus rt=(*it)->write(); + if (rt == GwBuffer::ERROR){ + LOG_DEBUG(GwLog::ERROR,"write error on %s, closing",(*it)->client->remoteIP().toString().c_str()); + (*it)->client->stop(); + } } for (auto it = clients.begin(); it != clients.end();it++) { if ((*it) != NULL) { - if (!(*it)->connected()) + if (!(*it)->client->connected()) { - logger->logString("client disconnect "); - (*it)->stop(); + logger->logString("client disconnect"); + (*it)->client->stop(); clients.erase(it); } else { - while ((*it)->available()) + while ((*it)->client->available()) { - char c = (*it)->read(); + char c = (*it)->client->read(); //TODO: read data } } @@ -48,9 +148,20 @@ void GwSocketServer::loop() } } void GwSocketServer::sendToClients(const char *buf){ + int len=strlen(buf); + char buffer[len+2]; + memcpy(buffer,buf,len); + buffer[len]=0x0d; + len++; + buffer[len]=0x0a; + len++; for (auto it = clients.begin() ; it != clients.end(); it++) { - if ( (*it) != NULL && (*it)->connected() ) { - (*it)->println(buf); + if ( (*it) != NULL && (*it)->client->connected() ) { + bool rt=(*it)->enqueue((uint8_t*)buffer,len); + if (!rt){ + LOG_DEBUG(GwLog::DEBUG,"overflow in send to %s",(*it)->client->remoteIP().toString().c_str()); + } + } } } diff --git a/lib/socketserver/GwSocketServer.h b/lib/socketserver/GwSocketServer.h index 9ad946b..2b758f2 100644 --- a/lib/socketserver/GwSocketServer.h +++ b/lib/socketserver/GwSocketServer.h @@ -7,11 +7,13 @@ #include using wiFiClientPtr = std::shared_ptr; +class GwClient; +using gwClientPtr = std::shared_ptr; class GwSocketServer{ private: const GwConfigHandler *config; GwLog *logger; - std::list clients; + std::list clients; WiFiServer *server=NULL; public: GwSocketServer(const GwConfigHandler *config,GwLog *logger);