From c893025cd35a32690b46cffc314e6b3a8b894891 Mon Sep 17 00:00:00 2001 From: andreas Date: Mon, 25 Oct 2021 18:31:34 +0200 Subject: [PATCH] use async web server --- lib/queue/GwBuffer.cpp | 145 ++++++++++++++++++++++++ lib/queue/GwBuffer.h | 127 ++------------------- lib/queue/GwMessage.h | 111 +++++++++++++++++++ platformio.ini | 9 +- src/main.cpp | 246 ++++++++++++++++++++++++++++------------- 5 files changed, 440 insertions(+), 198 deletions(-) create mode 100644 lib/queue/GwBuffer.cpp create mode 100644 lib/queue/GwMessage.h diff --git a/lib/queue/GwBuffer.cpp b/lib/queue/GwBuffer.cpp new file mode 100644 index 0000000..ae1bc8b --- /dev/null +++ b/lib/queue/GwBuffer.cpp @@ -0,0 +1,145 @@ +#include "GwBuffer.h" + +void GwBuffer::lp(const char *fkt, int p) +{ + LOG_DEBUG(GwLog::DEBUG + 1, "Buffer[%s]: buf=%p,wp=%d,rp=%d,used=%d,free=%d, p=%d", + fkt, buffer, offset(writePointer), offset(readPointer), usedSpace(), freeSpace(), p); +} + +GwBuffer::GwBuffer(GwLog *logger) +{ + this->logger = logger; +} +void GwBuffer::reset() +{ + writePointer = buffer; + readPointer = buffer; + lp("reset"); +} +size_t GwBuffer::freeSpace() +{ + if (readPointer < writePointer) + { + size_t rt = BUFFER_SIZE - offset(writePointer) - 1 + offset(readPointer); + return rt; + } + if (readPointer == writePointer) + return BUFFER_SIZE - 1; + return readPointer - writePointer - 1; +} +size_t GwBuffer::usedSpace() +{ + if (readPointer == writePointer) + return 0; + if (readPointer < writePointer) + return writePointer - readPointer; + return BUFFER_SIZE - offset(readPointer) - 1 + offset(writePointer); +} +size_t GwBuffer::addData(const uint8_t *data, size_t len) +{ + lp("addDataE", len); + if (len == 0) + return 0; + if (freeSpace() < len) + return 0; + size_t written = 0; + if (writePointer >= readPointer) + { + written = BUFFER_SIZE - offset(writePointer) - 1; + if (written > len) + written = len; + if (written) + { + memcpy(writePointer, data, written); + len -= written; + data += written; + writePointer += written; + if (offset(writePointer) >= (BUFFER_SIZE - 1)) + writePointer = buffer; + } + lp("addData1", written); + if (len <= 0) + { + return written; + } + } + //now we have the write pointer before the read pointer + //and we did the length check before - so we can safely copy + memcpy(writePointer, data, len); + writePointer += len; + lp("addData2", len); + return len + written; +} +/** + * write some data to the buffer writer + * return an error if the buffer writer returned < 0 + */ +GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, bool errorIf0 ) +{ + lp("fetchDataE"); + size_t len = usedSpace(); + if (len == 0) + return OK; + size_t written = 0; + size_t plen = len; + if (writePointer < readPointer) + { + //we need to write from readPointer till end and then till writePointer-1 + plen = BUFFER_SIZE - offset(readPointer) - 1; + int rt = writer->write(readPointer, plen); + lp("fetchData1", rt); + if (rt < 0) + { + LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write returns error %d", rt); + return ERROR; + } + if (rt > plen) + { + LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write too many bytes(1) %d", rt); + return ERROR; + } + if (rt == 0) + { + LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write returns 0 (1)"); + return (errorIf0 ? ERROR : AGAIN); + } + readPointer += rt; + if (offset(readPointer) >= (BUFFER_SIZE - 1)) + readPointer = buffer; + if (rt < plen) + return AGAIN; + if (plen >= len) + return OK; + len -= rt; + written += rt; + //next part - readPointer should be at buffer now + } + plen = writePointer - readPointer; + if (plen == 0) + return OK; + int rt = writer->write(readPointer, plen); + lp("fetchData2", rt); + if (rt < 0) + { + LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write returns error %d", rt); + return ERROR; + } + if (rt == 0) + { + LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write returns 0 (1)"); + return (errorIf0 ? ERROR : AGAIN); + } + if (rt > plen) + { + LOG_DEBUG(GwLog::DEBUG + 1, "buffer: write too many bytes(2)"); + return ERROR; + } + readPointer += rt; + if (offset(readPointer) >= (BUFFER_SIZE - 1)) + readPointer = buffer; + lp("fetchData3"); + written += rt; + if (written < len) + return AGAIN; + return OK; +} \ No newline at end of file diff --git a/lib/queue/GwBuffer.h b/lib/queue/GwBuffer.h index 8e994ca..68b8909 100644 --- a/lib/queue/GwBuffer.h +++ b/lib/queue/GwBuffer.h @@ -10,6 +10,11 @@ class GwBufferWriter{ virtual ~GwBufferWriter(){}; }; +/** + * an implementation of a + * buffer to safely inserte data if it fits + * and to write out data if possible + */ class GwBuffer{ public: static const size_t BUFFER_SIZE=1620; // app. 20 NMEA messages @@ -26,126 +31,18 @@ class GwBuffer{ return (size_t)(ptr-buffer); } GwLog *logger; - void lp(const char *fkt,int p=0){ - LOG_DEBUG(GwLog::DEBUG + 1,"Buffer[%s]: buf=%p,wp=%d,rp=%d,used=%d,free=%d, p=%d", - fkt,buffer,offset(writePointer),offset(readPointer),usedSpace(),freeSpace(),p - ); - } + void lp(const char *fkt,int p=0); public: - GwBuffer(GwLog *logger){ - this->logger=logger; - } - void reset(){ - writePointer=buffer; - readPointer=buffer; - lp("reset"); - } - size_t freeSpace(){ - if (readPointer < writePointer){ - size_t rt=BUFFER_SIZE-offset(writePointer)-1+offset(readPointer); - return rt; - } - if (readPointer == writePointer) return BUFFER_SIZE-1; - return readPointer-writePointer-1; - } - size_t usedSpace(){ - if (readPointer == writePointer) return 0; - if (readPointer < writePointer) return writePointer-readPointer; - return BUFFER_SIZE-offset(readPointer)-1+offset(writePointer); - } - size_t addData(const uint8_t *data,size_t len){ - lp("addDataE",len); - if (len == 0) return 0; - if (freeSpace() < len) return 0; - size_t written=0; - if (writePointer >= readPointer){ - written=BUFFER_SIZE-offset(writePointer)-1; - if (written > len) written=len; - if (written) { - memcpy(writePointer,data,written); - len-=written; - data+=written; - writePointer+=written; - if (offset(writePointer) >= (BUFFER_SIZE-1)) writePointer=buffer; - } - lp("addData1",written); - if (len <= 0) { - return written; - } - } - //now we have the write pointer before the read pointer - //and we did the length check before - so we can safely copy - memcpy(writePointer,data,len); - writePointer+=len; - lp("addData2",len); - return len+written; - } + GwBuffer(GwLog *logger); + void reset(); + size_t freeSpace(); + size_t usedSpace(); + size_t addData(const uint8_t *data,size_t len); /** * write some data to the buffer writer * return an error if the buffer writer returned < 0 */ - WriteStatus fetchData(GwBufferWriter *writer, bool errorIf0 = true) - { - lp("fetchDataE"); - size_t len = usedSpace(); - if (len == 0) - return OK; - size_t written=0; - size_t plen=len; - if (writePointer < readPointer) - { - //we need to write from readPointer till end and then till writePointer-1 - plen = BUFFER_SIZE - offset(readPointer)-1; - int rt = writer->write(readPointer, plen); - lp("fetchData1",rt); - if (rt < 0){ - LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns error %d",rt); - return ERROR; - } - if (rt > plen){ - LOG_DEBUG(GwLog::DEBUG+1,"buffer: write too many bytes(1) %d",rt); - return ERROR; - } - if (rt == 0){ - LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns 0 (1)"); - return (errorIf0 ? ERROR : AGAIN); - } - readPointer += rt; - if (offset(readPointer) >= (BUFFER_SIZE-1)) - readPointer = buffer; - if (rt < plen) - return AGAIN; - if (plen >= len) - return OK; - len -=rt; - written+=rt; - //next part - readPointer should be at buffer now - } - plen=writePointer - readPointer; - if (plen == 0) return OK; - int rt = writer->write(readPointer, plen); - lp("fetchData2",rt); - if (rt < 0){ - LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns error %d",rt); - return ERROR; - } - if (rt == 0){ - LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns 0 (1)"); - return (errorIf0 ? ERROR : AGAIN); - } - if (rt > plen){ - LOG_DEBUG(GwLog::DEBUG+1,"buffer: write too many bytes(2)"); - return ERROR; - } - readPointer += rt; - if (offset(readPointer) >= (BUFFER_SIZE-1)) - readPointer = buffer; - lp("fetchData3"); - written+=rt; - if (written < len) - return AGAIN; - return OK; - } + WriteStatus fetchData(GwBufferWriter *writer, bool errorIf0 = true); }; #endif \ No newline at end of file diff --git a/lib/queue/GwMessage.h b/lib/queue/GwMessage.h new file mode 100644 index 0000000..3a6963e --- /dev/null +++ b/lib/queue/GwMessage.h @@ -0,0 +1,111 @@ +#ifndef _GWMESSAGE_H +#define _GWMESSAGE_H +#include +#include +#include "esp_task_wdt.h" + +#ifdef GW_MESSAGE_DEBUG_ENABLED +#define GW_MESSAGE_DEBUG(...) Serial.printf(__VA_ARGS__); +#else +#define GW_MESSAGE_DEBUG(...) +#endif + +/** + * a message class intended to be send from one task to another + */ +class Message{ + private: + SemaphoreHandle_t locker; + SemaphoreHandle_t notifier; + int refcount=0; + protected: + virtual void processImpl()=0; + public: + Message(){ + locker=xSemaphoreCreateMutex(); + notifier=xSemaphoreCreateCounting(1,0); + refcount=1; + GW_MESSAGE_DEBUG("Message: %p\n",this); + } + virtual ~Message(){ + GW_MESSAGE_DEBUG("~Message %p\n",this); + } + void unref(){ + GW_MESSAGE_DEBUG("Message::unref %p\n",this); + bool mustDelete=false; + xSemaphoreTake(locker,portMAX_DELAY); + if (refcount > 0){ + refcount--; + if (refcount == 0) mustDelete=true; + } + xSemaphoreGive(locker); + if (mustDelete){ + delete this; + } + } + void ref(){ + xSemaphoreTake(locker,portMAX_DELAY); + refcount++; + xSemaphoreGive(locker); + } + int wait(int maxWaitMillis){ + static const int maxWait=1000; + int maxRetries=maxWaitMillis/maxWait; + int lastWait=maxWaitMillis - maxWait*maxRetries; + for (int retries=maxRetries;retries>0;retries--){ + if (xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait))) return true; + esp_task_wdt_reset(); + } + if (lastWait){ + return xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait)); + } + return false; + } + void process(){ + GW_MESSAGE_DEBUG("Message::process %p\n",this); + processImpl(); + xSemaphoreGive(notifier); + } +}; + +/** + * a class to executa an async web request that returns a string + */ +class RequestMessage : public Message{ + protected: + String result; + private: + int len=0; + int consumed=0; + bool handled=false; + protected: + virtual void processRequest()=0; + virtual void processImpl(){ + GW_MESSAGE_DEBUG("RequestMessage processImpl(1)"); + processRequest(); + GW_MESSAGE_DEBUG("RequestMessage processImpl(2)"); + len=strlen(result.c_str()); + consumed=0; + handled=true; + } + public: + RequestMessage():Message(){ + } + virtual ~RequestMessage(){ + } + String getResult(){return result;} + int getLen(){return len;} + int consume(uint8_t *destination,int maxLen){ + if (!handled) return RESPONSE_TRY_AGAIN; + if (consumed >= len) return 0; + int cplen=maxLen; + if (cplen > (len-consumed)) cplen=len-consumed; + memcpy(destination,result.c_str()+consumed,cplen); + consumed+=cplen; + return cplen; + } + bool isHandled(){return handled;} + +}; + +#endif \ No newline at end of file diff --git a/platformio.ini b/platformio.ini index 3e5f840..b53b165 100644 --- a/platformio.ini +++ b/platformio.ini @@ -16,13 +16,14 @@ lib_deps = ttlappalainen/NMEA2000_esp32 @ ^1.0.3 ttlappalainen/NMEA0183 @ ^1.7.1 bblanchon/ArduinoJson@^6.18.5 -board_build.embed_files= +board_build.embed_files = web/index.html.gz -extra_scripts= extra_script.py +extra_scripts = extra_script.py [env:m5stack-atom] board = m5stack-atom lib_deps = ${env.lib_deps} -build_flags= -D BOARD_M5ATOM -upload_port=/dev/esp32 \ No newline at end of file + ottowinter/ESPAsyncWebServer-esphome@^2.0.1 +build_flags = -D BOARD_M5ATOM +upload_port = /dev/esp32 diff --git a/src/main.cpp b/src/main.cpp index 778d1cd..ad27fd6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -12,7 +12,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#define VERSION "0.0.7" +#define VERSION "0.1.0" #include "GwHardware.h" #define LOG_SERIAL true @@ -22,10 +22,11 @@ #include #include #include -#include +#include #include #include #include +#include #include "N2kDataToNMEA0183.h" @@ -35,7 +36,9 @@ #include "GWWifi.h" #include "GwSocketServer.h" #include "GwBoatData.h" +#include "GwMessage.h" +typedef std::map StringMap; GwLog logger(LOG_SERIAL,GwLog::DEBUG); @@ -66,8 +69,7 @@ const unsigned long TransmitMessages[] PROGMEM = {127489L, // Engine dynamic void HandleNMEA2000Msg(const tN2kMsg &N2kMsg); void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg); - -WebServer webserver(80); +AsyncWebServer webserver(80); // Serial port 2 config (GPIO 16) const int baudrate = 38400; @@ -81,7 +83,50 @@ char buff[MAX_NMEA0183_MESSAGE_SIZE]; tNMEA0183 NMEA0183; - +QueueHandle_t queue=xQueueCreate(10,sizeof(Message *)); +void handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg, String contentType) +{ + msg->ref(); //for the queue + if (!xQueueSend(queue, &msg, 0)) + { + Serial.println("unable to enqueue"); + msg->unref(); //queue + msg->unref(); //our + request->send(500, "text/plain", "queue full"); + return; + } + logger.logDebug(GwLog::DEBUG + 1, "wait queue"); + if (msg->wait(500)) + { + logger.logDebug(GwLog::DEBUG + 1, "request ok"); + request->send(200, contentType, msg->getResult()); + msg->unref(); + return; + } + logger.logDebug(GwLog::DEBUG + 1, "switching to async"); + //msg is handed over to async handling + bool finished = false; + AsyncWebServerResponse *r = request->beginChunkedResponse( + contentType, [msg, finished](uint8_t *ptr, size_t len, size_t len2) -> size_t + { + logger.logDebug(GwLog::DEBUG + 1, "try read"); + if (msg->isHandled() || msg->wait(1)) + { + int rt = msg->consume(ptr, len); + logger.logDebug(GwLog::DEBUG + 1, "async response available, return %d\n", rt); + return rt; + } + else + return RESPONSE_TRY_AGAIN; + }, + NULL); + request->onDisconnect([msg](void) + { + logger.logDebug(GwLog::DEBUG + 1, "onDisconnect"); + msg->unref(); + }); + request->send(r); +} #define JSON_OK "{\"status\":\"OK\"}" //embedded files @@ -89,20 +134,8 @@ extern const uint8_t indexFile[] asm("_binary_web_index_html_gz_start"); extern const uint8_t indexFileEnd[] asm("_binary_web_index_html_gz_end"); extern const uint8_t indexFileLen[] asm("_binary_web_index_html_gz_size"); -void web_index() // Wenn "http:///" aufgerufen wurde -{ - webserver.sendHeader(F("Content-Encoding"), F("gzip")); - webserver.send_P(200, "text/html", (const char *)indexFile,(int)indexFileLen); //dann Index Webseite senden -} -void js_reset() // Wenn "http:///gauge.min.js" aufgerufen wurde -{ - Serial.println("Reset Button"); - ESP.restart(); -} - - -void js_status(){ +String js_status(){ int numPgns=nmea0183Converter->numPgns(); DynamicJsonDocument status(256+numPgns*50); status["numcan"]=numCan; @@ -114,62 +147,13 @@ void js_status(){ nmea0183Converter->toJson(status); String buf; serializeJson(status,buf); - webserver.send(200,F("application/json"),buf); + return buf; } -void js_config(){ - webserver.send(200,F("application/json"),config.toJson()); +void notFound(AsyncWebServerRequest *request) { + request->send(404, "text/plain", "Not found"); } -void js_boatData(){ - webserver.send(200,F("application/json"),boatData.toJson()); -} - -void web_setConfig(){ - bool ok=true; - String error; - for (int i=0;ibeginResponse_P(200,"text/html",(const uint8_t *)indexFile,(int)indexFileLen); + response->addHeader(F("Content-Encoding"), F("gzip")); + request->send(response); + }); + webserver.on("/api/reset", HTTP_GET,[](AsyncWebServerRequest *request){ + logger.logDebug(GwLog::LOG,"Reset Button"); + ESP.restart(); + }); + class StatusRequest : public RequestMessage{ + public: + StatusRequest(): RequestMessage(){}; + protected: + virtual void processRequest(){ + result=js_status(); + } + }; + webserver.on("/api/status",HTTP_GET,[](AsyncWebServerRequest *request){ + StatusRequest *msg=new StatusRequest(); + handleAsyncWebRequest(request,msg,F("application/json")); + }); + class ConfigRequest : public RequestMessage{ + public: + ConfigRequest(): RequestMessage(){}; + protected: + virtual void processRequest(){ + result=config.toJson(); + } + }; + webserver.on("/api/config",HTTP_GET,[](AsyncWebServerRequest *request){ + RequestMessage *msg=new ConfigRequest(); + handleAsyncWebRequest(request,msg,F("application/json")); + }); + + class SetConfigRequest : public RequestMessage{ + public: + SetConfigRequest(): RequestMessage(){}; + StringMap args; + protected: + virtual void processRequest(){ + bool ok=true; + String error; + for (StringMap::iterator it=args.begin();it != args.end();it++){ + bool rt=config.updateValue(it->first,it->second); + if (! rt){ + logger.logString("ERR: unable to update %s to %s",it->first.c_str(),it->second.c_str()); + ok=false; + error+=it->first; + error+="="; + error+=it->second; + error+=","; + } + } + if (ok){ + result=JSON_OK; + logger.logString("update config and restart"); + config.saveConfig(); + delay(100); + ESP.restart(); + } + else{ + DynamicJsonDocument rt(100); + rt["status"]=error; + serializeJson(rt,result); + } + } + }; + webserver.on("/api/setConfig",HTTP_GET,[](AsyncWebServerRequest *request){ + StringMap args; + for (int i=0;iargs();i++){ + args[request->argName(i)]=request->arg(i); + } + SetConfigRequest *msg=new SetConfigRequest(); + msg->args=args; + handleAsyncWebRequest(request,msg,F("application/json")); + }); + class ResetConfigRequest : public RequestMessage{ + public: + ResetConfigRequest(): RequestMessage(){}; + protected: + virtual void processRequest(){ + config.reset(true); + logger.logString("reset config, restart"); + result=JSON_OK; + delay(100); + ESP.restart(); + } + }; + webserver.on("/api/resetConfig",HTTP_GET,[](AsyncWebServerRequest *request){ + RequestMessage *msg=new ResetConfigRequest(); + handleAsyncWebRequest(request,msg,F("application/json")); + }); + class BoatDataRequest : public RequestMessage{ + public: + BoatDataRequest(): RequestMessage(){}; + protected: + virtual void processRequest(){ + result=boatData.toJson(); + } + }; + webserver.on("/api/boatData",HTTP_GET,[](AsyncWebServerRequest *request){ + RequestMessage *msg=new BoatDataRequest(); + handleAsyncWebRequest(request,msg,F("application/json")); + }); + webserver.onNotFound(notFound); webserver.begin(); Serial.println("HTTP server started"); @@ -293,7 +374,6 @@ void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg) { } void loop() { - webserver.handleClient(); gwWifi.loop(); socketServer.loop(); @@ -309,6 +389,14 @@ void loop() { } nmea0183Converter->loop(); + //handle messages from the async web server + Message *msg=NULL; + if (xQueueReceive(queue,&msg,0)){ + logger.logDebug(GwLog::DEBUG+1,"main message"); + msg->process(); + msg->unref(); + } + // Dummy to empty input buffer to avoid board to stuck with e.g. NMEA Reader if ( Serial.available() ) { Serial.read();