tcp client resolve mdns

This commit is contained in:
wellenvogel 2022-01-03 13:37:35 +01:00
parent f0a4cfcc53
commit 469b801e04
3 changed files with 139 additions and 22 deletions

View File

@ -150,6 +150,7 @@ size_t GwSocketServer::sendToClients(const char *buf, int source,bool partial)
{ {
if (!clients) if (!clients)
return 0; return 0;
bool hasSend=false;
int len = strlen(buf); int len = strlen(buf);
int sourceIndex = source - minId; int sourceIndex = source - minId;
for (int i = 0; i < maxClients; i++) for (int i = 0; i < maxClients; i++)
@ -161,10 +162,10 @@ size_t GwSocketServer::sendToClients(const char *buf, int source,bool partial)
continue; continue;
if (client->connected()) if (client->connected())
{ {
client->enqueue((uint8_t *)buf, len); if(client->enqueue((uint8_t *)buf, len)) hasSend=true;
} }
} }
return len; return hasSend?len:0;
} }
int GwSocketServer::numClients() int GwSocketServer::numClients()

View File

@ -1,4 +1,13 @@
#include "GwTcpClient.h" #include "GwTcpClient.h"
#include <functional>
#include <ESPmDNS.h>
class ResolveArgs{
public:
String host;
uint32_t timeout;
GwTcpClient *client;
};
bool GwTcpClient::hasConfig(){ bool GwTcpClient::hasConfig(){
return configured; return configured;
@ -15,13 +24,48 @@ void GwTcpClient::stop()
} }
state = C_DISABLED; state = C_DISABLED;
} }
void GwTcpClient::startResolving(){
LOG_DEBUG(GwLog::DEBUG,"TcpClient::resolveHost to %s:%d",
remoteAddress.c_str(),port);
state = C_INITIALIZED;
IPAddress addr;
if (! addr.fromString(remoteAddress)){
if (remoteAddress.endsWith(".local")){
//try to resolve
resolveHost(remoteAddress.substring(0,remoteAddress.length()-6));
}
else{
error="invalid ip "+remoteAddress;
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
return;
}
}
else{
setResolved(addr,true);
startConnection();
}
}
void GwTcpClient::startConnection() void GwTcpClient::startConnection()
{ {
LOG_DEBUG(GwLog::DEBUG,"TcpClient::startConnection to %s:%d", LOG_DEBUG(GwLog::DEBUG,"TcpClient::startConnection to %s:%d",
remoteAddress.c_str(),port); remoteAddress.c_str(),port);
ResolvedAddress addr=getResolved();
state = C_INITIALIZED; state = C_INITIALIZED;
error="";
connectStart=millis(); connectStart=millis();
if (! addr.resolved){
error="unable to resolve "+remoteAddress;
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
return;
}
else{
if (error.isEmpty()) error="connecting...";
}
uint32_t ip_addr = addr.address;
struct sockaddr_in serveraddr;
memset((char *) &serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
memcpy((void *)&serveraddr.sin_addr.s_addr, (const void *)(&ip_addr), 4);
serveraddr.sin_port = htons(port);
int sockfd = socket(AF_INET, SOCK_STREAM, 0); int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) { if (sockfd < 0) {
error="unable to create socket"; error="unable to create socket";
@ -29,18 +73,6 @@ void GwTcpClient::startConnection()
return; return;
} }
fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK ); fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK );
IPAddress addr;
if (! addr.fromString(remoteAddress)){
error="invalid ip "+remoteAddress;
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
return;
}
uint32_t ip_addr = addr;
struct sockaddr_in serveraddr;
memset((char *) &serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
memcpy((void *)&serveraddr.sin_addr.s_addr, (const void *)(&ip_addr), 4);
serveraddr.sin_port = htons(port);
int res = lwip_connect_r(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)); int res = lwip_connect_r(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
if (res < 0 ) { if (res < 0 ) {
if (errno != EINPROGRESS){ if (errno != EINPROGRESS){
@ -64,13 +96,21 @@ void GwTcpClient::checkConnection()
unsigned long now=millis(); unsigned long now=millis();
LOG_DEBUG(GwLog::DEBUG+3,"TcpClient::checkConnection state=%d, start=%ul, now=%ul", LOG_DEBUG(GwLog::DEBUG+3,"TcpClient::checkConnection state=%d, start=%ul, now=%ul",
(int)state,connectStart,now); (int)state,connectStart,now);
if (state == C_RESOLVING){
//TODO: timeout???
return;
}
if (state == C_RESOLVED){
startConnection();
return;
}
if (! connection->hasClient()){ if (! connection->hasClient()){
state = hasConfig()?C_INITIALIZED:C_DISABLED; state = hasConfig()?C_INITIALIZED:C_DISABLED;
} }
if (state == C_INITIALIZED){ if (state == C_INITIALIZED){
if ((now - connectStart) > CON_TIMEOUT){ if ((now - connectStart) > CON_TIMEOUT){
LOG_DEBUG(GwLog::LOG,"retry connect to %s",remoteAddress.c_str()); LOG_DEBUG(GwLog::LOG,"retry connect to %s",remoteAddress.c_str());
startConnection(); startResolving();
} }
return; return;
} }
@ -98,6 +138,7 @@ void GwTcpClient::checkConnection()
connection->stop(); connection->stop();
return; return;
} }
return;
} else { } else {
int sockerr; int sockerr;
socklen_t len = (socklen_t)sizeof(int); socklen_t len = (socklen_t)sizeof(int);
@ -116,18 +157,28 @@ void GwTcpClient::checkConnection()
return; return;
} }
} }
LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.c_str()); if (connection->connected()){
state=C_CONNECTED; error="";
LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.c_str());
state=C_CONNECTED;
}
else{
error=String("connect error ")+String(strerror(errno));
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
state=C_INITIALIZED;
}
} }
GwTcpClient::GwTcpClient(GwLog *logger) GwTcpClient::GwTcpClient(GwLog *logger)
{ {
this->logger = logger; this->logger = logger;
this->connection=NULL; this->connection=NULL;
locker=xSemaphoreCreateMutex();
} }
GwTcpClient::~GwTcpClient(){ GwTcpClient::~GwTcpClient(){
if (connection) if (connection)
delete connection; delete connection;
vSemaphoreDelete(locker);
} }
void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRead) void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRead)
{ {
@ -138,7 +189,7 @@ void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRea
configured=true; configured=true;
state = C_INITIALIZED; state = C_INITIALIZED;
this->connection = new GwSocketConnection(logger,0,allowRead); this->connection = new GwSocketConnection(logger,0,allowRead);
startConnection(); startResolving();
} }
void GwTcpClient::loop(bool handleRead,bool handleWrite) void GwTcpClient::loop(bool handleRead,bool handleWrite)
{ {
@ -149,7 +200,7 @@ void GwTcpClient::loop(bool handleRead,bool handleWrite)
if (handleRead){ if (handleRead){
if (connection->hasClient()){ if (connection->hasClient()){
if (! connection->connected()){ if (! connection->connected()){
LOG_DEBUG(GwLog::ERROR,"connection closed on %s",connection->remoteIpAddress.c_str()); LOG_DEBUG(GwLog::ERROR,"tcp client connection closed on %s",connection->remoteIpAddress.c_str());
connection->stop(); connection->stop();
} }
else{ else{
@ -184,3 +235,53 @@ void GwTcpClient::readMessages(GwMessageFetcher *writer){
if (! connection->hasClient()) return; if (! connection->hasClient()) return;
connection->messagesFromBuffer(writer); connection->messagesFromBuffer(writer);
} }
void GwTcpClient::resolveHost(String host)
{
LOG_DEBUG(GwLog::LOG,"start resolving %s",host.c_str());
{
GWSYNCHRONIZED(&locker);
resolvedAddress.resolved = false;
}
state = C_RESOLVING;
error=String("resolving ")+host;
ResolveArgs *args=new ResolveArgs();
args->host = host;
args->timeout = 10000;
args->client = this;
if (xTaskCreate([](void *p)
{
ResolveArgs *args = (ResolveArgs *)p;
struct ip4_addr addr;
addr.addr = 0;
esp_err_t err = mdns_query_a(args->host.c_str(), args->timeout, &addr);
if (err)
{
args->client->setResolved(IPAddress(), false);
}
else{
args->client->setResolved(IPAddress(addr.addr), true);
}
args->client->logger->logDebug(GwLog::DEBUG,"resolve task end");
delete args;
vTaskDelete(NULL);
},
"resolve", 4000, args, 0, NULL) != pdPASS)
{
LOG_DEBUG(GwLog::ERROR,"unable to start resolve task");
error = "unable to start resolve task";
delete args;
setResolved(IPAddress(), false);
}
}
void GwTcpClient::setResolved(IPAddress addr, bool valid){
LOG_DEBUG(GwLog::LOG,"setResolved %s, valid=%s",
addr.toString().c_str(),(valid?"true":"false"));
GWSYNCHRONIZED(&locker);
resolvedAddress.address=addr;
resolvedAddress.resolved=valid;
state=C_RESOLVED;
}
GwTcpClient::ResolvedAddress GwTcpClient::getResolved(){
GWSYNCHRONIZED(&locker);
return resolvedAddress;
}

View File

@ -1,33 +1,48 @@
#pragma once #pragma once
#include "GwSocketConnection.h" #include "GwSocketConnection.h"
#include "GwChannelInterface.h" #include "GwChannelInterface.h"
#include "GwSynchronized.h"
class GwTcpClient : public GwChannelInterface class GwTcpClient : public GwChannelInterface
{ {
class ResolvedAddress{
public:
IPAddress address;
bool resolved=false;
};
static const unsigned long CON_TIMEOUT=10000; static const unsigned long CON_TIMEOUT=10000;
GwSocketConnection *connection = NULL; GwSocketConnection *connection = NULL;
String remoteAddress; String remoteAddress;
ResolvedAddress resolvedAddress;
uint16_t port = 0; uint16_t port = 0;
unsigned long connectStart=0; unsigned long connectStart=0;
GwLog *logger; GwLog *logger;
int sourceId; int sourceId;
bool configured=false; bool configured=false;
String error; String error;
SemaphoreHandle_t locker;
public: public:
typedef enum typedef enum
{ {
C_DISABLED = 0, C_DISABLED = 0,
C_INITIALIZED = 1, C_INITIALIZED = 1,
C_CONNECTING = 2, C_RESOLVING = 2,
C_CONNECTED = 3 C_RESOLVED = 3,
C_CONNECTING = 4,
C_CONNECTED = 5
} State; } State;
private: private:
State state = C_DISABLED; State state = C_DISABLED;
void stop(); void stop();
void startResolving();
void startConnection(); void startConnection();
void checkConnection(); void checkConnection();
bool hasConfig(); bool hasConfig();
void resolveHost(String host);
void setResolved(IPAddress addr, bool valid);
ResolvedAddress getResolved();
public: public:
GwTcpClient(GwLog *logger); GwTcpClient(GwLog *logger);