From 47fb805ee6cc2d14881af8182c4a33f03f7eb438 Mon Sep 17 00:00:00 2001 From: wellenvogel Date: Fri, 31 Dec 2021 18:38:11 +0100 Subject: [PATCH] intermediate: introduce an abstract channel --- lib/channel/GwChannel.cpp | 213 ++++++++++++++++++ .../{GwChannelConfig.h => GwChannel.h} | 33 ++- lib/channel/GwChannelConfig.cpp | 104 --------- lib/channel/GwChannelInterface.h | 9 + lib/serial/GwSerial.cpp | 10 +- lib/serial/GwSerial.h | 11 +- lib/socketserver/GwSocketServer.cpp | 15 +- lib/socketserver/GwSocketServer.h | 9 +- src/main.cpp | 206 +++++++---------- 9 files changed, 351 insertions(+), 259 deletions(-) create mode 100644 lib/channel/GwChannel.cpp rename lib/channel/{GwChannelConfig.h => GwChannel.h} (54%) delete mode 100644 lib/channel/GwChannelConfig.cpp create mode 100644 lib/channel/GwChannelInterface.h diff --git a/lib/channel/GwChannel.cpp b/lib/channel/GwChannel.cpp new file mode 100644 index 0000000..bee3772 --- /dev/null +++ b/lib/channel/GwChannel.cpp @@ -0,0 +1,213 @@ +#include "GwChannel.h" +#include + + +class GwChannelMessageReceiver : public GwMessageFetcher{ + static const int bufferSize=GwBuffer::RX_BUFFER_SIZE+4; + uint8_t buffer[bufferSize]; + uint8_t *writePointer=buffer; + GwLog *logger; + GwChannel *channel; + GwChannel::NMEA0183Handler handler; + public: + GwChannelMessageReceiver(GwLog *logger,GwChannel *channel){ + this->logger=logger; + this->channel=channel; + } + void setHandler(GwChannel::NMEA0183Handler handler){ + this->handler=handler; + } + virtual bool handleBuffer(GwBuffer *gwbuffer){ + size_t len=fetchMessageToBuffer(gwbuffer,buffer,bufferSize-4,'\n'); + writePointer=buffer+len; + if (writePointer == buffer) return false; + uint8_t *p; + for (p=writePointer-1;p>=buffer && *p <= 0x20;p--){ + *p=0; + } + if (p > buffer){ + p++; + *p=0x0d; + p++; + *p=0x0a; + p++; + *p=0; + } + for (p=buffer; *p != 0 && p < writePointer && *p <= 0x20;p++){} + //very simple NMEA check + if (*p != '!' && *p != '$'){ + LOG_DEBUG(GwLog::DEBUG,"unknown line [%d] - ignore: %s",id,(const char *)p); + } + else{ + LOG_DEBUG(GwLog::DEBUG,"NMEA[%d]: %s",id,(const char *)p); + if (channel->canReceive((const char *)p)){ + handler((const char *)p,id); + } + } + writePointer=buffer; + return true; + } +}; + + +GwChannel::GwChannel(GwLog *logger, + String name, + int sourceId){ + this->logger = logger; + this->name=name; + this->sourceId=sourceId; + this->countIn=new GwCounter(String("count")+name+String("in")); + this->countOut=new GwCounter(String("count")+name+String("out")); + this->impl=NULL; + this->receiver=new GwChannelMessageReceiver(logger,this); + this->actisenseReader=NULL; +} +void GwChannel::begin( + bool enabled, + bool nmeaOut, + bool nmeaIn, + String readFilter, + String writeFilter, + bool seaSmartOut, + bool toN2k, + bool readActisense, + bool writeActisense) +{ + this->enabled = enabled; + this->NMEAout = nmeaOut; + this->NMEAin = nmeaIn; + this->readFilter=readFilter.isEmpty()? + NULL: + new GwNmeaFilter(readFilter); + this->writeFilter=writeFilter.isEmpty()? + NULL: + new GwNmeaFilter(writeFilter); + this->seaSmartOut=seaSmartOut; + this->toN2k=toN2k; + this->readActisense=readActisense; + this->writeActisense=writeActisense; + if (readActisense|| writeActisense){ + channelStream=impl->getStream(false); + if (! channelStream) { + this->readActisense=false; + this->writeActisense=false; + LOG_DEBUG(GwLog::ERROR,"unable to read actisnse on %s",name.c_str()); + } + else{ + if (readActisense){ + this->actisenseReader= new tActisenseReader(); + actisenseReader->SetReadStream(channelStream); + } + } + } +} +void GwChannel::setImpl(GwChannelInterface *impl){ + this->impl=impl; +} +void GwChannel::updateCounter(const char *msg, bool out) +{ + char key[6]; + if (msg[0] == '$') + { + strncpy(key, &msg[3], 3); + key[3] = 0; + } + else if (msg[0] == '!') + { + strncpy(key, &msg[1], 5); + key[5] = 0; + } + else{ + return; + } + if (out){ + countOut->add(key); + } + else{ + countIn->add(key); + } +} +bool GwChannel::canSendOut(unsigned long pgn){ + if (! enabled) return false; + if (! NMEAout) return false; + countOut->add(String(pgn)); + return true; +} +bool GwChannel::canReceive(unsigned long pgn){ + if (!enabled) return false; + if (!NMEAin) return false; + countIn->add(String(pgn)); + return true; +} + +bool GwChannel::canSendOut(const char *buffer){ + if (! enabled) return false; + if (! NMEAout) return false; + if (writeFilter && ! writeFilter->canPass(buffer)) return false; + updateCounter(buffer,true); + return true; +} + +bool GwChannel::canReceive(const char *buffer){ + if (! enabled) return false; + if (! NMEAin) return false; + if (readFilter && ! readFilter->canPass(buffer)) return false; + updateCounter(buffer,false); + return true; +} + +int GwChannel::getJsonSize(){ + if (! enabled) return 0; + int rt=2; + if (countIn) rt+=countIn->getJsonSize(); + if (countOut) rt+=countOut->getJsonSize(); + return rt; +} +void GwChannel::toJson(GwJsonDocument &doc){ + if (! enabled) return; + if (countOut) countOut->toJson(doc); + if (countIn) countIn->toJson(doc); +} +String GwChannel::toString(){ + String rt="CH:"+name; + rt+=enabled?"[ena]":"[dis]"; + rt+=NMEAin?"in,":""; + rt+=NMEAout?"out,":""; + rt+=String("RF:") + (readFilter?readFilter->toString():"[]"); + rt+=String("WF:") + (writeFilter?writeFilter->toString():"[]"); + rt+=String(",")+ (toN2k?"n2k":""); + rt+=String(",")+ (seaSmartOut?"SM":""); + return rt; +} +void GwChannel::loop(bool handleRead, bool handleWrite){ + if (! enabled || ! impl) return; + impl->loop(handleRead,handleWrite); +} +void GwChannel::readMessages(GwChannel::NMEA0183Handler handler){ + if (! enabled || ! impl) return; + if (readActisense || ! NMEAin) return; + receiver->id=sourceId; + receiver->setHandler(handler); + impl->readMessages(receiver); +} +void GwChannel::sendToClients(const char *buffer, int sourceId){ + if (! impl) return; + if (canSendOut(buffer)){ + impl->sendToClients(buffer,sourceId); + } +} +void GwChannel::parseActisense(N2kHandler handler){ + if (!enabled || ! impl || ! readActisense || ! actisenseReader) return; + tN2kMsg N2kMsg; + + while (actisenseReader->GetMessageFromStream(N2kMsg)) { + canReceive(N2kMsg.PGN); + handler(N2kMsg,sourceId); + } +} + +void GwChannel::sendActisense(const tN2kMsg &msg){ + if (!enabled || ! impl || ! writeActisense || ! channelStream) return; + canSendOut(msg.PGN); + msg.SendInActisenseFormat(channelStream); +} diff --git a/lib/channel/GwChannelConfig.h b/lib/channel/GwChannel.h similarity index 54% rename from lib/channel/GwChannelConfig.h rename to lib/channel/GwChannel.h index 1cacf95..940f599 100644 --- a/lib/channel/GwChannelConfig.h +++ b/lib/channel/GwChannel.h @@ -1,11 +1,16 @@ #pragma once +#include "GwChannelInterface.h" #include "GwConfigItem.h" #include "GwLog.h" #include "GWConfig.h" #include "GwCounter.h" #include "GwJsonDocument.h" +#include +#include -class GwChannelConfig{ +class GwChannelMessageReceiver; +class tActisenseReader; +class GwChannel{ bool enabled=false; bool NMEAout=false; bool NMEAin=false; @@ -13,15 +18,23 @@ class GwChannelConfig{ GwNmeaFilter* writeFilter=NULL; bool seaSmartOut=false; bool toN2k=false; + bool readActisense=false; + bool writeActisense=false; GwLog *logger; String name; GwCounter *countIn=NULL; GwCounter *countOut=NULL; + GwChannelInterface *impl; + int sourceId=0; + GwChannelMessageReceiver *receiver=NULL; + tActisenseReader *actisenseReader=NULL; + Stream *channelStream=NULL; void updateCounter(const char *msg, bool out); public: - GwChannelConfig( + GwChannel( GwLog *logger, - String name); + String name, + int sourceId); void begin( bool enabled, bool nmeaOut, @@ -29,9 +42,13 @@ class GwChannelConfig{ String readFilter, String writeFilter, bool seaSmartOut, - bool toN2k + bool toN2k, + bool readActisense=false, + bool writeActisense=false ); + void setImpl(GwChannelInterface *impl); + void enable(bool enabled){ this->enabled=enabled; } @@ -46,5 +63,13 @@ class GwChannelConfig{ int getJsonSize(); void toJson(GwJsonDocument &doc); String toString(); + + void loop(bool handleRead, bool handleWrite); + typedef std::function NMEA0183Handler; + void readMessages(NMEA0183Handler handler); + void sendToClients(const char *buffer, int sourceId); + typedef std::function N2kHandler ; + void parseActisense(N2kHandler handler); + void sendActisense(const tN2kMsg &msg); }; diff --git a/lib/channel/GwChannelConfig.cpp b/lib/channel/GwChannelConfig.cpp deleted file mode 100644 index 245df25..0000000 --- a/lib/channel/GwChannelConfig.cpp +++ /dev/null @@ -1,104 +0,0 @@ -#include "GwChannelConfig.h" - -GwChannelConfig::GwChannelConfig(GwLog *logger,String name){ - this->logger = logger; - this->name=name; - this->countIn=new GwCounter(String("count")+name+String("in")); - this->countOut=new GwCounter(String("count")+name+String("out")); -} -void GwChannelConfig::begin( - bool enabled, - bool nmeaOut, - bool nmeaIn, - String readFilter, - String writeFilter, - bool seaSmartOut, - bool toN2k) -{ - this->enabled = enabled; - this->NMEAout = nmeaOut; - this->NMEAin = nmeaIn; - this->readFilter=readFilter.isEmpty()? - NULL: - new GwNmeaFilter(readFilter); - this->writeFilter=writeFilter.isEmpty()? - NULL: - new GwNmeaFilter(writeFilter); - this->seaSmartOut=seaSmartOut; - this->toN2k=toN2k; -} -void GwChannelConfig::updateCounter(const char *msg, bool out) -{ - char key[6]; - if (msg[0] == '$') - { - strncpy(key, &msg[3], 3); - key[3] = 0; - } - else if (msg[0] == '!') - { - strncpy(key, &msg[1], 5); - key[5] = 0; - } - else{ - return; - } - if (out){ - countOut->add(key); - } - else{ - countIn->add(key); - } -} -bool GwChannelConfig::canSendOut(unsigned long pgn){ - if (! enabled) return false; - if (! NMEAout) return false; - countOut->add(String(pgn)); - return true; -} -bool GwChannelConfig::canReceive(unsigned long pgn){ - if (!enabled) return false; - if (!NMEAin) return false; - countIn->add(String(pgn)); - return true; -} - -bool GwChannelConfig::canSendOut(const char *buffer){ - if (! enabled) return false; - if (! NMEAout) return false; - if (writeFilter && ! writeFilter->canPass(buffer)) return false; - updateCounter(buffer,true); - return true; -} - -bool GwChannelConfig::canReceive(const char *buffer){ - if (! enabled) return false; - if (! NMEAin) return false; - if (readFilter && ! readFilter->canPass(buffer)) return false; - updateCounter(buffer,false); - return true; -} - -int GwChannelConfig::getJsonSize(){ - if (! enabled) return 0; - int rt=2; - if (countIn) rt+=countIn->getJsonSize(); - if (countOut) rt+=countOut->getJsonSize(); - return rt; -} -void GwChannelConfig::toJson(GwJsonDocument &doc){ - if (! enabled) return; - if (countOut) countOut->toJson(doc); - if (countIn) countIn->toJson(doc); -} -String GwChannelConfig::toString(){ - String rt="CH:"+name; - rt+=enabled?"[ena]":"[dis]"; - rt+=NMEAin?"in,":""; - rt+=NMEAout?"out,":""; - rt+=String("RF:") + (readFilter?readFilter->toString():"[]"); - rt+=String("WF:") + (writeFilter?writeFilter->toString():"[]"); - rt+=String(",")+ (toN2k?"n2k":""); - rt+=String(",")+ (seaSmartOut?"SM":""); - return rt; -} diff --git a/lib/channel/GwChannelInterface.h b/lib/channel/GwChannelInterface.h new file mode 100644 index 0000000..f9b076c --- /dev/null +++ b/lib/channel/GwChannelInterface.h @@ -0,0 +1,9 @@ +#pragma once +#include "GwBuffer.h" +class GwChannelInterface{ + public: + virtual void loop(bool handleRead,bool handleWrite)=0; + virtual void readMessages(GwMessageFetcher *writer)=0; + virtual size_t sendToClients(const char *buffer, int sourceId, bool partial=false)=0; + virtual Stream * getStream(bool partialWrites){ return NULL;} +}; \ No newline at end of file diff --git a/lib/serial/GwSerial.cpp b/lib/serial/GwSerial.cpp index 1539bb3..6f35856 100644 --- a/lib/serial/GwSerial.cpp +++ b/lib/serial/GwSerial.cpp @@ -96,7 +96,7 @@ size_t GwSerial::sendToClients(const char *buf,int sourceId,bool partial){ } return enqueued; } -void GwSerial::loop(bool handleRead){ +void GwSerial::loop(bool handleRead,bool handleWrite){ write(); if (! isInitialized()) return; if (! handleRead) return; @@ -116,10 +116,10 @@ void GwSerial::loop(bool handleRead){ serial->readBytes(buffer,available); } } -bool GwSerial::readMessages(GwMessageFetcher *writer){ - if (! isInitialized()) return false; - if (! allowRead) return false; - return writer->handleBuffer(readBuffer); +void GwSerial::readMessages(GwMessageFetcher *writer){ + if (! isInitialized()) return; + if (! allowRead) return; + writer->handleBuffer(readBuffer); } void GwSerial::flush(){ diff --git a/lib/serial/GwSerial.h b/lib/serial/GwSerial.h index 1fd23d6..b3880fc 100644 --- a/lib/serial/GwSerial.h +++ b/lib/serial/GwSerial.h @@ -3,8 +3,9 @@ #include "HardwareSerial.h" #include "GwLog.h" #include "GwBuffer.h" +#include "GwChannelInterface.h" class GwSerialStream; -class GwSerial{ +class GwSerial : public GwChannelInterface{ private: GwBuffer *buffer; GwBuffer *readBuffer=NULL; @@ -23,11 +24,11 @@ class GwSerial{ ~GwSerial(); int setup(int baud,int rxpin,int txpin); bool isInitialized(); - size_t sendToClients(const char *buf,int sourceId,bool partial=false); - void loop(bool handleRead=true); - bool readMessages(GwMessageFetcher *writer); + virtual size_t sendToClients(const char *buf,int sourceId,bool partial=false); + virtual void loop(bool handleRead=true,bool handleWrite=true); + virtual void readMessages(GwMessageFetcher *writer); void flush(); - Stream *getStream(bool partialWrites); + virtual Stream *getStream(bool partialWrites); friend GwSerialStream; }; #endif \ No newline at end of file diff --git a/lib/socketserver/GwSocketServer.cpp b/lib/socketserver/GwSocketServer.cpp index c53a5ee..26521e3 100644 --- a/lib/socketserver/GwSocketServer.cpp +++ b/lib/socketserver/GwSocketServer.cpp @@ -133,25 +133,23 @@ void GwSocketServer::loop(bool handleRead, bool handleWrite) } } -bool GwSocketServer::readMessages(GwMessageFetcher *writer) +void GwSocketServer::readMessages(GwMessageFetcher *writer) { if (!allowReceive || !clients) - return false; - bool hasMessages = false; + return; for (int i = 0; i < maxClients; i++) { writer->id = minId + i; if (!clients[i]->hasClient()) continue; - if (clients[i]->messagesFromBuffer(writer)) - hasMessages = true; + clients[i]->messagesFromBuffer(writer); } - return hasMessages; + return; } -void GwSocketServer::sendToClients(const char *buf, int source) +size_t GwSocketServer::sendToClients(const char *buf, int source,bool partial) { if (!clients) - return; + return 0; int len = strlen(buf); int sourceIndex = source - minId; for (int i = 0; i < maxClients; i++) @@ -166,6 +164,7 @@ void GwSocketServer::sendToClients(const char *buf, int source) client->enqueue((uint8_t *)buf, len); } } + return len; } int GwSocketServer::numClients() diff --git a/lib/socketserver/GwSocketServer.h b/lib/socketserver/GwSocketServer.h index e3762e7..248fc95 100644 --- a/lib/socketserver/GwSocketServer.h +++ b/lib/socketserver/GwSocketServer.h @@ -3,10 +3,11 @@ #include "GWConfig.h" #include "GwLog.h" #include "GwBuffer.h" +#include "GwChannelInterface.h" #include class GwSocketConnection; -class GwSocketServer{ +class GwSocketServer: public GwChannelInterface{ private: const GwConfigHandler *config; GwLog *logger; @@ -22,9 +23,9 @@ class GwSocketServer{ GwSocketServer(const GwConfigHandler *config,GwLog *logger,int minId); ~GwSocketServer(); void begin(); - void loop(bool handleRead=true,bool handleWrite=true); - void sendToClients(const char *buf,int sourceId); + virtual void loop(bool handleRead=true,bool handleWrite=true); + virtual size_t sendToClients(const char *buf,int sourceId, bool partialWrite=false); int numClients(); - bool readMessages(GwMessageFetcher *writer); + virtual void readMessages(GwMessageFetcher *writer); }; #endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 505028f..6075ff6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -62,7 +62,7 @@ const unsigned long HEAP_REPORT_TIME=2000; //set to 0 to disable heap reporting #include "GwStatistics.h" #include "GwUpdate.h" #include "GwTcpClient.h" -#include "GwChannelConfig.h" +#include "GwChannel.h" //NMEA message channels @@ -122,8 +122,6 @@ int NodeAddress; // To store last Node Address Preferences preferences; // Nonvolatile storage on ESP32 - To store LastDeviceAddress N2kDataToNMEA0183 *nmea0183Converter=NULL; NMEA0183DataToN2K *toN2KConverter=NULL; -tActisenseReader *actisenseReader=NULL; -Stream *usbStream=NULL; SemaphoreHandle_t mainLock; @@ -133,13 +131,15 @@ GwWebServer webserver(&logger,&mainQueue,80); GwCounter countNMEA2KIn("count2Kin"); GwCounter countNMEA2KOut("count2Kout"); -GwChannelConfig usbChannel(&logger,"USB"); -GwChannelConfig actisenseChannel(&logger,"USB"); -GwChannelConfig tcpChannel(&logger,"TCPServer"); -GwChannelConfig serialChannel(&logger,"SER"); -GwChannelConfig tclChannel(&logger,"TCPClient"); +GwChannel usbChannel(&logger,"USB",USB_CHANNEL_ID); +GwChannel tcpChannel(&logger,"TCPServer",MIN_TCP_CHANNEL_ID); +GwChannel serialChannel(&logger,"SER",SERIAL1_CHANNEL_ID); +GwChannel tclChannel(&logger,"TCPClient",TCP_CLIENT_CHANNEL_ID); -GwChannelConfig * channelFromSource(int source){ +GwChannel *allChannels[]={&usbChannel,&tcpChannel,&serialChannel,&tclChannel}; +const int numChannels=sizeof(allChannels)/sizeof(GwChannel*); + +GwChannel * channelFromSource(int source){ if (source == USB_CHANNEL_ID) return &usbChannel; if (source == SERIAL1_CHANNEL_ID) return &serialChannel; if (source == TCP_CLIENT_CHANNEL_ID) return &tclChannel; @@ -204,17 +204,6 @@ bool serCanRead=true; GwSerial *usbSerial = new GwSerial(NULL, 0, USB_CHANNEL_ID); GwSerial *serial1=NULL; -void sendBufferToChannels(const char * buffer, int sourceId){ - if (tcpChannel.canSendOut(buffer)){ - socketServer.sendToClients(buffer,sourceId); - } - if (sourceId != USB_CHANNEL_ID && usbChannel.canSendOut(buffer)){ - usbSerial->sendToClients(buffer,sourceId); - } - if (serial1 && sourceId != SERIAL1_CHANNEL_ID && serialChannel.canSendOut(buffer)){ - serial1->sendToClients(buffer,sourceId); - } -} typedef enum { N2KT_MSGIN, //from CAN N2KT_MSGINT, //from internal source @@ -228,18 +217,26 @@ void handleN2kMessage(const tN2kMsg &n2kMsg,N2K_MsgDirection direction) if (direction == N2KT_MSGIN){ countNMEA2KIn.add(n2kMsg.PGN); } - if (tcpChannel.sendSeaSmart() || tclChannel.sendSeaSmart()) - { - char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE]; - if (N2kToSeasmart(n2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) == 0) - return; - if (tcpChannel.sendSeaSmart()){ - socketServer.sendToClients(buf, N2K_CHANNEL_ID); + char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE]; + bool messageCreated=false; + for (int i=0;isendSeaSmart()){ + if (! messageCreated){ + if (N2kToSeasmart(n2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) != 0) { + messageCreated=true; + } + } + if (messageCreated){ + allChannels[i]->sendToClients(buf,N2K_CHANNEL_ID); + } } } - if (actisenseReader && direction != N2KT_MSGACT && usbStream && actisenseChannel.canSendOut(n2kMsg.PGN)) + + if (direction != N2KT_MSGACT) { - n2kMsg.SendInActisenseFormat(usbStream); + for (int i=0;isendActisense(n2kMsg); + } } if (direction != N2KT_MSGOUT){ nmea0183Converter->HandleMsg(n2kMsg); @@ -250,14 +247,6 @@ void handleN2kMessage(const tN2kMsg &n2kMsg,N2K_MsgDirection direction) } }; -void handleReceivedNmeaMessage(const char *buf, int sourceId){ - GwChannelConfig *channel=channelFromSource(sourceId); - if (channel && ! channel->canReceive(buf)) return; - if (! channel || channel->sendToN2K()){ - toN2KConverter->parseAndSend(buf,sourceId); - } - sendBufferToChannels(buf,sourceId); -} //***************************************************************************** void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg, int sourceId,bool convert=false) { @@ -272,7 +261,9 @@ void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg, int sourceId,bool conv buf[len]=0x0d; buf[len+1]=0x0a; buf[len+2]=0; - sendBufferToChannels(buf,sourceId); + for (int i=0;i< numChannels;i++){ + allChannels[i]->sendToClients(buf,sourceId); + } } class GwSerialLog : public GwLogWriter{ @@ -431,7 +422,6 @@ protected: usbChannel.getJsonSize()+ tcpChannel.getJsonSize()+ serialChannel.getJsonSize()+ - actisenseChannel.getJsonSize()+ tclChannel.getJsonSize() ); status["version"] = VERSION; @@ -449,7 +439,6 @@ protected: countNMEA2KIn.toJson(status); countNMEA2KOut.toJson(status); usbChannel.toJson(status); - actisenseChannel.toJson(status); serialChannel.toJson(status); tcpChannel.toJson(status); tclChannel.toJson(status); @@ -715,33 +704,28 @@ void setup() { if (serial1){ int rt=serial1->setup(config.getInt(config.serialBaud,115200),serialrx,serialtx); logger.logDebug(GwLog::LOG,"starting serial returns %d",rt); + serialChannel.setImpl(serial1); } #endif - + usbChannel.setImpl(usbSerial); MDNS.begin(config.getConfigItem(config.systemName)->asCString()); gwWifi.setup(); // Start TCP server socketServer.begin(); + tcpChannel.setImpl(&socketServer); logger.flush(); - usbChannel.begin(!config.getBool(config.usbActisense), + usbChannel.begin(true, config.getBool(config.sendUsb), config.getBool(config.receiveUsb), config.getString(config.usbReadFilter), config.getString(config.usbWriteFilter), false, - config.getBool(config.usbToN2k)); - logger.logDebug(GwLog::LOG,"%s",usbChannel.toString().c_str()); - actisenseChannel.begin( + config.getBool(config.usbToN2k), config.getBool(config.usbActisense), - config.getBool(config.usbActSend), - true, - "", - "", - false, - true - ); - logger.logDebug(GwLog::LOG,"ACT:%s",actisenseChannel.toString().c_str()); + config.getBool(config.usbActSend) + ); + logger.logDebug(GwLog::LOG,"%s",usbChannel.toString().c_str()); tcpChannel.begin( true, config.getBool(config.sendTCP), @@ -749,7 +733,9 @@ void setup() { config.getString(config.tcpReadFilter), config.getString(config.tcpWriteFilter), config.getBool(config.sendSeasmart), - config.getBool(config.tcpToN2k) + config.getBool(config.tcpToN2k), + false, + false ); logger.logDebug(GwLog::LOG,"%s",tcpChannel.toString().c_str()); serialChannel.begin( @@ -759,7 +745,9 @@ void setup() { config.getString(config.serialReadF), config.getString(config.serialWriteF), false, - config.getBool(config.serialToN2k) + config.getBool(config.serialToN2k), + false, + false ); logger.logDebug(GwLog::LOG,"%s",serialChannel.toString().c_str()); tclChannel.begin( @@ -769,7 +757,9 @@ void setup() { config.getString(config.tclReadFilter), config.getString(config.tclReadFilter), config.getBool(config.tclSeasmart), - config.getBool(config.tclToN2k) + config.getBool(config.tclToN2k), + false, + false ); logger.logDebug(GwLog::LOG,"%s",tclChannel.toString().c_str()); logger.flush(); @@ -890,15 +880,6 @@ void setup() { } NMEA2000.ExtendTransmitMessages(pgns); NMEA2000.ExtendReceiveMessages(nmea0183Converter->handledPgns()); - if (config.getBool(config.usbActisense)){ - actisenseReader=new tActisenseReader(); - usbStream=usbSerial->getStream(false); - actisenseReader->SetReadStream(usbStream); - actisenseReader->SetMsgHandler([](const tN2kMsg &msg){ - actisenseChannel.canReceive(msg.PGN); //count only - handleN2kMessage(msg,N2KT_MSGACT); - }); - } NMEA2000.SetMsgHandler([](const tN2kMsg &n2kMsg){ handleN2kMessage(n2kMsg,N2KT_MSGIN); }); @@ -918,49 +899,12 @@ void setup() { } //***************************************************************************** void handleSendAndRead(bool handleRead){ - socketServer.loop(handleRead); - usbSerial->loop(handleRead); - if (serial1) serial1->loop(handleRead); + for (int i=0;iloop(handleRead,true); + } } -class NMEAMessageReceiver : public GwMessageFetcher{ - static const int bufferSize=GwBuffer::RX_BUFFER_SIZE+4; - uint8_t buffer[bufferSize]; - uint8_t *writePointer=buffer; - public: - virtual bool handleBuffer(GwBuffer *gwbuffer){ - size_t len=fetchMessageToBuffer(gwbuffer,buffer,bufferSize-4,'\n'); - writePointer=buffer+len; - if (writePointer == buffer) return false; - uint8_t *p; - for (p=writePointer-1;p>=buffer && *p <= 0x20;p--){ - *p=0; - } - if (p > buffer){ - p++; - *p=0x0d; - p++; - *p=0x0a; - p++; - *p=0; - } - for (p=buffer; *p != 0 && p < writePointer && *p <= 0x20;p++){} - //very simple NMEA check - if (*p != '!' && *p != '$'){ - logger.logDebug(GwLog::DEBUG,"unknown line [%d] - ignore: %s",id,(const char *)p); - } - else{ - logger.logDebug(GwLog::DEBUG,"NMEA[%d]: %s",id,(const char *)p); - handleReceivedNmeaMessage((const char *)p,id); - //trigger sending to empty buffers - handleSendAndRead(false); - } - writePointer=buffer; - return true; - } -}; TimeMonitor monitor(20,0.2); -NMEAMessageReceiver receiver; unsigned long lastHeapReport=0; void loop() { monitor.reset(); @@ -981,20 +925,18 @@ void loop() { } } monitor.setTime(3); - //read sockets - socketServer.loop(true,false); + for (int i=0;iloop(true,false); + } + //reads monitor.setTime(4); - //write sockets - socketServer.loop(false,true); + for (int i=0;iloop(false,true); + } + //writes monitor.setTime(5); - usbSerial->loop(true); - monitor.setTime(6); - if (serial1) serial1->loop(true); - monitor.setTime(7); - handleSendAndRead(true); - monitor.setTime(8); NMEA2000.ParseMessages(); - monitor.setTime(9); + monitor.setTime(6); int SourceAddress = NMEA2000.GetN2kSource(); if (SourceAddress != NodeAddress) { // Save potentially changed Source Address to NVS memory @@ -1005,21 +947,27 @@ void loop() { logger.logDebug(GwLog::LOG,"Address Change: New Address=%d\n", SourceAddress); } nmea0183Converter->loop(); - monitor.setTime(10); + monitor.setTime(7); //read channels - if (tcpChannel.shouldRead()) socketServer.readMessages(&receiver); - monitor.setTime(11); - receiver.id=USB_CHANNEL_ID; - if (usbChannel.shouldRead()) usbSerial->readMessages(&receiver); - monitor.setTime(12); - receiver.id=SERIAL1_CHANNEL_ID; - if (serial1 && serialChannel.shouldRead() ) serial1->readMessages(&receiver); - monitor.setTime(13); - if (actisenseReader){ - actisenseReader->ParseMessages(); + for (int i=0;ireadMessages([&](const char * buffer, int sourceId){ + for (int j=0;jsendToClients(buffer,sourceId); + allChannels[j]->loop(false,true); + } + if (allChannels[i]->sendToN2K()){ + toN2KConverter->parseAndSend(buffer, sourceId); + } + }); } - monitor.setTime(14); + monitor.setTime(8); + for (int i=0;iparseActisense([](const tN2kMsg &msg,int source){ + handleN2kMessage(msg,N2KT_MSGACT); + }); + } + monitor.setTime(9); //handle message requests GwMessage *msg=mainQueue.fetchMessage(0); @@ -1027,5 +975,5 @@ void loop() { msg->process(); msg->unref(); } - monitor.setTime(15); + monitor.setTime(10); }