diff --git a/lib/socketserver/GwSocketServer.cpp b/lib/socketserver/GwSocketServer.cpp index 26521e3..1bee3a6 100644 --- a/lib/socketserver/GwSocketServer.cpp +++ b/lib/socketserver/GwSocketServer.cpp @@ -150,6 +150,7 @@ size_t GwSocketServer::sendToClients(const char *buf, int source,bool partial) { if (!clients) return 0; + bool hasSend=false; int len = strlen(buf); int sourceIndex = source - minId; for (int i = 0; i < maxClients; i++) @@ -161,10 +162,10 @@ size_t GwSocketServer::sendToClients(const char *buf, int source,bool partial) continue; if (client->connected()) { - client->enqueue((uint8_t *)buf, len); + if(client->enqueue((uint8_t *)buf, len)) hasSend=true; } } - return len; + return hasSend?len:0; } int GwSocketServer::numClients() diff --git a/lib/socketserver/GwTcpClient.cpp b/lib/socketserver/GwTcpClient.cpp index 10de8a8..5a81470 100644 --- a/lib/socketserver/GwTcpClient.cpp +++ b/lib/socketserver/GwTcpClient.cpp @@ -1,4 +1,13 @@ #include "GwTcpClient.h" +#include +#include + +class ResolveArgs{ + public: + String host; + uint32_t timeout; + GwTcpClient *client; +}; bool GwTcpClient::hasConfig(){ return configured; @@ -15,13 +24,48 @@ void GwTcpClient::stop() } state = C_DISABLED; } +void GwTcpClient::startResolving(){ + LOG_DEBUG(GwLog::DEBUG,"TcpClient::resolveHost to %s:%d", + remoteAddress.c_str(),port); + state = C_INITIALIZED; + IPAddress addr; + if (! addr.fromString(remoteAddress)){ + if (remoteAddress.endsWith(".local")){ + //try to resolve + resolveHost(remoteAddress.substring(0,remoteAddress.length()-6)); + } + else{ + error="invalid ip "+remoteAddress; + LOG_DEBUG(GwLog::ERROR,"%s",error.c_str()); + return; + } + } + else{ + setResolved(addr,true); + startConnection(); + } +} void GwTcpClient::startConnection() { LOG_DEBUG(GwLog::DEBUG,"TcpClient::startConnection to %s:%d", remoteAddress.c_str(),port); + ResolvedAddress addr=getResolved(); state = C_INITIALIZED; - error=""; connectStart=millis(); + if (! addr.resolved){ + error="unable to resolve "+remoteAddress; + LOG_DEBUG(GwLog::ERROR,"%s",error.c_str()); + return; + } + else{ + if (error.isEmpty()) error="connecting..."; + } + uint32_t ip_addr = addr.address; + struct sockaddr_in serveraddr; + memset((char *) &serveraddr, 0, sizeof(serveraddr)); + serveraddr.sin_family = AF_INET; + memcpy((void *)&serveraddr.sin_addr.s_addr, (const void *)(&ip_addr), 4); + serveraddr.sin_port = htons(port); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { error="unable to create socket"; @@ -29,18 +73,6 @@ void GwTcpClient::startConnection() return; } fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK ); - IPAddress addr; - if (! addr.fromString(remoteAddress)){ - error="invalid ip "+remoteAddress; - LOG_DEBUG(GwLog::ERROR,"%s",error.c_str()); - return; - } - uint32_t ip_addr = addr; - struct sockaddr_in serveraddr; - memset((char *) &serveraddr, 0, sizeof(serveraddr)); - serveraddr.sin_family = AF_INET; - memcpy((void *)&serveraddr.sin_addr.s_addr, (const void *)(&ip_addr), 4); - serveraddr.sin_port = htons(port); int res = lwip_connect_r(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)); if (res < 0 ) { if (errno != EINPROGRESS){ @@ -64,13 +96,21 @@ void GwTcpClient::checkConnection() unsigned long now=millis(); LOG_DEBUG(GwLog::DEBUG+3,"TcpClient::checkConnection state=%d, start=%ul, now=%ul", (int)state,connectStart,now); + if (state == C_RESOLVING){ + //TODO: timeout??? + return; + } + if (state == C_RESOLVED){ + startConnection(); + return; + } if (! connection->hasClient()){ state = hasConfig()?C_INITIALIZED:C_DISABLED; } if (state == C_INITIALIZED){ if ((now - connectStart) > CON_TIMEOUT){ LOG_DEBUG(GwLog::LOG,"retry connect to %s",remoteAddress.c_str()); - startConnection(); + startResolving(); } return; } @@ -98,6 +138,7 @@ void GwTcpClient::checkConnection() connection->stop(); return; } + return; } else { int sockerr; socklen_t len = (socklen_t)sizeof(int); @@ -116,18 +157,28 @@ void GwTcpClient::checkConnection() return; } } - LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.c_str()); - state=C_CONNECTED; + if (connection->connected()){ + error=""; + LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.c_str()); + state=C_CONNECTED; + } + else{ + error=String("connect error ")+String(strerror(errno)); + LOG_DEBUG(GwLog::ERROR,"%s",error.c_str()); + state=C_INITIALIZED; + } } GwTcpClient::GwTcpClient(GwLog *logger) { this->logger = logger; this->connection=NULL; + locker=xSemaphoreCreateMutex(); } GwTcpClient::~GwTcpClient(){ if (connection) delete connection; + vSemaphoreDelete(locker); } void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRead) { @@ -138,7 +189,7 @@ void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRea configured=true; state = C_INITIALIZED; this->connection = new GwSocketConnection(logger,0,allowRead); - startConnection(); + startResolving(); } void GwTcpClient::loop(bool handleRead,bool handleWrite) { @@ -149,7 +200,7 @@ void GwTcpClient::loop(bool handleRead,bool handleWrite) if (handleRead){ if (connection->hasClient()){ if (! connection->connected()){ - LOG_DEBUG(GwLog::ERROR,"connection closed on %s",connection->remoteIpAddress.c_str()); + LOG_DEBUG(GwLog::ERROR,"tcp client connection closed on %s",connection->remoteIpAddress.c_str()); connection->stop(); } else{ @@ -183,4 +234,54 @@ void GwTcpClient::readMessages(GwMessageFetcher *writer){ if (state != C_CONNECTED) return; if (! connection->hasClient()) return; connection->messagesFromBuffer(writer); +} +void GwTcpClient::resolveHost(String host) +{ + LOG_DEBUG(GwLog::LOG,"start resolving %s",host.c_str()); + { + GWSYNCHRONIZED(&locker); + resolvedAddress.resolved = false; + } + state = C_RESOLVING; + error=String("resolving ")+host; + ResolveArgs *args=new ResolveArgs(); + args->host = host; + args->timeout = 10000; + args->client = this; + if (xTaskCreate([](void *p) + { + ResolveArgs *args = (ResolveArgs *)p; + struct ip4_addr addr; + addr.addr = 0; + esp_err_t err = mdns_query_a(args->host.c_str(), args->timeout, &addr); + if (err) + { + args->client->setResolved(IPAddress(), false); + } + else{ + args->client->setResolved(IPAddress(addr.addr), true); + } + args->client->logger->logDebug(GwLog::DEBUG,"resolve task end"); + delete args; + vTaskDelete(NULL); + }, + "resolve", 4000, args, 0, NULL) != pdPASS) + { + LOG_DEBUG(GwLog::ERROR,"unable to start resolve task"); + error = "unable to start resolve task"; + delete args; + setResolved(IPAddress(), false); + } +} +void GwTcpClient::setResolved(IPAddress addr, bool valid){ + LOG_DEBUG(GwLog::LOG,"setResolved %s, valid=%s", + addr.toString().c_str(),(valid?"true":"false")); + GWSYNCHRONIZED(&locker); + resolvedAddress.address=addr; + resolvedAddress.resolved=valid; + state=C_RESOLVED; +} +GwTcpClient::ResolvedAddress GwTcpClient::getResolved(){ + GWSYNCHRONIZED(&locker); + return resolvedAddress; } \ No newline at end of file diff --git a/lib/socketserver/GwTcpClient.h b/lib/socketserver/GwTcpClient.h index d3488d1..25cc654 100644 --- a/lib/socketserver/GwTcpClient.h +++ b/lib/socketserver/GwTcpClient.h @@ -1,33 +1,48 @@ #pragma once #include "GwSocketConnection.h" #include "GwChannelInterface.h" +#include "GwSynchronized.h" class GwTcpClient : public GwChannelInterface { + class ResolvedAddress{ + public: + IPAddress address; + bool resolved=false; + }; static const unsigned long CON_TIMEOUT=10000; GwSocketConnection *connection = NULL; String remoteAddress; + ResolvedAddress resolvedAddress; uint16_t port = 0; unsigned long connectStart=0; GwLog *logger; int sourceId; bool configured=false; String error; + SemaphoreHandle_t locker; public: typedef enum { C_DISABLED = 0, C_INITIALIZED = 1, - C_CONNECTING = 2, - C_CONNECTED = 3 + C_RESOLVING = 2, + C_RESOLVED = 3, + C_CONNECTING = 4, + C_CONNECTED = 5 } State; private: + State state = C_DISABLED; void stop(); + void startResolving(); void startConnection(); void checkConnection(); bool hasConfig(); + void resolveHost(String host); + void setResolved(IPAddress addr, bool valid); + ResolvedAddress getResolved(); public: GwTcpClient(GwLog *logger);