use async send for tcp connections
This commit is contained in:
parent
2057b70cb7
commit
2b1eda27d4
|
@ -16,6 +16,6 @@ class GwLog{
|
||||||
void logDebug(int level, const char *fmt,...);
|
void logDebug(int level, const char *fmt,...);
|
||||||
int isActive(int level){return level <= logLevel;};
|
int isActive(int level){return level <= logLevel;};
|
||||||
};
|
};
|
||||||
#define LOG_DEBUG(level,fmt,...){ if (logger->isActive(level)) logger->logDebug(level,fmt,__VA_ARGS__);}
|
#define LOG_DEBUG(level,...){ if (logger->isActive(level)) logger->logDebug(level,__VA_ARGS__);}
|
||||||
|
|
||||||
#endif
|
#endif
|
|
@ -0,0 +1,151 @@
|
||||||
|
#ifndef _GWBUFFER_H
|
||||||
|
#define _GWBUFFER_H
|
||||||
|
#include <string.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "GwLog.h"
|
||||||
|
|
||||||
|
class GwBufferWriter{
|
||||||
|
public:
|
||||||
|
virtual int write(const uint8_t *buffer,size_t len)=0;
|
||||||
|
virtual ~GwBufferWriter(){};
|
||||||
|
};
|
||||||
|
|
||||||
|
class GwBuffer{
|
||||||
|
public:
|
||||||
|
static const size_t BUFFER_SIZE=1620; // app. 20 NMEA messages
|
||||||
|
typedef enum {
|
||||||
|
OK,
|
||||||
|
ERROR,
|
||||||
|
AGAIN
|
||||||
|
} WriteStatus;
|
||||||
|
private:
|
||||||
|
uint8_t buffer[BUFFER_SIZE];
|
||||||
|
uint8_t *writePointer=buffer;
|
||||||
|
uint8_t *readPointer=buffer;
|
||||||
|
size_t offset(uint8_t* ptr){
|
||||||
|
return (size_t)(ptr-buffer);
|
||||||
|
}
|
||||||
|
GwLog *logger;
|
||||||
|
void lp(const char *fkt,int p=0){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG + 1,"Buffer[%s]: buf=%p,wp=%d,rp=%d,used=%d,free=%d, p=%d",
|
||||||
|
fkt,buffer,offset(writePointer),offset(readPointer),usedSpace(),freeSpace(),p
|
||||||
|
);
|
||||||
|
}
|
||||||
|
public:
|
||||||
|
GwBuffer(GwLog *logger){
|
||||||
|
this->logger=logger;
|
||||||
|
}
|
||||||
|
void reset(){
|
||||||
|
writePointer=buffer;
|
||||||
|
readPointer=buffer;
|
||||||
|
lp("reset");
|
||||||
|
}
|
||||||
|
size_t freeSpace(){
|
||||||
|
if (readPointer < writePointer){
|
||||||
|
size_t rt=BUFFER_SIZE-offset(writePointer)-1+offset(readPointer);
|
||||||
|
return rt;
|
||||||
|
}
|
||||||
|
if (readPointer == writePointer) return BUFFER_SIZE-1;
|
||||||
|
return readPointer-writePointer-1;
|
||||||
|
}
|
||||||
|
size_t usedSpace(){
|
||||||
|
if (readPointer == writePointer) return 0;
|
||||||
|
if (readPointer < writePointer) return writePointer-readPointer;
|
||||||
|
return BUFFER_SIZE-offset(readPointer)-1+offset(writePointer);
|
||||||
|
}
|
||||||
|
size_t addData(const uint8_t *data,size_t len){
|
||||||
|
lp("addDataE",len);
|
||||||
|
if (len == 0) return 0;
|
||||||
|
if (freeSpace() < len) return 0;
|
||||||
|
size_t written=0;
|
||||||
|
if (writePointer >= readPointer){
|
||||||
|
written=BUFFER_SIZE-offset(writePointer)-1;
|
||||||
|
if (written > len) written=len;
|
||||||
|
if (written) {
|
||||||
|
memcpy(writePointer,data,written);
|
||||||
|
len-=written;
|
||||||
|
data+=written;
|
||||||
|
writePointer+=written;
|
||||||
|
if (offset(writePointer) >= (BUFFER_SIZE-1)) writePointer=buffer;
|
||||||
|
}
|
||||||
|
lp("addData1",written);
|
||||||
|
if (len <= 0) {
|
||||||
|
return written;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//now we have the write pointer before the read pointer
|
||||||
|
//and we did the length check before - so we can safely copy
|
||||||
|
memcpy(writePointer,data,len);
|
||||||
|
writePointer+=len;
|
||||||
|
lp("addData2",len);
|
||||||
|
return len+written;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* write some data to the buffer writer
|
||||||
|
* return an error if the buffer writer returned < 0
|
||||||
|
*/
|
||||||
|
WriteStatus fetchData(GwBufferWriter *writer, bool errorIf0 = true)
|
||||||
|
{
|
||||||
|
lp("fetchDataE");
|
||||||
|
size_t len = usedSpace();
|
||||||
|
if (len == 0)
|
||||||
|
return OK;
|
||||||
|
size_t written=0;
|
||||||
|
size_t plen=len;
|
||||||
|
if (writePointer < readPointer)
|
||||||
|
{
|
||||||
|
//we need to write from readPointer till end and then till writePointer-1
|
||||||
|
plen = BUFFER_SIZE - offset(readPointer)-1;
|
||||||
|
int rt = writer->write(readPointer, plen);
|
||||||
|
lp("fetchData1",rt);
|
||||||
|
if (rt < 0){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns error %d",rt);
|
||||||
|
return ERROR;
|
||||||
|
}
|
||||||
|
if (rt > plen){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"buffer: write too many bytes(1) %d",rt);
|
||||||
|
return ERROR;
|
||||||
|
}
|
||||||
|
if (rt == 0){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns 0 (1)");
|
||||||
|
return (errorIf0 ? ERROR : AGAIN);
|
||||||
|
}
|
||||||
|
readPointer += rt;
|
||||||
|
if (offset(readPointer) >= (BUFFER_SIZE-1))
|
||||||
|
readPointer = buffer;
|
||||||
|
if (rt < plen)
|
||||||
|
return AGAIN;
|
||||||
|
if (plen >= len)
|
||||||
|
return OK;
|
||||||
|
len -=rt;
|
||||||
|
written+=rt;
|
||||||
|
//next part - readPointer should be at buffer now
|
||||||
|
}
|
||||||
|
plen=writePointer - readPointer;
|
||||||
|
if (plen == 0) return OK;
|
||||||
|
int rt = writer->write(readPointer, plen);
|
||||||
|
lp("fetchData2",rt);
|
||||||
|
if (rt < 0){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns error %d",rt);
|
||||||
|
return ERROR;
|
||||||
|
}
|
||||||
|
if (rt == 0){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"buffer: write returns 0 (1)");
|
||||||
|
return (errorIf0 ? ERROR : AGAIN);
|
||||||
|
}
|
||||||
|
if (rt > plen){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"buffer: write too many bytes(2)");
|
||||||
|
return ERROR;
|
||||||
|
}
|
||||||
|
readPointer += rt;
|
||||||
|
if (offset(readPointer) >= (BUFFER_SIZE-1))
|
||||||
|
readPointer = buffer;
|
||||||
|
lp("fetchData3");
|
||||||
|
written+=rt;
|
||||||
|
if (written < len)
|
||||||
|
return AGAIN;
|
||||||
|
return OK;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
|
@ -1,5 +1,95 @@
|
||||||
#include "GwSocketServer.h"
|
#include "GwSocketServer.h"
|
||||||
#include <ESPmDNS.h>
|
#include <ESPmDNS.h>
|
||||||
|
#include <lwip/sockets.h>
|
||||||
|
#include "GwBuffer.h"
|
||||||
|
|
||||||
|
class Writer : public GwBufferWriter{
|
||||||
|
public:
|
||||||
|
wiFiClientPtr client;
|
||||||
|
bool writeError=false;
|
||||||
|
bool timeOut=false;
|
||||||
|
unsigned long writeTimeout;
|
||||||
|
unsigned long lastWrite;
|
||||||
|
bool pending;
|
||||||
|
Writer(wiFiClientPtr client, unsigned long writeTimeout=10000){
|
||||||
|
this->client=client;
|
||||||
|
pending=false;
|
||||||
|
this->writeTimeout=writeTimeout;
|
||||||
|
}
|
||||||
|
virtual ~Writer(){}
|
||||||
|
virtual int write(const uint8_t *buffer,size_t len){
|
||||||
|
int res = send(client->fd(), (void*) buffer, len, MSG_DONTWAIT);
|
||||||
|
if (res < 0){
|
||||||
|
if (errno != EAGAIN){
|
||||||
|
writeError=true;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
res=0;
|
||||||
|
}
|
||||||
|
if (res >= len){
|
||||||
|
pending=false;
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
if (!pending){
|
||||||
|
lastWrite=millis();
|
||||||
|
pending=true;
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
//we need to check if we have still not been able
|
||||||
|
//to write until timeout
|
||||||
|
if (millis() >= (lastWrite+writeTimeout)){
|
||||||
|
timeOut=true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
class GwClient{
|
||||||
|
public:
|
||||||
|
wiFiClientPtr client;
|
||||||
|
GwBuffer *buffer;
|
||||||
|
GwLog *logger;
|
||||||
|
int overflows;
|
||||||
|
private:
|
||||||
|
Writer *writer;
|
||||||
|
public:
|
||||||
|
GwClient(wiFiClientPtr client,GwLog *logger){
|
||||||
|
this->client=client;
|
||||||
|
this->logger=logger;
|
||||||
|
buffer=new GwBuffer(logger);
|
||||||
|
overflows=0;
|
||||||
|
writer=new Writer(client);
|
||||||
|
}
|
||||||
|
~GwClient(){
|
||||||
|
delete writer;
|
||||||
|
}
|
||||||
|
bool enqueue(uint8_t *data, size_t len){
|
||||||
|
if (len == 0) return true;
|
||||||
|
size_t rt=buffer->addData(data,len);
|
||||||
|
if (rt < len){
|
||||||
|
LOG_DEBUG(GwLog::LOG,"overflow on %s",client->remoteIP().toString().c_str());
|
||||||
|
overflows++;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
bool hasData(){
|
||||||
|
return buffer->usedSpace() > 0;
|
||||||
|
}
|
||||||
|
GwBuffer::WriteStatus write(){
|
||||||
|
GwBuffer::WriteStatus rt=buffer->fetchData(writer,false);
|
||||||
|
if (rt != GwBuffer::OK){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG+1,"write returns %d on %s",rt,client->remoteIP().toString().c_str());
|
||||||
|
}
|
||||||
|
if (writer->timeOut ){
|
||||||
|
LOG_DEBUG(GwLog::LOG,"timeout on %s",client->remoteIP().toString().c_str());
|
||||||
|
return GwBuffer::ERROR;
|
||||||
|
}
|
||||||
|
return rt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
GwSocketServer::GwSocketServer(const GwConfigHandler *config,GwLog *logger){
|
GwSocketServer::GwSocketServer(const GwConfigHandler *config,GwLog *logger){
|
||||||
this->config=config;
|
this->config=config;
|
||||||
|
@ -20,23 +110,33 @@ void GwSocketServer::loop()
|
||||||
if (client){
|
if (client){
|
||||||
logger->logString("new client connected from %s",
|
logger->logString("new client connected from %s",
|
||||||
client.remoteIP().toString().c_str());
|
client.remoteIP().toString().c_str());
|
||||||
clients.push_back(wiFiClientPtr(new WiFiClient(client)));
|
fcntl(client.fd(), F_SETFL, O_NONBLOCK);
|
||||||
|
gwClientPtr newClient(new GwClient(wiFiClientPtr(new WiFiClient(client)),logger));
|
||||||
|
clients.push_back(newClient);
|
||||||
|
}
|
||||||
|
//sending
|
||||||
|
for (auto it = clients.begin(); it != clients.end();it++){
|
||||||
|
GwBuffer::WriteStatus rt=(*it)->write();
|
||||||
|
if (rt == GwBuffer::ERROR){
|
||||||
|
LOG_DEBUG(GwLog::ERROR,"write error on %s, closing",(*it)->client->remoteIP().toString().c_str());
|
||||||
|
(*it)->client->stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (auto it = clients.begin(); it != clients.end();it++)
|
for (auto it = clients.begin(); it != clients.end();it++)
|
||||||
{
|
{
|
||||||
if ((*it) != NULL)
|
if ((*it) != NULL)
|
||||||
{
|
{
|
||||||
if (!(*it)->connected())
|
if (!(*it)->client->connected())
|
||||||
{
|
{
|
||||||
logger->logString("client disconnect ");
|
logger->logString("client disconnect");
|
||||||
(*it)->stop();
|
(*it)->client->stop();
|
||||||
clients.erase(it);
|
clients.erase(it);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
while ((*it)->available())
|
while ((*it)->client->available())
|
||||||
{
|
{
|
||||||
char c = (*it)->read();
|
char c = (*it)->client->read();
|
||||||
//TODO: read data
|
//TODO: read data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,9 +148,20 @@ void GwSocketServer::loop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void GwSocketServer::sendToClients(const char *buf){
|
void GwSocketServer::sendToClients(const char *buf){
|
||||||
|
int len=strlen(buf);
|
||||||
|
char buffer[len+2];
|
||||||
|
memcpy(buffer,buf,len);
|
||||||
|
buffer[len]=0x0d;
|
||||||
|
len++;
|
||||||
|
buffer[len]=0x0a;
|
||||||
|
len++;
|
||||||
for (auto it = clients.begin() ; it != clients.end(); it++) {
|
for (auto it = clients.begin() ; it != clients.end(); it++) {
|
||||||
if ( (*it) != NULL && (*it)->connected() ) {
|
if ( (*it) != NULL && (*it)->client->connected() ) {
|
||||||
(*it)->println(buf);
|
bool rt=(*it)->enqueue((uint8_t*)buffer,len);
|
||||||
|
if (!rt){
|
||||||
|
LOG_DEBUG(GwLog::DEBUG,"overflow in send to %s",(*it)->client->remoteIP().toString().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,11 +7,13 @@
|
||||||
#include <WiFi.h>
|
#include <WiFi.h>
|
||||||
|
|
||||||
using wiFiClientPtr = std::shared_ptr<WiFiClient>;
|
using wiFiClientPtr = std::shared_ptr<WiFiClient>;
|
||||||
|
class GwClient;
|
||||||
|
using gwClientPtr = std::shared_ptr<GwClient>;
|
||||||
class GwSocketServer{
|
class GwSocketServer{
|
||||||
private:
|
private:
|
||||||
const GwConfigHandler *config;
|
const GwConfigHandler *config;
|
||||||
GwLog *logger;
|
GwLog *logger;
|
||||||
std::list<wiFiClientPtr> clients;
|
std::list<gwClientPtr> clients;
|
||||||
WiFiServer *server=NULL;
|
WiFiServer *server=NULL;
|
||||||
public:
|
public:
|
||||||
GwSocketServer(const GwConfigHandler *config,GwLog *logger);
|
GwSocketServer(const GwConfigHandler *config,GwLog *logger);
|
||||||
|
|
Loading…
Reference in New Issue