intermediate: reading on tcp

This commit is contained in:
andreas 2021-10-26 20:28:21 +02:00
parent 60eabb05ab
commit 933ea0d0aa
8 changed files with 187 additions and 44 deletions

View File

@ -66,6 +66,7 @@ class GwConfigHandler{
const String serverPort=F("serverPort"); const String serverPort=F("serverPort");
const String maxClients=F("maxClients"); const String maxClients=F("maxClients");
const String sendTCP=F("sendTCP"); const String sendTCP=F("sendTCP");
const String readTCP=F("receiveTCP");
const String sendSeasmart=F("sendSeasmart"); const String sendSeasmart=F("sendSeasmart");
const String usbBaud=F("usbBaud"); const String usbBaud=F("usbBaud");
const String systemName=F("systemName"); const String systemName=F("systemName");
@ -84,7 +85,7 @@ class GwConfigHandler{
GwConfigItem * findConfig(const String name, bool dummy=false); GwConfigItem * findConfig(const String name, bool dummy=false);
GwConfigInterface * getConfigItem(const String name, bool dummy=false) const; GwConfigInterface * getConfigItem(const String name, bool dummy=false) const;
private: private:
GwConfigItem* configs[12]={ GwConfigItem* configs[13]={
new GwConfigItem(sendUsb,"true"), new GwConfigItem(sendUsb,"true"),
new GwConfigItem (receiveUsb,"false"), new GwConfigItem (receiveUsb,"false"),
new GwConfigItem (wifiClient,"false"), new GwConfigItem (wifiClient,"false"),
@ -93,13 +94,14 @@ class GwConfigHandler{
new GwConfigItem (serverPort,"2222"), new GwConfigItem (serverPort,"2222"),
new GwConfigItem (maxClients, "10"), new GwConfigItem (maxClients, "10"),
new GwConfigItem (sendTCP,"true"), new GwConfigItem (sendTCP,"true"),
new GwConfigItem (readTCP,"true"),
new GwConfigItem (sendSeasmart,"false"), new GwConfigItem (sendSeasmart,"false"),
new GwConfigItem (usbBaud,"115200"), new GwConfigItem (usbBaud,"115200"),
new GwConfigItem (systemName,"ESP32NMEA2K"), new GwConfigItem (systemName,"ESP32NMEA2K"),
new GwConfigItem (stopApTime,"0") new GwConfigItem (stopApTime,"0")
}; };
int getNumConfig() const{ int getNumConfig() const{
return 12; return 13;
} }
}; };
#endif #endif

View File

@ -6,9 +6,15 @@ void GwBuffer::lp(const char *fkt, int p)
fkt, buffer, offset(writePointer), offset(readPointer), usedSpace(), freeSpace(), p); fkt, buffer, offset(writePointer), offset(readPointer), usedSpace(), freeSpace(), p);
} }
GwBuffer::GwBuffer(GwLog *logger) GwBuffer::GwBuffer(GwLog *logger,size_t bufferSize, bool rotate)
{ {
this->logger = logger; this->logger = logger;
this->rotate=rotate;
this->bufferSize=bufferSize;
this->buffer=new uint8_t[bufferSize];
}
GwBuffer::~GwBuffer(){
delete buffer;
} }
void GwBuffer::reset() void GwBuffer::reset()
{ {
@ -18,13 +24,16 @@ void GwBuffer::reset()
} }
size_t GwBuffer::freeSpace() size_t GwBuffer::freeSpace()
{ {
if (! rotate){
return bufferSize-offset(writePointer)-1;
}
if (readPointer < writePointer) if (readPointer < writePointer)
{ {
size_t rt = BUFFER_SIZE - offset(writePointer) - 1 + offset(readPointer); size_t rt = bufferSize - offset(writePointer) - 1 + offset(readPointer);
return rt; return rt;
} }
if (readPointer == writePointer) if (readPointer == writePointer)
return BUFFER_SIZE - 1; return bufferSize - 1;
return readPointer - writePointer - 1; return readPointer - writePointer - 1;
} }
size_t GwBuffer::usedSpace() size_t GwBuffer::usedSpace()
@ -33,19 +42,27 @@ size_t GwBuffer::usedSpace()
return 0; return 0;
if (readPointer < writePointer) if (readPointer < writePointer)
return writePointer - readPointer; return writePointer - readPointer;
return BUFFER_SIZE - offset(readPointer) - 1 + offset(writePointer); return bufferSize - offset(readPointer) - 1 + offset(writePointer);
} }
size_t GwBuffer::addData(const uint8_t *data, size_t len) size_t GwBuffer::addData(const uint8_t *data, size_t len)
{ {
lp("addDataE", len); lp("addDataE", len);
if (len == 0) if (len == 0)
return 0; return 0;
if (freeSpace() < len) if (freeSpace() < len && rotate)
//in rotating mode (send buffer)
//we only fill in a message if it fit's completely
return 0; return 0;
size_t written = 0; size_t written = 0;
bool canRotate=rotate && offset(readPointer) > 0;
if (writePointer >= readPointer) if (writePointer >= readPointer)
{ {
written = BUFFER_SIZE - offset(writePointer) - 1; written = bufferSize - offset(writePointer) - 1;
if (written > 0 && ! canRotate){
//if we cannot rotate we are not allowed to write to the last byte
//as otherwise we would not be able to distinguish between full and empty
written--;
}
if (written > len) if (written > len)
written = len; written = len;
if (written) if (written)
@ -54,7 +71,7 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len)
len -= written; len -= written;
data += written; data += written;
writePointer += written; writePointer += written;
if (offset(writePointer) >= (BUFFER_SIZE - 1)) if (offset(writePointer) >= (bufferSize - 1) && canRotate)
writePointer = buffer; writePointer = buffer;
} }
lp("addData1", written); lp("addData1", written);
@ -63,21 +80,24 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len)
return written; return written;
} }
} }
if (! canRotate) return written;
//now we have the write pointer before the read pointer //now we have the write pointer before the read pointer
//and we did the length check before - so we can safely copy int maxLen=readPointer-writePointer-1;
memcpy(writePointer, data, len); if (len < maxLen) maxLen=len;
writePointer += len; memcpy(writePointer, data, maxLen);
lp("addData2", len); writePointer += maxLen;
return len + written; lp("addData2", maxLen);
return maxLen + written;
} }
/** /**
* write some data to the buffer writer * write some data to the buffer writer
* return an error if the buffer writer returned < 0 * return an error if the buffer writer returned < 0
*/ */
GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, bool errorIf0 ) GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, int maxLen,bool errorIf0 )
{ {
lp("fetchDataE"); lp("fetchDataE");
size_t len = usedSpace(); size_t len = usedSpace();
if (maxLen > 0 && len > maxLen) len=maxLen;
if (len == 0) if (len == 0)
return OK; return OK;
size_t written = 0; size_t written = 0;
@ -85,7 +105,7 @@ GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, bool errorIf0
if (writePointer < readPointer) if (writePointer < readPointer)
{ {
//we need to write from readPointer till end and then till writePointer-1 //we need to write from readPointer till end and then till writePointer-1
plen = BUFFER_SIZE - offset(readPointer) - 1; plen = bufferSize - offset(readPointer) - 1;
int rt = writer->write(readPointer, plen); int rt = writer->write(readPointer, plen);
lp("fetchData1", rt); lp("fetchData1", rt);
if (rt < 0) if (rt < 0)
@ -104,7 +124,7 @@ GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, bool errorIf0
return (errorIf0 ? ERROR : AGAIN); return (errorIf0 ? ERROR : AGAIN);
} }
readPointer += rt; readPointer += rt;
if (offset(readPointer) >= (BUFFER_SIZE - 1)) if (offset(readPointer) >= (bufferSize - 1))
readPointer = buffer; readPointer = buffer;
if (rt < plen) if (rt < plen)
return AGAIN; return AGAIN;
@ -135,11 +155,43 @@ GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, bool errorIf0
return ERROR; return ERROR;
} }
readPointer += rt; readPointer += rt;
if (offset(readPointer) >= (BUFFER_SIZE - 1)) if (offset(readPointer) >= (bufferSize - 1))
readPointer = buffer; readPointer = buffer;
lp("fetchData3"); lp("fetchData3");
written += rt; written += rt;
if (written < len) if (written < len)
return AGAIN; return AGAIN;
return OK; return OK;
}
int GwBuffer::findChar(char x){
int offset=0;
uint8_t *p;
for (p=readPointer; p != writePointer && p < (buffer+bufferSize);p++){
if (*p == x) return offset;
offset++;
}
if (p >= (buffer+bufferSize)){
//we reached the end of the buffer without "hitting" the write pointer
//so we can start from the beginning if rotating...
if (! rotate) return -1;
for (p=buffer;p < writePointer && p < (buffer+bufferSize);p++){
if (*p == x) return offset;
offset++;
}
}
return -1;
}
GwBuffer::WriteStatus GwBuffer::fetchMessage(GwBufferWriter *writer,char delimiter,bool emptyIfFull){
int pos=findChar(delimiter);
if (pos < 0) {
if (!freeSpace() && emptyIfFull){
LOG_DEBUG(GwLog::LOG,"line to long, reset");
reset();
return ERROR;
}
return AGAIN;
}
return fetchData(writer,pos+1,true);
} }

View File

@ -6,6 +6,7 @@
class GwBufferWriter{ class GwBufferWriter{
public: public:
int id=0; //can be set be users
virtual int write(const uint8_t *buffer,size_t len)=0; virtual int write(const uint8_t *buffer,size_t len)=0;
virtual ~GwBufferWriter(){}; virtual ~GwBufferWriter(){};
}; };
@ -24,7 +25,9 @@ class GwBuffer{
AGAIN AGAIN
} WriteStatus; } WriteStatus;
private: private:
uint8_t buffer[BUFFER_SIZE]; size_t bufferSize;
bool rotate;
uint8_t *buffer;
uint8_t *writePointer=buffer; uint8_t *writePointer=buffer;
uint8_t *readPointer=buffer; uint8_t *readPointer=buffer;
size_t offset(uint8_t* ptr){ size_t offset(uint8_t* ptr){
@ -32,8 +35,13 @@ class GwBuffer{
} }
GwLog *logger; GwLog *logger;
void lp(const char *fkt,int p=0); void lp(const char *fkt,int p=0);
/**
* find the first occurance of x in the buffer, -1 if not found
*/
int findChar(char x);
public: public:
GwBuffer(GwLog *logger); GwBuffer(GwLog *logger,size_t bufferSize,bool rotate=true);
~GwBuffer();
void reset(); void reset();
size_t freeSpace(); size_t freeSpace();
size_t usedSpace(); size_t usedSpace();
@ -42,7 +50,9 @@ class GwBuffer{
* write some data to the buffer writer * write some data to the buffer writer
* return an error if the buffer writer returned < 0 * return an error if the buffer writer returned < 0
*/ */
WriteStatus fetchData(GwBufferWriter *writer, bool errorIf0 = true); WriteStatus fetchData(GwBufferWriter *writer, int maxLen=-1,bool errorIf0 = true);
WriteStatus fetchMessage(GwBufferWriter *writer,char delimiter,bool emptyIfFull=true);
}; };
#endif #endif

View File

@ -17,7 +17,7 @@ GwSerial::GwSerial(GwLog *logger, uart_port_t num)
{ {
this->logger = logger; this->logger = logger;
this->num = num; this->num = num;
this->buffer = new GwBuffer(logger); this->buffer = new GwBuffer(logger,1600);
this->writer = new SerialWriter(num); this->writer = new SerialWriter(num);
} }
GwSerial::~GwSerial() GwSerial::~GwSerial()

View File

@ -7,6 +7,7 @@ class SerialWriter;
class GwSerial{ class GwSerial{
private: private:
GwBuffer *buffer; GwBuffer *buffer;
GwBuffer *readBuffer;
GwLog *logger; GwLog *logger;
SerialWriter *writer; SerialWriter *writer;
uart_port_t num; uart_port_t num;

View File

@ -3,6 +3,8 @@
#include <lwip/sockets.h> #include <lwip/sockets.h>
#include "GwBuffer.h" #include "GwBuffer.h"
#define WRITE_BUFFER_SIZE 1600
#define READ_BUFFER_SIZE 200
class Writer : public GwBufferWriter{ class Writer : public GwBufferWriter{
public: public:
wiFiClientPtr client; wiFiClientPtr client;
@ -48,17 +50,23 @@ class Writer : public GwBufferWriter{
class GwClient{ class GwClient{
public: public:
wiFiClientPtr client; wiFiClientPtr client;
GwBuffer *buffer;
GwLog *logger;
int overflows; int overflows;
String remoteIp; String remoteIp;
private: private:
Writer *writer=NULL; Writer *writer=NULL;
bool allowRead;
GwBuffer *buffer=NULL;
GwBuffer *readBuffer=NULL;
GwLog *logger;
public: public:
GwClient(wiFiClientPtr client,GwLog *logger){ GwClient(wiFiClientPtr client,GwLog *logger, bool allowRead=false){
this->client=client; this->client=client;
this->logger=logger; this->logger=logger;
buffer=new GwBuffer(logger); this->allowRead=allowRead;
buffer=new GwBuffer(logger,WRITE_BUFFER_SIZE);
if (allowRead){
readBuffer=new GwBuffer(logger,READ_BUFFER_SIZE);
}
overflows=0; overflows=0;
if (client != NULL){ if (client != NULL){
writer=new Writer(client); writer=new Writer(client);
@ -68,6 +76,7 @@ class GwClient{
void setClient(wiFiClientPtr client){ void setClient(wiFiClientPtr client){
this->client=client; this->client=client;
buffer->reset(); buffer->reset();
if (readBuffer) readBuffer->reset();
overflows=0; overflows=0;
if (writer) delete writer; if (writer) delete writer;
writer=NULL; writer=NULL;
@ -84,6 +93,8 @@ class GwClient{
} }
~GwClient(){ ~GwClient(){
delete writer; delete writer;
delete buffer;
if (readBuffer) delete readBuffer;
} }
bool enqueue(uint8_t *data, size_t len){ bool enqueue(uint8_t *data, size_t len){
if (len == 0) return true; if (len == 0) return true;
@ -113,16 +124,49 @@ class GwClient{
} }
return rt; return rt;
} }
bool read(){
size_t maxLen=allowRead?readBuffer->freeSpace():100;
if (!maxLen) {
return false;
}
char buffer[maxLen];
int res = recv(client->fd(), (void*) buffer, maxLen, MSG_DONTWAIT);
if (res == 0){
LOG_DEBUG(GwLog::LOG,"client shutdown (recv 0) on %s",remoteIp.c_str());
client->stop();
return false;
}
if (res < 0){
if (errno != EAGAIN){
LOG_DEBUG(GwLog::LOG,"client read error %d on %s",errno,remoteIp.c_str());
client->stop();
return false;
}
return false;
}
if (! allowRead) return true;
size_t stored=readBuffer->addData((uint8_t*)buffer,res);
if (stored != res){
LOG_DEBUG(GwLog::LOG,"internal read error buffer overflow on %s",remoteIp.c_str());
}
return true;
}
bool messagesFromBuffer(GwBufferWriter *writer){
if (! allowRead) return false;
return readBuffer->fetchMessage(writer,'\n',true) == GwBuffer::OK;
}
}; };
GwSocketServer::GwSocketServer(const GwConfigHandler *config,GwLog *logger){ GwSocketServer::GwSocketServer(const GwConfigHandler *config,GwLog *logger,int minId){
this->config=config; this->config=config;
this->logger=logger; this->logger=logger;
this->minId=minId;
maxClients=config->getInt(config->maxClients); maxClients=config->getInt(config->maxClients);
allowReceive=config->getBool(config->readTCP);
clients=new gwClientPtr[maxClients]; clients=new gwClientPtr[maxClients];
for (int i=0;i<maxClients;i++){ for (int i=0;i<maxClients;i++){
clients[i]=gwClientPtr(new GwClient(wiFiClientPtr(NULL),logger)); clients[i]=gwClientPtr(new GwClient(wiFiClientPtr(NULL),logger,allowReceive));
} }
} }
void GwSocketServer::begin(){ void GwSocketServer::begin(){
@ -186,15 +230,22 @@ void GwSocketServer::loop()
} }
else else
{ {
while (client->client->available()) client->read();
{
char c = client->client->read();
//TODO: read data
}
} }
} }
} }
void GwSocketServer::sendToClients(const char *buf){
bool GwSocketServer::readMessages(GwBufferWriter *writer){
if (! allowReceive) return false;
bool hasMessages=false;
for (int i = 0; i < maxClients; i++){
writer->id=minId+i;
if (!clients[i]->hasClient()) continue;
if (clients[i]->messagesFromBuffer(writer)) hasMessages=true;
}
return hasMessages;
}
void GwSocketServer::sendToClients(const char *buf,int source){
int len=strlen(buf); int len=strlen(buf);
char buffer[len+2]; char buffer[len+2];
memcpy(buffer,buf,len); memcpy(buffer,buf,len);
@ -202,8 +253,10 @@ void GwSocketServer::sendToClients(const char *buf){
len++; len++;
buffer[len]=0x0a; buffer[len]=0x0a;
len++; len++;
int sourceIndex=source-minId;
for (int i = 0; i < maxClients; i++) for (int i = 0; i < maxClients; i++)
{ {
if (i == sourceIndex)continue; //never send out to the source we received from
gwClientPtr client = clients[i]; gwClientPtr client = clients[i];
if (! client->hasClient()) continue; if (! client->hasClient()) continue;
if ( client->client->connected() ) { if ( client->client->connected() ) {

View File

@ -2,6 +2,7 @@
#define _GWSOCKETSERVER_H #define _GWSOCKETSERVER_H
#include "GWConfig.h" #include "GWConfig.h"
#include "GwLog.h" #include "GwLog.h"
#include "GwBuffer.h"
#include <memory> #include <memory>
#include <WiFi.h> #include <WiFi.h>
@ -14,13 +15,16 @@ class GwSocketServer{
GwLog *logger; GwLog *logger;
gwClientPtr *clients; gwClientPtr *clients;
WiFiServer *server=NULL; WiFiServer *server=NULL;
bool allowReceive;
int maxClients; int maxClients;
int minId;
public: public:
GwSocketServer(const GwConfigHandler *config,GwLog *logger); GwSocketServer(const GwConfigHandler *config,GwLog *logger,int minId);
~GwSocketServer(); ~GwSocketServer();
void begin(); void begin();
void loop(); void loop();
void sendToClients(const char *buf); void sendToClients(const char *buf,int sourceId);
int numClients(); int numClients();
bool readMessages(GwBufferWriter *writer);
}; };
#endif #endif

View File

@ -37,13 +37,20 @@
#include "GwMessage.h" #include "GwMessage.h"
#include "GwSerial.h" #include "GwSerial.h"
//NMEA message channels
#define N2K_CHANNEL_ID 0
#define USB_CHANNEL_ID 1
#define SERIAL1_CHANNEL_ID 2
#define MIN_TCP_CHANNEL_ID 3
typedef std::map<String,String> StringMap; typedef std::map<String,String> StringMap;
GwLog logger(GwLog::DEBUG,NULL); GwLog logger(GwLog::DEBUG,NULL);
GwConfigHandler config(&logger); GwConfigHandler config(&logger);
GwWifi gwWifi(&config,&logger); GwWifi gwWifi(&config,&logger);
GwSocketServer socketServer(&config,&logger); GwSocketServer socketServer(&config,&logger,MIN_TCP_CHANNEL_ID);
GwBoatData boatData(&logger); GwBoatData boatData(&logger);
@ -374,7 +381,7 @@ void HandleNMEA2000Msg(const tN2kMsg &N2kMsg) {
char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE]; char buf[MAX_NMEA2000_MESSAGE_SEASMART_SIZE];
if ( N2kToSeasmart(N2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) == 0 ) return; if ( N2kToSeasmart(N2kMsg, millis(), buf, MAX_NMEA2000_MESSAGE_SEASMART_SIZE) == 0 ) return;
socketServer.sendToClients(buf); socketServer.sendToClients(buf,N2K_CHANNEL_ID);
} }
@ -385,7 +392,7 @@ void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg) {
char buf[MAX_NMEA0183_MESSAGE_SIZE]; char buf[MAX_NMEA0183_MESSAGE_SIZE];
if ( !NMEA0183Msg.GetMessage(buf, MAX_NMEA0183_MESSAGE_SIZE) ) return; if ( !NMEA0183Msg.GetMessage(buf, MAX_NMEA0183_MESSAGE_SIZE) ) return;
if (sendTCP->asBoolean()){ if (sendTCP->asBoolean()){
socketServer.sendToClients(buf); socketServer.sendToClients(buf,N2K_CHANNEL_ID);
} }
if (sendUsb->asBoolean()){ if (sendUsb->asBoolean()){
int len=strlen(buf); int len=strlen(buf);
@ -399,10 +406,23 @@ void SendNMEA0183Message(const tNMEA0183Msg &NMEA0183Msg) {
} }
} }
class NMEAMessageReceiver : public GwBufferWriter{
public:
virtual int write(const uint8_t *buffer,size_t len){
char nbuf[len+1];
memcpy(nbuf,buffer,len);
nbuf[len]=0;
logger.logDebug(GwLog::DEBUG,"NMEA[%d]: %s",id,nbuf);
return len;
}
};
void loop() { void loop() {
gwWifi.loop(); gwWifi.loop();
socketServer.loop(); socketServer.loop();
if (usbSerial.write() == GwBuffer::ERROR){
logger.logDebug(GwLog::DEBUG,"overflow in USB serial");
}
NMEA2000.ParseMessages(); NMEA2000.ParseMessages();
int SourceAddress = NMEA2000.GetN2kSource(); int SourceAddress = NMEA2000.GetN2kSource();
@ -411,7 +431,7 @@ void loop() {
preferences.begin("nvs", false); preferences.begin("nvs", false);
preferences.putInt("LastNodeAddress", SourceAddress); preferences.putInt("LastNodeAddress", SourceAddress);
preferences.end(); preferences.end();
Serial.printf("Address Change: New Address=%d\n", SourceAddress); logger.logDebug(GwLog::LOG,"Address Change: New Address=%d\n", SourceAddress);
} }
nmea0183Converter->loop(); nmea0183Converter->loop();
@ -422,9 +442,10 @@ void loop() {
msg->process(); msg->process();
msg->unref(); msg->unref();
} }
if (usbSerial.write() == GwBuffer::ERROR){ NMEAMessageReceiver receiver;
logger.logDebug(GwLog::DEBUG,"overflow in USB serial"); socketServer.readMessages(&receiver);
} //read channels
usbSerial.read(); usbSerial.read();
} }