From f079211ea71019a15f692f2c403fde08c04bf44b Mon Sep 17 00:00:00 2001 From: tomoaki Date: Sat, 15 May 2021 18:32:05 +0900 Subject: [PATCH] upgrade and bugfix for a test Signed-off-by: tomoaki --- .cproject | 4 +- .../GatewayTester/samples/mainTest.cpp | 138 +++-- MQTTSNGateway/GatewayTester/src/LGwProxy.cpp | 30 +- MQTTSNGateway/GatewayTester/src/LGwProxy.h | 5 + .../GatewayTester/src/LMqttsnClient.cpp | 19 +- .../GatewayTester/src/LMqttsnClient.h | 2 + .../GatewayTester/src/LMqttsnClientApp.h | 4 + .../GatewayTester/src/LPublishManager.cpp | 11 +- .../GatewayTester/src/LPublishManager.h | 4 +- .../GatewayTester/src/LSubscribeManager.cpp | 1 + MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp | 28 +- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 532 +++++++++--------- MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp | 4 +- 13 files changed, 448 insertions(+), 334 deletions(-) diff --git a/.cproject b/.cproject index 6c81531..409af95 100644 --- a/.cproject +++ b/.cproject @@ -133,7 +133,7 @@ - + @@ -273,7 +273,7 @@ - + diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp index 934a419..8aaa641 100644 --- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp +++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp @@ -51,25 +51,25 @@ extern LScreen* theScreen; /*------------------------------------------------------ * UDP Configuration (theNetcon) *------------------------------------------------------*/ -UDPCONF = { - "GatewayTestClient", // ClientId - {225,1,1,1}, // Multicast group IP - 1883, // Multicast group Port - 20020, // Local PortNo -}; +UDPCONF = +{ "GatewayTestClient", // ClientId + { 225, 1, 1, 1 }, // Multicast group IP + 1883, // Multicast group Port + 20020, // Local PortNo + }; /*------------------------------------------------------ * Client Configuration (theMqcon) *------------------------------------------------------*/ -MQTTSNCONF = { - 60, //KeepAlive [seconds] - true, //Clean session - 300, //Sleep duration [seconds] - "", //WillTopic - "", //WillMessage - 0, //WillQos - false //WillRetain -}; +MQTTSNCONF = +{ 60, //KeepAlive [seconds] + true, //Clean session + 300, //Sleep duration [seconds] + "", //WillTopic + "", //WillMessage + 0, //WillQos + false //WillRetain + }; /*------------------------------------------------------ * Define Topics @@ -83,32 +83,31 @@ const char* topic52 = "ty4tw/topic5/2"; const char* topic53 = "ty4tw/topic5/3"; const char* topic50 = "ty4tw/topic5/+"; - /*------------------------------------------------------ * Callback routines for Subscribed Topics *------------------------------------------------------*/ int on_Topic01(uint8_t* pload, uint16_t ploadlen) { DISPLAY("\n\nTopic1 recv.\n"); - char c = pload[ploadlen-1]; - pload[ploadlen-1]= 0; // set null terminator - DISPLAY("Payload -->%s%c<--\n\n",pload, c); + char c = pload[ploadlen - 1]; + pload[ploadlen - 1] = 0; // set null terminator + DISPLAY("Payload -->%s%c<--\n\n", pload, c); return 0; } int on_Topic02(uint8_t* pload, uint16_t ploadlen) { DISPLAY("\n\nTopic2 recv.\n"); - pload[ploadlen-1]= 0; // set null terminator - DISPLAY("Payload -->%s <--\n\n",pload); + pload[ploadlen - 1] = 0; // set null terminator + DISPLAY("Payload -->%s <--\n\n", pload); return 0; } int on_Topic03(uint8_t* pload, uint16_t ploadlen) { DISPLAY("\n\nNew callback recv Topic3\n"); - pload[ploadlen-1]= 0; // set null terminator - DISPLAY("Payload -->%s <--\n\n",pload); + pload[ploadlen - 1] = 0; // set null terminator + DISPLAY("Payload -->%s <--\n\n", pload); return 0; } @@ -116,26 +115,26 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen) * A Link list of Callback routines and Topics *------------------------------------------------------*/ -SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx), - SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1), - SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), - END_OF_SUBSCRIBE_LIST - }; - +SUBSCRIBE_LIST = +{ // e.g. SUB(TopicType, topicName, TopicId, callback, QoSx), + SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1), + SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), + END_OF_SUBSCRIBE_LIST +}; /*------------------------------------------------------ * Test functions *------------------------------------------------------*/ void subscribePredefTopic1(void) { - SUBSCRIBE(1, on_Topic03, QoS1); + SUBSCRIBE(1, on_Topic03, QoS1); } void publishTopic1(void) { char payload[300]; sprintf(payload, "publish \"ty4tw/Topic1\" \n"); - PUBLISH(topic1,(uint8_t*)payload, strlen(payload), QoS0); + PUBLISH(topic1, (uint8_t* )payload, strlen(payload), QoS0); } void subscribeTopic10(void) @@ -147,11 +146,9 @@ void publishTopic2(void) { char payload[300]; sprintf(payload, "publish \"ty4tw/topic2\" \n"); - PUBLISH(topic2,(uint8_t*)payload, strlen(payload), QoS1); + PUBLISH(topic2, (uint8_t* )payload, strlen(payload), QoS1); } - - void unsubscribe(void) { UNSUBSCRIBE(topic2); @@ -167,7 +164,7 @@ void test3(void) char payload[300]; sprintf(payload, "TEST3 "); uint8_t qos = 0; - PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos); + PUBLISH(topic2, (uint8_t* )payload, strlen(payload), qos); } void disconnect(void) @@ -180,48 +177,71 @@ void asleep(void) DISCONNECT(theMqcon.sleepDuration); } +void onconnect(void) +{ + ONCONNECT(); +} + +void connect(void) +{ + CONNECT(); +} + + +void DisableAutoPingreq(void) +{ + SetAutoPingReqMode(false); +} + /*------------------------------------------------------ * A List of Test functions is valid in case of * line 23 of LMqttsnClientApp.h is commented out. * //#define CLIENT_MODE *------------------------------------------------------*/ -TEST_LIST = {// e.g. TEST( Label, Test), - TEST("Step0:Subscribe predef topic1", subscribePredefTopic1), - TEST("Step1:Publish topic1", publishTopic1), - TEST("Step2:Publish topic2", publishTopic2), - TEST("Step3:Subscribe PreDefined topic10. ID is not defined.", subscribeTopic10), - TEST("Step4:Publish topic2", publishTopic2), - TEST("Step5:Unsubscribe topic2", unsubscribe), - TEST("Step6:Publish topic2", publishTopic2), - TEST("Step7:subscribe again", subscribechangeCallback), - TEST("Step8:Publish topic2", publishTopic2), - TEST("Step9:Sleep ", asleep), - TEST("Step10:Publish topic1", publishTopic1), - TEST("Step11:Disconnect", disconnect), - END_OF_TEST_LIST - }; - +TEST_LIST = +{ // e.g. TEST( Label, Test), + TEST("Step0:Connect", connect), + TEST("Step1:Subscribe list", onconnect), + TEST("Step2:Subscribe predef topic1", subscribePredefTopic1), + TEST("Step3:Publish topic1", publishTopic1), + TEST("Step4:Publish topic2", publishTopic2), + TEST("Step5:Subscribe PreDefined topic10. ID is not defined.", subscribeTopic10), + TEST("Step6:Publish topic2", publishTopic2), + TEST("Step7:Unsubscribe topic2", unsubscribe), + TEST("Step8:Publish topic2", publishTopic2), + TEST("Step9:subscribe again", subscribechangeCallback), + TEST("Step10:Publish topic2", publishTopic2), + TEST("Step11:Sleep ", asleep), + TEST("Step12:Publish topic1", publishTopic1), + TEST("Step13:Disconnect", disconnect), + TEST("Step14:Publish topic2", publishTopic1), + TEST("Step15:Connect", connect), + TEST("Step16:Publish topic2", publishTopic2), + TEST("Step17:Auto Pingreq mode off", DisableAutoPingreq), + TEST("Step18:Publish topic2", publishTopic1), + TEST("Step19:Disconnect", disconnect), + END_OF_TEST_LIST +}; /*------------------------------------------------------ * List of tasks is valid in case of line23 of * LMqttsnClientApp.h is uncommented. * #define CLIENT_MODE *------------------------------------------------------*/ -TASK_LIST = {// e.g. TASK( task, executing duration in second), - TASK(publishTopic1, 4), // publishTopic1() is executed every 4 seconds - TASK(publishTopic2, 7), // publishTopic2() is executed every 7 seconds - END_OF_TASK_LIST - }; - +TASK_LIST = +{ // e.g. TASK( task, executing duration in second), + TASK(publishTopic1, 4),// publishTopic1() is executed every 4 seconds + TASK(publishTopic2, 7),// publishTopic2() is executed every 7 seconds + END_OF_TASK_LIST +}; /*------------------------------------------------------ * Initialize function *------------------------------------------------------*/ void setup(void) { - SetForwarderMode(false); + SetForwarderMode(false); } - /***************** END OF PROGRAM ********************/ diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp index cdacac1..3b5c89d 100644 --- a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp +++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp @@ -57,6 +57,8 @@ LGwProxy::LGwProxy() _initialized = 0; _isForwarderMode = false; _isQoSMinus1Mode = false; + _isPingReqMode = true; + _isAutoConnectMode = true; } LGwProxy::~LGwProxy() @@ -231,9 +233,12 @@ int LGwProxy::getConnectResponce(void) void LGwProxy::reconnect(void) { - D_MQTTLOG("...Gateway reconnect\r\n"); - _status = GW_DISCONNECTED; - connect(); + if (_isAutoConnectMode) + { + D_MQTTLOG("...Gateway reconnect\r\n"); + _status = GW_DISCONNECTED; + connect(); + } } void LGwProxy::disconnect(uint16_t secs) @@ -395,7 +400,7 @@ int LGwProxy::getMessage(void) } else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT) { - _status = GW_LOST; + _status = GW_DISCONNECTED; _gwAliveTimer.stop(); _keepAliveTimer.stop(); } @@ -586,7 +591,7 @@ uint16_t LGwProxy::getNextMsgId(void) void LGwProxy::checkPingReq(void) { - if ( _isQoSMinus1Mode ) + if (_isQoSMinus1Mode || _isPingReqMode == false) { return; } @@ -671,3 +676,18 @@ void LGwProxy::setQoSMinus1Mode(bool valid) { _isQoSMinus1Mode = valid; } + +void LGwProxy::setPingReqMode(bool valid) +{ + _isPingReqMode = valid; +} + +void LGwProxy::setAutoConnectMode(bool valid) +{ + _isAutoConnectMode = valid; +} + +uint8_t LGwProxy::getStatus(void) +{ + return _status; +} diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.h b/MQTTSNGateway/GatewayTester/src/LGwProxy.h index 814affc..4a3b8fb 100644 --- a/MQTTSNGateway/GatewayTester/src/LGwProxy.h +++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.h @@ -67,6 +67,8 @@ public: void setAdvertiseDuration(uint16_t duration); void setForwarderMode(bool valid); void setQoSMinus1Mode(bool valid); + void setPingReqMode(bool valid); + void setAutoConnectMode(bool valid); void reconnect(void); int writeMsg(const uint8_t* msg); void setPingReqTimer(void); @@ -74,6 +76,7 @@ public: LTopicTable* getTopicTable(void); LRegisterManager* getRegisterManager(void); const char* getClientId(void); + uint8_t getStatus(void); private: int readMsg(void); void writeGwMsg(void); @@ -111,6 +114,8 @@ private: uint16_t _tWake; bool _isForwarderMode; bool _isQoSMinus1Mode; + bool _isPingReqMode; + bool _isAutoConnectMode; char _msg[MQTTSN_MAX_MSG_LENGTH + 1]; }; diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp index b6759dc..2982060 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp @@ -77,6 +77,8 @@ int main(int argc, char** argv) break; } } + theClient->setAutoConnectMode(false); + theClient->getPublishManager()->setAutoConnectMode(false); #endif setup(); @@ -98,7 +100,7 @@ int main(int argc, char** argv) ======================================*/ LMqttsnClient::LMqttsnClient() { - + _isAutoConnect = true; } LMqttsnClient::~LMqttsnClient() @@ -205,10 +207,20 @@ void LMqttsnClient::disconnect(uint16_t sleepInSecs) void LMqttsnClient::run() { - _gwProxy.connect(); + if (_isAutoConnect) + { + _gwProxy.connect(); + } _taskMgr.run(); } +void LMqttsnClient::setAutoConnectMode(uint8_t flg) +{ + _isAutoConnect = flg; + _pubMgr.setAutoConnectMode(flg); + _gwProxy.setAutoConnectMode(flg); +} + void LMqttsnClient::setSleepMode(uint32_t duration) { // ToDo: set WDT and sleep mode @@ -227,7 +239,10 @@ void LMqttsnClient::setSleepDuration(uint32_t duration) void LMqttsnClient::onConnect(void) { + if (_isAutoConnect) + { _subMgr.onConnect(); + } } const char* LMqttsnClient::getClientId(void) diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h index de60310..ce152ef 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h @@ -63,6 +63,7 @@ public: void addTask(bool test); void setSleepDuration(uint32_t duration); void setSleepMode(uint32_t duration); + void setAutoConnectMode(uint8_t flg); void sleep(void); const char* getClientId(void); uint16_t getTopicId(const char* topicName); @@ -78,6 +79,7 @@ private: LSubscribeManager _subMgr; LGwProxy _gwProxy; uint32_t _sleepDuration; + uint8_t _isAutoConnect; }; diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h index b848e4c..947c5d5 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h @@ -92,6 +92,7 @@ typedef enum #define SUBSCRIBE(...) theClient->subscribe(__VA_ARGS__) #define UNSUBSCRIBE(...) theClient->unsubscribe(__VA_ARGS__) #define DISCONNECT(...) theClient->disconnect(__VA_ARGS__) +#define ONCONNECT() theClient->getSubscribeManager()->onConnect() #define TASK_LIST TaskList theTaskList[] #define TASK(...) {__VA_ARGS__, 0, 0} @@ -104,8 +105,11 @@ typedef enum #define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0} #define UDPCONF LUdpConfig theNetcon #define MQTTSNCONF LMqttsnConfig theMqcon + #define SetForwarderMode(...) theClient->getGwProxy()->setForwarderMode(__VA_ARGS__) #define SetQoSMinus1Mode(...) theClient->getGwProxy()->setQoSMinus1Mode(__VA_ARGS__) +#define SetAutoConnectMode(...) theClient->setAutoConnectMode(__VA_ARGS__) +#define SetAutoPingReqMode(...) theClient->getGwProxy()->setPingReqMode(__VA_ARGS__) #ifdef CLIENT_MODE #define DISPLAY(...) diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp index 78a84db..3632e0b 100644 --- a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp @@ -44,6 +44,7 @@ LPublishManager::LPublishManager() _last = 0; _elmCnt = 0; _publishedFlg = SAVE_TASK_INDEX; + _autoConnectFlg = false; } LPublishManager::~LPublishManager() @@ -115,7 +116,10 @@ void LPublishManager::sendPublish(PubElement* elm) return; } - theClient->getGwProxy()->connect(); + if (_autoConnectFlg) + { + theClient->getGwProxy()->connect(); + } uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; uint8_t org = 0; @@ -310,6 +314,11 @@ void LPublishManager::checkTimeout(void) } } +void LPublishManager::setAutoConnectMode(bool flg) +{ + _autoConnectFlg = flg; +} + PubElement* LPublishManager::getElement(uint16_t msgId) { PubElement* elm = _first; diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.h b/MQTTSNGateway/GatewayTester/src/LPublishManager.h index 6085218..6520733 100644 --- a/MQTTSNGateway/GatewayTester/src/LPublishManager.h +++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.h @@ -70,6 +70,7 @@ public: void sendSuspend(const char* topicName, uint16_t topicId, uint8_t topicType); bool isDone(void); bool isMaxFlight(void); + void setAutoConnectMode(bool); private: PubElement* getElement(uint16_t msgId); PubElement* getElement(const char* topicName); @@ -84,7 +85,8 @@ private: PubElement* _last; uint8_t _elmCnt; uint8_t _publishedFlg; + uint8_t _autoConnectFlg; }; - + } /* tomyAsyncClient */ #endif /* PUBLISHMANAGER_H_ */ diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp index 5158003..03a4bbf 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp @@ -122,6 +122,7 @@ void LSubscribeManager::send(SubElement* elm) { return; } + uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; if (elm->topicType == MQTTSN_TOPIC_TYPE_PREDEFINED) { diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp index 467cd8a..ed3fa70 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp @@ -54,7 +54,7 @@ void BrokerRecvTask::run(void) { struct timeval timeout; MQTTGWPacket* packet = nullptr; - int rc; + int rc; Event* ev = nullptr; fd_set rset; fd_set wset; @@ -100,6 +100,7 @@ void BrokerRecvTask::run(void) { /* Check sockets is ready to read */ int activity = select(maxSock + 1, &rset, 0, 0, &timeout); + if (activity > 0) { client = _gateway->getClientList()->getClient(0); @@ -134,17 +135,26 @@ void BrokerRecvTask::run(void) { if (rc == 0) // Disconnected { + WRITELOG( + "%s BrokerRecvTask %s is disconnected by the broker.%s\n", + ERRMSG_HEADER, + client->getClientId(), + ERRMSG_FOOTER); + client->getNetwork()->close(); + client->disconnected(); + + /* client->getNetwork()->close(); delete packet; - /* delete client when the client is not authorized & session is clean */ - _gateway->getClientList()->erase(client); if (client) { client = client->getNextClient(); } continue; + */ + } else if (rc == -1) { @@ -165,7 +175,7 @@ void BrokerRecvTask::run(void) else if (rc == -3) { WRITELOG( - "%s BrokerRecvTask can't get memories for the packet %s%s\n", + "%s BrokerRecvTask can't allocate memories for the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); @@ -173,22 +183,26 @@ void BrokerRecvTask::run(void) delete packet; - if ((rc == -1 || rc == -2) + if ((rc == -1 || rc == -2) && (client->isActive() || client->isSleep() || client->isAwake())) { /* disconnect the client */ + /* packet = new MQTTGWPacket(); packet->setHeader(DISCONNECT); ev = new Event(); ev->setBrokerRecvEvent(client, packet); _gateway->getPacketEventQue()->post(ev); + */ + client->getNetwork()->close(); + client->disconnected(); } } } - } - nextClient: client = client->getNextClient(); + } + nextClient: client = client->getNextClient(); } } } diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index 779def3..c399696 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -29,10 +29,10 @@ char* currentDateTime(void); =====================================*/ ClientRecvTask::ClientRecvTask(Gateway* gateway) { - _gateway = gateway; - _gateway->attach((Thread*) this); - _sensorNetwork = _gateway->getSensorNetwork(); - setTaskName("ClientRecvTask"); + _gateway = gateway; + _gateway->attach((Thread*) this); + _sensorNetwork = _gateway->getSensorNetwork(); + setTaskName("ClientRecvTask"); } ClientRecvTask::~ClientRecvTask() @@ -45,7 +45,7 @@ ClientRecvTask::~ClientRecvTask() */ void ClientRecvTask::initialize(int argc, char** argv) { - _sensorNetwork->initialize(); + _sensorNetwork->initialize(); } /* @@ -55,287 +55,307 @@ void ClientRecvTask::initialize(int argc, char** argv) */ void ClientRecvTask::run() { - Event* ev = nullptr; - AdapterManager* adpMgr = _gateway->getAdapterManager(); - QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy(); - int clientType = - adpMgr->isAggregaterActive() ? AGGREGATER_TYPE : TRANSPEARENT_TYPE; - ClientList* clientList = _gateway->getClientList(); - EventQue* packetEventQue = _gateway->getPacketEventQue(); + Event* ev = nullptr; + AdapterManager* adpMgr = _gateway->getAdapterManager(); + QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy(); + int clientType = + adpMgr->isAggregaterActive() ? AGGREGATER_TYPE : TRANSPEARENT_TYPE; + ClientList* clientList = _gateway->getClientList(); + EventQue* packetEventQue = _gateway->getPacketEventQue(); + EventQue* clientsendQue = _gateway->getClientSendQue(); - char buf[128]; + char buf[128]; - while (true) - { - Client* client = nullptr; - Forwarder* fwd = nullptr; - WirelessNodeId nodeId; + while (true) + { + Client* client = nullptr; + Forwarder* fwd = nullptr; + WirelessNodeId nodeId; - MQTTSNPacket* packet = new MQTTSNPacket(); - int packetLen = packet->recv(_sensorNetwork); + MQTTSNPacket* packet = new MQTTSNPacket(); + int packetLen = packet->recv(_sensorNetwork); - if (CHK_SIGINT) - { - WRITELOG("%s %s stopped.\n", currentDateTime(), getTaskName()); - delete packet; - return; - } + if (CHK_SIGINT) + { + WRITELOG("%s %s stopped.\n", currentDateTime(), getTaskName()); + delete packet; + return; + } - if (packetLen < 2) - { - delete packet; - continue; - } + if (packetLen < 2) + { + delete packet; + continue; + } - if (packet->getType() <= MQTTSN_ADVERTISE - || packet->getType() == MQTTSN_GWINFO) - { - delete packet; - continue; - } + if (packet->getType() <= MQTTSN_ADVERTISE + || packet->getType() == MQTTSN_GWINFO) + { + delete packet; + continue; + } - if (packet->getType() == MQTTSN_SEARCHGW) - { - /* write log and post Event */ - log(0, packet, 0); - ev = new Event(); - ev->setBrodcastEvent(packet); - packetEventQue->post(ev); - continue; - } + if (packet->getType() == MQTTSN_SEARCHGW) + { + /* write log and post Event */ + log(0, packet, 0); + ev = new Event(); + ev->setBrodcastEvent(packet); + packetEventQue->post(ev); + continue; + } - SensorNetAddress* senderAddr = - _gateway->getSensorNetwork()->getSenderAddress(); + SensorNetAddress* senderAddr = + _gateway->getSensorNetwork()->getSenderAddress(); - if (packet->getType() == MQTTSN_ENCAPSULATED) - { - fwd = - _gateway->getAdapterManager()->getForwarderList()->getForwarder( - senderAddr); + if (packet->getType() == MQTTSN_ENCAPSULATED) + { + fwd = + _gateway->getAdapterManager()->getForwarderList()->getForwarder( + senderAddr); - if (fwd != nullptr) - { - MQTTSNString fwdName = MQTTSNString_initializer; - fwdName.cstring = const_cast(fwd->getName()); - log(0, packet, &fwdName); + if (fwd != nullptr) + { + MQTTSNString fwdName = MQTTSNString_initializer; + fwdName.cstring = const_cast(fwd->getName()); + log(0, packet, &fwdName); - /* get the packet from the encapsulation message */ - MQTTSNGWEncapsulatedPacket encap; - encap.desirialize(packet->getPacketData(), - packet->getPacketLength()); - nodeId.setId(encap.getWirelessNodeId()); - client = fwd->getClient(&nodeId); - packet = encap.getMQTTSNPacket(); - } - } - else - { - /* Check the client belonging to QoS-1Proxy ? */ + /* get the packet from the encapsulation message */ + MQTTSNGWEncapsulatedPacket encap; + encap.desirialize(packet->getPacketData(), + packet->getPacketLength()); + nodeId.setId(encap.getWirelessNodeId()); + client = fwd->getClient(&nodeId); + packet = encap.getMQTTSNPacket(); + } + } + else + { + /* Check the client belonging to QoS-1Proxy ? */ - if (qosm1Proxy->isActive()) - { - const char* clientName = qosm1Proxy->getClientId(senderAddr); + if (qosm1Proxy->isActive()) + { + const char* clientName = qosm1Proxy->getClientId(senderAddr); - if (clientName != nullptr) - { - client = qosm1Proxy->getClient(); + if (clientName != nullptr) + { + client = qosm1Proxy->getClient(); - if (!packet->isQoSMinusPUBLISH()) - { - log(clientName, packet); - WRITELOG( - "%s %s %s can send only PUBLISH with QoS-1.%s\n", - ERRMSG_HEADER, clientName, - senderAddr->sprint(buf), ERRMSG_FOOTER); - delete packet; - continue; - } - } - } + if (!packet->isQoSMinusPUBLISH()) + { + log(clientName, packet); + WRITELOG( + "%s %s %s can send only PUBLISH with QoS-1.%s\n", + ERRMSG_HEADER, clientName, + senderAddr->sprint(buf), ERRMSG_FOOTER); + delete packet; + continue; + } + } + } - if (client == nullptr) - { - client = _gateway->getClientList()->getClient(senderAddr); - } - } + if (client == nullptr) + { + client = _gateway->getClientList()->getClient(senderAddr); + } + } - if (client != nullptr) - { - /* write log and post Event */ - log(client, packet, 0); - ev = new Event(); - ev->setClientRecvEvent(client, packet); - packetEventQue->post(ev); - } - else - { - /* new client */ - if (packet->getType() == MQTTSN_CONNECT) - { - MQTTSNPacket_connectData data; - memset(&data, 0, sizeof(MQTTSNPacket_connectData)); - if (!packet->getCONNECT(&data)) - { - log(0, packet, &data.clientID); - WRITELOG("%s CONNECT message form %s is incorrect.%s\n", - ERRMSG_HEADER, senderAddr->sprint(buf), - ERRMSG_FOOTER); - delete packet; - continue; - } + if (client != nullptr) + { + log(client, packet, 0); - client = clientList->getClient(&data.clientID); + if (client->isDisconnect() && packet->getType() != MQTTSN_CONNECT) + { + WRITELOG( + "%s MQTTSNGWClientRecvTask %s is not connecting.%s\n", + ERRMSG_HEADER, + client->getClientId(), ERRMSG_FOOTER); - if (fwd != nullptr) - { - if (client == nullptr) - { - /* create a new client */ - client = clientList->createClient(0, &data.clientID, - clientType); - } - /* Add to a forwarded client list of forwarder. */ - fwd->addClient(client, &nodeId); - } - else - { - if (client) - { - /* Authentication is not required */ - if (_gateway->getGWParams()->clientAuthentication - == false) - { - client->setClientAddress(senderAddr); - } - } - else - { - /* create a new client */ - client = clientList->createClient(senderAddr, - &data.clientID, clientType); - } - } + /* send DISCONNECT to the client, if it is not connected */ + MQTTSNPacket* snPacket = new MQTTSNPacket(); + snPacket->setDISCONNECT(0); + ev = new Event(); + ev->setClientSendEvent(client, snPacket); + clientsendQue->post(ev); + delete packet; + continue; + } + else + { + ev = new Event(); + ev->setClientRecvEvent(client, packet); + packetEventQue->post(ev); + } + } + else + { + /* new client */ + if (packet->getType() == MQTTSN_CONNECT) + { + MQTTSNPacket_connectData data; + memset(&data, 0, sizeof(MQTTSNPacket_connectData)); + if (!packet->getCONNECT(&data)) + { + log(0, packet, &data.clientID); + WRITELOG("%s CONNECT message form %s is incorrect.%s\n", + ERRMSG_HEADER, senderAddr->sprint(buf), + ERRMSG_FOOTER); + delete packet; + continue; + } - log(client, packet, &data.clientID); + client = clientList->getClient(&data.clientID); - if (client == nullptr) - { - WRITELOG( - "%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", - ERRMSG_HEADER, senderAddr->sprint(buf), - ERRMSG_FOOTER); - delete packet; - continue; - } + if (fwd != nullptr) + { + if (client == nullptr) + { + /* create a new client */ + client = clientList->createClient(0, &data.clientID, + clientType); + } + /* Add to a forwarded client list of forwarder. */ + fwd->addClient(client, &nodeId); + } + else + { + if (client) + { + /* Authentication is not required */ + if (_gateway->getGWParams()->clientAuthentication + == false) + { + client->setClientAddress(senderAddr); + } + } + else + { + /* create a new client */ + client = clientList->createClient(senderAddr, + &data.clientID, clientType); + } + } - /* post Client RecvEvent */ - ev = new Event(); - ev->setClientRecvEvent(client, packet); - packetEventQue->post(ev); - } - else - { - log(client, packet, 0); - if (packet->getType() == MQTTSN_ENCAPSULATED) - { - WRITELOG( - "%s MQTTSNGWClientRecvTask Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", - ERRMSG_HEADER, - _sensorNetwork->getSenderAddress()->sprint(buf), - ERRMSG_FOOTER); - } - else - { - WRITELOG( - "%s MQTTSNGWClientRecvTask Client(%s) is not connecting. message has been discarded.%s\n", - ERRMSG_HEADER, senderAddr->sprint(buf), - ERRMSG_FOOTER); - } - delete packet; - } - } - } + log(client, packet, &data.clientID); + + if (client == nullptr) + { + WRITELOG( + "%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", + ERRMSG_HEADER, senderAddr->sprint(buf), + ERRMSG_FOOTER); + delete packet; + continue; + } + + /* post Client RecvEvent */ + ev = new Event(); + ev->setClientRecvEvent(client, packet); + packetEventQue->post(ev); + } + else + { + log(client, packet, 0); + if (packet->getType() == MQTTSN_ENCAPSULATED) + { + WRITELOG( + "%s MQTTSNGWClientRecvTask Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", + ERRMSG_HEADER, + _sensorNetwork->getSenderAddress()->sprint(buf), + ERRMSG_FOOTER); + } + else + { + WRITELOG( + "%s MQTTSNGWClientRecvTask Client(%s) is not connecting. message has been discarded.%s\n", + ERRMSG_HEADER, + senderAddr->sprint(buf), ERRMSG_FOOTER); + } + delete packet; + } + } + } } void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id) { - const char* clientId; - char cstr[MAX_CLIENTID_LENGTH + 1]; + const char* clientId; + char cstr[MAX_CLIENTID_LENGTH + 1]; - if (id) - { - if (id->cstring) - { - strncpy(cstr, id->cstring, strlen(id->cstring)); - clientId = cstr; - } - else - { - memset((void*) cstr, 0, id->lenstring.len + 1); - strncpy(cstr, id->lenstring.data, id->lenstring.len); - clientId = cstr; - } - } - else if (client) - { - clientId = client->getClientId(); - } - else - { - clientId = UNKNOWNCL; - } + if (id) + { + if (id->cstring) + { + strncpy(cstr, id->cstring, strlen(id->cstring)); + clientId = cstr; + } + else + { + memset((void*) cstr, 0, id->lenstring.len + 1); + strncpy(cstr, id->lenstring.data, id->lenstring.len); + clientId = cstr; + } + } + else if (client) + { + clientId = client->getClientId(); + } + else + { + clientId = UNKNOWNCL; + } - log(clientId, packet); + log(clientId, packet); } void ClientRecvTask::log(const char* clientId, MQTTSNPacket* packet) { - char pbuf[ SIZE_OF_LOG_PACKET * 3 + 1]; - char msgId[6]; + char pbuf[ SIZE_OF_LOG_PACKET * 3 + 1]; + char msgId[6]; - switch (packet->getType()) - { - case MQTTSN_SEARCHGW: - WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), - LEFTARROW, CLIENT, packet->print(pbuf)); - break; - case MQTTSN_CONNECT: - case MQTTSN_PINGREQ: - WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), - LEFTARROW, clientId, packet->print(pbuf)); - break; - case MQTTSN_DISCONNECT: - case MQTTSN_WILLTOPICUPD: - case MQTTSN_WILLMSGUPD: - case MQTTSN_WILLTOPIC: - case MQTTSN_WILLMSG: - WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, - clientId, packet->print(pbuf)); - break; - case MQTTSN_PUBLISH: - case MQTTSN_REGISTER: - case MQTTSN_SUBSCRIBE: - case MQTTSN_UNSUBSCRIBE: - WRITELOG(FORMAT_G_MSGID_G_G_NL, currentDateTime(), packet->getName(), - packet->getMsgId(msgId), LEFTARROW, clientId, - packet->print(pbuf)); - break; - case MQTTSN_REGACK: - case MQTTSN_PUBACK: - case MQTTSN_PUBREC: - case MQTTSN_PUBREL: - case MQTTSN_PUBCOMP: - WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), - packet->getMsgId(msgId), LEFTARROW, clientId, - packet->print(pbuf)); - break; - case MQTTSN_ENCAPSULATED: - WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, - clientId, packet->print(pbuf)); - break; - default: - WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, - clientId, packet->print(pbuf)); - break; - } + switch (packet->getType()) + { + case MQTTSN_SEARCHGW: + WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), + LEFTARROW, CLIENT, packet->print(pbuf)); + break; + case MQTTSN_CONNECT: + case MQTTSN_PINGREQ: + WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), + LEFTARROW, clientId, packet->print(pbuf)); + break; + case MQTTSN_DISCONNECT: + case MQTTSN_WILLTOPICUPD: + case MQTTSN_WILLMSGUPD: + case MQTTSN_WILLTOPIC: + case MQTTSN_WILLMSG: + WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, + clientId, packet->print(pbuf)); + break; + case MQTTSN_PUBLISH: + case MQTTSN_REGISTER: + case MQTTSN_SUBSCRIBE: + case MQTTSN_UNSUBSCRIBE: + WRITELOG(FORMAT_G_MSGID_G_G_NL, currentDateTime(), packet->getName(), + packet->getMsgId(msgId), LEFTARROW, clientId, + packet->print(pbuf)); + break; + case MQTTSN_REGACK: + case MQTTSN_PUBACK: + case MQTTSN_PUBREC: + case MQTTSN_PUBREL: + case MQTTSN_PUBCOMP: + WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), + packet->getMsgId(msgId), LEFTARROW, clientId, + packet->print(pbuf)); + break; + case MQTTSN_ENCAPSULATED: + WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, + clientId, packet->print(pbuf)); + break; + default: + WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, + clientId, packet->print(pbuf)); + break; + } } diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index 25e0628..7b007e2 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -121,12 +121,14 @@ MQTTGWPacket* MQTTSNPublishHandler::handlePublish(Client* client, { pub.topic = (char*) topic->getTopicName()->data(); pub.topiclen = topic->getTopicName()->length(); + topicid.data.long_.name = pub.topic; + topicid.data.long_.len = pub.topiclen; } } /* Save a msgId & a TopicId pare for PUBACK */ if (msgId && qos > 0 && qos < 3) { - client->setWaitedPubTopicId(msgId, topicid.data.id, &topicid); + client->setWaitedPubTopicId(msgId, topicid.data.id, &topicid); } pub.payload = (char*) payload;