From 74a9ebaa55f9872ec8a295d6226b2ba4ed1de36b Mon Sep 17 00:00:00 2001 From: tomoaki Date: Wed, 18 Jul 2018 21:03:03 +0900 Subject: [PATCH] BugFix of #121 Signed-off-by: tomoaki --- .../samples/ClientPub/mainPub.cpp | 12 +- .../samples/ClientSub/mainSub.cpp | 23 +- .../GatewayTester/samples/mainTest.cpp | 5 +- .../GatewayTester/src/LRegisterManager.cpp | 2 +- .../GatewayTester/src/LSubscribeManager.cpp | 545 +++++++++--------- MQTTSNGateway/src/MQTTGWPublishHandler.cpp | 2 +- MQTTSNGateway/src/MQTTSNGWClient.cpp | 31 +- MQTTSNGateway/src/MQTTSNGWClient.h | 8 + .../src/MQTTSNGWConnectionHandler.cpp | 20 +- MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp | 11 + 10 files changed, 366 insertions(+), 293 deletions(-) diff --git a/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp index e9a676f..816dba7 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp @@ -77,7 +77,7 @@ MQTTSNCONF = { const char* topic1 = "ty4tw/topic1"; const char* topic2 = "ty4tw/topic2"; const char* topic3 = "ty4tw/topic3"; - +const char* topic57 = "ty4tw/topic5/7"; /*------------------------------------------------------ * Callback routines for Subscribed Topics @@ -112,6 +112,14 @@ void publishTopic2(void) PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos); } +void publishTopic57(void) +{ + char payload[300]; + sprintf(payload, "publish \"ty4tw/topic57\" \n"); + uint8_t qos = 0; + PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos); +} + void disconnect(void) { @@ -127,7 +135,7 @@ void disconnect(void) TEST_LIST = {// e.g. TEST( Label, Test), TEST("Step1:Publish topic1", publishTopic1), - TEST("Step2:Publish topic2", publishTopic2), + TEST("Step2:Publish topic57", publishTopic57), TEST("Step3:Publish topic2", publishTopic2), TEST("Step4:Disconnect", disconnect), END_OF_TEST_LIST diff --git a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp index f462fe7..e016732 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp @@ -62,7 +62,7 @@ UDPCONF = { * Client Configuration (theMqcon) *------------------------------------------------------*/ MQTTSNCONF = { - 300, //KeepAlive [seconds] + 60, //KeepAlive [seconds] false, //Clean session 300, //Sleep duration [seconds] "", //WillTopic @@ -109,14 +109,22 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen) return 0; } +int on_Topic05(uint8_t* pload, uint16_t ploadlen) +{ + DISPLAY("\n\nNew callback recv TopicA\n"); + pload[ploadlen-1]= 0; // set null terminator + DISPLAY("Payload -->%s <--\n\n",pload); + return 0; +} + /*------------------------------------------------------ * A Link list of Callback routines and Topics *------------------------------------------------------*/ SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx), - // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1), - // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), + // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic5, 0, on_Topic05, QoS1), + //SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), END_OF_SUBSCRIBE_LIST }; @@ -139,6 +147,11 @@ void subscribeTopic2(void) SUBSCRIBE(topic2, on_Topic02, qos); } +void subscribeTopic5(void) +{ + uint8_t qos = 1; + SUBSCRIBE(topic5, on_Topic05, qos); +} void disconnect(void) { @@ -162,9 +175,9 @@ void asleep(void) *------------------------------------------------------*/ TEST_LIST = {// e.g. TEST( Label, Test), - TEST("Step1:Subscribe topic1", subscribeTopic1), + TEST("Step1:Subscribe topic5", subscribeTopic5), //TEST("Step2:Subscribe topic2", subscribeTopic2), - TEST("Step2:Disconnect", disconnect), + TEST("Step2:Disconnect", asleep), END_OF_TEST_LIST }; diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp index 000ad51..a5e001b 100644 --- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp +++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp @@ -78,7 +78,10 @@ const char* topic1 = "ty4tw/topic1"; const char* topic2 = "ty4tw/topic2"; const char* topic3 = "ty4tw/topic3"; const char* topic4 = "ty4tw/topic4"; -const char* topic5 = "ty4tw/topic5"; +const char* topic51 = "ty4tw/topic5/1"; +const char* topic52 = "ty4tw/topic5/2"; +const char* topic53 = "ty4tw/topic5/3"; +const char* topic50 = "ty4tw/topic5/+"; /*------------------------------------------------------ diff --git a/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp b/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp index 5a924d4..b7e4197 100644 --- a/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp @@ -206,7 +206,7 @@ void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen) uint8_t regack[7]; regack[0] = 7; regack[1] = MQTTSN_TYPE_REGACK; - memcpy(regack + 2, msg + 2, 4); + memcpy(regack + 2, msg + 1, 4); LTopic* tp = theClient->getGwProxy()->getTopicTable()->match((char*) msg + 5); if (tp) diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp index 9f9cf5d..5158003 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp @@ -39,119 +39,119 @@ extern LScreen* theScreen; =======================================*/ LSubscribeManager::LSubscribeManager() { - _first = 0; - _last = 0; + _first = 0; + _last = 0; } LSubscribeManager::~LSubscribeManager() { - SubElement* elm = _first; - SubElement* sav = 0; - while (elm) - { - sav = elm->next; - if (elm != 0) - { - free(elm); - } - elm = sav; - } + SubElement* elm = _first; + SubElement* sav = 0; + while (elm) + { + sav = elm->next; + if (elm != 0) + { + free(elm); + } + elm = sav; + } } void LSubscribeManager::onConnect(void) { - DISPLAY("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n"); - if (_first == 0) - { - for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) - { - if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED) - { - subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); - } - else - { - subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); - } - } - } - else - { - SubElement* elm = _first; - SubElement* pelm; - do - { - pelm = elm; - if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) - { - elm->done = SUB_READY; - elm->retryCount = MQTTSN_RETRY_COUNT; - subscribe(elm->topicName, elm->callback, elm->qos); - } - elm = pelm->next; - } while (pelm->next); - } + DISPLAY("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n"); + if (_first == 0) + { + for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) + { + if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED) + { + subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + } + else + { + subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + } + } + } + else + { + SubElement* elm = _first; + SubElement* pelm; + do + { + pelm = elm; + if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) + { + elm->done = SUB_READY; + elm->retryCount = MQTTSN_RETRY_COUNT; + subscribe(elm->topicName, elm->callback, elm->qos); + } + elm = pelm->next; + } while (pelm->next); + } - while (!theClient->getSubscribeManager()->isDone()) - { - theClient->getGwProxy()->getMessage(); - } - DISPLAY("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n"); - DISPLAY("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n"); + while (!theClient->getSubscribeManager()->isDone()) + { + theClient->getGwProxy()->getMessage(); + } + DISPLAY("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n"); + DISPLAY("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n"); } bool LSubscribeManager::isDone(void) { - SubElement* elm = _first; - SubElement* prevelm; - while (elm) - { - prevelm = elm; - if (elm->done == SUB_READY) - { - return false; - } - elm = prevelm->next; - } - return true; + SubElement* elm = _first; + SubElement* prevelm; + while (elm) + { + prevelm = elm; + if (elm->done == SUB_READY) + { + return false; + } + elm = prevelm->next; + } + return true; } void LSubscribeManager::send(SubElement* elm) { - if (elm->done == SUB_DONE) - { - return; - } - uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; - if (elm->topicType == MQTTSN_TOPIC_TYPE_NORMAL) - { - msg[0] = 5 + strlen(elm->topicName); - strcpy((char*) msg + 5, elm->topicName); - } - else - { - msg[0] = 7; - setUint16(msg + 5, elm->topicId); - } - msg[1] = elm->msgType; - msg[2] = elm->qos | elm->topicType; - if (elm->retryCount == MQTTSN_RETRY_COUNT) - { - elm->msgId = theClient->getGwProxy()->getNextMsgId(); - } + if (elm->done == SUB_DONE) + { + return; + } + uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; + if (elm->topicType == MQTTSN_TOPIC_TYPE_PREDEFINED) + { + msg[0] = 7; + setUint16(msg + 5, elm->topicId); + } + else + { + msg[0] = 5 + strlen(elm->topicName); + strcpy((char*) msg + 5, elm->topicName); + } + msg[1] = elm->msgType; + msg[2] = elm->qos | elm->topicType; + if (elm->retryCount == MQTTSN_RETRY_COUNT) + { + elm->msgId = theClient->getGwProxy()->getNextMsgId(); + } - if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE) - { - msg[2] = msg[2] | MQTTSN_FLAG_DUP; - } + if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE) + { + msg[2] = msg[2] | MQTTSN_FLAG_DUP; + } - setUint16(msg + 3, elm->msgId); + setUint16(msg + 3, elm->msgId); - theClient->getGwProxy()->connect(); - theClient->getGwProxy()->writeMsg(msg); - theClient->getGwProxy()->setPingReqTimer(); - elm->sendUTC = time(NULL); - elm->retryCount--; + theClient->getGwProxy()->connect(); + theClient->getGwProxy()->writeMsg(msg); + theClient->getGwProxy()->setPingReqTimer(); + elm->sendUTC = time(NULL); + elm->retryCount--; } void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos) @@ -171,8 +171,8 @@ void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos) { - SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish); - send(elm); + SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish); + send(elm); } void LSubscribeManager::unsubscribe(const char* topicName) @@ -186,220 +186,221 @@ void LSubscribeManager::unsubscribe(const char* topicName) { topicType = MQTTSN_TOPIC_TYPE_SHORT; } - SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0); - send(elm); + SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0); + send(elm); } void LSubscribeManager::unsubscribe( uint16_t topicId) { - SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0); - send(elm); + SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0); + send(elm); } void LSubscribeManager::checkTimeout(void) { - SubElement* elm = _first; + SubElement* elm = _first; - while (elm) - { - if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) - { - if (elm->retryCount >= 0) - { - send(elm); - } - else - { - if ( elm->done == SUB_READY ) - { - if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) - { - DISPLAY("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); - }else{ - DISPLAY("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); - } - elm->done = SUB_DONE; - } - } - } - elm = elm->next; - } + while (elm) + { + if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) + { + if (elm->retryCount >= 0) + { + send(elm); + } + else + { + if ( elm->done == SUB_READY ) + { + if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) + { + DISPLAY("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); + }else{ + DISPLAY("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); + } + elm->done = SUB_DONE; + } + } + } + elm = elm->next; + } } void LSubscribeManager::responce(const uint8_t* msg) { - if (msg[0] == MQTTSN_TYPE_SUBACK) - { - uint16_t topicId = getUint16(msg + 2); - uint16_t msgId = getUint16(msg + 4); - uint8_t rc = msg[6]; + if (msg[0] == MQTTSN_TYPE_SUBACK) + { + uint16_t topicId = getUint16(msg + 2); + uint16_t msgId = getUint16(msg + 4); + uint8_t rc = msg[6]; - SubElement* elm = getElement(msgId); - if (elm) - { - if ( rc == MQTTSN_RC_ACCEPTED ) - { - theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback); - getElement(msgId)->done = SUB_DONE; - DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId); - } - else - { - DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); - remove(elm); - } - } - } - else if (msg[0] == MQTTSN_TYPE_UNSUBACK) - { - uint16_t msgId = getUint16(msg + 1); - SubElement* elm = getElement(msgId); - if (elm) - { - //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0); - DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName); - remove(elm); - } - else - { - DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n"); - } - } + SubElement* elm = getElement(msgId); + if (elm) + { + if ( rc == MQTTSN_RC_ACCEPTED ) + { + theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback); + getElement(msgId)->done = SUB_DONE; + DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId); + } + else + { + DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); + remove(elm); + } + } + } + else if (msg[0] == MQTTSN_TYPE_UNSUBACK) + { + uint16_t msgId = getUint16(msg + 1); + SubElement* elm = getElement(msgId); + if (elm) + { + //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0); + DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName); + remove(elm); + } + else + { + DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n"); + } + } } /* SubElement operations */ SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId, - uint8_t qos, TopicCallback callback) + uint8_t qos, TopicCallback callback) { - SubElement* elm = 0; - if (topicName ) - { - elm = getElement(topicName, msgType); - } - else - { - elm = getElement(topicId, topicType); - } - if ( elm == 0 ) - { - elm = (SubElement*) calloc(1, sizeof(SubElement)); - if (elm == 0) - { - return 0; - } - if (_last == 0) - { - _first = elm; - _last = elm; - } - else - { - elm->prev = _last; - _last->next = elm; - _last = elm; - } - } + SubElement* elm = 0; + if (topicName ) + { + elm = getElement(topicName, msgType); + } + else + { + elm = getElement(topicId, topicType); + } - elm->msgType = msgType; - elm->callback = callback; - elm->topicName = topicName; - elm->topicId = topicId; - elm->topicType = topicType; + if ( elm == 0 ) + { + elm = (SubElement*) calloc(1, sizeof(SubElement)); + if (elm == 0) + { + return 0; + } + if (_last == 0) + { + _first = elm; + _last = elm; + } + else + { + elm->prev = _last; + _last->next = elm; + _last = elm; + } + } - if (qos == 1) - { - elm->qos = MQTTSN_FLAG_QOS_1; - } - else if (qos == 2) - { - elm->qos = MQTTSN_FLAG_QOS_2; - } - else - { - elm->qos = MQTTSN_FLAG_QOS_0; - } - elm->msgId = 0; - elm->retryCount = MQTTSN_RETRY_COUNT; - elm->done = SUB_READY; - elm->sendUTC = 0; + elm->msgType = msgType; + elm->callback = callback; + elm->topicName = topicName; + elm->topicId = topicId; + elm->topicType = topicType; - return elm; + if (qos == 1) + { + elm->qos = MQTTSN_FLAG_QOS_1; + } + else if (qos == 2) + { + elm->qos = MQTTSN_FLAG_QOS_2; + } + else + { + elm->qos = MQTTSN_FLAG_QOS_0; + } + elm->msgId = 0; + elm->retryCount = MQTTSN_RETRY_COUNT; + elm->done = SUB_READY; + elm->sendUTC = 0; + + return elm; } void LSubscribeManager::remove(SubElement* elm) { - if (elm) - { - if (elm->prev == 0) - { - _first = elm->next; - if (elm->next != 0) - { - elm->next->prev = 0; - _last = elm->next; - } - free(elm); - } - else - { - if ( elm->next == 0 ) - { - _last = elm->prev; - } - elm->prev->next = elm->next; - free(elm); - } - } + if (elm) + { + if (elm->prev == 0) + { + _first = elm->next; + if (elm->next != 0) + { + elm->next->prev = 0; + _last = elm->next; + } + free(elm); + } + else + { + if ( elm->next == 0 ) + { + _last = elm->prev; + } + elm->prev->next = elm->next; + free(elm); + } + } } SubElement* LSubscribeManager::getElement(uint16_t msgId) { - SubElement* elm = _first; - while (elm) - { - if (elm->msgId == msgId) - { - return elm; - } - else - { - elm = elm->next; - } - } - return 0; + SubElement* elm = _first; + while (elm) + { + if (elm->msgId == msgId) + { + return elm; + } + else + { + elm = elm->next; + } + } + return 0; } SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType) { - SubElement* elm = _first; - while (elm) - { - if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 ) - { - return elm; - } - else - { - elm = elm->next; - } - } - return 0; + SubElement* elm = _first; + while (elm) + { + if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 ) + { + return elm; + } + else + { + elm = elm->next; + } + } + return 0; } SubElement* LSubscribeManager::getElement(uint16_t topicId, MQTTSN_topicTypes topicType) { - SubElement* elm = _first; - while (elm) - { - if (elm->topicId == topicId && elm->topicType == topicType) - { - return elm; - } - else - { - elm = elm->next; - } - } - return 0; + SubElement* elm = _first; + while (elm) + { + if (elm->topicId == topicId && elm->topicType == topicType) + { + return elm; + } + else + { + elm = elm->next; + } + } + return 0; } diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index f665cec..b29fea0 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -82,7 +82,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) MQTTSN_topicid topicId; uint16_t id = 0; - if (pub.topiclen == 2) + if (pub.topiclen <= 2) { topicId.type = MQTTSN_TOPIC_TYPE_SHORT; *(topicId.data.short_name) = *pub.topic; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 65e29d5..9aa2a18 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -400,6 +400,7 @@ Client::Client(bool secure) _nextClient = 0; _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); _hasPredefTopic = false; + _holdPingRequest = false; } Client::~Client() @@ -796,6 +797,21 @@ void Client::setOTAClient(Client* cl) _otaClient =cl; } +void Client::holdPingRequest(void) +{ + _holdPingRequest = true; +} + +void Client::resetPingRequest(void) +{ + _holdPingRequest = false; +} + +bool Client::isHoldPringReqest(void) +{ + return _holdPingRequest; +} + /*===================================== Class Topic ======================================*/ @@ -1214,7 +1230,7 @@ TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId) TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) { - if ( _cnt > _maxInflight * 2 || topicId == 0) + if ( _cnt > _maxInflight * 2 || ( topicId == 0 && type != MQTTSN_TOPIC_TYPE_SHORT ) ) { return 0; } @@ -1314,6 +1330,7 @@ WaitREGACKPacketList::WaitREGACKPacketList() { _first = 0; _end = 0; + _cnt = 0; } WaitREGACKPacketList::~WaitREGACKPacketList() @@ -1346,6 +1363,7 @@ int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId) elm->_prev = _end; _end = elm; } + _cnt++; return 1; } @@ -1387,10 +1405,17 @@ void WaitREGACKPacketList::erase(uint16_t REGACKMsgId) { p->_next->_prev = p->_prev; } - break; - // Do not delete element. Element is deleted after sending to Client. + _cnt--; + break; + // Do not delete element. Element is deleted after sending to Client. } p = p->_next; } } +uint8_t WaitREGACKPacketList::getCount(void) +{ + return _cnt; +} + + diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index 9bb5ede..118239e 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -219,8 +219,10 @@ public: int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId); MQTTSNPacket* getPacket(uint16_t REGACKMsgId); void erase(uint16_t REGACKMsgId); + uint8_t getCount(void); private: + uint8_t _cnt; waitREGACKPacket* _first; waitREGACKPacket* _end; }; @@ -298,6 +300,10 @@ public: bool isSensorNetStable(void); bool isWaitWillMsg(void); + void holdPingRequest(void); + void resetPingRequest(void); + bool isHoldPringReqest(void); + Client* getNextClient(void); Client* getOTAClient(void); void setOTAClient(Client* cl); @@ -317,6 +323,8 @@ private: char* _willTopic; char* _willMsg; + bool _holdPingRequest; + Timer _keepAliveTimer; uint32_t _keepAliveMsec; diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index 66e2d79..d9fab69 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -270,7 +270,7 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet { MQTTGWPacket* msg = 0; - if ( client->isSleep() || client->isAwake() ) + if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) { while ( ( msg = client->getClientSleepPacket() ) != 0 ) { @@ -281,12 +281,16 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet ev->setBrokerRecvEvent(client, msg); _gateway->getPacketEventQue()->post(ev); } + client->holdPingRequest(); + } + else + { + /* send PINGREQ to the broker */ + client->resetPingRequest(); + MQTTGWPacket* pingreq = new MQTTGWPacket(); + pingreq->setHeader(PINGREQ); + Event* evt = new Event(); + evt->setBrokerSendEvent(client, pingreq); + _gateway->getBrokerSendQue()->post(evt); } - - /* send PINGREQ to the broker */ - MQTTGWPacket* pingreq = new MQTTGWPacket(); - pingreq->setHeader(PINGREQ); - Event* evt = new Event(); - evt->setBrokerSendEvent(client, pingreq); - _gateway->getBrokerSendQue()->post(evt); } diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index f797ca0..4f0adc4 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -198,6 +198,7 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet) } MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId); + if ( regAck != 0 ) { client->getWaitREGACKPacketList()->erase(msgId); @@ -205,6 +206,16 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet) ev->setClientSendEvent(client, regAck); _gateway->getClientSendQue()->post(ev); } + if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 ) + { + /* send PINGREQ to the broker */ + client->resetPingRequest(); + MQTTGWPacket* pingreq = new MQTTGWPacket(); + pingreq->setHeader(PINGREQ); + Event* evt = new Event(); + evt->setBrokerSendEvent(client, pingreq); + _gateway->getBrokerSendQue()->post(evt); + } } }