diff --git a/lib/queue/GwMessage.cxx b/lib/queue/GwMessage.cxx new file mode 100644 index 0000000..90a6964 --- /dev/null +++ b/lib/queue/GwMessage.cxx @@ -0,0 +1,131 @@ +#include "GwMessage.h" +GwMessage::GwMessage(String name) +{ + this->name = name; + locker = xSemaphoreCreateMutex(); + notifier = xSemaphoreCreateCounting(1, 0); + refcount = 1; + GW_MESSAGE_DEBUG("Message: %p\n", this); +} +GwMessage::~GwMessage() +{ + GW_MESSAGE_DEBUG("~Message %p\n", this); + vSemaphoreDelete(locker); + vSemaphoreDelete(notifier); +} +void GwMessage::unref() +{ + GW_MESSAGE_DEBUG("Message::unref %p\n", this); + bool mustDelete = false; + xSemaphoreTake(locker, portMAX_DELAY); + if (refcount > 0) + { + refcount--; + if (refcount == 0) + mustDelete = true; + } + xSemaphoreGive(locker); + if (mustDelete) + { + delete this; + } +} +void GwMessage::ref() +{ + xSemaphoreTake(locker, portMAX_DELAY); + refcount++; + xSemaphoreGive(locker); +} +int GwMessage::wait(int maxWaitMillis) +{ + static const int maxWait = 1000; + int maxRetries = maxWaitMillis / maxWait; + int lastWait = maxWaitMillis - maxWait * maxRetries; + for (int retries = maxRetries; retries > 0; retries--) + { + if (xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait))) + return true; + esp_task_wdt_reset(); + } + if (lastWait) + { + return xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait)); + } + return false; +} +void GwMessage::process() +{ + GW_MESSAGE_DEBUG("Message::process %p\n", this); + processImpl(); + xSemaphoreGive(notifier); +} +String GwMessage::getName() { return name; } + +void GwRequestMessage::processImpl(){ + GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this); + processRequest(); + GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this); + len=strlen(result.c_str()); + consumed=0; + handled=true; + } +GwRequestMessage::GwRequestMessage(String contentType, String name):GwMessage(name){ + this->contentType=contentType; + } +GwRequestMessage::~GwRequestMessage(){ + GW_MESSAGE_DEBUG("~RequestMessage %p\n",this) + } +int GwRequestMessage::consume(uint8_t *destination,int maxLen){ + if (!handled) return RESPONSE_TRY_AGAIN; + if (consumed >= len) return 0; + int cplen=maxLen; + if (cplen > (len-consumed)) cplen=len-consumed; + memcpy(destination,result.c_str()+consumed,cplen); + consumed+=cplen; + return cplen; + } +GwRequestQueue::GwRequestQueue(GwLog *logger,int len){ + theQueue=xQueueCreate(len,sizeof(GwMessage*)); + this->logger=logger; +} +GwRequestQueue::~GwRequestQueue(){ + vQueueDelete(theQueue); +} + +GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndForget(GwMessage *msg){ + msg->ref(); //for the queue + if (!xQueueSend(theQueue, &msg, 0)) + { + LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str()); + msg->unref(); + return MSG_ERR; + } + return MSG_OK; +} +GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndWait(GwMessage *msg,unsigned long waitMillis){ + msg->ref(); //for the queue + msg->ref(); //for us waiting + if (!xQueueSend(theQueue, &msg, 0)) + { + LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str()); + msg->unref(); + msg->unref(); + return MSG_ERR; + } + LOG_DEBUG(GwLog::DEBUG + 1, "wait queue for %s",msg->getName().c_str()); + if (msg->wait(waitMillis)){ + LOG_DEBUG(GwLog::DEBUG + 1, "request ok for %s",msg->getName().c_str()); + msg->unref(); + return MSG_OK; + } + LOG_DEBUG(GwLog::DEBUG, "request timeout for %s",msg->getName().c_str()); + msg->unref(); + return MSG_TIMEOUT; +} +GwMessage* GwRequestQueue::fetchMessage(unsigned long waitMillis){ + GwMessage *msg=NULL; + if (xQueueReceive(theQueue,&msg,waitMillis)){ + return msg; + } + return NULL; +} \ No newline at end of file diff --git a/lib/queue/GwMessage.h b/lib/queue/GwMessage.h index 7d500ef..027ffdb 100644 --- a/lib/queue/GwMessage.h +++ b/lib/queue/GwMessage.h @@ -2,6 +2,7 @@ #define _GWMESSAGE_H #include #include +#include "GwLog.h" #include "esp_task_wdt.h" #ifdef GW_MESSAGE_DEBUG_ENABLED @@ -13,67 +14,28 @@ /** * a message class intended to be send from one task to another */ -class Message{ +class GwMessage{ private: SemaphoreHandle_t locker; SemaphoreHandle_t notifier; int refcount=0; + String name; protected: virtual void processImpl()=0; public: - Message(){ - locker=xSemaphoreCreateMutex(); - notifier=xSemaphoreCreateCounting(1,0); - refcount=1; - GW_MESSAGE_DEBUG("Message: %p\n",this); - } - virtual ~Message(){ - GW_MESSAGE_DEBUG("~Message %p\n",this); - vSemaphoreDelete(locker); - vSemaphoreDelete(notifier); - } - void unref(){ - GW_MESSAGE_DEBUG("Message::unref %p\n",this); - bool mustDelete=false; - xSemaphoreTake(locker,portMAX_DELAY); - if (refcount > 0){ - refcount--; - if (refcount == 0) mustDelete=true; - } - xSemaphoreGive(locker); - if (mustDelete){ - delete this; - } - } - void ref(){ - xSemaphoreTake(locker,portMAX_DELAY); - refcount++; - xSemaphoreGive(locker); - } - int wait(int maxWaitMillis){ - static const int maxWait=1000; - int maxRetries=maxWaitMillis/maxWait; - int lastWait=maxWaitMillis - maxWait*maxRetries; - for (int retries=maxRetries;retries>0;retries--){ - if (xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait))) return true; - esp_task_wdt_reset(); - } - if (lastWait){ - return xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait)); - } - return false; - } - void process(){ - GW_MESSAGE_DEBUG("Message::process %p\n",this); - processImpl(); - xSemaphoreGive(notifier); - } + GwMessage(String name=F("unknown")); + virtual ~GwMessage(); + void unref(); + void ref(); + int wait(int maxWaitMillis); + void process(); + String getName(); }; /** * a class to executa an async web request that returns a string */ -class RequestMessage : public Message{ +class GwRequestMessage : public GwMessage{ protected: String result; String contentType; @@ -83,32 +45,13 @@ class RequestMessage : public Message{ bool handled=false; protected: virtual void processRequest()=0; - virtual void processImpl(){ - GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this); - processRequest(); - GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this); - len=strlen(result.c_str()); - consumed=0; - handled=true; - } + virtual void processImpl(); public: - RequestMessage(String contentType):Message(){ - this->contentType=contentType; - } - virtual ~RequestMessage(){ - GW_MESSAGE_DEBUG("~RequestMessage %p\n",this) - } + GwRequestMessage(String contentType,String name=F("web")); + virtual ~GwRequestMessage(); String getResult(){return result;} int getLen(){return len;} - int consume(uint8_t *destination,int maxLen){ - if (!handled) return RESPONSE_TRY_AGAIN; - if (consumed >= len) return 0; - int cplen=maxLen; - if (cplen > (len-consumed)) cplen=len-consumed; - memcpy(destination,result.c_str()+consumed,cplen); - consumed+=cplen; - return cplen; - } + int consume(uint8_t *destination,int maxLen); bool isHandled(){return handled;} String getContentType(){ return contentType; @@ -116,4 +59,22 @@ class RequestMessage : public Message{ }; +class GwRequestQueue{ + private: + QueueHandle_t theQueue; + GwLog *logger; + public: + typedef enum{ + MSG_OK, + MSG_ERR, + MSG_TIMEOUT + } MessageSendStatus; + GwRequestQueue(GwLog *logger,int len); + ~GwRequestQueue(); + //both send methods will increment the ref count when enqueing + MessageSendStatus sendAndForget(GwMessage *msg); + MessageSendStatus sendAndWait(GwMessage *msg,unsigned long waitMillis); + GwMessage* fetchMessage(unsigned long waitMillis); +}; + #endif \ No newline at end of file diff --git a/lib/webserver/GwWebServer.cpp b/lib/webserver/GwWebServer.cpp index e610f7e..b95c41f 100644 --- a/lib/webserver/GwWebServer.cpp +++ b/lib/webserver/GwWebServer.cpp @@ -36,9 +36,9 @@ void sendEmbeddedFile(String name,String contentType,AsyncWebServerRequest *requ } -GwWebServer::GwWebServer(GwLog* logger,int port){ +GwWebServer::GwWebServer(GwLog* logger,GwRequestQueue *queue,int port){ server=new AsyncWebServer(port); - queue=xQueueCreate(10,sizeof(Message *)); + this->queue=queue; this->logger=logger; } void GwWebServer::begin(){ @@ -61,32 +61,28 @@ GwWebServer::~GwWebServer(){ delete server; vQueueDelete(queue); } -void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg) +void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, GwRequestMessage *msg) { - msg->ref(); //for the queue - if (!xQueueSend(queue, &msg, 0)) + GwRequestQueue::MessageSendStatus st=queue->sendAndWait(msg,500); + if (st == GwRequestQueue::MSG_ERR) { - Serial.println("unable to enqueue"); - msg->unref(); //queue msg->unref(); //our request->send(500, "text/plain", "queue full"); return; } - LOG_DEBUG(GwLog::DEBUG + 1, "wait queue"); - if (msg->wait(500)) + if (st == GwRequestQueue::MSG_OK) { - LOG_DEBUG(GwLog::DEBUG + 1, "request ok"); request->send(200, msg->getContentType(), msg->getResult()); msg->unref(); return; } - LOG_DEBUG(GwLog::DEBUG + 1, "switching to async"); + LOG_DEBUG(GwLog::DEBUG + 1, "switching to async for %s",msg->getName().c_str()); //msg is handed over to async handling bool finished = false; AsyncWebServerResponse *r = request->beginChunkedResponse( msg->getContentType(), [this,msg, finished](uint8_t *ptr, size_t len, size_t len2) -> size_t { - LOG_DEBUG(GwLog::DEBUG + 1, "try read"); + LOG_DEBUG(GwLog::DEBUG + 1, "try read for %s",msg->getName().c_str()); if (msg->isHandled() || msg->wait(1)) { int rt = msg->consume(ptr, len); @@ -106,7 +102,7 @@ void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, RequestM } bool GwWebServer::registerMainHandler(const char *url,RequestCreator creator){ server->on(url,HTTP_GET, [this,creator,url](AsyncWebServerRequest *request){ - RequestMessage *msg=(*creator)(request); + GwRequestMessage *msg=(*creator)(request); if (!msg){ LOG_DEBUG(GwLog::DEBUG,"creator returns NULL for %s",url); request->send(404, "text/plain", "Not found"); @@ -115,14 +111,4 @@ bool GwWebServer::registerMainHandler(const char *url,RequestCreator creator){ handleAsyncWebRequest(request,msg); }); } -//to be called from the main loop -void GwWebServer::fetchMainRequest() -{ - Message *msg = NULL; - if (xQueueReceive(queue, &msg, 0)) - { - LOG_DEBUG(GwLog::DEBUG + 1, "main message"); - msg->process(); - msg->unref(); - } -} + diff --git a/lib/webserver/GwWebServer.h b/lib/webserver/GwWebServer.h index 76e457b..0afd0aa 100644 --- a/lib/webserver/GwWebServer.h +++ b/lib/webserver/GwWebServer.h @@ -6,15 +6,14 @@ class GwWebServer{ private: AsyncWebServer *server; - QueueHandle_t queue; + GwRequestQueue *queue; GwLog *logger; public: - typedef RequestMessage *(RequestCreator)(AsyncWebServerRequest *request); - GwWebServer(GwLog *logger, int port); + typedef GwRequestMessage *(RequestCreator)(AsyncWebServerRequest *request); + GwWebServer(GwLog *logger, GwRequestQueue *queue,int port); ~GwWebServer(); void begin(); bool registerMainHandler(const char *url,RequestCreator creator); - void fetchMainRequest(); //to be called from main loop - void handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg); + void handleAsyncWebRequest(AsyncWebServerRequest *request, GwRequestMessage *msg); }; #endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index ab6b4a4..8c418c7 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -80,7 +80,8 @@ const unsigned long TransmitMessages[] PROGMEM = {127489L, // Engine dynamic void HandleNMEA2000Msg(const tN2kMsg &N2kMsg); void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg,int id); -GwWebServer webserver(&logger,80); +GwRequestQueue mainQueue(&logger,20); +GwWebServer webserver(&logger,&mainQueue,80); // Serial port 2 config (GPIO 16) const int baudrate = 38400; @@ -127,10 +128,10 @@ void delayedRestart(){ //register the requests at the webserver that should //be processed inside the main loop //this prevents us from the need to sync all the accesses -class ResetRequest : public RequestMessage +class ResetRequest : public GwRequestMessage { public: - ResetRequest() : RequestMessage(F("application/json")){}; + ResetRequest() : GwRequestMessage(F("application/json"),F("reset")){}; protected: virtual void processRequest() @@ -141,10 +142,10 @@ protected: } }; -class StatusRequest : public RequestMessage +class StatusRequest : public GwRequestMessage { public: - StatusRequest() : RequestMessage(F("application/json")){}; + StatusRequest() : GwRequestMessage(F("application/json"),F("status")){}; protected: virtual void processRequest() @@ -161,10 +162,10 @@ protected: serializeJson(status, result); } }; -class ConfigRequest : public RequestMessage +class ConfigRequest : public GwRequestMessage { public: - ConfigRequest() : RequestMessage(F("application/json")){}; + ConfigRequest() : GwRequestMessage(F("application/json"),F("config")){}; protected: virtual void processRequest() @@ -173,10 +174,10 @@ protected: } }; -class SetConfigRequest : public RequestMessage +class SetConfigRequest : public GwRequestMessage { public: - SetConfigRequest() : RequestMessage(F("application/json")){}; + SetConfigRequest() : GwRequestMessage(F("application/json"),F("setConfig")){}; StringMap args; protected: @@ -212,10 +213,10 @@ protected: } } }; -class ResetConfigRequest : public RequestMessage +class ResetConfigRequest : public GwRequestMessage { public: - ResetConfigRequest() : RequestMessage(F("application/json")){}; + ResetConfigRequest() : GwRequestMessage(F("application/json"),F("resetConfig")){}; protected: virtual void processRequest() @@ -226,10 +227,10 @@ protected: delayedRestart(); } }; -class BoatDataRequest : public RequestMessage +class BoatDataRequest : public GwRequestMessage { public: - BoatDataRequest() : RequestMessage(F("application/json")){}; + BoatDataRequest() : GwRequestMessage(F("application/json"),F("boatData")){}; protected: virtual void processRequest() @@ -277,15 +278,15 @@ void setup() { // Start TCP server socketServer.begin(); - webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->RequestMessage *{ + webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->GwRequestMessage *{ return new ResetRequest(); }); - webserver.registerMainHandler("/api/status", [](AsyncWebServerRequest *request)->RequestMessage * + webserver.registerMainHandler("/api/status", [](AsyncWebServerRequest *request)->GwRequestMessage * { return new StatusRequest(); }); - webserver.registerMainHandler("/api/config", [](AsyncWebServerRequest *request)->RequestMessage * + webserver.registerMainHandler("/api/config", [](AsyncWebServerRequest *request)->GwRequestMessage * { return new ConfigRequest(); }); webserver.registerMainHandler("/api/setConfig", - [](AsyncWebServerRequest *request)->RequestMessage * + [](AsyncWebServerRequest *request)->GwRequestMessage * { StringMap args; for (int i = 0; i < request->args(); i++) @@ -296,9 +297,9 @@ void setup() { msg->args = args; return msg; }); - webserver.registerMainHandler("/api/resetConfig", [](AsyncWebServerRequest *request)->RequestMessage * + webserver.registerMainHandler("/api/resetConfig", [](AsyncWebServerRequest *request)->GwRequestMessage * { return new ResetConfigRequest(); }); - webserver.registerMainHandler("/api/boatData", [](AsyncWebServerRequest *request)->RequestMessage * + webserver.registerMainHandler("/api/boatData", [](AsyncWebServerRequest *request)->GwRequestMessage * { return new BoatDataRequest(); }); webserver.begin(); @@ -467,10 +468,14 @@ void loop() { } nmea0183Converter->loop(); - webserver.fetchMainRequest(); - //read channels socketServer.readMessages(&receiver); usbSerial.readMessages(&receiver); + //handle message requests + GwMessage *msg=mainQueue.fetchMessage(0); + if (msg){ + msg->process(); + msg->unref(); + } }