intermediate: tcp read

This commit is contained in:
andreas 2021-10-26 21:54:28 +02:00
parent 7e8e7aa35e
commit e378f6e56b
4 changed files with 35 additions and 17 deletions

View File

@ -12,6 +12,8 @@ GwBuffer::GwBuffer(GwLog *logger,size_t bufferSize, bool rotate)
this->rotate=rotate; this->rotate=rotate;
this->bufferSize=bufferSize; this->bufferSize=bufferSize;
this->buffer=new uint8_t[bufferSize]; this->buffer=new uint8_t[bufferSize];
writePointer = buffer;
readPointer = buffer;
} }
GwBuffer::~GwBuffer(){ GwBuffer::~GwBuffer(){
delete buffer; delete buffer;
@ -42,7 +44,7 @@ size_t GwBuffer::usedSpace()
return 0; return 0;
if (readPointer < writePointer) if (readPointer < writePointer)
return writePointer - readPointer; return writePointer - readPointer;
return bufferSize - offset(readPointer) - 1 + offset(writePointer); return bufferSize - offset(readPointer) + offset(writePointer);
} }
size_t GwBuffer::addData(const uint8_t *data, size_t len) size_t GwBuffer::addData(const uint8_t *data, size_t len)
{ {
@ -54,15 +56,11 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len)
//we only fill in a message if it fit's completely //we only fill in a message if it fit's completely
return 0; return 0;
size_t written = 0; size_t written = 0;
bool canRotate=rotate && offset(readPointer) > 0;
if (writePointer >= readPointer) if (writePointer >= readPointer)
{ {
written = bufferSize - offset(writePointer) - 1; written = bufferSize - offset(writePointer) - 1;
if (written > 0 && ! canRotate){ bool canRotate=rotate && offset(readPointer) > 0;
//if we cannot rotate we are not allowed to write to the last byte if (canRotate) written++; //we can also fill the last byte
//as otherwise we would not be able to distinguish between full and empty
written--;
}
if (written > len) if (written > len)
written = len; written = len;
if (written) if (written)
@ -71,7 +69,7 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len)
len -= written; len -= written;
data += written; data += written;
writePointer += written; writePointer += written;
if (offset(writePointer) >= (bufferSize - 1) && canRotate) if (offset(writePointer) > (bufferSize - 1))
writePointer = buffer; writePointer = buffer;
} }
lp("addData1", written); lp("addData1", written);
@ -79,10 +77,11 @@ size_t GwBuffer::addData(const uint8_t *data, size_t len)
{ {
return written; return written;
} }
if (! rotate) return written;
} }
if (! canRotate) return written;
//now we have the write pointer before the read pointer //now we have the write pointer before the read pointer
int maxLen=readPointer-writePointer-1; int maxLen=readPointer-writePointer-1;
if (maxLen <= 0) return written;
if (len < maxLen) maxLen=len; if (len < maxLen) maxLen=len;
memcpy(writePointer, data, maxLen); memcpy(writePointer, data, maxLen);
writePointer += maxLen; writePointer += maxLen;
@ -124,7 +123,7 @@ GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, int maxLen,boo
return (errorIf0 ? ERROR : AGAIN); return (errorIf0 ? ERROR : AGAIN);
} }
readPointer += rt; readPointer += rt;
if (offset(readPointer) >= (bufferSize - 1)) if (offset(readPointer) > (bufferSize - 1))
readPointer = buffer; readPointer = buffer;
if (rt < plen) if (rt < plen)
return AGAIN; return AGAIN;
@ -155,7 +154,7 @@ GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, int maxLen,boo
return ERROR; return ERROR;
} }
readPointer += rt; readPointer += rt;
if (offset(readPointer) >= (bufferSize - 1)) if (offset(readPointer) > (bufferSize - 1))
readPointer = buffer; readPointer = buffer;
lp("fetchData3"); lp("fetchData3");
written += rt; written += rt;
@ -165,6 +164,7 @@ GwBuffer::WriteStatus GwBuffer::fetchData(GwBufferWriter *writer, int maxLen,boo
} }
int GwBuffer::findChar(char x){ int GwBuffer::findChar(char x){
lp("findChar",x);
int offset=0; int offset=0;
uint8_t *p; uint8_t *p;
for (p=readPointer; p != writePointer && p < (buffer+bufferSize);p++){ for (p=readPointer; p != writePointer && p < (buffer+bufferSize);p++){
@ -187,11 +187,28 @@ GwBuffer::WriteStatus GwBuffer::fetchMessage(GwBufferWriter *writer,char delimit
int pos=findChar(delimiter); int pos=findChar(delimiter);
if (pos < 0) { if (pos < 0) {
if (!freeSpace() && emptyIfFull){ if (!freeSpace() && emptyIfFull){
LOG_DEBUG(GwLog::LOG,"line to long, reset"); LOG_DEBUG(GwLog::LOG,"line to long, reset, buffer=%p",buffer);
reset(); reset();
return ERROR; return ERROR;
} }
return AGAIN; return AGAIN;
} }
if (! rotate){
//in a non rotating buffer we discard the found message
//and copy the remain to the start
int len=pos+1;
int wr=writer->write(readPointer,len);
//in any case discard the data now
int remain=usedSpace()-len;
if (remain > 0){
memcpy(buffer,readPointer+len,remain);
readPointer=buffer;
writePointer=buffer+remain;
}
else{
reset();
}
return (wr == len)?OK:ERROR;
}
return fetchData(writer,pos+1,true); return fetchData(writer,pos+1,true);
} }

View File

@ -28,8 +28,8 @@ class GwBuffer{
size_t bufferSize; size_t bufferSize;
bool rotate; bool rotate;
uint8_t *buffer; uint8_t *buffer;
uint8_t *writePointer=buffer; uint8_t *writePointer;
uint8_t *readPointer=buffer; uint8_t *readPointer;
size_t offset(uint8_t* ptr){ size_t offset(uint8_t* ptr){
return (size_t)(ptr-buffer); return (size_t)(ptr-buffer);
} }

View File

@ -65,7 +65,7 @@ class GwClient{
this->allowRead=allowRead; this->allowRead=allowRead;
buffer=new GwBuffer(logger,WRITE_BUFFER_SIZE); buffer=new GwBuffer(logger,WRITE_BUFFER_SIZE);
if (allowRead){ if (allowRead){
readBuffer=new GwBuffer(logger,READ_BUFFER_SIZE); readBuffer=new GwBuffer(logger,READ_BUFFER_SIZE,false);
} }
overflows=0; overflows=0;
if (client != NULL){ if (client != NULL){
@ -147,7 +147,7 @@ class GwClient{
if (! allowRead) return true; if (! allowRead) return true;
size_t stored=readBuffer->addData((uint8_t*)buffer,res); size_t stored=readBuffer->addData((uint8_t*)buffer,res);
if (stored != res){ if (stored != res){
LOG_DEBUG(GwLog::LOG,"internal read error buffer overflow on %s",remoteIp.c_str()); LOG_DEBUG(GwLog::LOG,"internal read error buffer overflow (w=%d,c=%d) on %s",res,(int)stored,remoteIp.c_str());
} }
return true; return true;
} }

View File

@ -186,6 +186,7 @@ void setup() {
baud=usbBaud->asInt(); baud=usbBaud->asInt();
} }
int st=usbSerial.setup(baud,3,1); //TODO: PIN defines int st=usbSerial.setup(baud,3,1); //TODO: PIN defines
//int st=-1;
if (st < 0){ if (st < 0){
//falling back to old style serial for logging //falling back to old style serial for logging
Serial.begin(baud); Serial.begin(baud);
@ -421,7 +422,7 @@ void loop() {
socketServer.loop(); socketServer.loop();
if (usbSerial.write() == GwBuffer::ERROR){ if (usbSerial.write() == GwBuffer::ERROR){
logger.logDebug(GwLog::DEBUG,"overflow in USB serial"); //logger.logDebug(GwLog::DEBUG,"overflow in USB serial");
} }
NMEA2000.ParseMessages(); NMEA2000.ParseMessages();