1
0
mirror of https://github.com/thooge/esp32-nmea2000-obp60.git synced 2025-12-14 22:43:07 +01:00

intermediate: introduce an abstract channel

This commit is contained in:
wellenvogel
2021-12-31 18:38:11 +01:00
parent 0acb988f31
commit 47fb805ee6
9 changed files with 351 additions and 259 deletions

View File

@@ -62,7 +62,7 @@ const unsigned long HEAP_REPORT_TIME=2000; //set to 0 to disable heap reporting
#include "GwStatistics.h"
#include "GwUpdate.h"
#include "GwTcpClient.h"
#include "GwChannelConfig.h"
#include "GwChannel.h"
//NMEA message channels
@@ -122,8 +122,6 @@ int NodeAddress; // To store last Node Address
Preferences preferences; // Nonvolatile storage on ESP32 - To store LastDeviceAddress
N2kDataToNMEA0183 *nmea0183Converter=NULL;
NMEA0183DataToN2K *toN2KConverter=NULL;
tActisenseReader *actisenseReader=NULL;
Stream *usbStream=NULL;
SemaphoreHandle_t mainLock;
@@ -133,13 +131,15 @@ GwWebServer webserver(&logger,&mainQueue,80);
GwCounter<unsigned long> countNMEA2KIn("count2Kin");
GwCounter<unsigned long> countNMEA2KOut("count2Kout");
GwChannelConfig usbChannel(&logger,"USB");
GwChannelConfig actisenseChannel(&logger,"USB");
GwChannelConfig tcpChannel(&logger,"TCPServer");
GwChannelConfig serialChannel(&logger,"SER");
GwChannelConfig tclChannel(&logger,"TCPClient");
GwChannel usbChannel(&logger,"USB",USB_CHANNEL_ID);
GwChannel tcpChannel(&logger,"TCPServer",MIN_TCP_CHANNEL_ID);
GwChannel serialChannel(&logger,"SER",SERIAL1_CHANNEL_ID);
GwChannel tclChannel(&logger,"TCPClient",TCP_CLIENT_CHANNEL_ID);
GwChannelConfig * channelFromSource(int source){
GwChannel *allChannels[]={&usbChannel,&tcpChannel,&serialChannel,&tclChannel};
const int numChannels=sizeof(allChannels)/sizeof(GwChannel*);
GwChannel * channelFromSource(int source){
if (source == USB_CHANNEL_ID) return &usbChannel;
if (source == SERIAL1_CHANNEL_ID) return &serialChannel;
if (source == TCP_CLIENT_CHANNEL_ID) return &tclChannel;
@@ -204,17 +204,6 @@ bool serCanRead=true;
GwSerial *usbSerial = new GwSerial(NULL, 0, USB_CHANNEL_ID);
GwSerial *serial1=NULL;
void sendBufferToChannels(const char * buffer, int sourceId){
if (tcpChannel.canSendOut(buffer)){
socketServer.sendToClients(buffer,sourceId);
}
if (sourceId != USB_CHANNEL_ID && usbChannel.canSendOut(buffer)){
usbSerial->sendToClients(buffer,sourceId);
}
if (serial1 && sourceId != SERIAL1_CHANNEL_ID && serialChannel.canSendOut(buffer)){
serial1->sendToClients(buffer,sourceId);
}
}
typedef enum {
N2KT_MSGIN, //from CAN
N2KT_MSGINT, //from internal source
@@ -228,18 +217,26 @@ void handleN2kMessage(const tN2kMsg &n2kMsg,N2K_MsgDirection direction)
if (direction == N2KT_MSGIN){
countNMEA2KIn.add(n2kMsg.PGN);
}
if (tcpChannel.sendSeaSmart() || tclChannel.sendSeaSmart())
{
char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE];
if (N2kToSeasmart(n2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) == 0)
return;
if (tcpChannel.sendSeaSmart()){
socketServer.sendToClients(buf, N2K_CHANNEL_ID);
char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE];
bool messageCreated=false;
for (int i=0;i<numChannels;i++){
if (allChannels[i]->sendSeaSmart()){
if (! messageCreated){
if (N2kToSeasmart(n2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) != 0) {
messageCreated=true;
}
}
if (messageCreated){
allChannels[i]->sendToClients(buf,N2K_CHANNEL_ID);
}
}
}
if (actisenseReader && direction != N2KT_MSGACT && usbStream && actisenseChannel.canSendOut(n2kMsg.PGN))
if (direction != N2KT_MSGACT)
{
n2kMsg.SendInActisenseFormat(usbStream);
for (int i=0;i<numChannels;i++){
allChannels[i]->sendActisense(n2kMsg);
}
}
if (direction != N2KT_MSGOUT){
nmea0183Converter->HandleMsg(n2kMsg);
@@ -250,14 +247,6 @@ void handleN2kMessage(const tN2kMsg &n2kMsg,N2K_MsgDirection direction)
}
};
void handleReceivedNmeaMessage(const char *buf, int sourceId){
GwChannelConfig *channel=channelFromSource(sourceId);
if (channel && ! channel->canReceive(buf)) return;
if (! channel || channel->sendToN2K()){
toN2KConverter->parseAndSend(buf,sourceId);
}
sendBufferToChannels(buf,sourceId);
}
//*****************************************************************************
void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg, int sourceId,bool convert=false) {
@@ -272,7 +261,9 @@ void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg, int sourceId,bool conv
buf[len]=0x0d;
buf[len+1]=0x0a;
buf[len+2]=0;
sendBufferToChannels(buf,sourceId);
for (int i=0;i< numChannels;i++){
allChannels[i]->sendToClients(buf,sourceId);
}
}
class GwSerialLog : public GwLogWriter{
@@ -431,7 +422,6 @@ protected:
usbChannel.getJsonSize()+
tcpChannel.getJsonSize()+
serialChannel.getJsonSize()+
actisenseChannel.getJsonSize()+
tclChannel.getJsonSize()
);
status["version"] = VERSION;
@@ -449,7 +439,6 @@ protected:
countNMEA2KIn.toJson(status);
countNMEA2KOut.toJson(status);
usbChannel.toJson(status);
actisenseChannel.toJson(status);
serialChannel.toJson(status);
tcpChannel.toJson(status);
tclChannel.toJson(status);
@@ -715,33 +704,28 @@ void setup() {
if (serial1){
int rt=serial1->setup(config.getInt(config.serialBaud,115200),serialrx,serialtx);
logger.logDebug(GwLog::LOG,"starting serial returns %d",rt);
serialChannel.setImpl(serial1);
}
#endif
usbChannel.setImpl(usbSerial);
MDNS.begin(config.getConfigItem(config.systemName)->asCString());
gwWifi.setup();
// Start TCP server
socketServer.begin();
tcpChannel.setImpl(&socketServer);
logger.flush();
usbChannel.begin(!config.getBool(config.usbActisense),
usbChannel.begin(true,
config.getBool(config.sendUsb),
config.getBool(config.receiveUsb),
config.getString(config.usbReadFilter),
config.getString(config.usbWriteFilter),
false,
config.getBool(config.usbToN2k));
logger.logDebug(GwLog::LOG,"%s",usbChannel.toString().c_str());
actisenseChannel.begin(
config.getBool(config.usbToN2k),
config.getBool(config.usbActisense),
config.getBool(config.usbActSend),
true,
"",
"",
false,
true
);
logger.logDebug(GwLog::LOG,"ACT:%s",actisenseChannel.toString().c_str());
config.getBool(config.usbActSend)
);
logger.logDebug(GwLog::LOG,"%s",usbChannel.toString().c_str());
tcpChannel.begin(
true,
config.getBool(config.sendTCP),
@@ -749,7 +733,9 @@ void setup() {
config.getString(config.tcpReadFilter),
config.getString(config.tcpWriteFilter),
config.getBool(config.sendSeasmart),
config.getBool(config.tcpToN2k)
config.getBool(config.tcpToN2k),
false,
false
);
logger.logDebug(GwLog::LOG,"%s",tcpChannel.toString().c_str());
serialChannel.begin(
@@ -759,7 +745,9 @@ void setup() {
config.getString(config.serialReadF),
config.getString(config.serialWriteF),
false,
config.getBool(config.serialToN2k)
config.getBool(config.serialToN2k),
false,
false
);
logger.logDebug(GwLog::LOG,"%s",serialChannel.toString().c_str());
tclChannel.begin(
@@ -769,7 +757,9 @@ void setup() {
config.getString(config.tclReadFilter),
config.getString(config.tclReadFilter),
config.getBool(config.tclSeasmart),
config.getBool(config.tclToN2k)
config.getBool(config.tclToN2k),
false,
false
);
logger.logDebug(GwLog::LOG,"%s",tclChannel.toString().c_str());
logger.flush();
@@ -890,15 +880,6 @@ void setup() {
}
NMEA2000.ExtendTransmitMessages(pgns);
NMEA2000.ExtendReceiveMessages(nmea0183Converter->handledPgns());
if (config.getBool(config.usbActisense)){
actisenseReader=new tActisenseReader();
usbStream=usbSerial->getStream(false);
actisenseReader->SetReadStream(usbStream);
actisenseReader->SetMsgHandler([](const tN2kMsg &msg){
actisenseChannel.canReceive(msg.PGN); //count only
handleN2kMessage(msg,N2KT_MSGACT);
});
}
NMEA2000.SetMsgHandler([](const tN2kMsg &n2kMsg){
handleN2kMessage(n2kMsg,N2KT_MSGIN);
});
@@ -918,49 +899,12 @@ void setup() {
}
//*****************************************************************************
void handleSendAndRead(bool handleRead){
socketServer.loop(handleRead);
usbSerial->loop(handleRead);
if (serial1) serial1->loop(handleRead);
for (int i=0;i<numChannels;i++){
allChannels[i]->loop(handleRead,true);
}
}
class NMEAMessageReceiver : public GwMessageFetcher{
static const int bufferSize=GwBuffer::RX_BUFFER_SIZE+4;
uint8_t buffer[bufferSize];
uint8_t *writePointer=buffer;
public:
virtual bool handleBuffer(GwBuffer *gwbuffer){
size_t len=fetchMessageToBuffer(gwbuffer,buffer,bufferSize-4,'\n');
writePointer=buffer+len;
if (writePointer == buffer) return false;
uint8_t *p;
for (p=writePointer-1;p>=buffer && *p <= 0x20;p--){
*p=0;
}
if (p > buffer){
p++;
*p=0x0d;
p++;
*p=0x0a;
p++;
*p=0;
}
for (p=buffer; *p != 0 && p < writePointer && *p <= 0x20;p++){}
//very simple NMEA check
if (*p != '!' && *p != '$'){
logger.logDebug(GwLog::DEBUG,"unknown line [%d] - ignore: %s",id,(const char *)p);
}
else{
logger.logDebug(GwLog::DEBUG,"NMEA[%d]: %s",id,(const char *)p);
handleReceivedNmeaMessage((const char *)p,id);
//trigger sending to empty buffers
handleSendAndRead(false);
}
writePointer=buffer;
return true;
}
};
TimeMonitor monitor(20,0.2);
NMEAMessageReceiver receiver;
unsigned long lastHeapReport=0;
void loop() {
monitor.reset();
@@ -981,20 +925,18 @@ void loop() {
}
}
monitor.setTime(3);
//read sockets
socketServer.loop(true,false);
for (int i=0;i<numChannels;i++){
allChannels[i]->loop(true,false);
}
//reads
monitor.setTime(4);
//write sockets
socketServer.loop(false,true);
for (int i=0;i<numChannels;i++){
allChannels[i]->loop(false,true);
}
//writes
monitor.setTime(5);
usbSerial->loop(true);
monitor.setTime(6);
if (serial1) serial1->loop(true);
monitor.setTime(7);
handleSendAndRead(true);
monitor.setTime(8);
NMEA2000.ParseMessages();
monitor.setTime(9);
monitor.setTime(6);
int SourceAddress = NMEA2000.GetN2kSource();
if (SourceAddress != NodeAddress) { // Save potentially changed Source Address to NVS memory
@@ -1005,21 +947,27 @@ void loop() {
logger.logDebug(GwLog::LOG,"Address Change: New Address=%d\n", SourceAddress);
}
nmea0183Converter->loop();
monitor.setTime(10);
monitor.setTime(7);
//read channels
if (tcpChannel.shouldRead()) socketServer.readMessages(&receiver);
monitor.setTime(11);
receiver.id=USB_CHANNEL_ID;
if (usbChannel.shouldRead()) usbSerial->readMessages(&receiver);
monitor.setTime(12);
receiver.id=SERIAL1_CHANNEL_ID;
if (serial1 && serialChannel.shouldRead() ) serial1->readMessages(&receiver);
monitor.setTime(13);
if (actisenseReader){
actisenseReader->ParseMessages();
for (int i=0;i<numChannels;i++){
allChannels[i]->readMessages([&](const char * buffer, int sourceId){
for (int j=0;j<numChannels;j++){
allChannels[j]->sendToClients(buffer,sourceId);
allChannels[j]->loop(false,true);
}
if (allChannels[i]->sendToN2K()){
toN2KConverter->parseAndSend(buffer, sourceId);
}
});
}
monitor.setTime(14);
monitor.setTime(8);
for (int i=0;i<numChannels;i++){
allChannels[i]->parseActisense([](const tN2kMsg &msg,int source){
handleN2kMessage(msg,N2KT_MSGACT);
});
}
monitor.setTime(9);
//handle message requests
GwMessage *msg=mainQueue.fetchMessage(0);
@@ -1027,5 +975,5 @@ void loop() {
msg->process();
msg->unref();
}
monitor.setTime(15);
monitor.setTime(10);
}