From 63d2824444ecc4b3906777f8f3eeb21314687655 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Tue, 4 Oct 2016 11:17:46 +0900 Subject: [PATCH] BugFix: stop Process mechanism of ProcessFramework. Update: DISCONNECT procedure for NonActive clients. Signed-off-by: tomoaki --- MQTTSNGateway/src/MQTTGWConnectionHandler.cpp | 10 +- MQTTSNGateway/src/MQTTGWConnectionHandler.h | 1 + MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp | 23 ++- MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp | 142 +++++++++--------- MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h | 1 - MQTTSNGateway/src/MQTTSNGWClient.cpp | 3 +- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 33 ++-- MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp | 19 ++- .../src/MQTTSNGWConnectionHandler.cpp | 23 +-- .../src/MQTTSNGWPacketHandleTask.cpp | 17 ++- MQTTSNGateway/src/MQTTSNGWProcess.cpp | 81 ++++++---- MQTTSNGateway/src/MQTTSNGWProcess.h | 11 +- MQTTSNGateway/src/MQTTSNGateway.cpp | 81 +++++++--- MQTTSNGateway/src/MQTTSNGateway.h | 15 +- MQTTSNGateway/src/linux/Network.cpp | 18 +-- MQTTSNGateway/src/linux/Threading.cpp | 37 +++-- MQTTSNGateway/src/linux/Threading.h | 12 +- MQTTSNGateway/src/linux/udp/SensorNetwork.cpp | 34 +++-- MQTTSNGateway/src/linux/udp/SensorNetwork.h | 10 +- .../src/tests/TestProcessFramework.cpp | 1 + MQTTSNGateway/src/tests/TestTask.cpp | 5 +- 21 files changed, 338 insertions(+), 239 deletions(-) diff --git a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp index d670ef9..8440142 100644 --- a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp @@ -95,4 +95,12 @@ void MQTTGWConnectionHandler::handlePingresp(Client* client, MQTTGWPacket* packe _gateway->getClientSendQue()->post(ev1); } - +void MQTTGWConnectionHandler::handleDisconnect(Client* client, MQTTGWPacket* packet) +{ + MQTTSNPacket* snPacket = new MQTTSNPacket(); + snPacket->setDISCONNECT(0); + client->disconnected(); + client->getNetwork()->close(); + Event* ev1 = new Event(); + ev1->setClientSendEvent(client, snPacket); +} diff --git a/MQTTSNGateway/src/MQTTGWConnectionHandler.h b/MQTTSNGateway/src/MQTTGWConnectionHandler.h index aae892e..125350d 100644 --- a/MQTTSNGateway/src/MQTTGWConnectionHandler.h +++ b/MQTTSNGateway/src/MQTTGWConnectionHandler.h @@ -30,6 +30,7 @@ public: ~MQTTGWConnectionHandler(); void handleConnack(Client* client, MQTTGWPacket* packet); void handlePingresp(Client* client, MQTTGWPacket* packet); + void handleDisconnect(Client* client, MQTTGWPacket* packet); private: Gateway* _gateway; }; diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp index 0e233ed..33d2733 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp @@ -59,6 +59,10 @@ void BrokerRecvTask::run(void) while (true) { + if (CHK_SIGINT) + { + return; + } timeout.tv_sec = 0; timeout.tv_usec = 500000; // 500 msec FD_ZERO(&rset); @@ -117,10 +121,7 @@ void BrokerRecvTask::run(void) /* post a BrokerRecvEvent */ ev = new Event(); ev->setBrokerRecvEvent(client, packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); } else { @@ -143,11 +144,17 @@ void BrokerRecvTask::run(void) WRITELOG("%s BrokerRecvTask can't create the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); } - /* disconnect the client */ - client->disconnected(); - client->getNetwork()->disconnect(); - rc = 0; delete packet; + + /* disconnect the client */ + if ( rc == -1 || rc == -2 ) + { + packet = new MQTTGWPacket(); + packet->setHeader(DISCONNECT); + ev = new Event(); + ev->setBrokerRecvEvent(client, packet); + _gateway->getPacketEventQue()->post(ev); + } } } } diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp index ea589e4..bd850b1 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp @@ -60,83 +60,90 @@ void BrokerSendTask::run() Event* ev = 0; MQTTGWPacket* packet = 0; Client* client = 0; + int rc = 0; while (true) { - int rc = 0; ev = _gateway->getBrokerSendQue()->wait(); - client = ev->getClient(); - packet = ev->getMQTTGWPacket(); - /* - if ( client->getNetwork()->isValid() && !client->getNetwork()->isSecure() && packet->getType() == CONNECT ) - { - client->getNetwork()->close(); - } - */ - if ( !client->getNetwork()->isValid() ) - { - /* connect to the broker and send a packet */ - char* portNo = _gwparams->port; - const char* cert = 0; - const char* keyFile = 0; - string certFile; - string privateKeyFile; - if (client->isSecureNetwork()) + if ( ev->getEventType() == EtStop ) + { + delete ev; + return; + } + + if ( ev->getEventType() == EtBrokerSend) + { + client = ev->getClient(); + packet = ev->getMQTTGWPacket(); + + if ( !client->getNetwork()->isValid() ) { - portNo = _gwparams->portSecure; - if ( _gwparams->certDirectory ) + /* connect to the broker and send a packet */ + char* portNo = _gwparams->port; + const char* cert = 0; + const char* keyFile = 0; + string certFile; + string privateKeyFile; + + if (client->isSecureNetwork()) { - certFile = _gwparams->certDirectory; - certFile += client->getClientId(); - certFile += ".crt"; - cert = certFile.c_str(); - privateKeyFile = _gwparams->certDirectory; - privateKeyFile += client->getClientId(); - privateKeyFile += ".key"; - keyFile = privateKeyFile.c_str(); + portNo = _gwparams->portSecure; + if ( _gwparams->certDirectory ) + { + certFile = _gwparams->certDirectory; + certFile += client->getClientId(); + certFile += ".crt"; + cert = certFile.c_str(); + privateKeyFile = _gwparams->certDirectory; + privateKeyFile += client->getClientId(); + privateKeyFile += ".key"; + keyFile = privateKeyFile.c_str(); + } + rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->portSecure, _gwparams->rootCApath, _gwparams->rootCAfile, cert, keyFile); } - rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->portSecure, _gwparams->rootCApath, _gwparams->rootCAfile, cert, keyFile); + else + { + rc = client->getNetwork()->connect(_gwparams->brokerName, portNo); + } + + if ( !rc ) + { + /* disconnect the broker and the client */ + WRITELOG("%s BrokerSendTask can't connect to the broker. %s%s\n", + ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); + delete ev; + client->getNetwork()->close(); + continue; + } + } + + /* send a packet */ + _light->blueLight(true); + if ( (rc = packet->send(client->getNetwork())) > 0 ) + { + if ( packet->getType() == CONNECT ) + { + client->setWaitWillMsgFlg(false); + client->connectSended(); + } + log(client, packet); } else { - rc = client->getNetwork()->connect(_gwparams->brokerName, portNo); - } - - if ( !rc ) - { - /* disconnect the broker and the client */ - WRITELOG("%s BrokerSendTask can't connect to the broker. errno=%d %s%s\n", - ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); - delete ev; - disconnect(client); - client->getNetwork()->disconnect(); - continue; - } - } - - /* send a packet */ - _light->blueLight(true); - if ( (rc = packet->send(client->getNetwork())) > 0 ) - { - if ( packet->getType() == CONNECT ) - { - client->setWaitWillMsgFlg(false); - client->connectSended(); - } - log(client, packet); - } - else - { - WRITELOG("%s BrokerSendTask can't send a packet. errno=%d %s%s\n", - ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s BrokerSendTask can't send a packet to the broker errno=%d %s%s\n", ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); - client->disconnected(); - client->getNetwork()->disconnect(); - } - _light->blueLight(false); + /* Disconnect the client */ + packet = new MQTTGWPacket(); + packet->setHeader(DISCONNECT); + ev = new Event(); + ev->setBrokerRecvEvent(client, packet); + _gateway->getPacketEventQue()->post(ev); + } + + _light->blueLight(false); + } delete ev; } } @@ -177,12 +184,3 @@ void BrokerSendTask::log(Client* client, MQTTGWPacket* packet) } } -void BrokerSendTask::disconnect(Client* client) -{ - MQTTSNPacket* snMsg = new MQTTSNPacket(); - snMsg->setDISCONNECT(0); - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, snMsg); - _gateway->getClientSendQue()->post(ev1); - client->disconnected(); -} diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h index a589a5d..f287536 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h @@ -35,7 +35,6 @@ public: void run(); private: void log(Client*, MQTTGWPacket*); - void disconnect(Client*); Gateway* _gateway; GatewayParams* _gwparams; LightIndicator* _light; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 4a8aecf..74b07ac 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -102,9 +102,7 @@ bool ClientList::authorize(const char* fileName) } pos = data.find_first_of(","); string id = data.substr(0, pos); - clientId.cstring = strdup(id.c_str()); - string addr = data.substr(pos + 1); if (netAddr.setAddress(&addr) == 0) @@ -117,6 +115,7 @@ bool ClientList::authorize(const char* fileName) { WRITELOG("Invalid address %s\n", data.c_str()); } + free(clientId.cstring); } fclose(fp); _authorize = true; diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index ff87f5b..f20783d 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -59,6 +59,12 @@ void ClientRecvTask::run() MQTTSNPacket* packet = new MQTTSNPacket(); int packetLen = packet->recv(_sensorNetwork); + if (CHK_SIGINT) + { + delete packet; + return; + } + if (packetLen < 2 ) { delete packet; @@ -77,10 +83,7 @@ void ClientRecvTask::run() log(0, packet); ev = new Event(); ev->setBrodcastEvent(packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); continue; } @@ -93,10 +96,7 @@ void ClientRecvTask::run() log(client, packet); ev = new Event(); ev->setClientRecvEvent(client,packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); } else { @@ -112,7 +112,7 @@ void ClientRecvTask::run() if (!client) { - WRITELOG("%s Can't create a Client. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, ERRMSG_FOOTER); + WRITELOG("%s Client was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, ERRMSG_FOOTER); delete packet; continue; } @@ -123,15 +123,20 @@ void ClientRecvTask::run() client->setClientAddress(_sensorNetwork->getSenderAddress()); ev = new Event(); ev->setClientRecvEvent(client, packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); } else { log(client, packet); delete packet; + /* Send DISCONNECT */ + SensorNetAddress* addr = new SensorNetAddress(); + addr = _sensorNetwork->getSenderAddress(); + packet = new MQTTSNPacket(); + packet->setDISCONNECT(0); + ev = new Event(); + ev->setClientSendEvent(addr, packet); + _gateway->getClientSendQue()->post(ev); continue; } } @@ -142,7 +147,7 @@ void ClientRecvTask::log(Client* client, MQTTSNPacket* packet) { char pbuf[SIZE_OF_LOG_PACKET * 3]; char msgId[6]; - const char* clientId = client ? (const char*)client->getClientId() :"Non Active Client !" ; + const char* clientId = client ? (const char*)client->getClientId() : NONACTCLT ; switch (packet->getType()) { diff --git a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp index b7d7bff..b245c77 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp @@ -45,6 +45,11 @@ void ClientSendTask::run() { Event* ev = _gateway->getClientSendQue()->wait(); + if (ev->getEventType() == EtStop) + { + delete ev; + break; + } if (ev->getEventType() == EtClientSend) { client = ev->getClient(); @@ -56,6 +61,11 @@ void ClientSendTask::run() packet = ev->getMQTTSNPacket(); packet->broadcast(_sensorNetwork); } + else if (ev->getEventType() == EtSensornetSend) + { + packet = ev->getMQTTSNPacket(); + packet->unicast(_sensorNetwork, ev->getSensorNetAddress()); + } log(client, packet); delete ev; @@ -66,6 +76,7 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet) { char pbuf[SIZE_OF_LOG_PACKET * 3]; char msgId[6]; + const char* clientId = client ? (const char*)client->getClientId() : NONACTCLT ; switch (packet->getType()) { @@ -78,15 +89,13 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet) case MQTTSN_DISCONNECT: case MQTTSN_WILLTOPICREQ: case MQTTSN_WILLMSGREQ: - WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, client->getClientId(), packet->print(pbuf)); - break; case MQTTSN_WILLTOPICRESP: case MQTTSN_WILLMSGRESP: - WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, client->getClientId(), packet->print(pbuf)); + WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, clientId, packet->print(pbuf)); break; case MQTTSN_REGISTER: case MQTTSN_PUBLISH: - WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, client->getClientId(), packet->print(pbuf)); + WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, clientId, packet->print(pbuf)); break; case MQTTSN_REGACK: case MQTTSN_PUBACK: @@ -95,7 +104,7 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet) case MQTTSN_PUBCOMP: case MQTTSN_SUBACK: case MQTTSN_UNSUBACK: - WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, client->getClientId(), packet->print(pbuf)); + WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, clientId, packet->print(pbuf)); break; default: break; diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index cae95f2..5783646 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -209,19 +209,22 @@ void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet */ void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet) { - MQTTGWPacket* mqMsg = new MQTTGWPacket(); + Event* ev = new Event(); MQTTSNPacket* snMsg = new MQTTSNPacket(); - - mqMsg->setHeader(DISCONNECT); snMsg->setDISCONNECT(0); + ev->setClientSendEvent(client, snMsg); + _gateway->getClientSendQue()->post(ev); - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, snMsg); - _gateway->getClientSendQue()->post(ev1); - - ev1 = new Event(); - ev1->setBrokerSendEvent(client, mqMsg); - _gateway->getBrokerSendQue()->post(ev1); + uint16_t duration = 0; + packet->getDISCONNECT(&duration); + if ( duration == 0 ) + { + MQTTGWPacket* mqMsg = new MQTTGWPacket(); + mqMsg->setHeader(DISCONNECT); + ev = new Event(); + ev->setBrokerSendEvent(client, mqMsg); + _gateway->getBrokerSendQue()->post(ev); + } } /* diff --git a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp index bf8a85d..41270fa 100644 --- a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp @@ -84,7 +84,6 @@ void PacketHandleTask::run() { Event* ev = 0; EventQue* eventQue = _gateway->getPacketEventQue(); - ClientList* clist = _gateway->getClientList(); Client* client = 0; MQTTSNPacket* snPacket = 0; MQTTGWPacket* brPacket = 0; @@ -95,27 +94,29 @@ void PacketHandleTask::run() while (true) { - if (theProcess->checkSignal() == SIGINT) - { - throw Exception("Terminated by CTL-C"); - } - /* wait Event */ ev = eventQue->timedwait(EVENT_QUE_TIME_OUT); + if (ev->getEventType() == EtStop) + { + delete ev; + return; + } + if (ev->getEventType() == EtTimeout) { /*------ Is Client Lost ? ---------*/ - client = clist->getClient(); + /* + client = _gateway->getClientList()->getClient(); while (client > 0) { if ( client->checkTimeover() ) { - _mqttsnConnection->handleDisconnect(client, 0); client->disconnected(); } client = client->getNextClient(); } + */ /*------ Check Keep Alive Timer & send Advertise ------*/ if (_advertiseTimer.isTimeup()) { diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.cpp b/MQTTSNGateway/src/MQTTSNGWProcess.cpp index bd6d2ad..18f5df3 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.cpp +++ b/MQTTSNGateway/src/MQTTSNGWProcess.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "MQTTSNGWProcess.h" #include "Threading.h" @@ -118,6 +119,28 @@ void Process::initialize(int argc, char** argv) } } +void Process::putLog(const char* format, ...) +{ + _mt.lock(); + va_list arg; + va_start(arg, format); + vsprintf(_rbdata, format, arg); + va_end(arg); + if (strlen(_rbdata)) + { + if ( _log > 0 ) + { + _rb->put(_rbdata); + _rbsem->post(); + } + else + { + printf("%s", _rbdata); + } + } + _mt.unlock(); +} + int Process::getArgc() { return _argc; @@ -183,28 +206,6 @@ int Process::getParam(const char* parameter, char* value) return -2; } -void Process::putLog(const char* format, ...) -{ - _mt.lock(); - va_list arg; - va_start(arg, format); - vsprintf(_rbdata, format, arg); - va_end(arg); - if (strlen(_rbdata)) - { - if ( _log > 0 ) - { - _rb->put(_rbdata); - _rbsem->post(); - } - else - { - printf("%s", _rbdata); - } - } - _mt.unlock(); -} - const char* Process::getLog() { int len = 0; @@ -249,6 +250,7 @@ MultiTaskProcess::MultiTaskProcess() { theMultiTaskProcess = this; _threadCount = 0; + _stopCount = 0; } MultiTaskProcess::~MultiTaskProcess() @@ -275,28 +277,43 @@ void MultiTaskProcess::run(void) try { - _stopProcessEvent.wait(); - } - catch (Exception* ex) - { - for (int i = 0; i < _threadCount; i++) + while(true) { - if ( _threadList[i] ) + if (theProcess->checkSignal() == SIGINT) { - _threadList[i]->cancel();; + return; } + sleep(1); } + } + catch(Exception* ex) + { ex->writeMessage(); } + catch(...) + { + throw; + } } -Semaphore* MultiTaskProcess::getStopProcessEvent(void) +void MultiTaskProcess::waitStop(void) { - return &_stopProcessEvent; + while (_stopCount < _threadCount) + { + sleep(1); + } +} + +void MultiTaskProcess::threadStoped(void) +{ + _mutex.lock(); + _stopCount++; + _mutex.unlock(); } void MultiTaskProcess::attach(Thread* thread) { + _mutex.lock(); if (_threadCount < MQTTSNGW_MAX_TASK) { _threadList[_threadCount] = thread; @@ -304,8 +321,10 @@ void MultiTaskProcess::attach(Thread* thread) } else { + _mutex.unlock(); throw Exception("Full of Threads"); } + _mutex.unlock(); } int MultiTaskProcess::getParam(const char* parameter, char* value) diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.h b/MQTTSNGateway/src/MQTTSNGWProcess.h index 3606678..7fdd52f 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.h +++ b/MQTTSNGateway/src/MQTTSNGWProcess.h @@ -39,7 +39,7 @@ namespace MQTTSNGW * Macros ==================================*/ #define WRITELOG theProcess->putLog - +#define CHK_SIGINT (theProcess->checkSignal() == SIGINT) /*================================= Class Process ==================================*/ @@ -79,17 +79,18 @@ class MultiTaskProcess: public Process public: MultiTaskProcess(void); ~MultiTaskProcess(); - virtual void initialize(int argc, char** argv); - virtual int getParam(const char* parameter, char* value); + void initialize(int argc, char** argv); + int getParam(const char* parameter, char* value); void run(void); + void waitStop(void); + void threadStoped(void); void attach(Thread* thread); - Semaphore* getStopProcessEvent(void); private: Thread* _threadList[MQTTSNGW_MAX_TASK]; - Semaphore _stopProcessEvent; Mutex _mutex; int _threadCount; + int _stopCount; }; /*===================================== diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index 1352b6a..8e9a25d 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -42,6 +42,8 @@ Gateway::Gateway() _params.rootCApath = 0; _params.rootCAfile = 0; _params.certDirectory = 0; + _params.clientListName = 0; + _params.configName = 0; _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS); } @@ -71,11 +73,18 @@ Gateway::~Gateway() { free(_params.portSecure); } + if ( _params.certDirectory ) + { + free(_params.certDirectory); + } if ( _params.rootCApath ) { free(_params.rootCApath); } - + if ( _params.rootCAfile ) + { + free(_params.rootCAfile); + } if ( _params.clientListName ) { free(_params.clientListName); @@ -93,7 +102,6 @@ void Gateway::initialize(int argc, char** argv) MultiTaskProcess::initialize(argc, argv); resetRingBuffer(); - if (getParam("BrokerName", param) == 0) { _params.brokerName = strdup(param); @@ -215,8 +223,22 @@ void Gateway::run(void) WRITELOG(" RootCAfile: %s\n", _params.rootCAfile); WRITELOG(" ClientCerts: %s\n", _params.certDirectory); - /* Execute threads and wait StopProcessEvent from MQTTSNGWPacketHandleTask */ MultiTaskProcess::run(); + + /* stop threads */ + Event* ev = new Event(); + ev->setStop(); + _packetEventQue.post(ev); + ev = new Event(); + ev->setStop(); + _brokerSendQue.post(ev); + ev = new Event(); + ev->setStop(); + _clientSendQue.post(ev); + + /* wait until all threads stop */ + MultiTaskProcess::waitStop(); + WRITELOG("%s MQTT-SN Gateway stoped\n", currentDateTime()); _lightIndicator.allLightOff(); } @@ -266,7 +288,13 @@ EventQue::EventQue() EventQue::~EventQue() { - + _mutex.lock(); + while (_que.size() > 0) + { + delete _que.front(); + _que.pop(); + } + _mutex.unlock(); } void EventQue::setMaxSize(uint16_t maxSize) @@ -316,14 +344,18 @@ Event* EventQue::timedwait(uint16_t millsec) return ev; } -int EventQue::post(Event* ev) +void EventQue::post(Event* ev) { - int rc = 0; _mutex.lock(); - rc = _que.post(ev); - _sem.post(); + if ( _que.post(ev) ) + { + _sem.post(); + } + else + { + delete ev; + } _mutex.unlock(); - return rc; } int EventQue::size() @@ -342,20 +374,18 @@ Event::Event() { _eventType = Et_NA; _client = 0; - _mqttSNPacket = 0; - _mqttGWPacket = 0; -} - -Event::Event(EventType type) -{ - _eventType = type; - _client = 0; + _sensorNetAddr = 0; _mqttSNPacket = 0; _mqttGWPacket = 0; } Event::~Event() { + if (_sensorNetAddr) + { + delete _sensorNetAddr; + } + if (_mqttSNPacket) { delete _mqttSNPacket; @@ -405,17 +435,34 @@ void Event::setTimeout(void) _eventType = EtTimeout; } +void Event::setStop(void) +{ + _eventType = EtStop; +} + void Event::setBrodcastEvent(MQTTSNPacket* msg) { _mqttSNPacket = msg; _eventType = EtBroadcast; } +void Event::setClientSendEvent(SensorNetAddress* addr, MQTTSNPacket* msg) +{ + _eventType = EtSensornetSend; + _sensorNetAddr = addr; + _mqttSNPacket = msg; +} + Client* Event::getClient(void) { return _client; } +SensorNetAddress* Event::getSensorNetAddress(void) +{ + return _sensorNetAddr; +} + MQTTSNPacket* Event::getMQTTSNPacket() { return _mqttSNPacket; diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index ab370e3..deec969 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -31,7 +31,7 @@ namespace MQTTSNGW /*================================= * Starting prompt ==================================*/ -#define GATEWAY_VERSION " * Version: 0.8.0" +#define GATEWAY_VERSION " * Version: 0.9.0" #define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" @@ -49,6 +49,7 @@ namespace MQTTSNGW ===========================================================*/ #define CLIENT "Client" #define CLIENTS "Clients" +#define NONACTCLT "Non Active Client !" #define LEFTARROW "<---" #define RIGHTARROW "--->" @@ -87,20 +88,20 @@ namespace MQTTSNGW ====================================*/ enum EventType{ Et_NA = 0, + EtStop, EtTimeout, EtBrokerRecv, EtBrokerSend, EtClientRecv, EtClientSend, EtBroadcast, - EtSocketAlive + EtSensornetSend }; class Event{ public: Event(); - Event(EventType); ~Event(); EventType getEventType(void); void setClientRecvEvent(Client*, MQTTSNPacket*); @@ -109,13 +110,17 @@ public: void setBrokerSendEvent(Client*, MQTTGWPacket*); void setBrodcastEvent(MQTTSNPacket*); // ADVERTISE and GWINFO void setTimeout(void); // Required by EventQue.timedwait() + void setStop(void); + void setClientSendEvent(SensorNetAddress*, MQTTSNPacket*); Client* getClient(void); + SensorNetAddress* getSensorNetAddress(void); MQTTSNPacket* getMQTTSNPacket(void); MQTTGWPacket* getMQTTGWPacket(void); private: EventType _eventType; Client* _client; + SensorNetAddress* _sensorNetAddr; MQTTSNPacket* _mqttSNPacket; MQTTGWPacket* _mqttGWPacket; }; @@ -131,8 +136,8 @@ public: Event* wait(void); Event* timedwait(uint16_t millsec); void setMaxSize(uint16_t maxSize); - int post(Event*); - int size(); + void post(Event*); + int size(); private: Que _que; diff --git a/MQTTSNGateway/src/linux/Network.cpp b/MQTTSNGateway/src/linux/Network.cpp index e7b9790..8919fa4 100644 --- a/MQTTSNGateway/src/linux/Network.cpp +++ b/MQTTSNGateway/src/linux/Network.cpp @@ -345,6 +345,11 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co return false; } + if (_session) + { + rc = SSL_set_session(_ssl, _session); + } + if (!SSL_set_fd(_ssl, getSock())) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); @@ -352,11 +357,9 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co SSL_free(_ssl); } - //SSL_set_options(_ssl, SSL_OP_NO_TICKET); - if ( cert ) { - if ( SSL_use_certificate_file(_ssl, cert, SSL_FILETYPE_PEM) <= 0 ) + if ( SSL_use_certificate_file(_ssl, cert, SSL_FILETYPE_PEM) != 1 ) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); WRITELOG("SSL_use_certificate_file() %s %s\n", cert, errmsg); @@ -367,7 +370,7 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co } if ( prvkey ) { - if ( SSL_use_PrivateKey_file(_ssl, prvkey, SSL_FILETYPE_PEM) <= 0 ) + if ( SSL_use_PrivateKey_file(_ssl, prvkey, SSL_FILETYPE_PEM) != 1 ) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); WRITELOG("SSL_use_PrivateKey_file() %s %s\n", prvkey, errmsg); @@ -386,11 +389,6 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co return false; } - if (_session) - { - rc = SSL_set_session(_ssl, _session); - } - if (SSL_connect(_ssl) != 1) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); @@ -426,7 +424,7 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co if (_session == 0) { - _session = SSL_get1_session(_ssl); + //_session = SSL_get1_session(_ssl); } _numOfInstance++; _sslValid = true; diff --git a/MQTTSNGateway/src/linux/Threading.cpp b/MQTTSNGateway/src/linux/Threading.cpp index b4f462c..a245118 100644 --- a/MQTTSNGateway/src/linux/Threading.cpp +++ b/MQTTSNGateway/src/linux/Threading.cpp @@ -160,12 +160,11 @@ Semaphore::Semaphore(const char* name, unsigned int val) { throw Exception( -1, "Semaphore can't be created."); } - _name = (char*) calloc(strlen(name + 1), 1); + _name = strdup(name); if (_name == NULL) { throw Exception( -1, "Semaphore can't allocate memories."); } - _name = strdup(name); } Semaphore::~Semaphore() @@ -174,7 +173,7 @@ Semaphore::~Semaphore() { sem_close(_psem); sem_unlink(_name); - free((void*) _name); + free(_name); } else { @@ -243,12 +242,18 @@ RingBuffer::RingBuffer(const char* keyDirectory) { int fp = 0; string fileName = keyDirectory + string(MQTTSNGW_RINGBUFFER_KEY); - fp = open(fileName.c_str(), O_CREAT, 0); - close(fp); + fp = open(fileName.c_str(), O_CREAT, S_IRGRP); + if ( fp > 0 ) + { + close(fp); + } fileName = keyDirectory + string(MQTTSNGW_RB_MUTEX_KEY); - fp = open(fileName.c_str(), O_CREAT, 0); - close(fp); + fp = open(fileName.c_str(), O_CREAT, S_IRGRP); + if ( fp > 0 ) + { + close(fp); + } key_t key = ftok(MQTTSNGW_RINGBUFFER_KEY, 1); @@ -302,10 +307,6 @@ RingBuffer::~RingBuffer() { shmctl(_shmid, IPC_RMID, NULL); } - if (_pmx > 0) - { - delete _pmx; - } } else { @@ -314,6 +315,11 @@ RingBuffer::~RingBuffer() shmdt(_shmaddr); } } + + if (_pmx > 0) + { + delete _pmx; + } } void RingBuffer::put(char* data) @@ -469,14 +475,11 @@ void RingBuffer::reset() =====================================*/ Thread::Thread() { - _stopProcessEvent = theMultiTaskProcess->getStopProcessEvent(); _threadID = 0; } Thread::~Thread() { - pthread_cancel(_threadID); - pthread_join(_threadID, 0); } void* Thread::_run(void* runnable) @@ -508,10 +511,6 @@ int Thread::start(void) void Thread::stopProcess(void) { - _stopProcessEvent->post(); + theMultiTaskProcess->threadStoped(); } -void Thread::cancel(void) -{ - pthread_cancel(_threadID); -} diff --git a/MQTTSNGateway/src/linux/Threading.h b/MQTTSNGateway/src/linux/Threading.h index 2ff3a9f..91e7e05 100644 --- a/MQTTSNGateway/src/linux/Threading.h +++ b/MQTTSNGateway/src/linux/Threading.h @@ -101,17 +101,12 @@ public: virtual void EXECRUN(){} }; - #define MAGIC_WORD_FOR_THREAD \ public: void EXECRUN() \ { \ try \ { \ run(); \ - } \ - catch(Exception& ex) \ - { \ - ex.writeMessage();\ stopProcess(); \ } \ catch(...) \ @@ -132,13 +127,10 @@ public: static bool equals(pthread_t*, pthread_t*); virtual void initialize(int argc, char** argv); void stopProcess(void); - void cancel(void); + void waitStop(void); private: - pthread_t _threadID; - Semaphore* _stopProcessEvent; - static void* _run(void*); - + pthread_t _threadID; }; } diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp index 93b2a1e..263d3ce 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp @@ -45,6 +45,16 @@ SensorNetAddress::~SensorNetAddress() } +uint32_t SensorNetAddress::getIpAddress(void) +{ + return _IpAddr; +} + +uint16_t SensorNetAddress::getPortNo(void) +{ + return _portNo; +} + void SensorNetAddress::setAddress(uint32_t IpAddr, uint16_t port) { _IpAddr = IpAddr; @@ -96,12 +106,10 @@ SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr) ============================================*/ SensorNetwork::SensorNetwork() { - } SensorNetwork::~SensorNetwork() { - } int SensorNetwork::unicast(const uint8_t* payload, uint16_t payloadLength, SensorNetAddress* sendToAddr) @@ -302,10 +310,12 @@ int UDPPort::broadcast(const uint8_t* buf, uint32_t length) int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr) { + struct timeval timeout; fd_set recvfds; int maxSock = 0; - int rc = 0; + timeout.tv_sec = 0; + timeout.tv_usec = 1000000; // 1 sec FD_ZERO(&recvfds); FD_SET(_sockfdUnicast, &recvfds); FD_SET(_sockfdMulticast, &recvfds); @@ -319,15 +329,17 @@ int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr) maxSock = _sockfdUnicast; } - select(maxSock + 1, &recvfds, 0, 0, 0); - - if (FD_ISSET(_sockfdUnicast, &recvfds)) + int rc = 0; + if ( select(maxSock + 1, &recvfds, 0, 0, &timeout) > 0 ) { - rc = recvfrom(_sockfdUnicast, buf, len, 0, addr); - } - else if (FD_ISSET(_sockfdMulticast, &recvfds)) - { - rc = recvfrom(_sockfdMulticast, buf, len, 0, &_grpAddr); + if (FD_ISSET(_sockfdUnicast, &recvfds)) + { + rc = recvfrom(_sockfdUnicast, buf, len, 0, addr); + } + else if (FD_ISSET(_sockfdMulticast, &recvfds)) + { + rc = recvfrom(_sockfdMulticast, buf, len, 0, &_grpAddr); + } } return rc; } diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.h b/MQTTSNGateway/src/linux/udp/SensorNetwork.h index 86d4595..021c9bc 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.h +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.h @@ -41,14 +41,8 @@ public: ~SensorNetAddress(); void setAddress(uint32_t IpAddr, uint16_t port); int setAddress(string* data); - uint32_t getIpAddress(void) - { - return _IpAddr; - } - uint16_t getPortNo(void) - { - return _portNo; - } + uint16_t getPortNo(void); + uint32_t getIpAddress(void); bool isMatch(SensorNetAddress* addr); SensorNetAddress& operator =(SensorNetAddress& addr); diff --git a/MQTTSNGateway/src/tests/TestProcessFramework.cpp b/MQTTSNGateway/src/tests/TestProcessFramework.cpp index 6b6fe04..627b2e0 100644 --- a/MQTTSNGateway/src/tests/TestProcessFramework.cpp +++ b/MQTTSNGateway/src/tests/TestProcessFramework.cpp @@ -115,6 +115,7 @@ void TestProcessFramework::run(void) printf("%s Timer 1sec\n", currentDateTime()); MultiTaskProcess::run(); + printf("ProcessFramework test complited.\n"); } diff --git a/MQTTSNGateway/src/tests/TestTask.cpp b/MQTTSNGateway/src/tests/TestTask.cpp index 6a95ae3..ae5cff8 100644 --- a/MQTTSNGateway/src/tests/TestTask.cpp +++ b/MQTTSNGateway/src/tests/TestTask.cpp @@ -39,9 +39,10 @@ void TestTask::run(void) { while(true) { - if (theProcess->checkSignal() == SIGINT) + if ( CHK_SIGINT) { - throw Exception("Terminated by CTL-C"); + printf("Task stopped.\n"); + return; } printf("Task is running. Enter CTRL+C\n"); sleep(1);