esp32-nmea2000-obp60/lib/queue/GwMessage.cxx

131 lines
3.7 KiB
C++

#include "GwMessage.h"
GwMessage::GwMessage(String name)
{
this->name = name;
locker = xSemaphoreCreateMutex();
notifier = xSemaphoreCreateCounting(1, 0);
refcount = 1;
GW_MESSAGE_DEBUG("Message: %p\n", this);
}
GwMessage::~GwMessage()
{
GW_MESSAGE_DEBUG("~Message %p\n", this);
vSemaphoreDelete(locker);
vSemaphoreDelete(notifier);
}
void GwMessage::unref()
{
GW_MESSAGE_DEBUG("Message::unref %p\n", this);
bool mustDelete = false;
xSemaphoreTake(locker, portMAX_DELAY);
if (refcount > 0)
{
refcount--;
if (refcount == 0)
mustDelete = true;
}
xSemaphoreGive(locker);
if (mustDelete)
{
delete this;
}
}
void GwMessage::ref()
{
xSemaphoreTake(locker, portMAX_DELAY);
refcount++;
xSemaphoreGive(locker);
}
int GwMessage::wait(int maxWaitMillis)
{
static const int maxWait = 1000;
int maxRetries = maxWaitMillis / maxWait;
int lastWait = maxWaitMillis - maxWait * maxRetries;
for (int retries = maxRetries; retries > 0; retries--)
{
if (xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait)))
return true;
esp_task_wdt_reset();
}
if (lastWait)
{
return xSemaphoreTake(notifier, pdMS_TO_TICKS(maxWait));
}
return false;
}
void GwMessage::process()
{
GW_MESSAGE_DEBUG("Message::process %p\n", this);
processImpl();
xSemaphoreGive(notifier);
}
String GwMessage::getName() { return name; }
void GwRequestMessage::processImpl(){
GW_MESSAGE_DEBUG("RequestMessage processImpl(1) %p\n",this);
processRequest();
GW_MESSAGE_DEBUG("RequestMessage processImpl(2) %p\n",this);
len=strlen(result.c_str());
consumed=0;
handled=true;
}
GwRequestMessage::GwRequestMessage(String contentType, String name):GwMessage(name){
this->contentType=contentType;
}
GwRequestMessage::~GwRequestMessage(){
GW_MESSAGE_DEBUG("~RequestMessage %p\n",this)
}
int GwRequestMessage::consume(uint8_t *destination,int maxLen){
if (!handled) return RESPONSE_TRY_AGAIN;
if (consumed >= len) return 0;
int cplen=maxLen;
if (cplen > (len-consumed)) cplen=len-consumed;
memcpy(destination,result.c_str()+consumed,cplen);
consumed+=cplen;
return cplen;
}
GwRequestQueue::GwRequestQueue(GwLog *logger,int len){
theQueue=xQueueCreate(len,sizeof(GwMessage*));
this->logger=logger;
}
GwRequestQueue::~GwRequestQueue(){
vQueueDelete(theQueue);
}
GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndForget(GwMessage *msg){
msg->ref(); //for the queue
if (!xQueueSend(theQueue, &msg, 0))
{
LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str());
msg->unref();
return MSG_ERR;
}
return MSG_OK;
}
GwRequestQueue::MessageSendStatus GwRequestQueue::sendAndWait(GwMessage *msg,unsigned long waitMillis){
msg->ref(); //for the queue
msg->ref(); //for us waiting
if (!xQueueSend(theQueue, &msg, 0))
{
LOG_DEBUG(GwLog::LOG,"unable to enqueue %s",msg->getName().c_str());
msg->unref();
msg->unref();
return MSG_ERR;
}
LOG_DEBUG(GwLog::DEBUG + 1, "wait queue for %s",msg->getName().c_str());
if (msg->wait(waitMillis)){
LOG_DEBUG(GwLog::DEBUG + 1, "request ok for %s",msg->getName().c_str());
msg->unref();
return MSG_OK;
}
LOG_DEBUG(GwLog::DEBUG, "request timeout for %s",msg->getName().c_str());
msg->unref();
return MSG_TIMEOUT;
}
GwMessage* GwRequestQueue::fetchMessage(unsigned long waitMillis){
GwMessage *msg=NULL;
if (xQueueReceive(theQueue,&msg,waitMillis)){
return msg;
}
return NULL;
}