From 74a9ebaa55f9872ec8a295d6226b2ba4ed1de36b Mon Sep 17 00:00:00 2001 From: tomoaki Date: Wed, 18 Jul 2018 21:03:03 +0900 Subject: [PATCH 1/5] BugFix of #121 Signed-off-by: tomoaki --- .../samples/ClientPub/mainPub.cpp | 12 +- .../samples/ClientSub/mainSub.cpp | 23 +- .../GatewayTester/samples/mainTest.cpp | 5 +- .../GatewayTester/src/LRegisterManager.cpp | 2 +- .../GatewayTester/src/LSubscribeManager.cpp | 545 +++++++++--------- MQTTSNGateway/src/MQTTGWPublishHandler.cpp | 2 +- MQTTSNGateway/src/MQTTSNGWClient.cpp | 31 +- MQTTSNGateway/src/MQTTSNGWClient.h | 8 + .../src/MQTTSNGWConnectionHandler.cpp | 20 +- MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp | 11 + 10 files changed, 366 insertions(+), 293 deletions(-) diff --git a/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp index e9a676f..816dba7 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp @@ -77,7 +77,7 @@ MQTTSNCONF = { const char* topic1 = "ty4tw/topic1"; const char* topic2 = "ty4tw/topic2"; const char* topic3 = "ty4tw/topic3"; - +const char* topic57 = "ty4tw/topic5/7"; /*------------------------------------------------------ * Callback routines for Subscribed Topics @@ -112,6 +112,14 @@ void publishTopic2(void) PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos); } +void publishTopic57(void) +{ + char payload[300]; + sprintf(payload, "publish \"ty4tw/topic57\" \n"); + uint8_t qos = 0; + PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos); +} + void disconnect(void) { @@ -127,7 +135,7 @@ void disconnect(void) TEST_LIST = {// e.g. TEST( Label, Test), TEST("Step1:Publish topic1", publishTopic1), - TEST("Step2:Publish topic2", publishTopic2), + TEST("Step2:Publish topic57", publishTopic57), TEST("Step3:Publish topic2", publishTopic2), TEST("Step4:Disconnect", disconnect), END_OF_TEST_LIST diff --git a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp index f462fe7..e016732 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp @@ -62,7 +62,7 @@ UDPCONF = { * Client Configuration (theMqcon) *------------------------------------------------------*/ MQTTSNCONF = { - 300, //KeepAlive [seconds] + 60, //KeepAlive [seconds] false, //Clean session 300, //Sleep duration [seconds] "", //WillTopic @@ -109,14 +109,22 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen) return 0; } +int on_Topic05(uint8_t* pload, uint16_t ploadlen) +{ + DISPLAY("\n\nNew callback recv TopicA\n"); + pload[ploadlen-1]= 0; // set null terminator + DISPLAY("Payload -->%s <--\n\n",pload); + return 0; +} + /*------------------------------------------------------ * A Link list of Callback routines and Topics *------------------------------------------------------*/ SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx), - // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1), - // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), + // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic5, 0, on_Topic05, QoS1), + //SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), END_OF_SUBSCRIBE_LIST }; @@ -139,6 +147,11 @@ void subscribeTopic2(void) SUBSCRIBE(topic2, on_Topic02, qos); } +void subscribeTopic5(void) +{ + uint8_t qos = 1; + SUBSCRIBE(topic5, on_Topic05, qos); +} void disconnect(void) { @@ -162,9 +175,9 @@ void asleep(void) *------------------------------------------------------*/ TEST_LIST = {// e.g. TEST( Label, Test), - TEST("Step1:Subscribe topic1", subscribeTopic1), + TEST("Step1:Subscribe topic5", subscribeTopic5), //TEST("Step2:Subscribe topic2", subscribeTopic2), - TEST("Step2:Disconnect", disconnect), + TEST("Step2:Disconnect", asleep), END_OF_TEST_LIST }; diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp index 000ad51..a5e001b 100644 --- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp +++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp @@ -78,7 +78,10 @@ const char* topic1 = "ty4tw/topic1"; const char* topic2 = "ty4tw/topic2"; const char* topic3 = "ty4tw/topic3"; const char* topic4 = "ty4tw/topic4"; -const char* topic5 = "ty4tw/topic5"; +const char* topic51 = "ty4tw/topic5/1"; +const char* topic52 = "ty4tw/topic5/2"; +const char* topic53 = "ty4tw/topic5/3"; +const char* topic50 = "ty4tw/topic5/+"; /*------------------------------------------------------ diff --git a/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp b/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp index 5a924d4..b7e4197 100644 --- a/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp @@ -206,7 +206,7 @@ void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen) uint8_t regack[7]; regack[0] = 7; regack[1] = MQTTSN_TYPE_REGACK; - memcpy(regack + 2, msg + 2, 4); + memcpy(regack + 2, msg + 1, 4); LTopic* tp = theClient->getGwProxy()->getTopicTable()->match((char*) msg + 5); if (tp) diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp index 9f9cf5d..5158003 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp @@ -39,119 +39,119 @@ extern LScreen* theScreen; =======================================*/ LSubscribeManager::LSubscribeManager() { - _first = 0; - _last = 0; + _first = 0; + _last = 0; } LSubscribeManager::~LSubscribeManager() { - SubElement* elm = _first; - SubElement* sav = 0; - while (elm) - { - sav = elm->next; - if (elm != 0) - { - free(elm); - } - elm = sav; - } + SubElement* elm = _first; + SubElement* sav = 0; + while (elm) + { + sav = elm->next; + if (elm != 0) + { + free(elm); + } + elm = sav; + } } void LSubscribeManager::onConnect(void) { - DISPLAY("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n"); - if (_first == 0) - { - for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) - { - if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED) - { - subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); - } - else - { - subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); - } - } - } - else - { - SubElement* elm = _first; - SubElement* pelm; - do - { - pelm = elm; - if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) - { - elm->done = SUB_READY; - elm->retryCount = MQTTSN_RETRY_COUNT; - subscribe(elm->topicName, elm->callback, elm->qos); - } - elm = pelm->next; - } while (pelm->next); - } + DISPLAY("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n"); + if (_first == 0) + { + for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) + { + if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED) + { + subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + } + else + { + subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + } + } + } + else + { + SubElement* elm = _first; + SubElement* pelm; + do + { + pelm = elm; + if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) + { + elm->done = SUB_READY; + elm->retryCount = MQTTSN_RETRY_COUNT; + subscribe(elm->topicName, elm->callback, elm->qos); + } + elm = pelm->next; + } while (pelm->next); + } - while (!theClient->getSubscribeManager()->isDone()) - { - theClient->getGwProxy()->getMessage(); - } - DISPLAY("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n"); - DISPLAY("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n"); + while (!theClient->getSubscribeManager()->isDone()) + { + theClient->getGwProxy()->getMessage(); + } + DISPLAY("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n"); + DISPLAY("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n"); } bool LSubscribeManager::isDone(void) { - SubElement* elm = _first; - SubElement* prevelm; - while (elm) - { - prevelm = elm; - if (elm->done == SUB_READY) - { - return false; - } - elm = prevelm->next; - } - return true; + SubElement* elm = _first; + SubElement* prevelm; + while (elm) + { + prevelm = elm; + if (elm->done == SUB_READY) + { + return false; + } + elm = prevelm->next; + } + return true; } void LSubscribeManager::send(SubElement* elm) { - if (elm->done == SUB_DONE) - { - return; - } - uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; - if (elm->topicType == MQTTSN_TOPIC_TYPE_NORMAL) - { - msg[0] = 5 + strlen(elm->topicName); - strcpy((char*) msg + 5, elm->topicName); - } - else - { - msg[0] = 7; - setUint16(msg + 5, elm->topicId); - } - msg[1] = elm->msgType; - msg[2] = elm->qos | elm->topicType; - if (elm->retryCount == MQTTSN_RETRY_COUNT) - { - elm->msgId = theClient->getGwProxy()->getNextMsgId(); - } + if (elm->done == SUB_DONE) + { + return; + } + uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; + if (elm->topicType == MQTTSN_TOPIC_TYPE_PREDEFINED) + { + msg[0] = 7; + setUint16(msg + 5, elm->topicId); + } + else + { + msg[0] = 5 + strlen(elm->topicName); + strcpy((char*) msg + 5, elm->topicName); + } + msg[1] = elm->msgType; + msg[2] = elm->qos | elm->topicType; + if (elm->retryCount == MQTTSN_RETRY_COUNT) + { + elm->msgId = theClient->getGwProxy()->getNextMsgId(); + } - if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE) - { - msg[2] = msg[2] | MQTTSN_FLAG_DUP; - } + if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE) + { + msg[2] = msg[2] | MQTTSN_FLAG_DUP; + } - setUint16(msg + 3, elm->msgId); + setUint16(msg + 3, elm->msgId); - theClient->getGwProxy()->connect(); - theClient->getGwProxy()->writeMsg(msg); - theClient->getGwProxy()->setPingReqTimer(); - elm->sendUTC = time(NULL); - elm->retryCount--; + theClient->getGwProxy()->connect(); + theClient->getGwProxy()->writeMsg(msg); + theClient->getGwProxy()->setPingReqTimer(); + elm->sendUTC = time(NULL); + elm->retryCount--; } void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos) @@ -171,8 +171,8 @@ void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos) { - SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish); - send(elm); + SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish); + send(elm); } void LSubscribeManager::unsubscribe(const char* topicName) @@ -186,220 +186,221 @@ void LSubscribeManager::unsubscribe(const char* topicName) { topicType = MQTTSN_TOPIC_TYPE_SHORT; } - SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0); - send(elm); + SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0); + send(elm); } void LSubscribeManager::unsubscribe( uint16_t topicId) { - SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0); - send(elm); + SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0); + send(elm); } void LSubscribeManager::checkTimeout(void) { - SubElement* elm = _first; + SubElement* elm = _first; - while (elm) - { - if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) - { - if (elm->retryCount >= 0) - { - send(elm); - } - else - { - if ( elm->done == SUB_READY ) - { - if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) - { - DISPLAY("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); - }else{ - DISPLAY("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); - } - elm->done = SUB_DONE; - } - } - } - elm = elm->next; - } + while (elm) + { + if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) + { + if (elm->retryCount >= 0) + { + send(elm); + } + else + { + if ( elm->done == SUB_READY ) + { + if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) + { + DISPLAY("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); + }else{ + DISPLAY("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); + } + elm->done = SUB_DONE; + } + } + } + elm = elm->next; + } } void LSubscribeManager::responce(const uint8_t* msg) { - if (msg[0] == MQTTSN_TYPE_SUBACK) - { - uint16_t topicId = getUint16(msg + 2); - uint16_t msgId = getUint16(msg + 4); - uint8_t rc = msg[6]; + if (msg[0] == MQTTSN_TYPE_SUBACK) + { + uint16_t topicId = getUint16(msg + 2); + uint16_t msgId = getUint16(msg + 4); + uint8_t rc = msg[6]; - SubElement* elm = getElement(msgId); - if (elm) - { - if ( rc == MQTTSN_RC_ACCEPTED ) - { - theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback); - getElement(msgId)->done = SUB_DONE; - DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId); - } - else - { - DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); - remove(elm); - } - } - } - else if (msg[0] == MQTTSN_TYPE_UNSUBACK) - { - uint16_t msgId = getUint16(msg + 1); - SubElement* elm = getElement(msgId); - if (elm) - { - //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0); - DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName); - remove(elm); - } - else - { - DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n"); - } - } + SubElement* elm = getElement(msgId); + if (elm) + { + if ( rc == MQTTSN_RC_ACCEPTED ) + { + theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback); + getElement(msgId)->done = SUB_DONE; + DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId); + } + else + { + DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); + remove(elm); + } + } + } + else if (msg[0] == MQTTSN_TYPE_UNSUBACK) + { + uint16_t msgId = getUint16(msg + 1); + SubElement* elm = getElement(msgId); + if (elm) + { + //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0); + DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName); + remove(elm); + } + else + { + DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n"); + } + } } /* SubElement operations */ SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId, - uint8_t qos, TopicCallback callback) + uint8_t qos, TopicCallback callback) { - SubElement* elm = 0; - if (topicName ) - { - elm = getElement(topicName, msgType); - } - else - { - elm = getElement(topicId, topicType); - } - if ( elm == 0 ) - { - elm = (SubElement*) calloc(1, sizeof(SubElement)); - if (elm == 0) - { - return 0; - } - if (_last == 0) - { - _first = elm; - _last = elm; - } - else - { - elm->prev = _last; - _last->next = elm; - _last = elm; - } - } + SubElement* elm = 0; + if (topicName ) + { + elm = getElement(topicName, msgType); + } + else + { + elm = getElement(topicId, topicType); + } - elm->msgType = msgType; - elm->callback = callback; - elm->topicName = topicName; - elm->topicId = topicId; - elm->topicType = topicType; + if ( elm == 0 ) + { + elm = (SubElement*) calloc(1, sizeof(SubElement)); + if (elm == 0) + { + return 0; + } + if (_last == 0) + { + _first = elm; + _last = elm; + } + else + { + elm->prev = _last; + _last->next = elm; + _last = elm; + } + } - if (qos == 1) - { - elm->qos = MQTTSN_FLAG_QOS_1; - } - else if (qos == 2) - { - elm->qos = MQTTSN_FLAG_QOS_2; - } - else - { - elm->qos = MQTTSN_FLAG_QOS_0; - } - elm->msgId = 0; - elm->retryCount = MQTTSN_RETRY_COUNT; - elm->done = SUB_READY; - elm->sendUTC = 0; + elm->msgType = msgType; + elm->callback = callback; + elm->topicName = topicName; + elm->topicId = topicId; + elm->topicType = topicType; - return elm; + if (qos == 1) + { + elm->qos = MQTTSN_FLAG_QOS_1; + } + else if (qos == 2) + { + elm->qos = MQTTSN_FLAG_QOS_2; + } + else + { + elm->qos = MQTTSN_FLAG_QOS_0; + } + elm->msgId = 0; + elm->retryCount = MQTTSN_RETRY_COUNT; + elm->done = SUB_READY; + elm->sendUTC = 0; + + return elm; } void LSubscribeManager::remove(SubElement* elm) { - if (elm) - { - if (elm->prev == 0) - { - _first = elm->next; - if (elm->next != 0) - { - elm->next->prev = 0; - _last = elm->next; - } - free(elm); - } - else - { - if ( elm->next == 0 ) - { - _last = elm->prev; - } - elm->prev->next = elm->next; - free(elm); - } - } + if (elm) + { + if (elm->prev == 0) + { + _first = elm->next; + if (elm->next != 0) + { + elm->next->prev = 0; + _last = elm->next; + } + free(elm); + } + else + { + if ( elm->next == 0 ) + { + _last = elm->prev; + } + elm->prev->next = elm->next; + free(elm); + } + } } SubElement* LSubscribeManager::getElement(uint16_t msgId) { - SubElement* elm = _first; - while (elm) - { - if (elm->msgId == msgId) - { - return elm; - } - else - { - elm = elm->next; - } - } - return 0; + SubElement* elm = _first; + while (elm) + { + if (elm->msgId == msgId) + { + return elm; + } + else + { + elm = elm->next; + } + } + return 0; } SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType) { - SubElement* elm = _first; - while (elm) - { - if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 ) - { - return elm; - } - else - { - elm = elm->next; - } - } - return 0; + SubElement* elm = _first; + while (elm) + { + if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 ) + { + return elm; + } + else + { + elm = elm->next; + } + } + return 0; } SubElement* LSubscribeManager::getElement(uint16_t topicId, MQTTSN_topicTypes topicType) { - SubElement* elm = _first; - while (elm) - { - if (elm->topicId == topicId && elm->topicType == topicType) - { - return elm; - } - else - { - elm = elm->next; - } - } - return 0; + SubElement* elm = _first; + while (elm) + { + if (elm->topicId == topicId && elm->topicType == topicType) + { + return elm; + } + else + { + elm = elm->next; + } + } + return 0; } diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index f665cec..b29fea0 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -82,7 +82,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) MQTTSN_topicid topicId; uint16_t id = 0; - if (pub.topiclen == 2) + if (pub.topiclen <= 2) { topicId.type = MQTTSN_TOPIC_TYPE_SHORT; *(topicId.data.short_name) = *pub.topic; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 65e29d5..9aa2a18 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -400,6 +400,7 @@ Client::Client(bool secure) _nextClient = 0; _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); _hasPredefTopic = false; + _holdPingRequest = false; } Client::~Client() @@ -796,6 +797,21 @@ void Client::setOTAClient(Client* cl) _otaClient =cl; } +void Client::holdPingRequest(void) +{ + _holdPingRequest = true; +} + +void Client::resetPingRequest(void) +{ + _holdPingRequest = false; +} + +bool Client::isHoldPringReqest(void) +{ + return _holdPingRequest; +} + /*===================================== Class Topic ======================================*/ @@ -1214,7 +1230,7 @@ TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId) TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) { - if ( _cnt > _maxInflight * 2 || topicId == 0) + if ( _cnt > _maxInflight * 2 || ( topicId == 0 && type != MQTTSN_TOPIC_TYPE_SHORT ) ) { return 0; } @@ -1314,6 +1330,7 @@ WaitREGACKPacketList::WaitREGACKPacketList() { _first = 0; _end = 0; + _cnt = 0; } WaitREGACKPacketList::~WaitREGACKPacketList() @@ -1346,6 +1363,7 @@ int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId) elm->_prev = _end; _end = elm; } + _cnt++; return 1; } @@ -1387,10 +1405,17 @@ void WaitREGACKPacketList::erase(uint16_t REGACKMsgId) { p->_next->_prev = p->_prev; } - break; - // Do not delete element. Element is deleted after sending to Client. + _cnt--; + break; + // Do not delete element. Element is deleted after sending to Client. } p = p->_next; } } +uint8_t WaitREGACKPacketList::getCount(void) +{ + return _cnt; +} + + diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index 9bb5ede..118239e 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -219,8 +219,10 @@ public: int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId); MQTTSNPacket* getPacket(uint16_t REGACKMsgId); void erase(uint16_t REGACKMsgId); + uint8_t getCount(void); private: + uint8_t _cnt; waitREGACKPacket* _first; waitREGACKPacket* _end; }; @@ -298,6 +300,10 @@ public: bool isSensorNetStable(void); bool isWaitWillMsg(void); + void holdPingRequest(void); + void resetPingRequest(void); + bool isHoldPringReqest(void); + Client* getNextClient(void); Client* getOTAClient(void); void setOTAClient(Client* cl); @@ -317,6 +323,8 @@ private: char* _willTopic; char* _willMsg; + bool _holdPingRequest; + Timer _keepAliveTimer; uint32_t _keepAliveMsec; diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index 66e2d79..d9fab69 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -270,7 +270,7 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet { MQTTGWPacket* msg = 0; - if ( client->isSleep() || client->isAwake() ) + if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) { while ( ( msg = client->getClientSleepPacket() ) != 0 ) { @@ -281,12 +281,16 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet ev->setBrokerRecvEvent(client, msg); _gateway->getPacketEventQue()->post(ev); } + client->holdPingRequest(); + } + else + { + /* send PINGREQ to the broker */ + client->resetPingRequest(); + MQTTGWPacket* pingreq = new MQTTGWPacket(); + pingreq->setHeader(PINGREQ); + Event* evt = new Event(); + evt->setBrokerSendEvent(client, pingreq); + _gateway->getBrokerSendQue()->post(evt); } - - /* send PINGREQ to the broker */ - MQTTGWPacket* pingreq = new MQTTGWPacket(); - pingreq->setHeader(PINGREQ); - Event* evt = new Event(); - evt->setBrokerSendEvent(client, pingreq); - _gateway->getBrokerSendQue()->post(evt); } diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index f797ca0..4f0adc4 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -198,6 +198,7 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet) } MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId); + if ( regAck != 0 ) { client->getWaitREGACKPacketList()->erase(msgId); @@ -205,6 +206,16 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet) ev->setClientSendEvent(client, regAck); _gateway->getClientSendQue()->post(ev); } + if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 ) + { + /* send PINGREQ to the broker */ + client->resetPingRequest(); + MQTTGWPacket* pingreq = new MQTTGWPacket(); + pingreq->setHeader(PINGREQ); + Event* evt = new Event(); + evt->setBrokerSendEvent(client, pingreq); + _gateway->getBrokerSendQue()->post(evt); + } } } From 521715e011399811572eec82959f005647f2a289 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Wed, 18 Jul 2018 21:32:46 +0900 Subject: [PATCH 2/5] Bugfix of #122 Signed-off-by: tomoaki --- .../src/MQTTSNGWConnectionHandler.cpp | 27 ++++++++++++------- MQTTSNGateway/src/MQTTSNGWConnectionHandler.h | 2 ++ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index d9fab69..721003b 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -82,6 +82,8 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet Event* ev = new Event(); ev->setClientSendEvent(client, packet); _gateway->getClientSendQue()->post(ev); + + sendStoredPublish(client); return; } @@ -272,15 +274,7 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) { - while ( ( msg = client->getClientSleepPacket() ) != 0 ) - { - // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. - client->deleteFirstClientSleepPacket(); // pop the que to delete element. - - Event* ev = new Event(); - ev->setBrokerRecvEvent(client, msg); - _gateway->getPacketEventQue()->post(ev); - } + sendStoredPublish(client); client->holdPingRequest(); } else @@ -294,3 +288,18 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet _gateway->getBrokerSendQue()->post(evt); } } + +void MQTTSNConnectionHandler::sendStoredPublish(Client* client) +{ + MQTTGWPacket* msg = 0; + + while ( ( msg = client->getClientSleepPacket() ) != 0 ) + { + // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. + client->deleteFirstClientSleepPacket(); // pop the que to delete element. + + Event* ev = new Event(); + ev->setBrokerRecvEvent(client, msg); + _gateway->getPacketEventQue()->post(ev); + } +} diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.h b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.h index 044e766..adf92d4 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.h +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.h @@ -37,6 +37,8 @@ public: void handleWillmsgupd(Client* client, MQTTSNPacket* packet); void handlePingreq(Client* client, MQTTSNPacket* packet); private: + void sendStoredPublish(Client* client); + char _pbuf[MQTTSNGW_MAX_PACKET_SIZE * 3]; Gateway* _gateway; }; From 797ddf43f8d5143ca74e64c34f1614ef98d3b232 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Wed, 18 Jul 2018 21:40:20 +0900 Subject: [PATCH 3/5] update test program Signed-off-by: tomoaki --- MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp index e016732..585c8ec 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp @@ -63,7 +63,7 @@ UDPCONF = { *------------------------------------------------------*/ MQTTSNCONF = { 60, //KeepAlive [seconds] - false, //Clean session + true, //Clean session 300, //Sleep duration [seconds] "", //WillTopic "", //WillMessage @@ -78,7 +78,7 @@ const char* topic1 = "ty4tw/topic1"; const char* topic2 = "ty4tw/topic2"; const char* topic3 = "ty4tw/topic3"; const char* topic4 = "a"; -const char* topic5 = "#"; +const char* topic5 = "ty4tw/#"; /*------------------------------------------------------ @@ -178,6 +178,8 @@ TEST_LIST = {// e.g. TEST( Label, Test), TEST("Step1:Subscribe topic5", subscribeTopic5), //TEST("Step2:Subscribe topic2", subscribeTopic2), TEST("Step2:Disconnect", asleep), + TEST("Step3:Cconnect", connect), + TEST("Step4:Disconnect", asleep), END_OF_TEST_LIST }; @@ -202,3 +204,4 @@ void setup(void) /***************** END OF PROGRAM ********************/ + From 4d75351a06ae07d04842f0364fa88430e761f9f7 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Tue, 24 Jul 2018 07:19:39 +0900 Subject: [PATCH 4/5] BugFix of #123 Signed-off-by: tomoaki --- MQTTSNGateway/src/MQTTSNGateway.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index 23f7e78..afd0197 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -216,7 +216,7 @@ void Gateway::initialize(int argc, char** argv) { if (getParam("PredefinedTopicFile", param) == 0) { - fileName =*getConfigDirName() + string(param); + fileName = string(param); } else { From 9940aadd4becbff36aff3ec226e6c3e7e9fd3f99 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Thu, 26 Jul 2018 17:42:12 +0900 Subject: [PATCH 5/5] Add The forwarder Encapsulation mesage #27 #69 Signed-off-by: tomoaki --- .cproject | 4 +- .../GatewayTester/samples/mainTest.cpp | 2 +- MQTTSNGateway/GatewayTester/src/LGwProxy.cpp | 647 ++++++++++-------- MQTTSNGateway/GatewayTester/src/LGwProxy.h | 108 +-- .../GatewayTester/src/LMqttsnClientApp.h | 4 +- .../GatewayTester/src/LNetworkUdp.cpp | 10 +- MQTTSNGateway/Makefile | 5 + MQTTSNGateway/README.md | 10 +- MQTTSNGateway/forwarders.conf | 16 + MQTTSNGateway/gateway.conf | 9 +- MQTTSNGateway/src/MQTTSNGWClient.cpp | 51 +- MQTTSNGateway/src/MQTTSNGWClient.h | 12 +- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 97 ++- MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp | 26 +- MQTTSNGateway/src/MQTTSNGWClientSendTask.h | 1 + .../src/MQTTSNGWConnectionHandler.cpp | 5 +- MQTTSNGateway/src/MQTTSNGWDefines.h | 3 +- .../src/MQTTSNGWEncapsulatedPacket.cpp | 179 +++++ .../src/MQTTSNGWEncapsulatedPacket.h | 65 ++ MQTTSNGateway/src/MQTTSNGWForwarder.cpp | 321 +++++++++ MQTTSNGateway/src/MQTTSNGWForwarder.h | 88 +++ MQTTSNGateway/src/MQTTSNGWVersion.h | 2 +- MQTTSNGateway/src/MQTTSNGateway.cpp | 50 +- MQTTSNGateway/src/MQTTSNGateway.h | 13 +- MQTTSNGateway/src/tests/TestProcess.cpp | 12 +- MQTTSNGateway/src/tests/mainTestProcess.cpp | 2 +- MQTTSNPacket/src/MQTTSNPacket.c | 5 + MQTTSNPacket/src/MQTTSNPacket.h | 8 +- 28 files changed, 1322 insertions(+), 433 deletions(-) create mode 100644 MQTTSNGateway/forwarders.conf create mode 100644 MQTTSNGateway/src/MQTTSNGWEncapsulatedPacket.cpp create mode 100644 MQTTSNGateway/src/MQTTSNGWEncapsulatedPacket.h create mode 100644 MQTTSNGateway/src/MQTTSNGWForwarder.cpp create mode 100644 MQTTSNGateway/src/MQTTSNGWForwarder.h diff --git a/.cproject b/.cproject index d61f694..c3137e8 100644 --- a/.cproject +++ b/.cproject @@ -14,14 +14,14 @@ - + -