From bb7f2085a44cd6b60c13c5a3729d1e31531e5b6c Mon Sep 17 00:00:00 2001
From: andreas <andreas@wellenvogel.de>
Date: Sat, 30 Oct 2021 19:11:21 +0200
Subject: [PATCH] refactor message queue

---
 lib/queue/GwMessage.cxx       | 131 ++++++++++++++++++++++++++++++++++
 lib/queue/GwMessage.h         | 105 +++++++++------------------
 lib/webserver/GwWebServer.cpp |  34 +++------
 lib/webserver/GwWebServer.h   |   9 ++-
 src/main.cpp                  |  47 ++++++------
 5 files changed, 204 insertions(+), 122 deletions(-)
 create mode 100644 lib/queue/GwMessage.cxx

diff --git a/lib/queue/GwMessage.cxx b/lib/queue/GwMessage.cxx
new file mode 100644
index 0000000..90a6964
--- /dev/null
+++ b/lib/queue/GwMessage.cxx
@@ -0,0 +1,131 @@
+#include "GwMessage.h"
+GwMessage::GwMessage(String name)
+{
+    this->name = name;
+    locker = xSemaphoreCreateMutex();
+    notifier = xSemaphoreCreateCounting(1, 0);
+    refcount = 1;
+    GW_MESSAGE_DEBUG("Message: %p\n", this);
+}
+GwMessage::~GwMessage()
+{
+    GW_MESSAGE_DEBUG("~Message %p\n", this);
+    vSemaphoreDelete(locker);
+    vSemaphoreDelete(notifier);
+}
+void GwMessage::unref()
+{
+    GW_MESSAGE_DEBUG("Message::unref %p\n", this);
+    bool mustDelete = false;
+    xSemaphoreTake(locker, portMAX_DELAY);
+    if (refcount > 0)
+    {
+        refcount--;
+        if (refcount == 0)
+            mustDelete = true;
+    }
+    xSemaphoreGive(locker);
+    if (mustDelete)
+    {
+        delete this;
+    }
+}
+void GwMessage::ref()
+{
+    xSemaphoreTake(locker, portMAX_DELAY);
+    refcount++;
+    xSemaphoreGive(locker);
+}
+int GwMessage::wait(int maxWaitMillis)
+{
+    static const int maxWait = 1000;
+    int maxRetries = maxWaitMillis / maxWait;
+    int lastWait = maxWaitMillis - maxWait * maxRetries;
+    for (int retries = maxRetries; retries > 0; retries--)
+    {
+        if (xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait)))
+            return true;
+        esp_task_wdt_reset();
+    }
+    if (lastWait)
+    {
+        return xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait));
+    }
+    return false;
+}
+void GwMessage::process()
+{
+    GW_MESSAGE_DEBUG("Message::process %p\n", this);
+    processImpl();
+    xSemaphoreGive(notifier);
+}
+String GwMessage::getName() { return name; }
+
+void GwRequestMessage::processImpl(){
+      GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this);
+      processRequest();
+      GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this);
+      len=strlen(result.c_str());
+      consumed=0;
+      handled=true;
+    }
+GwRequestMessage::GwRequestMessage(String contentType, String name):GwMessage(name){
+      this->contentType=contentType;
+    }
+GwRequestMessage::~GwRequestMessage(){
+      GW_MESSAGE_DEBUG("~RequestMessage %p\n",this)
+    }
+int GwRequestMessage::consume(uint8_t *destination,int maxLen){
+      if (!handled) return RESPONSE_TRY_AGAIN;
+      if (consumed >= len) return 0;
+      int cplen=maxLen;
+      if (cplen > (len-consumed)) cplen=len-consumed;
+      memcpy(destination,result.c_str()+consumed,cplen);
+      consumed+=cplen;
+      return cplen; 
+    }
+GwRequestQueue::GwRequestQueue(GwLog *logger,int len){
+    theQueue=xQueueCreate(len,sizeof(GwMessage*));
+    this->logger=logger;
+}
+GwRequestQueue::~GwRequestQueue(){
+    vQueueDelete(theQueue);
+}
+
+GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndForget(GwMessage *msg){
+    msg->ref(); //for the queue
+    if (!xQueueSend(theQueue, &msg, 0))
+    {
+        LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str());
+        msg->unref();
+        return MSG_ERR;
+    }
+    return MSG_OK;
+}
+GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndWait(GwMessage *msg,unsigned long waitMillis){
+    msg->ref(); //for the queue
+    msg->ref(); //for us waiting
+    if (!xQueueSend(theQueue, &msg, 0))
+    {
+        LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str());
+        msg->unref();
+        msg->unref();
+        return MSG_ERR;
+    }
+    LOG_DEBUG(GwLog::DEBUG + 1, "wait queue for %s",msg->getName().c_str());
+    if (msg->wait(waitMillis)){
+       LOG_DEBUG(GwLog::DEBUG + 1, "request ok for %s",msg->getName().c_str()); 
+       msg->unref();
+       return MSG_OK;
+    }
+    LOG_DEBUG(GwLog::DEBUG, "request timeout for %s",msg->getName().c_str()); 
+    msg->unref();
+    return MSG_TIMEOUT;
+}
+GwMessage* GwRequestQueue::fetchMessage(unsigned long waitMillis){
+    GwMessage *msg=NULL;
+    if (xQueueReceive(theQueue,&msg,waitMillis)){
+        return msg;
+    }
+    return NULL;
+}
\ No newline at end of file
diff --git a/lib/queue/GwMessage.h b/lib/queue/GwMessage.h
index 7d500ef..027ffdb 100644
--- a/lib/queue/GwMessage.h
+++ b/lib/queue/GwMessage.h
@@ -2,6 +2,7 @@
 #define _GWMESSAGE_H
 #include <Arduino.h>
 #include <ESPAsyncWebServer.h>
+#include "GwLog.h"
 #include "esp_task_wdt.h"
 
 #ifdef GW_MESSAGE_DEBUG_ENABLED
@@ -13,67 +14,28 @@
 /**
  * a message class intended to be send from one task to another
  */
-class Message{
+class GwMessage{
   private:
     SemaphoreHandle_t locker;
     SemaphoreHandle_t notifier;
     int refcount=0;
+    String name;
   protected:
     virtual void processImpl()=0;
   public:
-    Message(){
-      locker=xSemaphoreCreateMutex();
-      notifier=xSemaphoreCreateCounting(1,0);
-      refcount=1;
-      GW_MESSAGE_DEBUG("Message: %p\n",this);
-    }
-    virtual ~Message(){
-      GW_MESSAGE_DEBUG("~Message %p\n",this);
-      vSemaphoreDelete(locker);
-      vSemaphoreDelete(notifier);
-    }
-    void unref(){
-      GW_MESSAGE_DEBUG("Message::unref %p\n",this);
-      bool mustDelete=false;
-      xSemaphoreTake(locker,portMAX_DELAY);
-      if (refcount > 0){
-        refcount--;
-        if (refcount == 0) mustDelete=true;
-      }
-      xSemaphoreGive(locker);
-      if (mustDelete){
-        delete this;
-      }
-    }
-    void ref(){
-      xSemaphoreTake(locker,portMAX_DELAY);
-      refcount++;
-      xSemaphoreGive(locker);
-    }
-    int wait(int maxWaitMillis){
-      static const int maxWait=1000;
-      int maxRetries=maxWaitMillis/maxWait;
-      int lastWait=maxWaitMillis - maxWait*maxRetries;
-      for (int retries=maxRetries;retries>0;retries--){
-        if (xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait))) return true;
-        esp_task_wdt_reset();
-      }
-      if (lastWait){
-        return xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait));
-      }
-      return false;
-    }
-    void process(){
-      GW_MESSAGE_DEBUG("Message::process %p\n",this);
-      processImpl();
-      xSemaphoreGive(notifier);
-    }  
+    GwMessage(String name=F("unknown"));
+    virtual ~GwMessage();
+    void unref();
+    void ref();
+    int wait(int maxWaitMillis);
+    void process();
+    String getName();  
 };
 
 /**
  * a class to executa an async web request that returns a string
  */
-class RequestMessage : public Message{
+class GwRequestMessage : public GwMessage{
   protected:
     String result;
     String contentType;
@@ -83,32 +45,13 @@ class RequestMessage : public Message{
     bool handled=false;
   protected:
     virtual void processRequest()=0;  
-    virtual void processImpl(){
-      GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this);
-      processRequest();
-      GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this);
-      len=strlen(result.c_str());
-      consumed=0;
-      handled=true;
-    }
+    virtual void processImpl();
   public:
-    RequestMessage(String contentType):Message(){
-      this->contentType=contentType;
-    }
-    virtual ~RequestMessage(){
-      GW_MESSAGE_DEBUG("~RequestMessage %p\n",this)
-    }
+    GwRequestMessage(String contentType,String name=F("web"));
+    virtual ~GwRequestMessage();
     String getResult(){return result;}
     int getLen(){return len;}
-    int consume(uint8_t *destination,int maxLen){
-      if (!handled) return RESPONSE_TRY_AGAIN;
-      if (consumed >= len) return 0;
-      int cplen=maxLen;
-      if (cplen > (len-consumed)) cplen=len-consumed;
-      memcpy(destination,result.c_str()+consumed,cplen);
-      consumed+=cplen;
-      return cplen; 
-    }
+    int consume(uint8_t *destination,int maxLen);
     bool isHandled(){return handled;}
     String getContentType(){
       return contentType;
@@ -116,4 +59,22 @@ class RequestMessage : public Message{
 
 };
 
+class GwRequestQueue{
+  private:
+    QueueHandle_t theQueue;
+    GwLog *logger;
+  public:
+    typedef enum{
+      MSG_OK,
+      MSG_ERR,
+      MSG_TIMEOUT
+    } MessageSendStatus;
+    GwRequestQueue(GwLog *logger,int len);  
+    ~GwRequestQueue();
+    //both send methods will increment the ref count when enqueing
+    MessageSendStatus sendAndForget(GwMessage *msg);
+    MessageSendStatus sendAndWait(GwMessage *msg,unsigned long waitMillis);
+    GwMessage* fetchMessage(unsigned long waitMillis);
+};
+
 #endif
\ No newline at end of file
diff --git a/lib/webserver/GwWebServer.cpp b/lib/webserver/GwWebServer.cpp
index e610f7e..b95c41f 100644
--- a/lib/webserver/GwWebServer.cpp
+++ b/lib/webserver/GwWebServer.cpp
@@ -36,9 +36,9 @@ void sendEmbeddedFile(String name,String contentType,AsyncWebServerRequest *requ
   }
 
 
-GwWebServer::GwWebServer(GwLog* logger,int port){
+GwWebServer::GwWebServer(GwLog* logger,GwRequestQueue *queue,int port){
     server=new AsyncWebServer(port);
-    queue=xQueueCreate(10,sizeof(Message *));
+    this->queue=queue;
     this->logger=logger;
 }
 void GwWebServer::begin(){
@@ -61,32 +61,28 @@ GwWebServer::~GwWebServer(){
     delete server;
     vQueueDelete(queue);
 }
-void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg)
+void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, GwRequestMessage *msg)
 {
-  msg->ref(); //for the queue
-  if (!xQueueSend(queue, &msg, 0))
+  GwRequestQueue::MessageSendStatus st=queue->sendAndWait(msg,500);      
+  if (st == GwRequestQueue::MSG_ERR)
   {
-    Serial.println("unable to enqueue");
-    msg->unref(); //queue
     msg->unref(); //our
     request->send(500, "text/plain", "queue full");
     return;
   }
-  LOG_DEBUG(GwLog::DEBUG + 1, "wait queue");
-  if (msg->wait(500))
+  if (st == GwRequestQueue::MSG_OK)
   {
-    LOG_DEBUG(GwLog::DEBUG + 1, "request ok");
     request->send(200, msg->getContentType(), msg->getResult());
     msg->unref();
     return;
   }
-  LOG_DEBUG(GwLog::DEBUG + 1, "switching to async");
+  LOG_DEBUG(GwLog::DEBUG + 1, "switching to async for %s",msg->getName().c_str());
   //msg is handed over to async handling
   bool finished = false;
   AsyncWebServerResponse *r = request->beginChunkedResponse(
       msg->getContentType(), [this,msg, finished](uint8_t *ptr, size_t len, size_t len2) -> size_t
       {
-        LOG_DEBUG(GwLog::DEBUG + 1, "try read");
+        LOG_DEBUG(GwLog::DEBUG + 1, "try read for %s",msg->getName().c_str());
         if (msg->isHandled() || msg->wait(1))
         {
           int rt = msg->consume(ptr, len);
@@ -106,7 +102,7 @@ void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, RequestM
 }
 bool GwWebServer::registerMainHandler(const char *url,RequestCreator creator){
     server->on(url,HTTP_GET, [this,creator,url](AsyncWebServerRequest *request){
-        RequestMessage *msg=(*creator)(request);
+        GwRequestMessage *msg=(*creator)(request);
         if (!msg){
             LOG_DEBUG(GwLog::DEBUG,"creator returns NULL for %s",url);
             request->send(404, "text/plain", "Not found");
@@ -115,14 +111,4 @@ bool GwWebServer::registerMainHandler(const char *url,RequestCreator creator){
         handleAsyncWebRequest(request,msg);
     });
 }
-//to be called from the main loop
-void GwWebServer::fetchMainRequest()
-{
-    Message *msg = NULL;
-    if (xQueueReceive(queue, &msg, 0))
-    {
-        LOG_DEBUG(GwLog::DEBUG + 1, "main message");
-        msg->process();
-        msg->unref();
-    }
-}
+
diff --git a/lib/webserver/GwWebServer.h b/lib/webserver/GwWebServer.h
index 76e457b..0afd0aa 100644
--- a/lib/webserver/GwWebServer.h
+++ b/lib/webserver/GwWebServer.h
@@ -6,15 +6,14 @@
 class GwWebServer{
     private:
         AsyncWebServer *server;
-        QueueHandle_t queue;
+        GwRequestQueue *queue;
         GwLog *logger;
     public:
-        typedef RequestMessage *(RequestCreator)(AsyncWebServerRequest *request);
-        GwWebServer(GwLog *logger, int port);
+        typedef GwRequestMessage *(RequestCreator)(AsyncWebServerRequest *request);
+        GwWebServer(GwLog *logger, GwRequestQueue *queue,int port);
         ~GwWebServer();
         void begin();
         bool registerMainHandler(const char *url,RequestCreator creator);
-        void fetchMainRequest(); //to be called from main loop
-        void handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg);
+        void handleAsyncWebRequest(AsyncWebServerRequest *request, GwRequestMessage *msg);
 };
 #endif
\ No newline at end of file
diff --git a/src/main.cpp b/src/main.cpp
index ab6b4a4..8c418c7 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -80,7 +80,8 @@ const unsigned long TransmitMessages[] PROGMEM = {127489L, // Engine dynamic
 void HandleNMEA2000Msg(const tN2kMsg &N2kMsg);
 void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg,int id);
 
-GwWebServer webserver(&logger,80);
+GwRequestQueue mainQueue(&logger,20);
+GwWebServer webserver(&logger,&mainQueue,80);
 
 // Serial port 2 config (GPIO 16)
 const int baudrate = 38400;
@@ -127,10 +128,10 @@ void delayedRestart(){
 //register the requests at the webserver that should
 //be processed inside the main loop
 //this prevents us from the need to sync all the accesses
-class ResetRequest : public RequestMessage
+class ResetRequest : public GwRequestMessage
 {
 public:
-  ResetRequest() : RequestMessage(F("application/json")){};
+  ResetRequest() : GwRequestMessage(F("application/json"),F("reset")){};
 
 protected:
   virtual void processRequest()
@@ -141,10 +142,10 @@ protected:
   }
 };
 
-class StatusRequest : public RequestMessage
+class StatusRequest : public GwRequestMessage
 {
 public:
-  StatusRequest() : RequestMessage(F("application/json")){};
+  StatusRequest() : GwRequestMessage(F("application/json"),F("status")){};
 
 protected:
   virtual void processRequest()
@@ -161,10 +162,10 @@ protected:
     serializeJson(status, result);
   }
 };
-class ConfigRequest : public RequestMessage
+class ConfigRequest : public GwRequestMessage
 {
 public:
-  ConfigRequest() : RequestMessage(F("application/json")){};
+  ConfigRequest() : GwRequestMessage(F("application/json"),F("config")){};
 
 protected:
   virtual void processRequest()
@@ -173,10 +174,10 @@ protected:
   }
 };
 
-class SetConfigRequest : public RequestMessage
+class SetConfigRequest : public GwRequestMessage
 {
 public:
-  SetConfigRequest() : RequestMessage(F("application/json")){};
+  SetConfigRequest() : GwRequestMessage(F("application/json"),F("setConfig")){};
   StringMap args;
 
 protected:
@@ -212,10 +213,10 @@ protected:
     }
   }
 };
-class ResetConfigRequest : public RequestMessage
+class ResetConfigRequest : public GwRequestMessage
 {
 public:
-  ResetConfigRequest() : RequestMessage(F("application/json")){};
+  ResetConfigRequest() : GwRequestMessage(F("application/json"),F("resetConfig")){};
 
 protected:
   virtual void processRequest()
@@ -226,10 +227,10 @@ protected:
     delayedRestart();
   }
 };
-class BoatDataRequest : public RequestMessage
+class BoatDataRequest : public GwRequestMessage
 {
 public:
-  BoatDataRequest() : RequestMessage(F("application/json")){};
+  BoatDataRequest() : GwRequestMessage(F("application/json"),F("boatData")){};
 
 protected:
   virtual void processRequest()
@@ -277,15 +278,15 @@ void setup() {
   // Start TCP server
   socketServer.begin();
 
-  webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->RequestMessage *{
+  webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->GwRequestMessage *{
     return new ResetRequest();
   });
-  webserver.registerMainHandler("/api/status", [](AsyncWebServerRequest *request)->RequestMessage *
+  webserver.registerMainHandler("/api/status", [](AsyncWebServerRequest *request)->GwRequestMessage *
                               { return new StatusRequest(); });
-  webserver.registerMainHandler("/api/config", [](AsyncWebServerRequest *request)->RequestMessage *
+  webserver.registerMainHandler("/api/config", [](AsyncWebServerRequest *request)->GwRequestMessage *
                               { return new ConfigRequest(); });
   webserver.registerMainHandler("/api/setConfig",
-                              [](AsyncWebServerRequest *request)->RequestMessage *
+                              [](AsyncWebServerRequest *request)->GwRequestMessage *
                               {
                                 StringMap args;
                                 for (int i = 0; i < request->args(); i++)
@@ -296,9 +297,9 @@ void setup() {
                                 msg->args = args;
                                 return msg;
                               });
-  webserver.registerMainHandler("/api/resetConfig", [](AsyncWebServerRequest *request)->RequestMessage *
+  webserver.registerMainHandler("/api/resetConfig", [](AsyncWebServerRequest *request)->GwRequestMessage *
                               { return new ResetConfigRequest(); });
-  webserver.registerMainHandler("/api/boatData", [](AsyncWebServerRequest *request)->RequestMessage *
+  webserver.registerMainHandler("/api/boatData", [](AsyncWebServerRequest *request)->GwRequestMessage *
                               { return new BoatDataRequest(); });
 
   webserver.begin();
@@ -467,10 +468,14 @@ void loop() {
   }
   nmea0183Converter->loop();
 
-  webserver.fetchMainRequest();
-
   //read channels
   socketServer.readMessages(&receiver);
   usbSerial.readMessages(&receiver);
 
+  //handle message requests
+  GwMessage *msg=mainQueue.fetchMessage(0);
+  if (msg){
+    msg->process();
+    msg->unref();
+  }
 }