1
0
mirror of https://github.com/thooge/esp32-nmea2000-obp60.git synced 2025-12-13 05:53:06 +01:00

intermediate: udp reader

This commit is contained in:
andreas
2024-11-08 21:00:25 +01:00
parent b0d5e27b5a
commit 82f5e17987
10 changed files with 406 additions and 24 deletions

View File

@@ -96,6 +96,7 @@ class GwApi{
unsigned long tcpSerRx=0;
unsigned long tcpSerTx=0;
unsigned long udpwTx=0;
unsigned long udprRx=0;
int tcpClients=0;
unsigned long tcpClRx=0;
unsigned long tcpClTx=0;

View File

@@ -7,6 +7,7 @@
#include "GwSerial.h"
#include "GwTcpClient.h"
#include "GwUdpWriter.h"
#include "GwUdpReader.h"
class SerInit{
public:
int serial=-1;
@@ -260,8 +261,27 @@ static ChannelParam channelParameters[]={
.maxId=-1,
.rxstatus=0,
.txstatus=offsetof(GwApi::Status,GwApi::Status::udpwTx)
},
{
.id=UDPR_CHANNEL_ID,
.baud="",
.receive=GwConfigDefinitions::udprEnabled,
.send="",
.direction="",
.toN2K=GwConfigDefinitions::udprToN2k,
.readF=GwConfigDefinitions::udprReadFilter,
.writeF="",
.preventLog="",
.readAct="",
.writeAct="",
.sendSeasmart="",
.name="UDPReader",
.maxId=-1,
.rxstatus=offsetof(GwApi::Status,GwApi::Status::udprRx),
.txstatus=0
}
};
template<typename T>
@@ -451,6 +471,12 @@ void GwChannelList::begin(bool fallbackSerial){
writer->begin();
addChannel(createChannel(logger,config,UDPW_CHANNEL_ID,writer));
}
//udp reader
if (config->getBool(GwConfigDefinitions::udprEnabled)){
GwUdpReader *reader=new GwUdpReader(config,logger,UDPR_CHANNEL_ID);
reader->begin();
addChannel(createChannel(logger,config,UDPR_CHANNEL_ID,reader));
}
logger->flush();
}
String GwChannelList::getMode(int id){

View File

@@ -19,6 +19,7 @@
#define TCP_CLIENT_CHANNEL_ID 4
#define MIN_TCP_CHANNEL_ID 5
#define UDPW_CHANNEL_ID 20
#define UDPR_CHANNEL_ID 21
#define MIN_USER_TASK 200
class GwSocketServer;

View File

@@ -21,7 +21,7 @@ GwBuffer::~GwBuffer(){
}
void GwBuffer::reset(String reason)
{
LOG_DEBUG(GwLog::LOG,"reseting buffer %s, reason %s",this->name.c_str(),reason.c_str());
if (! reason.isEmpty())LOG_DEBUG(GwLog::LOG,"reseting buffer %s, reason %s",this->name.c_str(),reason.c_str());
writePointer = buffer;
readPointer = buffer;
lp("reset");
@@ -33,6 +33,16 @@ size_t GwBuffer::freeSpace()
}
return readPointer - writePointer - 1;
}
size_t GwBuffer::continousSpace() const{
if (readPointer <= writePointer){
return bufferSize-offset(writePointer);
}
return readPointer-writePointer-1;
}
void GwBuffer::moveWp(size_t offset){
if (offset > continousSpace()) return;
writePointer+=offset;
}
size_t GwBuffer::usedSpace()
{
if (readPointer <= writePointer)

View File

@@ -33,7 +33,7 @@ class GwBuffer{
uint8_t *buffer;
uint8_t *writePointer;
uint8_t *readPointer;
size_t offset(uint8_t* ptr){
size_t offset(uint8_t* ptr) const{
return (size_t)(ptr-buffer);
}
GwLog *logger;
@@ -54,6 +54,9 @@ class GwBuffer{
* find the first occurance of x in the buffer, -1 if not found
*/
int findChar(char x);
uint8_t *getWp(){return writePointer;}
size_t continousSpace() const; //free space from wp
void moveWp(size_t offset); //move the wp forward
};
#endif

View File

@@ -22,4 +22,7 @@ class GwSocketHelper{
if (inet_pton(AF_INET,addr.c_str(),&iaddr) != 1) return false;
return IN_MULTICAST(ntohl(iaddr.s_addr));
}
static bool equals(const in_addr &left, const in_addr &right){
return left.s_addr == right.s_addr;
}
};

View File

@@ -0,0 +1,158 @@
#include "GwUdpReader.h"
#include <ESPmDNS.h>
#include <errno.h>
#include "GwBuffer.h"
#include "GwSocketConnection.h"
#include "GwSocketHelper.h"
#include "GWWifi.h"
GwUdpReader::GwUdpReader(const GwConfigHandler *config, GwLog *logger, int minId)
{
this->config = config;
this->logger = logger;
this->minId = minId;
port=config->getInt(GwConfigDefinitions::udprPort);
buffer= new GwBuffer(logger,GwBuffer::RX_BUFFER_SIZE,"udprd");
}
void GwUdpReader::createAndBind(){
if (fd >= 0){
::close(fd);
}
if (currentStationIp.isEmpty() && (type == T_STA || type == T_MCSTA)) return;
fd=socket(AF_INET,SOCK_DGRAM,IPPROTO_IP);
if (fd < 0){
LOG_ERROR("UDPR: unable to create udp socket: %d",errno);
return;
}
int enable = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
if (type == T_STA)
{
if (inet_pton(AF_INET, currentStationIp.c_str(), &listenA.sin_addr) != 1)
{
LOG_ERROR("UDPR: invalid station ip address %s", currentStationIp.c_str());
close(fd);
fd = -1;
return;
}
}
if (bind(fd,(struct sockaddr *)&listenA,sizeof(listenA)) < 0){
LOG_ERROR("UDPR: unable to bind: %d",errno);
close(fd);
fd=-1;
return;
}
LOG_INFO("UDPR: socket created and bound");
if (type != T_MCALL && type != T_MCAP && type != T_MCSTA) {
return;
}
struct ip_mreq mc;
String mcAddr=config->getString(GwConfigDefinitions::udprMC);
if (inet_pton(AF_INET,mcAddr.c_str(),&mc.imr_multiaddr) != 1){
LOG_ERROR("UDPR: invalid multicast addr %s",mcAddr.c_str());
::close(fd);
fd=-1;
return;
}
if (type == T_MCALL || type == T_MCAP){
mc.imr_interface=apAddr;
int res=setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP,&mc,sizeof(mc));
if (res != 0){
LOG_ERROR("UDPR: unable to add MC membership for AP:%d",errno);
}
else{
LOG_INFO("UDPR: membership for %s for AP",mcAddr.c_str());
}
}
if (!currentStationIp.isEmpty() && (type == T_MCALL || type == T_MCSTA))
{
mc.imr_interface = staAddr;
int res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mc, sizeof(mc));
if (res != 0)
{
LOG_ERROR("UDPR: unable to add MC membership for STA:%d", errno);
}
else{
LOG_INFO("UDPR: membership for %s for STA %s",mcAddr.c_str(),currentStationIp.c_str());
}
}
}
void GwUdpReader::begin()
{
if (type != T_UNKNOWN) return; //already started
type=(UType)(config->getInt(GwConfigDefinitions::udprType));
LOG_INFO("UDPR begin, mode=%d",(int)type);
port=config->getInt(GwConfigDefinitions::udprPort);
listenA.sin_family=AF_INET;
listenA.sin_port=htons(port);
if (type != T_STA){
listenA.sin_addr.s_addr=htonl(INADDR_ANY);
}
String ap=WiFi.softAPIP().toString();
if (inet_pton(AF_INET, ap.c_str(), &apAddr) != 1)
{
LOG_ERROR("UDPR: invalid ap ip address %s", ap.c_str());
return;
}
String sta;
if (WiFi.isConnected()) sta=WiFi.localIP().toString();
setStationAdd(sta);
createAndBind();
}
bool GwUdpReader::setStationAdd(const String &sta){
if (sta == currentStationIp) return false;
currentStationIp=sta;
if (inet_pton(AF_INET, currentStationIp.c_str(), &staAddr) != 1){
LOG_ERROR("UDPR: invalid station ip address %s", currentStationIp.c_str());
return false;
}
LOG_INFO("UDPR: new station IP %s",currentStationIp.c_str());
return true;
}
void GwUdpReader::loop(bool handleRead, bool handleWrite)
{
if (handleRead){
if (type == T_STA || type == T_MCALL || type == T_MCSTA){
//only change anything if we considered the station IP
String nextStationIp;
if (WiFi.isConnected()){
String nextStationIp=WiFi.localIP().toString();
}
if (setStationAdd(nextStationIp)){
LOG_INFO("UDPR: wifi client IP changed, restart");
createAndBind();
}
}
}
}
void GwUdpReader::readMessages(GwMessageFetcher *writer)
{
if (fd < 0) return;
//we expect one NMEA message in one UDP packet
buffer->reset();
struct sockaddr_in from;
socklen_t fromLen=sizeof(from);
ssize_t res=recvfrom(fd,buffer->getWp(),buffer->continousSpace(),MSG_DONTWAIT,
(struct sockaddr*)&from,&fromLen);
if (res <= 0) return;
if (GwSocketHelper::equals(from.sin_addr,apAddr)) return;
if (!currentStationIp.isEmpty() && (GwSocketHelper::equals(from.sin_addr,staAddr))) return;
buffer->moveWp(res);
LOG_DEBUG(GwLog::DEBUG,"UDPR: received %d bytes",res);
writer->handleBuffer(buffer);
}
size_t GwUdpReader::sendToClients(const char *buf, int source,bool partial)
{
return 0;
}
GwUdpReader::~GwUdpReader()
{
}

View File

@@ -0,0 +1,45 @@
#ifndef _GWUDPREADER_H
#define _GWUDPREADER_H
#include "GWConfig.h"
#include "GwLog.h"
#include "GwBuffer.h"
#include "GwChannelInterface.h"
#include <memory>
#include <sys/socket.h>
#include <arpa/inet.h>
class GwUdpReader: public GwChannelInterface{
public:
using UType=enum{
T_ALL=0,
T_AP=1,
T_STA=2,
T_MCALL=4,
T_MCAP=5,
T_MCSTA=6,
T_UNKNOWN=-1
};
private:
const GwConfigHandler *config;
GwLog *logger;
int minId;
int port;
int fd=-1;
struct sockaddr_in listenA;
String listenIp;
String currentStationIp;
struct in_addr apAddr;
struct in_addr staAddr;
UType type=T_UNKNOWN;
void createAndBind();
bool setStationAdd(const String &sta);
GwBuffer *buffer=nullptr;
public:
GwUdpReader(const GwConfigHandler *config,GwLog *logger,int minId);
~GwUdpReader();
void begin();
virtual void loop(bool handleRead=true,bool handleWrite=true);
virtual size_t sendToClients(const char *buf,int sourceId, bool partialWrite=false);
virtual void readMessages(GwMessageFetcher *writer);
};
#endif