Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2020-10-12 17:00:09 +09:00
parent 48169d48fa
commit d91de457f3
2 changed files with 28 additions and 14 deletions

View File

@@ -1,5 +1,5 @@
/************************************************************************************** /**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi * Copyright (c) 2016, 2020 Tomoaki Yamaguchi and others
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
@@ -104,10 +104,14 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
connectData->keepAliveTimer = data.duration; connectData->keepAliveTimer = data.duration;
connectData->flags.bits.will = data.willFlag; connectData->flags.bits.will = data.willFlag;
if ((const char*) _gateway->getGWParams()->loginId != nullptr && (const char*) _gateway->getGWParams()->password != 0) if ((const char*) _gateway->getGWParams()->loginId != nullptr)
{
connectData->flags.bits.username = 1;
}
if ((const char*) _gateway->getGWParams()->password != 0)
{ {
connectData->flags.bits.password = 1; connectData->flags.bits.password = 1;
connectData->flags.bits.username = 1;
} }
client->setSessionStatus(false); client->setSessionStatus(false);
@@ -275,14 +279,18 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet
if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() )
{ {
sendStoredPublish(client); sendStoredPublish(client);
client->holdPingRequest();
}
else
{
/* send PINGREQ to the broker */
client->resetPingRequest();
MQTTGWPacket* pingreq = new MQTTGWPacket();
pingreq->setHeader(PINGREQ);
Event* evt = new Event();
evt->setBrokerSendEvent(client, pingreq);
_gateway->getBrokerSendQue()->post(evt);
} }
/* send PINGREQ to the broker */
MQTTGWPacket* pingreq = new MQTTGWPacket();
pingreq->setHeader(PINGREQ);
Event* evt = new Event();
evt->setBrokerSendEvent(client, pingreq);
_gateway->getBrokerSendQue()->post(evt);
} }
void MQTTSNConnectionHandler::sendStoredPublish(Client* client) void MQTTSNConnectionHandler::sendStoredPublish(Client* client)
@@ -291,6 +299,7 @@ void MQTTSNConnectionHandler::sendStoredPublish(Client* client)
while ( ( msg = client->getClientSleepPacket() ) != nullptr ) while ( ( msg = client->getClientSleepPacket() ) != nullptr )
{ {
// ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
client->deleteFirstClientSleepPacket(); // pop the que to delete element. client->deleteFirstClientSleepPacket(); // pop the que to delete element.
Event* ev = new Event(); Event* ev = new Event();

View File

@@ -145,11 +145,7 @@ bool TCPStack::accept(TCPStack& new_socket)
int TCPStack::send(const uint8_t* buf, int length) int TCPStack::send(const uint8_t* buf, int length)
{ {
#ifdef __APPLE__
return ::send(_sockfd, buf, length, SO_NOSIGPIPE);
#else
return ::send(_sockfd, buf, length, MSG_NOSIGNAL); return ::send(_sockfd, buf, length, MSG_NOSIGNAL);
#endif
} }
int TCPStack::recv(uint8_t* buf, int len) int TCPStack::recv(uint8_t* buf, int len)
@@ -545,6 +541,15 @@ loop:
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
readBlockedOnWrite = true; readBlockedOnWrite = true;
break; break;
case SSL_ERROR_SYSCALL:
SSL_free(_ssl);
_ssl = 0;
_numOfInstance--;
//TCPStack::close();
_busy = false;
_mutex.unlock();
return -1;
break;
default: default:
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("Network::recv() %s\n", errmsg); WRITELOG("Network::recv() %s\n", errmsg);