refactor message queue

This commit is contained in:
andreas 2021-10-30 19:11:21 +02:00
parent bb80c5f44f
commit bb7f2085a4
5 changed files with 204 additions and 122 deletions

131
lib/queue/GwMessage.cxx Normal file
View File

@ -0,0 +1,131 @@
#include "GwMessage.h"
GwMessage::GwMessage(String name)
{
this->name = name;
locker = xSemaphoreCreateMutex();
notifier = xSemaphoreCreateCounting(1, 0);
refcount = 1;
GW_MESSAGE_DEBUG("Message: %p\n", this);
}
GwMessage::~GwMessage()
{
GW_MESSAGE_DEBUG("~Message %p\n", this);
vSemaphoreDelete(locker);
vSemaphoreDelete(notifier);
}
void GwMessage::unref()
{
GW_MESSAGE_DEBUG("Message::unref %p\n", this);
bool mustDelete = false;
xSemaphoreTake(locker, portMAX_DELAY);
if (refcount > 0)
{
refcount--;
if (refcount == 0)
mustDelete = true;
}
xSemaphoreGive(locker);
if (mustDelete)
{
delete this;
}
}
void GwMessage::ref()
{
xSemaphoreTake(locker, portMAX_DELAY);
refcount++;
xSemaphoreGive(locker);
}
int GwMessage::wait(int maxWaitMillis)
{
static const int maxWait = 1000;
int maxRetries = maxWaitMillis / maxWait;
int lastWait = maxWaitMillis - maxWait * maxRetries;
for (int retries = maxRetries; retries > 0; retries--)
{
if (xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait)))
return true;
esp_task_wdt_reset();
}
if (lastWait)
{
return xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait));
}
return false;
}
void GwMessage::process()
{
GW_MESSAGE_DEBUG("Message::process %p\n", this);
processImpl();
xSemaphoreGive(notifier);
}
String GwMessage::getName() { return name; }
void GwRequestMessage::processImpl(){
GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this);
processRequest();
GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this);
len=strlen(result.c_str());
consumed=0;
handled=true;
}
GwRequestMessage::GwRequestMessage(String contentType, String name):GwMessage(name){
this->contentType=contentType;
}
GwRequestMessage::~GwRequestMessage(){
GW_MESSAGE_DEBUG("~RequestMessage %p\n",this)
}
int GwRequestMessage::consume(uint8_t *destination,int maxLen){
if (!handled) return RESPONSE_TRY_AGAIN;
if (consumed >= len) return 0;
int cplen=maxLen;
if (cplen > (len-consumed)) cplen=len-consumed;
memcpy(destination,result.c_str()+consumed,cplen);
consumed+=cplen;
return cplen;
}
GwRequestQueue::GwRequestQueue(GwLog *logger,int len){
theQueue=xQueueCreate(len,sizeof(GwMessage*));
this->logger=logger;
}
GwRequestQueue::~GwRequestQueue(){
vQueueDelete(theQueue);
}
GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndForget(GwMessage *msg){
msg->ref(); //for the queue
if (!xQueueSend(theQueue, &msg, 0))
{
LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str());
msg->unref();
return MSG_ERR;
}
return MSG_OK;
}
GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndWait(GwMessage *msg,unsigned long waitMillis){
msg->ref(); //for the queue
msg->ref(); //for us waiting
if (!xQueueSend(theQueue, &msg, 0))
{
LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str());
msg->unref();
msg->unref();
return MSG_ERR;
}
LOG_DEBUG(GwLog::DEBUG + 1, "wait queue for %s",msg->getName().c_str());
if (msg->wait(waitMillis)){
LOG_DEBUG(GwLog::DEBUG + 1, "request ok for %s",msg->getName().c_str());
msg->unref();
return MSG_OK;
}
LOG_DEBUG(GwLog::DEBUG, "request timeout for %s",msg->getName().c_str());
msg->unref();
return MSG_TIMEOUT;
}
GwMessage* GwRequestQueue::fetchMessage(unsigned long waitMillis){
GwMessage *msg=NULL;
if (xQueueReceive(theQueue,&msg,waitMillis)){
return msg;
}
return NULL;
}

View File

@ -2,6 +2,7 @@
#define _GWMESSAGE_H
#include <Arduino.h>
#include <ESPAsyncWebServer.h>
#include "GwLog.h"
#include "esp_task_wdt.h"
#ifdef GW_MESSAGE_DEBUG_ENABLED
@ -13,67 +14,28 @@
/**
* a message class intended to be send from one task to another
*/
class Message{
class GwMessage{
private:
SemaphoreHandle_t locker;
SemaphoreHandle_t notifier;
int refcount=0;
String name;
protected:
virtual void processImpl()=0;
public:
Message(){
locker=xSemaphoreCreateMutex();
notifier=xSemaphoreCreateCounting(1,0);
refcount=1;
GW_MESSAGE_DEBUG("Message: %p\n",this);
}
virtual ~Message(){
GW_MESSAGE_DEBUG("~Message %p\n",this);
vSemaphoreDelete(locker);
vSemaphoreDelete(notifier);
}
void unref(){
GW_MESSAGE_DEBUG("Message::unref %p\n",this);
bool mustDelete=false;
xSemaphoreTake(locker,portMAX_DELAY);
if (refcount > 0){
refcount--;
if (refcount == 0) mustDelete=true;
}
xSemaphoreGive(locker);
if (mustDelete){
delete this;
}
}
void ref(){
xSemaphoreTake(locker,portMAX_DELAY);
refcount++;
xSemaphoreGive(locker);
}
int wait(int maxWaitMillis){
static const int maxWait=1000;
int maxRetries=maxWaitMillis/maxWait;
int lastWait=maxWaitMillis - maxWait*maxRetries;
for (int retries=maxRetries;retries>0;retries--){
if (xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait))) return true;
esp_task_wdt_reset();
}
if (lastWait){
return xSemaphoreTake(notifier,pdMS_TO_TICKS(maxWait));
}
return false;
}
void process(){
GW_MESSAGE_DEBUG("Message::process %p\n",this);
processImpl();
xSemaphoreGive(notifier);
}
GwMessage(String name=F("unknown"));
virtual ~GwMessage();
void unref();
void ref();
int wait(int maxWaitMillis);
void process();
String getName();
};
/**
* a class to executa an async web request that returns a string
*/
class RequestMessage : public Message{
class GwRequestMessage : public GwMessage{
protected:
String result;
String contentType;
@ -83,32 +45,13 @@ class RequestMessage : public Message{
bool handled=false;
protected:
virtual void processRequest()=0;
virtual void processImpl(){
GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this);
processRequest();
GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this);
len=strlen(result.c_str());
consumed=0;
handled=true;
}
virtual void processImpl();
public:
RequestMessage(String contentType):Message(){
this->contentType=contentType;
}
virtual ~RequestMessage(){
GW_MESSAGE_DEBUG("~RequestMessage %p\n",this)
}
GwRequestMessage(String contentType,String name=F("web"));
virtual ~GwRequestMessage();
String getResult(){return result;}
int getLen(){return len;}
int consume(uint8_t *destination,int maxLen){
if (!handled) return RESPONSE_TRY_AGAIN;
if (consumed >= len) return 0;
int cplen=maxLen;
if (cplen > (len-consumed)) cplen=len-consumed;
memcpy(destination,result.c_str()+consumed,cplen);
consumed+=cplen;
return cplen;
}
int consume(uint8_t *destination,int maxLen);
bool isHandled(){return handled;}
String getContentType(){
return contentType;
@ -116,4 +59,22 @@ class RequestMessage : public Message{
};
class GwRequestQueue{
private:
QueueHandle_t theQueue;
GwLog *logger;
public:
typedef enum{
MSG_OK,
MSG_ERR,
MSG_TIMEOUT
} MessageSendStatus;
GwRequestQueue(GwLog *logger,int len);
~GwRequestQueue();
//both send methods will increment the ref count when enqueing
MessageSendStatus sendAndForget(GwMessage *msg);
MessageSendStatus sendAndWait(GwMessage *msg,unsigned long waitMillis);
GwMessage* fetchMessage(unsigned long waitMillis);
};
#endif

View File

@ -36,9 +36,9 @@ void sendEmbeddedFile(String name,String contentType,AsyncWebServerRequest *requ
}
GwWebServer::GwWebServer(GwLog* logger,int port){
GwWebServer::GwWebServer(GwLog* logger,GwRequestQueue *queue,int port){
server=new AsyncWebServer(port);
queue=xQueueCreate(10,sizeof(Message *));
this->queue=queue;
this->logger=logger;
}
void GwWebServer::begin(){
@ -61,32 +61,28 @@ GwWebServer::~GwWebServer(){
delete server;
vQueueDelete(queue);
}
void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg)
void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, GwRequestMessage *msg)
{
msg->ref(); //for the queue
if (!xQueueSend(queue, &msg, 0))
GwRequestQueue::MessageSendStatus st=queue->sendAndWait(msg,500);
if (st == GwRequestQueue::MSG_ERR)
{
Serial.println("unable to enqueue");
msg->unref(); //queue
msg->unref(); //our
request->send(500, "text/plain", "queue full");
return;
}
LOG_DEBUG(GwLog::DEBUG + 1, "wait queue");
if (msg->wait(500))
if (st == GwRequestQueue::MSG_OK)
{
LOG_DEBUG(GwLog::DEBUG + 1, "request ok");
request->send(200, msg->getContentType(), msg->getResult());
msg->unref();
return;
}
LOG_DEBUG(GwLog::DEBUG + 1, "switching to async");
LOG_DEBUG(GwLog::DEBUG + 1, "switching to async for %s",msg->getName().c_str());
//msg is handed over to async handling
bool finished = false;
AsyncWebServerResponse *r = request->beginChunkedResponse(
msg->getContentType(), [this,msg, finished](uint8_t *ptr, size_t len, size_t len2) -> size_t
{
LOG_DEBUG(GwLog::DEBUG + 1, "try read");
LOG_DEBUG(GwLog::DEBUG + 1, "try read for %s",msg->getName().c_str());
if (msg->isHandled() || msg->wait(1))
{
int rt = msg->consume(ptr, len);
@ -106,7 +102,7 @@ void GwWebServer::handleAsyncWebRequest(AsyncWebServerRequest *request, RequestM
}
bool GwWebServer::registerMainHandler(const char *url,RequestCreator creator){
server->on(url,HTTP_GET, [this,creator,url](AsyncWebServerRequest *request){
RequestMessage *msg=(*creator)(request);
GwRequestMessage *msg=(*creator)(request);
if (!msg){
LOG_DEBUG(GwLog::DEBUG,"creator returns NULL for %s",url);
request->send(404, "text/plain", "Not found");
@ -115,14 +111,4 @@ bool GwWebServer::registerMainHandler(const char *url,RequestCreator creator){
handleAsyncWebRequest(request,msg);
});
}
//to be called from the main loop
void GwWebServer::fetchMainRequest()
{
Message *msg = NULL;
if (xQueueReceive(queue, &msg, 0))
{
LOG_DEBUG(GwLog::DEBUG + 1, "main message");
msg->process();
msg->unref();
}
}

View File

@ -6,15 +6,14 @@
class GwWebServer{
private:
AsyncWebServer *server;
QueueHandle_t queue;
GwRequestQueue *queue;
GwLog *logger;
public:
typedef RequestMessage *(RequestCreator)(AsyncWebServerRequest *request);
GwWebServer(GwLog *logger, int port);
typedef GwRequestMessage *(RequestCreator)(AsyncWebServerRequest *request);
GwWebServer(GwLog *logger, GwRequestQueue *queue,int port);
~GwWebServer();
void begin();
bool registerMainHandler(const char *url,RequestCreator creator);
void fetchMainRequest(); //to be called from main loop
void handleAsyncWebRequest(AsyncWebServerRequest *request, RequestMessage *msg);
void handleAsyncWebRequest(AsyncWebServerRequest *request, GwRequestMessage *msg);
};
#endif

View File

@ -80,7 +80,8 @@ const unsigned long TransmitMessages[] PROGMEM = {127489L, // Engine dynamic
void HandleNMEA2000Msg(const tN2kMsg &N2kMsg);
void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg,int id);
GwWebServer webserver(&logger,80);
GwRequestQueue mainQueue(&logger,20);
GwWebServer webserver(&logger,&mainQueue,80);
// Serial port 2 config (GPIO 16)
const int baudrate = 38400;
@ -127,10 +128,10 @@ void delayedRestart(){
//register the requests at the webserver that should
//be processed inside the main loop
//this prevents us from the need to sync all the accesses
class ResetRequest : public RequestMessage
class ResetRequest : public GwRequestMessage
{
public:
ResetRequest() : RequestMessage(F("application/json")){};
ResetRequest() : GwRequestMessage(F("application/json"),F("reset")){};
protected:
virtual void processRequest()
@ -141,10 +142,10 @@ protected:
}
};
class StatusRequest : public RequestMessage
class StatusRequest : public GwRequestMessage
{
public:
StatusRequest() : RequestMessage(F("application/json")){};
StatusRequest() : GwRequestMessage(F("application/json"),F("status")){};
protected:
virtual void processRequest()
@ -161,10 +162,10 @@ protected:
serializeJson(status, result);
}
};
class ConfigRequest : public RequestMessage
class ConfigRequest : public GwRequestMessage
{
public:
ConfigRequest() : RequestMessage(F("application/json")){};
ConfigRequest() : GwRequestMessage(F("application/json"),F("config")){};
protected:
virtual void processRequest()
@ -173,10 +174,10 @@ protected:
}
};
class SetConfigRequest : public RequestMessage
class SetConfigRequest : public GwRequestMessage
{
public:
SetConfigRequest() : RequestMessage(F("application/json")){};
SetConfigRequest() : GwRequestMessage(F("application/json"),F("setConfig")){};
StringMap args;
protected:
@ -212,10 +213,10 @@ protected:
}
}
};
class ResetConfigRequest : public RequestMessage
class ResetConfigRequest : public GwRequestMessage
{
public:
ResetConfigRequest() : RequestMessage(F("application/json")){};
ResetConfigRequest() : GwRequestMessage(F("application/json"),F("resetConfig")){};
protected:
virtual void processRequest()
@ -226,10 +227,10 @@ protected:
delayedRestart();
}
};
class BoatDataRequest : public RequestMessage
class BoatDataRequest : public GwRequestMessage
{
public:
BoatDataRequest() : RequestMessage(F("application/json")){};
BoatDataRequest() : GwRequestMessage(F("application/json"),F("boatData")){};
protected:
virtual void processRequest()
@ -277,15 +278,15 @@ void setup() {
// Start TCP server
socketServer.begin();
webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->RequestMessage *{
webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->GwRequestMessage *{
return new ResetRequest();
});
webserver.registerMainHandler("/api/status", [](AsyncWebServerRequest *request)->RequestMessage *
webserver.registerMainHandler("/api/status", [](AsyncWebServerRequest *request)->GwRequestMessage *
{ return new StatusRequest(); });
webserver.registerMainHandler("/api/config", [](AsyncWebServerRequest *request)->RequestMessage *
webserver.registerMainHandler("/api/config", [](AsyncWebServerRequest *request)->GwRequestMessage *
{ return new ConfigRequest(); });
webserver.registerMainHandler("/api/setConfig",
[](AsyncWebServerRequest *request)->RequestMessage *
[](AsyncWebServerRequest *request)->GwRequestMessage *
{
StringMap args;
for (int i = 0; i < request->args(); i++)
@ -296,9 +297,9 @@ void setup() {
msg->args = args;
return msg;
});
webserver.registerMainHandler("/api/resetConfig", [](AsyncWebServerRequest *request)->RequestMessage *
webserver.registerMainHandler("/api/resetConfig", [](AsyncWebServerRequest *request)->GwRequestMessage *
{ return new ResetConfigRequest(); });
webserver.registerMainHandler("/api/boatData", [](AsyncWebServerRequest *request)->RequestMessage *
webserver.registerMainHandler("/api/boatData", [](AsyncWebServerRequest *request)->GwRequestMessage *
{ return new BoatDataRequest(); });
webserver.begin();
@ -467,10 +468,14 @@ void loop() {
}
nmea0183Converter->loop();
webserver.fetchMainRequest();
//read channels
socketServer.readMessages(&receiver);
usbSerial.readMessages(&receiver);
//handle message requests
GwMessage *msg=mainQueue.fetchMessage(0);
if (msg){
msg->process();
msg->unref();
}
}