294 lines
9.0 KiB
C++
294 lines
9.0 KiB
C++
#include "GwTcpClient.h"
|
|
#include <functional>
|
|
#include <ESPmDNS.h>
|
|
#include "GwSocketHelper.h"
|
|
|
|
class ResolveArgs{
|
|
public:
|
|
String host;
|
|
uint32_t timeout;
|
|
GwTcpClient *client;
|
|
};
|
|
|
|
bool GwTcpClient::hasConfig(){
|
|
return configured;
|
|
}
|
|
bool GwTcpClient::isConnected(){
|
|
return state == C_CONNECTED;
|
|
}
|
|
void GwTcpClient::stop()
|
|
{
|
|
if (connection && connection->hasClient())
|
|
{
|
|
LOG_DEBUG(GwLog::DEBUG, "stopping tcp client");
|
|
connection->stop();
|
|
}
|
|
state = C_DISABLED;
|
|
}
|
|
void GwTcpClient::startResolving(){
|
|
LOG_DEBUG(GwLog::DEBUG,"TcpClient::resolveHost to %s:%d",
|
|
remoteAddress.c_str(),port);
|
|
state = C_INITIALIZED;
|
|
IPAddress addr;
|
|
if (! addr.fromString(remoteAddress)){
|
|
if (remoteAddress.endsWith(".local")){
|
|
//try to resolve
|
|
resolveHost(remoteAddress.substring(0,remoteAddress.length()-6));
|
|
}
|
|
else{
|
|
error="invalid ip "+remoteAddress;
|
|
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
|
|
return;
|
|
}
|
|
}
|
|
else{
|
|
setResolved(addr,true);
|
|
startConnection();
|
|
}
|
|
}
|
|
void GwTcpClient::startConnection()
|
|
{
|
|
LOG_DEBUG(GwLog::DEBUG,"TcpClient::startConnection to %s:%d",
|
|
remoteAddress.c_str(),port);
|
|
ResolvedAddress addr=getResolved();
|
|
state = C_INITIALIZED;
|
|
connectStart=millis();
|
|
if (! addr.resolved){
|
|
error="unable to resolve "+remoteAddress;
|
|
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
|
|
return;
|
|
}
|
|
else{
|
|
if (error.isEmpty()) error="connecting...";
|
|
}
|
|
uint32_t ip_addr = addr.address;
|
|
struct sockaddr_in serveraddr;
|
|
memset((char *) &serveraddr, 0, sizeof(serveraddr));
|
|
serveraddr.sin_family = AF_INET;
|
|
memcpy((void *)&serveraddr.sin_addr.s_addr, (const void *)(&ip_addr), 4);
|
|
serveraddr.sin_port = htons(port);
|
|
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (sockfd < 0) {
|
|
error="unable to create socket";
|
|
LOG_DEBUG(GwLog::ERROR,"unable to create socket: %d", errno);
|
|
return;
|
|
}
|
|
if (! GwSocketHelper::setKeepAlive(sockfd,true)){
|
|
error="unable to set keepalive, nodelay on socket";
|
|
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
|
|
close(sockfd);
|
|
return;
|
|
}
|
|
fcntl( sockfd, F_SETFL, fcntl( sockfd, F_GETFL, 0 ) | O_NONBLOCK );
|
|
int res = connect(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
|
|
if (res < 0 ) {
|
|
if (errno != EINPROGRESS){
|
|
error=String("connect error ")+String(strerror(errno));
|
|
LOG_DEBUG(GwLog::ERROR,"connect on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
|
|
close(sockfd);
|
|
return;
|
|
}
|
|
state=C_CONNECTING;
|
|
connection->setClient(sockfd);
|
|
LOG_DEBUG(GwLog::DEBUG,"TcpClient connecting...");
|
|
}
|
|
else{
|
|
state=C_CONNECTED;
|
|
connection->setClient(sockfd);
|
|
LOG_DEBUG(GwLog::DEBUG,"TcpClient connected");
|
|
}
|
|
}
|
|
void GwTcpClient::checkConnection()
|
|
{
|
|
unsigned long now=millis();
|
|
LOG_DEBUG(GwLog::DEBUG+3,"TcpClient::checkConnection state=%d, start=%ul, now=%ul",
|
|
(int)state,connectStart,now);
|
|
if (state == C_RESOLVING){
|
|
//TODO: timeout???
|
|
return;
|
|
}
|
|
if (state == C_RESOLVED){
|
|
startConnection();
|
|
return;
|
|
}
|
|
if (! connection->hasClient()){
|
|
state = hasConfig()?C_INITIALIZED:C_DISABLED;
|
|
}
|
|
if (state == C_INITIALIZED){
|
|
if ((now - connectStart) > CON_TIMEOUT){
|
|
LOG_DEBUG(GwLog::LOG,"retry connect to %s",remoteAddress.c_str());
|
|
startResolving();
|
|
}
|
|
return;
|
|
}
|
|
if (state != C_CONNECTING){
|
|
return;
|
|
}
|
|
fd_set fdset;
|
|
struct timeval tv;
|
|
FD_ZERO(&fdset);
|
|
int sockfd=connection->fd;
|
|
FD_SET(connection->fd, &fdset);
|
|
tv.tv_sec = 0;
|
|
tv.tv_usec = 0;
|
|
int res = select(sockfd + 1, nullptr, &fdset, nullptr, &tv);
|
|
if (res < 0) {
|
|
error=String("select error ")+String(strerror(errno));
|
|
LOG_DEBUG(GwLog::ERROR,"select on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
|
|
connection->stop();
|
|
return;
|
|
} else if (res == 0) {
|
|
//still connecting
|
|
if ((now - connectStart) >= CON_TIMEOUT){
|
|
error="connect timeout";
|
|
LOG_DEBUG(GwLog::ERROR,"connect timeout to %s, retry",remoteAddress.c_str());
|
|
connection->stop();
|
|
return;
|
|
}
|
|
return;
|
|
} else {
|
|
int sockerr;
|
|
socklen_t len = (socklen_t)sizeof(int);
|
|
res = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &sockerr, &len);
|
|
|
|
if (res < 0) {
|
|
error="getsockopt failed";
|
|
LOG_DEBUG(GwLog::ERROR,"getsockopt on fd %d, errno: %d, \"%s\"", sockfd, errno, strerror(errno));
|
|
connection->stop();
|
|
return;
|
|
}
|
|
if (sockerr != 0) {
|
|
error=String("socket error ")+String(strerror(sockerr));
|
|
LOG_DEBUG(GwLog::ERROR,"socket error on fd %d, errno: %d, \"%s\"", sockfd, sockerr, strerror(sockerr));
|
|
connection->stop();
|
|
return;
|
|
}
|
|
}
|
|
if (connection->connected()){
|
|
error="";
|
|
LOG_DEBUG(GwLog::LOG,"connected to %s",remoteAddress.c_str());
|
|
state=C_CONNECTED;
|
|
}
|
|
else{
|
|
error=String("connect error ")+String(strerror(errno));
|
|
LOG_DEBUG(GwLog::ERROR,"%s",error.c_str());
|
|
state=C_INITIALIZED;
|
|
}
|
|
}
|
|
|
|
GwTcpClient::GwTcpClient(GwLog *logger)
|
|
{
|
|
this->logger = logger;
|
|
this->connection=NULL;
|
|
locker=xSemaphoreCreateMutex();
|
|
}
|
|
GwTcpClient::~GwTcpClient(){
|
|
if (connection)
|
|
delete connection;
|
|
vSemaphoreDelete(locker);
|
|
}
|
|
void GwTcpClient::begin(int sourceId,String address, uint16_t port,bool allowRead)
|
|
{
|
|
stop();
|
|
this->sourceId=sourceId;
|
|
this->remoteAddress = address;
|
|
this->port = port;
|
|
configured=true;
|
|
state = C_INITIALIZED;
|
|
this->connection = new GwSocketConnection(logger,0,allowRead);
|
|
startResolving();
|
|
}
|
|
void GwTcpClient::loop(bool handleRead,bool handleWrite)
|
|
{
|
|
checkConnection();
|
|
if (state != C_CONNECTED){
|
|
return;
|
|
}
|
|
if (handleRead){
|
|
if (connection->hasClient()){
|
|
if (! connection->connected()){
|
|
LOG_DEBUG(GwLog::ERROR,"tcp client connection closed on %s",connection->remoteIpAddress.c_str());
|
|
connection->stop();
|
|
}
|
|
else{
|
|
connection->read();
|
|
}
|
|
}
|
|
}
|
|
if (handleWrite){
|
|
if (connection->hasClient()){
|
|
GwBuffer::WriteStatus rt = connection->write();
|
|
if (rt == GwBuffer::ERROR)
|
|
{
|
|
LOG_DEBUG(GwLog::ERROR, "write error on %s, closing", connection->remoteIpAddress.c_str());
|
|
connection->stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
size_t GwTcpClient::sendToClients(const char *buf,int sourceId, bool partialWrite){
|
|
if (sourceId == this->sourceId) return 0;
|
|
if (state != C_CONNECTED) return 0;
|
|
if (! connection->hasClient()) return 0;
|
|
size_t len=strlen(buf);
|
|
if (connection->enqueue((uint8_t*)buf,len)){
|
|
return len;
|
|
}
|
|
return 0;
|
|
}
|
|
void GwTcpClient::readMessages(GwMessageFetcher *writer){
|
|
if (state != C_CONNECTED) return;
|
|
if (! connection->hasClient()) return;
|
|
connection->messagesFromBuffer(writer);
|
|
}
|
|
void GwTcpClient::resolveHost(String host)
|
|
{
|
|
LOG_DEBUG(GwLog::LOG,"start resolving %s",host.c_str());
|
|
{
|
|
GWSYNCHRONIZED(&locker);
|
|
resolvedAddress.resolved = false;
|
|
}
|
|
state = C_RESOLVING;
|
|
error=String("resolving ")+host;
|
|
ResolveArgs *args=new ResolveArgs();
|
|
args->host = host;
|
|
args->timeout = 10000;
|
|
args->client = this;
|
|
if (xTaskCreate([](void *p)
|
|
{
|
|
ResolveArgs *args = (ResolveArgs *)p;
|
|
esp_ip4_addr_t addr;
|
|
addr.addr = 0;
|
|
esp_err_t err = mdns_query_a(args->host.c_str(), args->timeout, &addr);
|
|
if (err)
|
|
{
|
|
args->client->setResolved(IPAddress(), false);
|
|
}
|
|
else{
|
|
args->client->setResolved(IPAddress(addr.addr), true);
|
|
}
|
|
args->client->logger->logDebug(GwLog::DEBUG,"resolve task end");
|
|
delete args;
|
|
vTaskDelete(NULL);
|
|
},
|
|
"resolve", 4000, args, 0, NULL) != pdPASS)
|
|
{
|
|
LOG_DEBUG(GwLog::ERROR,"unable to start resolve task");
|
|
error = "unable to start resolve task";
|
|
delete args;
|
|
setResolved(IPAddress(), false);
|
|
}
|
|
}
|
|
void GwTcpClient::setResolved(IPAddress addr, bool valid){
|
|
LOG_DEBUG(GwLog::LOG,"setResolved %s, valid=%s",
|
|
addr.toString().c_str(),(valid?"true":"false"));
|
|
GWSYNCHRONIZED(&locker);
|
|
resolvedAddress.address=addr;
|
|
resolvedAddress.resolved=valid;
|
|
state=C_RESOLVED;
|
|
}
|
|
GwTcpClient::ResolvedAddress GwTcpClient::getResolved(){
|
|
GWSYNCHRONIZED(&locker);
|
|
return resolvedAddress;
|
|
} |