From d21e4978641ad42e72ee95d0a92d493b05b8eb7e Mon Sep 17 00:00:00 2001 From: wellenvogel Date: Sun, 2 Jan 2022 14:43:37 +0100 Subject: [PATCH] intermediate, untested: move channel handling out of main --- lib/channel/GwChannel.cpp | 14 +- lib/channel/GwChannel.h | 8 +- lib/channel/GwChannelList.cpp | 189 +++++++++++++++ lib/channel/GwChannelList.h | 43 ++++ lib/nmea0183ton2k/NMEA0183AIStoNMEA2000.h | 4 +- lib/nmea0183ton2k/NMEA0183DataToN2K.cpp | 81 ++++--- lib/nmea0183ton2k/NMEA0183DataToN2K.h | 2 +- lib/socketserver/GwTcpClient.cpp | 44 ++-- lib/socketserver/GwTcpClient.h | 15 +- src/main.cpp | 283 ++++------------------ 10 files changed, 381 insertions(+), 302 deletions(-) create mode 100644 lib/channel/GwChannelList.cpp create mode 100644 lib/channel/GwChannelList.h diff --git a/lib/channel/GwChannel.cpp b/lib/channel/GwChannel.cpp index bee3772..e51a1eb 100644 --- a/lib/channel/GwChannel.cpp +++ b/lib/channel/GwChannel.cpp @@ -52,10 +52,12 @@ class GwChannelMessageReceiver : public GwMessageFetcher{ GwChannel::GwChannel(GwLog *logger, String name, - int sourceId){ + int sourceId, + int maxSourceId){ this->logger = logger; this->name=name; this->sourceId=sourceId; + this->maxSourceId=sourceId; this->countIn=new GwCounter(String("count")+name+String("in")); this->countOut=new GwCounter(String("count")+name+String("out")); this->impl=NULL; @@ -206,8 +208,16 @@ void GwChannel::parseActisense(N2kHandler handler){ } } -void GwChannel::sendActisense(const tN2kMsg &msg){ +void GwChannel::sendActisense(const tN2kMsg &msg, int sourceId){ if (!enabled || ! impl || ! writeActisense || ! channelStream) return; + //currently actisense only for channels with a single source id + //so we can check it here + if (isOwnSource(sourceId)) return; canSendOut(msg.PGN); msg.SendInActisenseFormat(channelStream); } + +bool GwChannel::isOwnSource(int id){ + if (maxSourceId < 0) return id == sourceId; + else return (id >= sourceId && id <= maxSourceId); +} diff --git a/lib/channel/GwChannel.h b/lib/channel/GwChannel.h index 940f599..16a1423 100644 --- a/lib/channel/GwChannel.h +++ b/lib/channel/GwChannel.h @@ -26,6 +26,7 @@ class GwChannel{ GwCounter *countOut=NULL; GwChannelInterface *impl; int sourceId=0; + int maxSourceId=-1; GwChannelMessageReceiver *receiver=NULL; tActisenseReader *actisenseReader=NULL; Stream *channelStream=NULL; @@ -34,7 +35,8 @@ class GwChannel{ GwChannel( GwLog *logger, String name, - int sourceId); + int sourceId, + int maxSourceId=-1); void begin( bool enabled, bool nmeaOut, @@ -48,7 +50,7 @@ class GwChannel{ ); void setImpl(GwChannelInterface *impl); - + bool isOwnSource(int id); void enable(bool enabled){ this->enabled=enabled; } @@ -70,6 +72,6 @@ class GwChannel{ void sendToClients(const char *buffer, int sourceId); typedef std::function N2kHandler ; void parseActisense(N2kHandler handler); - void sendActisense(const tN2kMsg &msg); + void sendActisense(const tN2kMsg &msg, int sourceId); }; diff --git a/lib/channel/GwChannelList.cpp b/lib/channel/GwChannelList.cpp new file mode 100644 index 0000000..45a2392 --- /dev/null +++ b/lib/channel/GwChannelList.cpp @@ -0,0 +1,189 @@ +#include "GwChannelList.h" +#include "GwApi.h" +#include "GwHardware.h" +#include "GwSocketServer.h" +#include "GwSerial.h" +#include "GwTcpClient.h" +class GwSerialLog : public GwLogWriter{ + static const size_t bufferSize=4096; + char *logBuffer=NULL; + int wp=0; + GwSerial *writer; + public: + GwSerialLog(GwSerial *writer){ + this->writer=writer; + logBuffer=new char[bufferSize]; + wp=0; + } + virtual ~GwSerialLog(){} + virtual void write(const char *data){ + int len=strlen(data); + if ((wp+len) >= (bufferSize-1)) return; + strncpy(logBuffer+wp,data,len); + wp+=len; + logBuffer[wp]=0; + } + virtual void flush(){ + size_t handled=0; + while (handled < wp){ + writer->flush(); + size_t rt=writer->sendToClients(logBuffer+handled,-1,true); + handled+=rt; + } + wp=0; + logBuffer[0]=0; + } + +}; + + +GwChannelList::GwChannelList(GwLog *logger, GwConfigHandler *config){ + this->logger=logger; + this->config=config; +} +void GwChannelList::allChannels(ChannelAction action){ + for (auto it=theChannels.begin();it != theChannels.end();it++){ + action(*it); + } +} +void GwChannelList::begin(bool fallbackSerial){ + LOG_DEBUG(GwLog::DEBUG,"GwChannelList::begin"); + GwChannel *channel=NULL; + //usb + if (! fallbackSerial){ + GwSerial *usb=new GwSerial(NULL,0,USB_CHANNEL_ID); + usb->setup(config->getInt(config->usbBaud),3,1); + logger->setWriter(new GwSerialLog(usb)); + logger->prefix="GWSERIAL:"; + channel=new GwChannel(logger,"USB",USB_CHANNEL_ID); + channel->begin(true, + config->getBool(config->sendUsb), + config->getBool(config->receiveUsb), + config->getString(config->usbReadFilter), + config->getString(config->usbWriteFilter), + false, + config->getBool(config->usbToN2k), + config->getBool(config->usbActisense), + config->getBool(config->usbActSend) + ); + LOG_DEBUG(GwLog::LOG,"%s",channel->toString().c_str()); + } + //TCP server + sockets=new GwSocketServer(config,logger,MIN_TCP_CHANNEL_ID); + sockets->begin(); + channel=new GwChannel(logger,"TCP",MIN_TCP_CHANNEL_ID,MIN_TCP_CHANNEL_ID+10); + channel->setImpl(sockets); + channel->begin( + true, + config->getBool(config->sendTCP), + config->getBool(config->readTCP), + config->getString(config->tcpReadFilter), + config->getString(config->tcpWriteFilter), + config->getBool(config->sendSeasmart), + config->getBool(config->tcpToN2k), + false, + false + ); + LOG_DEBUG(GwLog::LOG,"%s",channel->toString().c_str()); + theChannels.push_back(channel); + + //serial 1 + bool serCanRead=false; + bool serCanWrite=false; + int serialrx=-1; + int serialtx=-1; + #ifdef GWSERIAL_MODE + #ifdef GWSERIAL_TX + serialtx=GWSERIAL_TX; + #endif + #ifdef GWSERIAL_RX + serialrx=GWSERIAL_RX; + #endif + if (serialrx != -1 && serialtx != -1){ + serialMode=GWSERIAL_MODE; + } + #endif + //the serial direction is from the config (only valid for mode UNI) + String serialDirection=config->getString(config->serialDirection); + //we only consider the direction if mode is UNI + if (serialMode != String("UNI")){ + serialDirection=String(""); + //if mode is UNI it depends on the selection + serCanRead=config->getBool(config->receiveSerial); + serCanWrite=config->getBool(config->sendSerial); + } + if (serialDirection == "receive" || serialDirection == "off" || serialMode == "RX") serCanWrite=false; + if (serialDirection == "send" || serialDirection == "off" || serialMode == "TX") serCanRead=false; + LOG_DEBUG(GwLog::DEBUG,"serial set up: mode=%s,direction=%s,rx=%d,tx=%d", + serialMode.c_str(),serialDirection.c_str(),serialrx,serialtx + ); + if (serialtx != -1 || serialrx != -1 ){ + LOG_DEBUG(GwLog::LOG,"creating serial interface rx=%d, tx=%d",serialrx,serialtx); + GwSerial *serial=new GwSerial(logger,1,SERIAL1_CHANNEL_ID,serCanRead); + int rt=serial->setup(config->getInt(config->serialBaud,115200),serialrx,serialtx); + LOG_DEBUG(GwLog::LOG,"starting serial returns %d",rt); + channel=new GwChannel(logger,"SER",SERIAL1_CHANNEL_ID); + channel->setImpl(serial); + channel->begin( + serCanRead || serCanWrite, + serCanWrite, + serCanRead, + config->getString(config->serialReadF), + config->getString(config->serialWriteF), + false, + config->getBool(config->serialToN2k), + false, + false + ); + LOG_DEBUG(GwLog::LOG,"%s",channel->toString().c_str()); + theChannels.push_back(channel); + } + + //tcp client + channel=new GwChannel(logger,"TCPClient",TCP_CLIENT_CHANNEL_ID); + channel->begin( + config->getBool(config->tclEnabled), + config->getBool(config->sendTCL), + config->getBool(config->readTCL), + config->getString(config->tclReadFilter), + config->getString(config->tclReadFilter), + config->getBool(config->tclSeasmart), + config->getBool(config->tclToN2k), + false, + false + ); + if (channel->isEnabled()){ + client=new GwTcpClient(logger); + client->begin(TCP_CLIENT_CHANNEL_ID, + config->getString(config->remoteAddress), + config->getInt(config->remotePort), + channel->shouldRead() + ); + channel->setImpl(client); + } + LOG_DEBUG(GwLog::LOG,"%s",channel->toString().c_str()); + logger->flush(); +} +int GwChannelList::getJsonSize(){ + int rt=0; + allChannels([&](GwChannel *c){ + rt+=c->getJsonSize(); + }); + return rt+20; +} +void GwChannelList::toJson(GwJsonDocument &doc){ + if (sockets) doc["numClients"]=sockets->numClients(); + if (client){ + doc["clientCon"]=client->isConnected(); + doc["clientErr"]=client->getError(); + } + allChannels([&](GwChannel *c){ + c->toJson(doc); + }); +} +GwChannel *GwChannelList::getChannelById(int sourceId){ + for (auto it=theChannels.begin();it != theChannels.end();it++){ + if ((*it)->isOwnSource(sourceId)) return *it; + } + return NULL; +} \ No newline at end of file diff --git a/lib/channel/GwChannelList.h b/lib/channel/GwChannelList.h new file mode 100644 index 0000000..26329de --- /dev/null +++ b/lib/channel/GwChannelList.h @@ -0,0 +1,43 @@ +#pragma once +#include +#include +#include +#include "GwChannel.h" +#include "GwLog.h" +#include "GWConfig.h" +#include "GwJsonDocument.h" + +//NMEA message channels +#define N2K_CHANNEL_ID 0 +#define USB_CHANNEL_ID 1 +#define SERIAL1_CHANNEL_ID 2 +#define TCP_CLIENT_CHANNEL_ID 3 +#define MIN_TCP_CHANNEL_ID 4 + +#define MIN_USER_TASK 200 +class GwSocketServer; +class GwTcpClient; +class GwChannelList{ + private: + GwLog *logger; + GwConfigHandler *config; + typedef std::vector ChannelList; + ChannelList theChannels; + + GwSocketServer *sockets; + GwTcpClient *client; + String serialMode=F("NONE"); + public: + GwChannelList(GwLog *logger, GwConfigHandler *config); + typedef std::function ChannelAction; + void allChannels(ChannelAction action); + //initialize + void begin(bool fallbackSerial=false); + //status + int getJsonSize(); + void toJson(GwJsonDocument &doc); + //single channel + GwChannel *getChannelById(int sourceId); + + +}; diff --git a/lib/nmea0183ton2k/NMEA0183AIStoNMEA2000.h b/lib/nmea0183ton2k/NMEA0183AIStoNMEA2000.h index 4c4b440..a45ad2d 100644 --- a/lib/nmea0183ton2k/NMEA0183AIStoNMEA2000.h +++ b/lib/nmea0183ton2k/NMEA0183AIStoNMEA2000.h @@ -29,11 +29,13 @@ uint16_t DaysSince1970 = 0; class MyAisDecoder : public AIS::AisDecoder { + public: + int sourceId=-1; private: NMEA0183DataToN2K::N2kSender sender; GwLog *logger; void send(const tN2kMsg &msg){ - (*sender)(msg); + (*sender)(msg,sourceId); } AIS::DefaultSentenceParser parser; public: diff --git a/lib/nmea0183ton2k/NMEA0183DataToN2K.cpp b/lib/nmea0183ton2k/NMEA0183DataToN2K.cpp index e3c6585..bc0986f 100644 --- a/lib/nmea0183ton2k/NMEA0183DataToN2K.cpp +++ b/lib/nmea0183ton2k/NMEA0183DataToN2K.cpp @@ -97,26 +97,26 @@ private: waypointMap[wpName]=newWp; return newWp.id; } - bool send(tN2kMsg &msg,String key,unsigned long minDiff){ + bool send(tN2kMsg &msg,String key,unsigned long minDiff,int sourceId){ unsigned long now=millis(); unsigned long pgn=msg.PGN; if (key == "") key=String(msg.PGN); auto it=lastSends.find(key); if (it == lastSends.end()){ lastSends[key]=now; - sender(msg); + sender(msg,sourceId); return true; } if ((it->second + minDiff) <= now){ lastSends[key]=now; - sender(msg); + sender(msg,sourceId); return true; } LOG_DEBUG(GwLog::DEBUG+1,"skipped n2k message %d",msg.PGN); return false; } - bool send(tN2kMsg &msg, String key=""){ - send(msg,key,minSendInterval); + bool send(tN2kMsg &msg, int sourceId,String key=""){ + return send(msg,key,minSendInterval,sourceId); } bool updateDouble(GwBoatItem *target,double v, int sourceId){ if (v != NMEA0183DoubleNA){ @@ -255,7 +255,7 @@ private: (tN2kFluidType)(current.selector()), fields[0], fields[1]); - send(n2kMsg, buildN2KKey(n2kMsg, current.mapping)); + send(n2kMsg,msg.sourceId, buildN2KKey(n2kMsg, current.mapping)); } break; case XDRBAT: @@ -263,7 +263,7 @@ private: { SetN2kPGN127508(n2kMsg, current.mapping.instanceId, fields[0], fields[1], fields[2]); - send(n2kMsg, buildN2KKey(n2kMsg, current.mapping)); + send(n2kMsg,msg.sourceId, buildN2KKey(n2kMsg, current.mapping)); } break; case XDRTEMP: @@ -271,7 +271,7 @@ private: SetN2kPGN130312(n2kMsg,1,current.mapping.instanceId, (tN2kTempSource)(current.selector()), fields[0],fields[1]); - send(n2kMsg,buildN2KKey(n2kMsg,current.mapping)); + send(n2kMsg,msg.sourceId,buildN2KKey(n2kMsg,current.mapping)); } break; case XDRHUMIDITY: @@ -281,7 +281,7 @@ private: fields[0], fields[1] ); - send(n2kMsg,buildN2KKey(n2kMsg,current.mapping)); + send(n2kMsg,msg.sourceId,buildN2KKey(n2kMsg,current.mapping)); } break; case XDRPRESSURE: @@ -289,7 +289,7 @@ private: SetN2kPGN130314(n2kMsg,1,current.mapping.instanceId, (tN2kPressureSource)(current.selector()), fields[0]); - send(n2kMsg,buildN2KKey(n2kMsg,current.mapping)); + send(n2kMsg,msg.sourceId,buildN2KKey(n2kMsg,current.mapping)); } break; case XDRENGINE: @@ -301,14 +301,14 @@ private: fields[0], fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fromDouble(fields[8]), fromDouble(fields[9]), tN2kEngineDiscreteStatus1(), tN2kEngineDiscreteStatus2()); - send(n2kMsg, buildN2KKey(n2kMsg, current.mapping)); + send(n2kMsg,msg.sourceId, buildN2KKey(n2kMsg, current.mapping)); } } else{ if (fillFieldList(current, fields, 13,10)){ SetN2kPGN127488(n2kMsg,current.mapping.instanceId, fields[10],fields[11],fromDouble(fields[12])); - send(n2kMsg, buildN2KKey(n2kMsg, current.mapping)); + send(n2kMsg,msg.sourceId, buildN2KKey(n2kMsg, current.mapping)); } } break; @@ -334,7 +334,7 @@ private: mode=xteMode(*modeChar); } SetN2kXTE(n2kMsg,1,mode,false,rmb.xte); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } uint8_t destinationId=getWaypointId(rmb.destID); uint8_t sourceId=getWaypointId(rmb.originID); @@ -357,10 +357,10 @@ private: rmb.longitude, rmb.vmg ); - send(n2kMsg); + send(n2kMsg,msg.sourceId); SetN2kPGN129285(n2kMsg,sourceId,1,1,true,true,"default"); AppendN2kPGN129285(n2kMsg,destinationId,rmb.destID,rmb.latitude,rmb.longitude); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } } void convertRMC(const SNMEA0183Msg &msg) @@ -382,25 +382,26 @@ private: { SetN2kSystemTime(n2kMsg, 1, GpsDate, GpsTime); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } if (UD(Latitude) && UD(Longitude)){ SetN2kLatLonRapid(n2kMsg,Latitude,Longitude); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } if (UD(COG) && UD(SOG)){ SetN2kCOGSOGRapid(n2kMsg,1,N2khr_true,COG,SOG); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } if (UD(Variation)){ SetN2kMagneticVariation(n2kMsg,1,N2kmagvar_Calc, getUint32(boatData->GpsDate), Variation); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } } void convertAIVDX(const SNMEA0183Msg &msg){ + aisDecoder->sourceId=msg.sourceId; aisDecoder->handleMessage(msg.line); } void convertMWV(const SNMEA0183Msg &msg){ @@ -434,7 +435,7 @@ private: } if (shouldSend){ SetN2kWindSpeed(n2kMsg,1,WindSpeed,WindAngle,n2kRef); - send(n2kMsg,String(n2kMsg.PGN)+String((int)n2kRef)); + send(n2kMsg,msg.sourceId,String(n2kMsg.PGN)+String((int)n2kRef)); } } void convertVWR(const SNMEA0183Msg &msg) @@ -475,7 +476,7 @@ private: if (shouldSend) { SetN2kWindSpeed(n2kMsg, 1, WindSpeed, WindAngle, N2kWind_Apparent); - send(n2kMsg,String(n2kMsg.PGN)+String((int)N2kWind_Apparent)); + send(n2kMsg,msg.sourceId,String(n2kMsg.PGN)+String((int)N2kWind_Apparent)); } } @@ -519,11 +520,11 @@ private: if (shouldSend) { SetN2kWindSpeed(n2kMsg, 1, WindSpeed, WindAngle, N2kWind_True_North); - send(n2kMsg,String(n2kMsg.PGN)+String((int)N2kWind_True_North)); + send(n2kMsg,msg.sourceId,String(n2kMsg.PGN)+String((int)N2kWind_True_North)); } if (WindAngleMagnetic != NMEA0183DoubleNA && shouldSend){ SetN2kWindSpeed(n2kMsg, 1, WindSpeed, WindAngleMagnetic, N2kWind_Magnetic); - send(n2kMsg,String(n2kMsg.PGN)+String((int)N2kWind_Magnetic)); + send(n2kMsg,msg.sourceId,String(n2kMsg.PGN)+String((int)N2kWind_Magnetic)); } } @@ -540,7 +541,7 @@ private: boatData->Variation->getDataWithDefault(N2kDoubleNA), boatData->Deviation->getDataWithDefault(N2kDoubleNA) ); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertHDT(const SNMEA0183Msg &msg){ @@ -553,7 +554,7 @@ private: if (! UD(Heading)) return; tN2kMsg n2kMsg; SetN2kTrueHeading(n2kMsg,1,Heading); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertHDG(const SNMEA0183Msg &msg){ double MagneticHeading=NMEA0183DoubleNA; @@ -584,7 +585,7 @@ private: UD(Deviation); tN2kMsg n2kMsg; SetN2kMagneticHeading(n2kMsg,1,MagneticHeading,Deviation,Variation); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertDPT(const SNMEA0183Msg &msg){ @@ -612,7 +613,7 @@ private: if (! boatData->DepthTransducer->update(DepthBelowTransducer)) return; tN2kMsg n2kMsg; SetN2kWaterDepth(n2kMsg,1,DepthBelowTransducer,Offset); - send(n2kMsg,String(n2kMsg.PGN)+String((Offset != N2kDoubleNA)?1:0)); + send(n2kMsg,msg.sourceId,String(n2kMsg.PGN)+String((Offset != N2kDoubleNA)?1:0)); } typedef enum { DBS, @@ -647,7 +648,7 @@ private: if (! boatData->DepthTransducer->update(Depth,msg.sourceId)) return; tN2kMsg n2kMsg; SetN2kWaterDepth(n2kMsg,1,Depth,N2kDoubleNA); - send(n2kMsg,String(n2kMsg.PGN)+String(0)); + send(n2kMsg,msg.sourceId,String(n2kMsg.PGN)+String(0)); return; } //we can only send if we have a valid depth beloww tranducer @@ -667,7 +668,7 @@ private: } tN2kMsg n2kMsg; SetN2kWaterDepth(n2kMsg,1,Depth,offset); - send(n2kMsg,String(n2kMsg.PGN)+String((offset != N2kDoubleNA)?1:0)); + send(n2kMsg,msg.sourceId,(n2kMsg.PGN)+String((offset != N2kDoubleNA)?1:0)); } } } @@ -694,7 +695,7 @@ private: tN2kMsg n2kMsg; if (! UD(RudderPosition)) return; SetN2kRudder(n2kMsg,RudderPosition); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } } @@ -711,7 +712,7 @@ private: if (MagneticHeading == NMEA0183DoubleNA) MagneticHeading=N2kDoubleNA; tN2kMsg n2kMsg; SetN2kBoatSpeed(n2kMsg,1,STW); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertVTG(const SNMEA0183Msg &msg){ @@ -727,7 +728,7 @@ private: tN2kMsg n2kMsg; //TODO: maybe use MCOG if no COG? SetN2kCOGSOGRapid(n2kMsg,1,N2khr_true,COG,SOG); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertZDA(const SNMEA0183Msg &msg){ time_t DateTime; @@ -751,10 +752,10 @@ private: tN2kMsg n2kMsg; if (timezoneValid){ SetN2kLocalOffset(n2kMsg,DaysSince1970,GpsTime,Timezone); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } SetN2kSystemTime(n2kMsg,1,DaysSince1970,GpsTime); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertGGA(const SNMEA0183Msg &msg){ double GPSTime=NMEA0183DoubleNA; @@ -788,7 +789,7 @@ private: SatelliteCount, HDOP, boatData->PDOP->getDataWithDefault(N2kDoubleNA), 0, 0, N2kGNSSt_GPS, DGPSReferenceStationID, DGPSAge); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertGSA(const SNMEA0183Msg &msg){ if (msg.FieldCount() < 17) @@ -819,7 +820,7 @@ private: if (!updateDouble(boatData->VDOP,VDOP,msg.sourceId)) return; } SetN2kGNSSDOPData(n2kMsg,1,rmode,mode,HDOP,VDOP,N2kDoubleNA); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertGSV(const SNMEA0183Msg &msg){ if (msg.FieldCount() < 7){ @@ -867,7 +868,7 @@ private: } } if (hasInfos){ - send(n2kMsg); + send(n2kMsg,msg.sourceId); } } @@ -885,7 +886,7 @@ private: if (! updateDouble(boatData->GpsTime,GLL.GPSTime,msg.sourceId)) return; tN2kMsg n2kMsg; SetN2kLatLonRapid(n2kMsg,GLL.latitude,GLL.longitude); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertROT(const SNMEA0183Msg &msg){ @@ -897,7 +898,7 @@ private: if (! updateDouble(boatData->ROT,ROT,msg.sourceId)) return; tN2kMsg n2kMsg; SetN2kRateOfTurn(n2kMsg,1,ROT); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } void convertXTE(const SNMEA0183Msg &msg){ if (msg.FieldCount() < 6){ @@ -916,7 +917,7 @@ private: tN2kMsg n2kMsg; tN2kXTEMode mode=xteMode(msg.Field(5)[0]); SetN2kXTE(n2kMsg,1,mode,false,xte); - send(n2kMsg); + send(n2kMsg,msg.sourceId); } //shortcut for lambda converters diff --git a/lib/nmea0183ton2k/NMEA0183DataToN2K.h b/lib/nmea0183ton2k/NMEA0183DataToN2K.h index 59ace4f..d56b80a 100644 --- a/lib/nmea0183ton2k/NMEA0183DataToN2K.h +++ b/lib/nmea0183ton2k/NMEA0183DataToN2K.h @@ -7,7 +7,7 @@ class NMEA0183DataToN2K{ public: - typedef bool (*N2kSender)(const tN2kMsg &msg); + typedef bool (*N2kSender)(const tN2kMsg &msg,int sourceId); protected: GwLog * logger; GwBoatData *boatData; diff --git a/lib/socketserver/GwTcpClient.cpp b/lib/socketserver/GwTcpClient.cpp index 44c2d1e..b5d819f 100644 --- a/lib/socketserver/GwTcpClient.cpp +++ b/lib/socketserver/GwTcpClient.cpp @@ -19,15 +19,22 @@ void GwTcpClient::startConnection() { //TODO state = C_INITIALIZED; + error=""; connectStart=millis(); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { + error="unable to create socket"; LOG_DEBUG(GwLog::ERROR,"unable to create socket: %d", errno); return; } fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK ); - - uint32_t ip_addr = this->remoteAddress; + IPAddress addr; + if (! addr.fromString(remoteAddress)){ + error="invalid ip "+remoteAddress; + LOG_DEBUG(GwLog::ERROR,"%s",error.c_str()); + return; + } + uint32_t ip_addr = addr; struct sockaddr_in serveraddr; memset((char *) &serveraddr, 0, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; @@ -36,6 +43,7 @@ void GwTcpClient::startConnection() int res = lwip_connect_r(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)); if (res < 0 ) { if (errno != EINPROGRESS){ + error=String("connect error ")+String(strerror(errno)); LOG_DEBUG(GwLog::ERROR,"connect on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); close(sockfd); return; @@ -56,7 +64,7 @@ void GwTcpClient::checkConnection() } if (state == C_INITIALIZED){ if ((now - connectStart) > CON_TIMEOUT){ - LOG_DEBUG(GwLog::LOG,"retry connect to %s",remoteAddress.toString().c_str()); + LOG_DEBUG(GwLog::LOG,"retry connect to %s",remoteAddress.c_str()); startConnection(); } return; @@ -73,13 +81,15 @@ void GwTcpClient::checkConnection() tv.tv_usec = 0; int res = select(sockfd + 1, nullptr, &fdset, nullptr, &tv); if (res < 0) { + error=String("select error ")+String(strerror(errno)); LOG_DEBUG(GwLog::ERROR,"select on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); connection->stop(); return; } else if (res == 0) { //still connecting if ((now - connectStart) >= CON_TIMEOUT){ - LOG_DEBUG(GwLog::ERROR,"connect timeout to %s, retry",remoteAddress.toString().c_str()); + error="connect timeout"; + LOG_DEBUG(GwLog::ERROR,"connect timeout to %s, retry",remoteAddress.c_str()); connection->stop(); return; } @@ -89,17 +99,19 @@ void GwTcpClient::checkConnection() res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &sockerr, &len); if (res < 0) { + error="getsockopt failed"; LOG_DEBUG(GwLog::ERROR,"getsockopt on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno)); connection->stop(); return; } if (sockerr != 0) { + error=String("socket error ")+String(strerror(sockerr)); LOG_DEBUG(GwLog::ERROR,"socket error on fd %d, errno: %d, \"%s\"", sockfd, sockerr, strerror(sockerr)); connection->stop(); return; } } - LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.toString().c_str()); + LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.c_str()); state=C_CONNECTED; } @@ -111,7 +123,7 @@ GwTcpClient::~GwTcpClient(){ if (connection) delete connection; } -void GwTcpClient::begin(int sourceId,IPAddress address, uint16_t port,bool allowRead) +void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRead) { stop(); this->sourceId=sourceId; @@ -151,14 +163,18 @@ void GwTcpClient::loop(bool handleRead,bool handleWrite) } } -void GwTcpClient::sendToClients(const char *buf,int sourceId){ - if (sourceId == this->sourceId) return; +size_t GwTcpClient::sendToClients(const char *buf,int sourceId, bool partialWrite){ + if (sourceId == this->sourceId) return 0; + if (state != C_CONNECTED) return 0; + if (! connection->hasClient()) return 0; + size_t len=strlen(buf); + if (connection->enqueue((uint8_t*)buf,len)){ + return len; + } + return 0; +} +void GwTcpClient::readMessages(GwMessageFetcher *writer){ if (state != C_CONNECTED) return; if (! connection->hasClient()) return; - connection->enqueue((uint8_t*)buf,strlen(buf)); -} -bool GwTcpClient::readMessages(GwMessageFetcher *writer){ - if (state != C_CONNECTED) return false; - if (! connection->hasClient()) return false; - return connection->messagesFromBuffer(writer); + connection->messagesFromBuffer(writer); } \ No newline at end of file diff --git a/lib/socketserver/GwTcpClient.h b/lib/socketserver/GwTcpClient.h index 36f017c..702baaf 100644 --- a/lib/socketserver/GwTcpClient.h +++ b/lib/socketserver/GwTcpClient.h @@ -1,15 +1,17 @@ #pragma once #include "GwSocketConnection.h" -class GwTcpClient +#include "GwChannelInterface.h" +class GwTcpClient : public GwChannelInterface { static const unsigned long CON_TIMEOUT=10; GwSocketConnection *connection = NULL; - IPAddress remoteAddress; + String remoteAddress; uint16_t port = 0; unsigned long connectStart=0; GwLog *logger; int sourceId; bool configured=false; + String error; public: typedef enum @@ -30,9 +32,10 @@ private: public: GwTcpClient(GwLog *logger); ~GwTcpClient(); - void begin(int sourceId,IPAddress address, uint16_t port,bool allowRead); - void loop(bool handleRead=true,bool handleWrite=true); - void sendToClients(const char *buf,int sourceId); - bool readMessages(GwMessageFetcher *writer); + void begin(int sourceId,String address, uint16_t port,bool allowRead); + virtual void loop(bool handleRead=true,bool handleWrite=true); + virtual size_t sendToClients(const char *buf,int sourceId, bool partialWrite=false); + virtual void readMessages(GwMessageFetcher *writer); bool isConnected(); + String getError(){return error;} }; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 6075ff6..7a35061 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -63,16 +63,9 @@ const unsigned long HEAP_REPORT_TIME=2000; //set to 0 to disable heap reporting #include "GwUpdate.h" #include "GwTcpClient.h" #include "GwChannel.h" +#include "GwChannelList.h" -//NMEA message channels -#define N2K_CHANNEL_ID 0 -#define USB_CHANNEL_ID 1 -#define SERIAL1_CHANNEL_ID 2 -#define TCP_CLIENT_CHANNEL_ID 3 -#define MIN_TCP_CHANNEL_ID 4 - -#define MIN_USER_TASK 200 #define MAX_NMEA2000_MESSAGE_SEASMART_SIZE 500 #define MAX_NMEA0183_MESSAGE_SIZE 150 // For AIS @@ -112,7 +105,7 @@ bool fixedApPass=false; bool fixedApPass=true; #endif GwWifi gwWifi(&config,&logger,fixedApPass); -GwSocketServer socketServer(&config,&logger,MIN_TCP_CHANNEL_ID); +GwChannelList channels(&logger,&config); GwBoatData boatData(&logger); GwXDRMappings xdrMappings(&logger,&config); @@ -131,23 +124,6 @@ GwWebServer webserver(&logger,&mainQueue,80); GwCounter countNMEA2KIn("count2Kin"); GwCounter countNMEA2KOut("count2Kout"); -GwChannel usbChannel(&logger,"USB",USB_CHANNEL_ID); -GwChannel tcpChannel(&logger,"TCPServer",MIN_TCP_CHANNEL_ID); -GwChannel serialChannel(&logger,"SER",SERIAL1_CHANNEL_ID); -GwChannel tclChannel(&logger,"TCPClient",TCP_CLIENT_CHANNEL_ID); - -GwChannel *allChannels[]={&usbChannel,&tcpChannel,&serialChannel,&tclChannel}; -const int numChannels=sizeof(allChannels)/sizeof(GwChannel*); - -GwChannel * channelFromSource(int source){ - if (source == USB_CHANNEL_ID) return &usbChannel; - if (source == SERIAL1_CHANNEL_ID) return &serialChannel; - if (source == TCP_CLIENT_CHANNEL_ID) return &tclChannel; - if (source >= MIN_TCP_CHANNEL_ID && source < MIN_USER_TASK) return &tcpChannel; - return NULL; -} - - unsigned long saltBase=esp_random(); char hv(uint8_t nibble){ @@ -190,58 +166,38 @@ bool checkPass(String hash){ } GwUpdate updater(&logger,&webserver,&checkPass); - - - -//configs that we need in main - - GwConfigInterface *systemName=config.getConfigItem(config.systemName,true); -bool serCanWrite=true; -bool serCanRead=true; -GwSerial *usbSerial = new GwSerial(NULL, 0, USB_CHANNEL_ID); -GwSerial *serial1=NULL; - -typedef enum { - N2KT_MSGIN, //from CAN - N2KT_MSGINT, //from internal source - N2KT_MSGOUT, //from converter - N2KT_MSGACT //from actisense -} N2K_MsgDirection; -void handleN2kMessage(const tN2kMsg &n2kMsg,N2K_MsgDirection direction) +void handleN2kMessage(const tN2kMsg &n2kMsg,int sourceId, bool isConverted=false) { logger.logDebug(GwLog::DEBUG + 1, "N2K: pgn %d, dir %d", - n2kMsg.PGN,(int)direction); - if (direction == N2KT_MSGIN){ + n2kMsg.PGN,sourceId); + if (sourceId == N2K_CHANNEL_ID){ countNMEA2KIn.add(n2kMsg.PGN); } char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE]; bool messageCreated=false; - for (int i=0;isendSeaSmart()){ + channels.allChannels([&](GwChannel *c){ + if (c->sendSeaSmart()){ if (! messageCreated){ if (N2kToSeasmart(n2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) != 0) { messageCreated=true; } } if (messageCreated){ - allChannels[i]->sendToClients(buf,N2K_CHANNEL_ID); + c->sendToClients(buf,sourceId); } } - } + }); - if (direction != N2KT_MSGACT) - { - for (int i=0;isendActisense(n2kMsg); - } - } - if (direction != N2KT_MSGOUT){ + channels.allChannels([&](GwChannel *c){ + c->sendActisense(n2kMsg,sourceId); + }); + if (! isConverted){ nmea0183Converter->HandleMsg(n2kMsg); } - if (direction != N2KT_MSGIN){ + if (sourceId != N2K_CHANNEL_ID){ countNMEA2KOut.add(n2kMsg.PGN); NMEA2000.SendMsg(n2kMsg); } @@ -261,44 +217,11 @@ void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg, int sourceId,bool conv buf[len]=0x0d; buf[len+1]=0x0a; buf[len+2]=0; - for (int i=0;i< numChannels;i++){ - allChannels[i]->sendToClients(buf,sourceId); - } + channels.allChannels([&](GwChannel *c){ + c->sendToClients(buf,sourceId); + }); } -class GwSerialLog : public GwLogWriter{ - static const size_t bufferSize=4096; - char *logBuffer=NULL; - int wp=0; - public: - GwSerialLog(){ - logBuffer=new char[bufferSize]; - wp=0; - } - virtual ~GwSerialLog(){} - virtual void write(const char *data){ - int len=strlen(data); - if ((wp+len) >= (bufferSize-1)) return; - strncpy(logBuffer+wp,data,len); - wp+=len; - logBuffer[wp]=0; - } - virtual void flush(){ - size_t handled=0; - while (handled < wp){ - usbSerial->flush(); - size_t rt=usbSerial->sendToClients(logBuffer+handled,-1,true); - handled+=rt; - } - wp=0; - logBuffer[0]=0; - } - -}; - -GwSerialLog logWriter; - - class ApiImpl : public GwApi { private: @@ -315,7 +238,7 @@ public: } virtual void sendN2kMessage(const tN2kMsg &msg,bool convert) { - handleN2kMessage(msg,convert?N2KT_MSGINT:N2KT_MSGOUT); + handleN2kMessage(msg,sourceId,!convert); } virtual void sendNMEA0183Message(const tNMEA0183Msg &msg, int sourceId,bool convert) @@ -419,15 +342,11 @@ protected: GwJsonDocument status(256 + countNMEA2KIn.getJsonSize()+ countNMEA2KOut.getJsonSize() + - usbChannel.getJsonSize()+ - tcpChannel.getJsonSize()+ - serialChannel.getJsonSize()+ - tclChannel.getJsonSize() + channels.getJsonSize() ); status["version"] = VERSION; status["wifiConnected"] = gwWifi.clientConnected(); status["clientIP"] = WiFi.localIP().toString(); - status["numClients"] = socketServer.numClients(); status["apIp"] = gwWifi.apIP(); size_t bsize=2*sizeof(unsigned long)+1; unsigned long base=saltBase + ( millis()/1000UL & ~0x7UL); @@ -438,10 +357,7 @@ protected: //nmea0183Converter->toJson(status); countNMEA2KIn.toJson(status); countNMEA2KOut.toJson(status); - usbChannel.toJson(status); - serialChannel.toJson(status); - tcpChannel.toJson(status); - tclChannel.toJson(status); + channels.toJson(status); serializeJson(status, result); } }; @@ -648,122 +564,19 @@ void setup() { uint8_t chipid[6]; uint32_t id = 0; config.loadConfig(); - // Init USB serial port - GwConfigInterface *usbBaud=config.getConfigItem(config.usbBaud,false); - int baud=115200; - if (usbBaud){ - baud=usbBaud->asInt(); - } + bool fallbackSerial=false; #ifdef FALLBACK_SERIAL - int st=-1; -#else - int st=usbSerial->setup(baud,3,1); //TODO: PIN defines -#endif - if (st < 0){ + fallbackSerial=true; //falling back to old style serial for logging Serial.begin(baud); Serial.printf("fallback serial enabled, error was %d\n",st); logger.prefix="FALLBACK:"; - } - else{ - logger.prefix="GWSERIAL:"; - logger.setWriter(&logWriter); - logger.logDebug(GwLog::LOG,"created GwSerial for USB port"); - } - logger.logDebug(GwLog::LOG,"config: %s", config.toString().c_str()); +#endif userCodeHandler.startInitTasks(MIN_USER_TASK); - #ifdef GWSERIAL_MODE - int serialrx=-1; - int serialtx=-1; - #ifdef GWSERIAL_TX - serialtx=GWSERIAL_TX; - #endif - #ifdef GWSERIAL_RX - serialrx=GWSERIAL_RX; - #endif - //the mode is a compile time preselection from hardware.h - String serialMode(F(GWSERIAL_MODE)); - //the serial direction is from the config (only valid for mode UNI) - String serialDirection=config.getString(config.serialDirection); - //we only consider the direction if mode is UNI - if (serialMode != String("UNI")){ - serialDirection=String(""); - //if mode is UNI it depends on the selection - serCanRead=config.getBool(config.receiveSerial); - serCanWrite=config.getBool(config.sendSerial); - } - if (serialDirection == "receive" || serialDirection == "off" || serialMode == "RX") serCanWrite=false; - if (serialDirection == "send" || serialDirection == "off" || serialMode == "TX") serCanRead=false; - logger.logDebug(GwLog::DEBUG,"serial set up: mode=%s,direction=%s,rx=%d,tx=%d", - serialMode.c_str(),serialDirection.c_str(),serialrx,serialtx - ); - if (serialtx != -1 || serialrx != -1){ - logger.logDebug(GwLog::LOG,"creating serial interface rx=%d, tx=%d",serialrx,serialtx); - serial1=new GwSerial(&logger,1,SERIAL1_CHANNEL_ID,serCanRead); - } - if (serial1){ - int rt=serial1->setup(config.getInt(config.serialBaud,115200),serialrx,serialtx); - logger.logDebug(GwLog::LOG,"starting serial returns %d",rt); - serialChannel.setImpl(serial1); - } - #endif - usbChannel.setImpl(usbSerial); + channels.begin(fallbackSerial); MDNS.begin(config.getConfigItem(config.systemName)->asCString()); gwWifi.setup(); - - // Start TCP server - socketServer.begin(); - tcpChannel.setImpl(&socketServer); logger.flush(); - usbChannel.begin(true, - config.getBool(config.sendUsb), - config.getBool(config.receiveUsb), - config.getString(config.usbReadFilter), - config.getString(config.usbWriteFilter), - false, - config.getBool(config.usbToN2k), - config.getBool(config.usbActisense), - config.getBool(config.usbActSend) - ); - logger.logDebug(GwLog::LOG,"%s",usbChannel.toString().c_str()); - tcpChannel.begin( - true, - config.getBool(config.sendTCP), - config.getBool(config.readTCP), - config.getString(config.tcpReadFilter), - config.getString(config.tcpWriteFilter), - config.getBool(config.sendSeasmart), - config.getBool(config.tcpToN2k), - false, - false - ); - logger.logDebug(GwLog::LOG,"%s",tcpChannel.toString().c_str()); - serialChannel.begin( - serCanRead || serCanWrite, - serCanWrite, - serCanRead, - config.getString(config.serialReadF), - config.getString(config.serialWriteF), - false, - config.getBool(config.serialToN2k), - false, - false - ); - logger.logDebug(GwLog::LOG,"%s",serialChannel.toString().c_str()); - tclChannel.begin( - config.getBool(config.tclEnabled), - config.getBool(config.sendTCL), - config.getBool(config.readTCL), - config.getString(config.tclReadFilter), - config.getString(config.tclReadFilter), - config.getBool(config.tclSeasmart), - config.getBool(config.tclToN2k), - false, - false - ); - logger.logDebug(GwLog::LOG,"%s",tclChannel.toString().c_str()); - logger.flush(); - webserver.registerMainHandler("/api/reset", [](AsyncWebServerRequest *request)->GwRequestMessage *{ return new ResetRequest(request->arg("_hash")); }); @@ -823,9 +636,9 @@ void setup() { config.getInt(config.minXdrInterval,100) ); - toN2KConverter= NMEA0183DataToN2K::create(&logger,&boatData,[](const tN2kMsg &msg)->bool{ + toN2KConverter= NMEA0183DataToN2K::create(&logger,&boatData,[](const tN2kMsg &msg, int sourceId)->bool{ logger.logDebug(GwLog::DEBUG+2,"send N2K %ld",msg.PGN); - handleN2kMessage(msg,N2KT_MSGOUT); + handleN2kMessage(msg,sourceId,true); return true; }, &xdrMappings, @@ -881,7 +694,7 @@ void setup() { NMEA2000.ExtendTransmitMessages(pgns); NMEA2000.ExtendReceiveMessages(nmea0183Converter->handledPgns()); NMEA2000.SetMsgHandler([](const tN2kMsg &n2kMsg){ - handleN2kMessage(n2kMsg,N2KT_MSGIN); + handleN2kMessage(n2kMsg,N2K_CHANNEL_ID); }); NMEA2000.Open(); logger.logDebug(GwLog::LOG,"starting addon tasks"); @@ -899,9 +712,9 @@ void setup() { } //***************************************************************************** void handleSendAndRead(bool handleRead){ - for (int i=0;iloop(handleRead,true); - } + channels.allChannels([&](GwChannel *c){ + c->loop(handleRead,true); + }); } TimeMonitor monitor(20,0.2); @@ -925,14 +738,14 @@ void loop() { } } monitor.setTime(3); - for (int i=0;iloop(true,false); - } + channels.allChannels([](GwChannel *c){ + c->loop(true,false); + }); //reads monitor.setTime(4); - for (int i=0;iloop(false,true); - } + channels.allChannels([](GwChannel *c){ + c->loop(false,true); + }); //writes monitor.setTime(5); NMEA2000.ParseMessages(); @@ -950,23 +763,23 @@ void loop() { monitor.setTime(7); //read channels - for (int i=0;ireadMessages([&](const char * buffer, int sourceId){ - for (int j=0;jsendToClients(buffer,sourceId); - allChannels[j]->loop(false,true); - } - if (allChannels[i]->sendToN2K()){ + channels.allChannels([](GwChannel *c){ + c->readMessages([&](const char * buffer, int sourceId){ + channels.allChannels([&](GwChannel *oc){ + oc->sendToClients(buffer,sourceId); + oc->loop(false,true); + }); + if (c->sendToN2K()){ toN2KConverter->parseAndSend(buffer, sourceId); } }); - } + }); monitor.setTime(8); - for (int i=0;iparseActisense([](const tN2kMsg &msg,int source){ - handleN2kMessage(msg,N2KT_MSGACT); + channels.allChannels([](GwChannel *c){ + c->parseActisense([](const tN2kMsg &msg,int source){ + handleN2kMessage(msg,source); }); - } + }); monitor.setTime(9); //handle message requests