From bb993aed5bd48274a4b40282b95b02f969211b8a Mon Sep 17 00:00:00 2001 From: tomoaki Date: Sun, 1 Jul 2018 18:18:38 +0900 Subject: [PATCH] Update: Add Pre-defined-Topic Signed-off-by: tomoaki --- .gitignore | 5 + .../samples/ClientPub/mainPub.cpp | 15 +- .../samples/ClientSub/mainSub.cpp | 27 +- .../GatewayTester/samples/mainTest.cpp | 43 +- .../GatewayTester/src/LMqttsnClient.cpp | 9 +- .../GatewayTester/src/LMqttsnClient.h | 5 +- .../GatewayTester/src/LMqttsnClientApp.h | 25 +- .../GatewayTester/src/LPublishManager.cpp | 21 +- .../GatewayTester/src/LPublishManager.h | 1 - .../GatewayTester/src/LRegisterManager.cpp | 8 +- .../GatewayTester/src/LSubscribeManager.cpp | 65 ++- .../GatewayTester/src/LSubscribeManager.h | 10 +- .../GatewayTester/src/LTopicTable.cpp | 16 +- MQTTSNGateway/GatewayTester/src/LTopicTable.h | 16 +- MQTTSNGateway/Makefile | 2 + MQTTSNGateway/README.md | 4 +- MQTTSNGateway/gateway.conf | 3 + MQTTSNGateway/predefinedTopic.conf | 19 + MQTTSNGateway/src/MQTTGWPublishHandler.cpp | 23 +- MQTTSNGateway/src/MQTTGWSubscribeHandler.cpp | 6 +- MQTTSNGateway/src/MQTTSNGWClient.cpp | 386 +++++++++++----- MQTTSNGateway/src/MQTTSNGWClient.h | 425 +++++++++--------- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 17 +- .../src/MQTTSNGWConnectionHandler.cpp | 4 +- MQTTSNGateway/src/MQTTSNGWDefines.h | 2 + MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp | 70 +-- .../src/MQTTSNGWSubscribeHandler.cpp | 218 ++++----- MQTTSNGateway/src/MQTTSNGWVersion.h | 2 +- MQTTSNGateway/src/MQTTSNGateway.cpp | 39 +- MQTTSNGateway/src/MQTTSNGateway.h | 9 +- MQTTSNGateway/src/tests/TestProcess.cpp | 6 +- MQTTSNGateway/src/tests/TestQue.cpp | 1 - MQTTSNGateway/src/tests/TestTopicIdMap.cpp | 194 +++++--- MQTTSNGateway/src/tests/TestTopicIdMap.h | 1 + MQTTSNGateway/src/tests/TestTopics.cpp | 137 ++++-- MQTTSNGateway/src/tests/TestTree23.cpp | 1 - MQTTSNPacket/src/MQTTSNPacket.c | 4 +- 37 files changed, 1090 insertions(+), 749 deletions(-) create mode 100644 MQTTSNGateway/predefinedTopic.conf diff --git a/.gitignore b/.gitignore index 60dcbfc..ee251ce 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,8 @@ *.pyc /doc/MQTTSNClient/ /doc/MQTTSNPacket/ +/rbmutex.key +/ringbuffer.key +/Release/ +/Debug/ +/core diff --git a/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp index b7e9227..e9a676f 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp @@ -16,16 +16,17 @@ * * Supported functions. * - * void PUBLISH ( const char* topicName, uint8_t* payload, - * uint16_t len, uint8_t qos, bool retain = false ); + * void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false ); * - * void PUBLISH ( uint16_t topicId, uint8_t* payload, - * uint16_t len, uint8_t qos, bool retain = false ); + * void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false ); * - * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, - * uint8_t qos ); + * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos ); * - * void UNSUBSCRIBE( const char* topicName ); + * void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos ); + * + * void UNSUBSCRIBE ( const char* topicName ); + * + * void UNSUBSCRIBE ( uint16_t topicId ); * * void DISCONNECT ( uint16_t sleepInSecs ); * diff --git a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp index e61f4eb..f462fe7 100644 --- a/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp +++ b/MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp @@ -16,16 +16,17 @@ * * Supported functions. * - * void PUBLISH ( const char* topicName, uint8_t* payload, - * uint16_t len, uint8_t qos, bool retain = false ); + * void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false ); * - * void PUBLISH ( uint16_t topicId, uint8_t* payload, - * uint16_t len, uint8_t qos, bool retain = false ); + * void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false ); * - * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, - * uint8_t qos ); + * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos ); * - * void UNSUBSCRIBE( const char* topicName ); + * void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos ); + * + * void UNSUBSCRIBE ( const char* topicName ); + * + * void UNSUBSCRIBE ( uint16_t topicId ); * * void DISCONNECT ( uint16_t sleepInSecs ); * @@ -112,11 +113,13 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen) * A Link list of Callback routines and Topics *------------------------------------------------------*/ -SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS), - //SUB(topic1, on_Topic01, 1), - //SUB(topic4, on_Topic03, 1), - END_OF_SUBSCRIBE_LIST - }; +SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx), + + // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1), + // SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), + END_OF_SUBSCRIBE_LIST + }; + /*------------------------------------------------------ diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp index fa999f1..000ad51 100644 --- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp +++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp @@ -16,16 +16,17 @@ * * Supported functions. * - * void PUBLISH ( const char* topicName, uint8_t* payload, - * uint16_t len, uint8_t qos, bool retain = false ); + * void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false ); * - * void PUBLISH ( uint16_t topicId, uint8_t* payload, - * uint16_t len, uint8_t qos, bool retain = false ); + * void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false ); * - * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, - * uint8_t qos ); + * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos ); * - * void UNSUBSCRIBE( const char* topicName ); + * void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos ); + * + * void UNSUBSCRIBE ( const char* topicName ); + * + * void UNSUBSCRIBE ( uint16_t topicId ); * * void DISCONNECT ( uint16_t sleepInSecs ); * @@ -76,6 +77,8 @@ MQTTSNCONF = { const char* topic1 = "ty4tw/topic1"; const char* topic2 = "ty4tw/topic2"; const char* topic3 = "ty4tw/topic3"; +const char* topic4 = "ty4tw/topic4"; +const char* topic5 = "ty4tw/topic5"; /*------------------------------------------------------ @@ -100,7 +103,7 @@ int on_Topic02(uint8_t* pload, uint16_t ploadlen) int on_Topic03(uint8_t* pload, uint16_t ploadlen) { - DISPLAY("\n\nNew callback recv Topic2\n"); + DISPLAY("\n\nNew callback recv Topic3\n"); pload[ploadlen-1]= 0; // set null terminator DISPLAY("Payload -->%s <--\n\n",pload); return 0; @@ -110,8 +113,9 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen) * A Link list of Callback routines and Topics *------------------------------------------------------*/ -SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS), - SUB(topic1, on_Topic01, 1), +SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx), + SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1), + SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1), END_OF_SUBSCRIBE_LIST }; @@ -119,29 +123,32 @@ SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS), /*------------------------------------------------------ * Test functions *------------------------------------------------------*/ +void subscribePredefTopic1(void) +{ + SUBSCRIBE(1, on_Topic03, QoS1); +} void publishTopic1(void) { char payload[300]; sprintf(payload, "publish \"ty4tw/Topic1\" \n"); - uint8_t qos = 0; - PUBLISH(topic1,(uint8_t*)payload, strlen(payload), qos); + PUBLISH(topic1,(uint8_t*)payload, strlen(payload), QoS0); } void subscribeTopic2(void) { - uint8_t qos = 1; - SUBSCRIBE(topic2, on_Topic02, qos); + SUBSCRIBE(10, on_Topic02, QoS1); } void publishTopic2(void) { char payload[300]; sprintf(payload, "publish \"ty4tw/topic2\" \n"); - uint8_t qos = 0; - PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos); + PUBLISH(topic2,(uint8_t*)payload, strlen(payload), QoS1); } + + void unsubscribe(void) { UNSUBSCRIBE(topic2); @@ -149,8 +156,7 @@ void unsubscribe(void) void subscribechangeCallback(void) { - uint8_t qos = 1; - SUBSCRIBE(topic2, on_Topic03, qos); + SUBSCRIBE(topic2, on_Topic02, QoS1); } void test3(void) @@ -178,6 +184,7 @@ void asleep(void) *------------------------------------------------------*/ TEST_LIST = {// e.g. TEST( Label, Test), + TEST("Step0:Subscribe predef topic1", subscribePredefTopic1), TEST("Step1:Publish topic1", publishTopic1), TEST("Step2:Publish topic2", publishTopic2), TEST("Step3:Subscribe topic2", subscribeTopic2), diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp index 286fe74..b6759dc 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp @@ -183,9 +183,9 @@ void LMqttsnClient::subscribe(const char* topicName, TopicCallback onPublish, ui _subMgr.subscribe(topicName, onPublish, qos); } -void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType) +void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos) { - _subMgr.subscribe(topicId, onPublish, qos, topicType); + _subMgr.subscribe(topicId, onPublish, qos); } void LMqttsnClient::unsubscribe(const char* topicName) @@ -193,6 +193,11 @@ void LMqttsnClient::unsubscribe(const char* topicName) _subMgr.unsubscribe(topicName); } +void LMqttsnClient::unsubscribe(const uint16_t topicId) +{ + _subMgr.unsubscribe(topicId); +} + void LMqttsnClient::disconnect(uint16_t sleepInSecs) { _gwProxy.disconnect(sleepInSecs); diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h index 9c6c424..de60310 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h @@ -34,7 +34,9 @@ namespace linuxAsyncClient { struct OnPublishList { + MQTTSN_topicTypes type; const char* topic; + uint16_t id; int (*pubCallback)(uint8_t* payload, uint16_t payloadlen); uint8_t qos; }; @@ -52,8 +54,9 @@ public: void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false); void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false); void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos); - void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType); + void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos); void unsubscribe(const char* topicName); + void unsubscribe(const uint16_t topicId); void disconnect(uint16_t sleepInSecs); void initialize(LUdpConfig netconf, LMqttsnConfig mqconf); void run(void); diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h index 691d330..2d41807 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h @@ -73,6 +73,15 @@ struct LUdpConfig{ uint16_t uPortNo; }; + +typedef enum +{ + MQTTSN_TOPIC_TYPE_NORMAL, + MQTTSN_TOPIC_TYPE_PREDEFINED, + MQTTSN_TOPIC_TYPE_SHORT +} MQTTSN_topicTypes; + + /*====================================== MACROs for Application =======================================*/ @@ -93,7 +102,7 @@ struct LUdpConfig{ #define END_OF_TEST_LIST {0, 0, 0} #define SUBSCRIBE_LIST OnPublishList theOnPublishList[] #define SUB(...) {__VA_ARGS__} -#define END_OF_SUBSCRIBE_LIST {0,0,0} +#define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0} #define UDPCONF LUdpConfig theNetcon #define MQTTSNCONF LMqttsnConfig theMqcon #ifdef CLIENT_MODE @@ -129,6 +138,9 @@ struct LUdpConfig{ /*====================================== MQTT-SN Defines ========================================*/ +#define QoS0 0 +#define QoS1 1 +#define QoS2 2 #define MQTTSN_TYPE_ADVERTISE 0x00 #define MQTTSN_TYPE_SEARCHGW 0x01 #define MQTTSN_TYPE_GWINFO 0x02 @@ -157,10 +169,7 @@ struct LUdpConfig{ #define MQTTSN_TYPE_WILLMSGUPD 0x1C #define MQTTSN_TYPE_WILLMSGRESP 0x1D -#define MQTTSN_TOPIC_TYPE_NORMAL 0x00 -#define MQTTSN_TOPIC_TYPE_PREDEFINED 0x01 -#define MQTTSN_TOPIC_TYPE_SHORT 0x02 -#define MQTTSN_TOPIC_TYPE 0x03 +#define MQTTSN_TOPIC_TYPE 0x03 #define MQTTSN_FLAG_DUP 0x80 #define MQTTSN_FLAG_QOS_0 0x0 @@ -179,14 +188,10 @@ struct LUdpConfig{ #define MQTTSN_RC_REJECTED_INVALID_TOPIC_ID 0x02 #define MQTTSN_RC_REJECTED_NOT_SUPPORTED 0x03 -#define PREDEFINEDID_OTA_REQ (0x0ff0) -#define PREDEFINEDID_OTA_READY (0x0ff1) -#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2) - /*================================= * Starting prompt ==================================*/ -#define TESTER_VERSION " * Version: 1.0.0" +#define TESTER_VERSION " * Version: 2.0.0" #define PAHO_COPYRIGHT0 " * MQTT-SN Gateway Tester" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp index 8032b61..4e20c8d 100644 --- a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp @@ -66,25 +66,22 @@ void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t q publish(topicName, payload->getRowData(), payload->getLen(), qos, retain); } - void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain) -{ - uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL; - if ( strlen(topicName) < 2 ) - { - topicType = MQTTSN_TOPIC_TYPE_SHORT; - } - publish(topicName, payload, len, qos, topicType, retain); -} - -void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain) { uint16_t msgId = 0; + uint8_t topicType = MQTTSN_TOPIC_TYPE_SHORT; + if ( strlen(topicName) > 2 ) + { + topicType = MQTTSN_TOPIC_TYPE_NORMAL; + } + if ( qos > 0 ) { msgId = theClient->getGwProxy()->getNextMsgId(); } + PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType); + if (elm->status == TOPICID_IS_READY) { sendPublish(elm); @@ -286,7 +283,7 @@ void LPublishManager::published(uint8_t* msg, uint16_t msglen) } _publishedFlg = NEG_TASK_INDEX; - theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, msg[1] & 0x03); + theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, (MQTTSN_topicTypes)(msg[1] & MQTTSN_TOPIC_TYPE)); _publishedFlg = SAVE_TASK_INDEX; } diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.h b/MQTTSNGateway/GatewayTester/src/LPublishManager.h index 231f601..6085218 100644 --- a/MQTTSNGateway/GatewayTester/src/LPublishManager.h +++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.h @@ -62,7 +62,6 @@ public: ~LPublishManager(); void publish(const char* topicName, Payload* payload, uint8_t qos, bool retain = false); void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false); - void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain = false); void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false); void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false); void responce(const uint8_t* msg, uint16_t msglen); diff --git a/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp b/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp index 5309e61..5a924d4 100644 --- a/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp @@ -190,13 +190,13 @@ void LRegisterManager::registerTopic(char* topicName) void LRegisterManager::responceRegAck(uint16_t msgId, uint16_t topicId) { const char* topicName = getTopic(msgId); + MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL; if (topicName) { - uint8_t topicType = strlen((char*) topicName) > 2 ? MQTTSN_TOPIC_TYPE_NORMAL : MQTTSN_TOPIC_TYPE_SHORT; - theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, topicType); // Add Topic to TopicTable + theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, type); // Add Topic to TopicTable RegQueElement* elm = getElement(msgId); remove(elm); - theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, topicType); + theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, type); } } @@ -213,7 +213,7 @@ void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen) { TopicCallback callback = tp->getCallback(); void* topicName = calloc(strlen((char*) msg + 5) + 1, sizeof(char)); - theClient->getGwProxy()->getTopicTable()->add((char*) topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, callback, 1); + theClient->getGwProxy()->getTopicTable()->add((char*) topicName, MQTTSN_TOPIC_TYPE_NORMAL, 0, callback, 1); regack[6] = MQTTSN_RC_ACCEPTED; } else diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp index 4424155..9f9cf5d 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp @@ -65,7 +65,14 @@ void LSubscribeManager::onConnect(void) { for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) { - subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED) + { + subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + } + else + { + subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); + } } } else @@ -149,30 +156,43 @@ void LSubscribeManager::send(SubElement* elm) void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos) { - uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL; - if ( strlen(topicName) <= 2) - { - topicType = MQTTSN_TOPIC_TYPE_SHORT; - } - SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, 0, topicType, qos, onPublish); - send(elm); + MQTTSN_topicTypes topicType; + if ( strlen(topicName) > 2 ) + { + topicType = MQTTSN_TOPIC_TYPE_NORMAL; + } + else + { + topicType = MQTTSN_TOPIC_TYPE_SHORT; + } + SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, topicType, 0, qos, onPublish); + send(elm); } -void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType) +void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos) { - SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, topicId, topicType, qos, onPublish); + SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish); send(elm); } void LSubscribeManager::unsubscribe(const char* topicName) { - SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, 0, 0); + MQTTSN_topicTypes topicType; + if ( strlen(topicName) > 2 ) + { + topicType = MQTTSN_TOPIC_TYPE_NORMAL; + } + else + { + topicType = MQTTSN_TOPIC_TYPE_SHORT; + } + SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0); send(elm); } -void LSubscribeManager::unsubscribe(uint16_t topicId, uint8_t topicType) +void LSubscribeManager::unsubscribe( uint16_t topicId) { - SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, topicId, topicType, 0, 0); + SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0); send(elm); } @@ -214,20 +234,19 @@ void LSubscribeManager::responce(const uint8_t* msg) uint16_t msgId = getUint16(msg + 4); uint8_t rc = msg[6]; - LTopicTable* tt = theClient->getGwProxy()->getTopicTable(); SubElement* elm = getElement(msgId); if (elm) { if ( rc == MQTTSN_RC_ACCEPTED ) { - tt->add((char*) elm->topicName, topicId, elm->topicType, elm->callback); + theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback); getElement(msgId)->done = SUB_DONE; DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId); } else { - remove(elm); DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); + remove(elm); } } } @@ -237,22 +256,20 @@ void LSubscribeManager::responce(const uint8_t* msg) SubElement* elm = getElement(msgId); if (elm) { - LTopicTable* tt = theClient->getGwProxy()->getTopicTable(); - tt->setCallback(elm->topicName, 0); + //theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0); DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName); - remove(getElement(msgId)); + remove(elm); } else { - DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); - remove(getElement(msgId)); + DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n"); } } } /* SubElement operations */ -SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType, +SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId, uint8_t qos, TopicCallback callback) { SubElement* elm = 0; @@ -358,7 +375,7 @@ SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType SubElement* elm = _first; while (elm) { - if (strcmp(elm->topicName, topicName) == 0 && elm->msgType == msgType) + if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 ) { return elm; } @@ -370,7 +387,7 @@ SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType return 0; } -SubElement* LSubscribeManager::getElement(uint16_t topicId, uint8_t topicType) +SubElement* LSubscribeManager::getElement(uint16_t topicId, MQTTSN_topicTypes topicType) { SubElement* elm = _first; while (elm) diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.h b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.h index be6eae3..1db3617 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.h +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.h @@ -37,7 +37,7 @@ typedef struct SubElement{ time_t sendUTC; uint16_t topicId; uint8_t msgType; - uint8_t topicType; + MQTTSN_topicTypes topicType; uint8_t qos; int retryCount; @@ -56,9 +56,9 @@ public: ~LSubscribeManager(); void onConnect(void); void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos); - void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType); + void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos); void unsubscribe(const char* topicName); - void unsubscribe(uint16_t topicId, uint8_t topicType); + void unsubscribe(uint16_t topicId); void responce(const uint8_t* msg); void checkTimeout(void); bool isDone(void); @@ -66,9 +66,9 @@ private: void send(SubElement* elm); SubElement* getFirstElement(void); SubElement* getElement(uint16_t msgId); - SubElement* getElement(uint16_t topicId, uint8_t topicType); + SubElement* getElement(uint16_t topicId, MQTTSN_topicTypes topicType); SubElement* getElement(const char* topicName, uint8_t msgType); - SubElement* add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType, uint8_t qos, TopicCallback callback); + SubElement* add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId, uint8_t qos, TopicCallback callback); void remove(SubElement* elm); SubElement* _first; SubElement* _last; diff --git a/MQTTSNGateway/GatewayTester/src/LTopicTable.cpp b/MQTTSNGateway/GatewayTester/src/LTopicTable.cpp index 4454ba0..d1e191b 100644 --- a/MQTTSNGateway/GatewayTester/src/LTopicTable.cpp +++ b/MQTTSNGateway/GatewayTester/src/LTopicTable.cpp @@ -131,7 +131,7 @@ LTopic* LTopicTable::getTopic(const char* topic){ return 0; } -LTopic* LTopicTable::getTopic(uint16_t topicId, uint8_t topicType){ +LTopic* LTopicTable::getTopic(uint16_t topicId, MQTTSN_topicTypes topicType){ LTopic* p = _first; while(p){ if (p->_topicId == topicId && p->_topicType == topicType){ @@ -156,12 +156,12 @@ char* LTopicTable::getTopicName(LTopic* topic){ } -void LTopicTable::setTopicId(const char* topic, uint16_t id, uint8_t type){ +void LTopicTable::setTopicId(const char* topic, uint16_t id, MQTTSN_topicTypes type){ LTopic* tp = getTopic(topic); if (tp){ tp->_topicId = id; }else{ - add(topic, id, type, 0); + add(topic, type, id, 0); } } @@ -176,7 +176,7 @@ bool LTopicTable::setCallback(const char* topic, TopicCallback callback){ } -bool LTopicTable::setCallback(uint16_t topicId, uint8_t topicType, TopicCallback callback){ +bool LTopicTable::setCallback(uint16_t topicId, MQTTSN_topicTypes topicType, TopicCallback callback){ LTopic* p = getTopic(topicId, topicType); if (p){ p->_callback = callback; @@ -186,7 +186,7 @@ bool LTopicTable::setCallback(uint16_t topicId, uint8_t topicType, TopicCallback } -int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, uint8_t topicType){ +int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, MQTTSN_topicTypes topicType){ LTopic* p = getTopic(topicId, topicType); if (p){; return p->execCallback(payload, payloadlen); @@ -195,7 +195,7 @@ int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payl } -LTopic* LTopicTable::add(const char* topicName, uint16_t id, uint8_t type, TopicCallback callback, uint8_t alocFlg) +LTopic* LTopicTable::add(const char* topicName, MQTTSN_topicTypes type, uint16_t id, TopicCallback callback, uint8_t alocFlg) { LTopic* elm; @@ -234,9 +234,9 @@ exit: return elm; } -void LTopicTable::remove(uint16_t topicId) +void LTopicTable::remove(uint16_t topicId, MQTTSN_topicTypes type) { - LTopic* elm = getTopic(topicId); + LTopic* elm = getTopic(topicId, type); if (elm){ if (elm->_prev == 0) diff --git a/MQTTSNGateway/GatewayTester/src/LTopicTable.h b/MQTTSNGateway/GatewayTester/src/LTopicTable.h index 9736c6e..981613e 100644 --- a/MQTTSNGateway/GatewayTester/src/LTopicTable.h +++ b/MQTTSNGateway/GatewayTester/src/LTopicTable.h @@ -42,7 +42,7 @@ public: TopicCallback getCallback(void); private: uint16_t _topicId; - uint8_t _topicType; + MQTTSN_topicTypes _topicType; char* _topicStr; TopicCallback _callback; uint8_t _malocFlg; @@ -60,16 +60,16 @@ public: uint16_t getTopicId(const char* topic); char* getTopicName(LTopic* topic); LTopic* getTopic(const char* topic); - LTopic* getTopic(uint16_t topicId, uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL); - void setTopicId(const char* topic, uint16_t id, uint8_t topicType); + LTopic* getTopic(uint16_t topicId, MQTTSN_topicTypes topicType); + void setTopicId(const char* topic, uint16_t id, MQTTSN_topicTypes topicType); bool setCallback(const char* topic, TopicCallback callback); - bool setCallback(uint16_t topicId, uint8_t type, TopicCallback callback); - int execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL); - LTopic* add(const char* topic, uint16_t id = 0, uint8_t type = MQTTSN_TOPIC_TYPE_NORMAL, TopicCallback callback = 0, uint8_t alocFlg = 0); - LTopic* add(uint16_t topicId, uint16_t id, uint8_t type, TopicCallback callback, uint8_t alocFlg); + bool setCallback(uint16_t topicId, MQTTSN_topicTypes type, TopicCallback callback); + int execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, MQTTSN_topicTypes topicType); + LTopic* add(const char* topic, MQTTSN_topicTypes type, uint16_t id = 0, TopicCallback callback = 0, uint8_t alocFlg = 0); + //LTopic* add(uint16_t topicId, uint16_t id, MQTTSN_topicTypes type, TopicCallback callback, uint8_t alocFlg); LTopic* match(const char* topic); void clearTopic(void); - void remove(uint16_t topicId); + void remove(uint16_t topicId, MQTTSN_topicTypes type); private: LTopic* _first; diff --git a/MQTTSNGateway/Makefile b/MQTTSNGateway/Makefile index fa5ad97..9a66f0c 100644 --- a/MQTTSNGateway/Makefile +++ b/MQTTSNGateway/Makefile @@ -9,6 +9,7 @@ TESTAPPL := mainTestProcess CONFIG := gateway.conf CLIENTS := clients.conf +PREDEFTOPIC := predefinedTopic.conf SRCDIR := src SUBDIR := ../MQTTSNPacket/src @@ -135,6 +136,7 @@ install: cp -pf $(LPROG) ../../ cp -pf $(CONFIG) ../../ cp -pf $(CLIENTS) ../../ + cp -pf $(PREDEFTOPIC) ../../ exectest: ./$(OUTDIR)/$(TESTPROGNAME) -f ./gateway.conf diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md index 122527c..73afcad 100644 --- a/MQTTSNGateway/README.md +++ b/MQTTSNGateway/README.md @@ -60,7 +60,9 @@ ApiMode=2 Client should know the MulticastIP and MulticastPortNo to send a SEARCHGW message. **GatewayId** is used by GWINFO message. **KeepAlive** is a duration of ADVERTISE message in seconds. -when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No. +when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No. +When **PredefinedTopic** is YES, Pre-definedTopicID file specified by PredefinedTopicFile is effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format. + diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf index 46016a4..d92a12b 100644 --- a/MQTTSNGateway/gateway.conf +++ b/MQTTSNGateway/gateway.conf @@ -20,6 +20,9 @@ BrokerSecurePortNo=8883 ClientAuthentication=NO #ClientsList=/path/to/your_clients.conf +PredefinedTopic=NO +#PredefinedTopicFile=/path/to/your_predefinedTopic.conf + #RootCAfile=/etc/ssl/certs/ca-certificates.crt #RootCApath=/etc/ssl/certs/ #CertsFile=/path/to/certKey.pem diff --git a/MQTTSNGateway/predefinedTopic.conf b/MQTTSNGateway/predefinedTopic.conf new file mode 100644 index 0000000..9b50801 --- /dev/null +++ b/MQTTSNGateway/predefinedTopic.conf @@ -0,0 +1,19 @@ +#*********************************************************************** +# Copyright (c) 2017, Tomoaki Yamaguchi +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +#*********************************************************************** +# +# ClientID, TopicName, TopicID +# + +GatewayTestClient,ty4tw/predefinedTopic1, 1 +GatewayTestClient,ty4tw/predefinedTopic2, 2 +GatewayTestClient,ty4tw/predefinedTopic3, 3 \ No newline at end of file diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index 12ad28e..f665cec 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -90,18 +90,21 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) } else { - topicId.type = MQTTSN_TOPIC_TYPE_NORMAL; - topicId.data.long_.len = pub.topiclen; - topicId.data.long_.name = pub.topic; - id = client->getTopics()->getTopicId(&topicId); + topicId.data.long_.len = pub.topiclen; + topicId.data.long_.name = pub.topic; + Topic* tp = client->getTopics()->getTopicByName(&topicId); - if ( id > 0 ) - { - topicId.data.id = id; - } + if ( tp ) + { + topicId.type = tp->getType(); + topicId.data.long_.len = pub.topiclen; + topicId.data.long_.name = pub.topic; + topicId.data.id = tp->getTopicId(); + } else { /* This message might be subscribed with wild card. */ + topicId.type = MQTTSN_TOPIC_TYPE_NORMAL; Topic* topic = client->getTopics()->match(&topicId); if (topic == 0) { @@ -179,11 +182,11 @@ void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet) { Ack ack; packet->getAck(&ack); - uint16_t topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId); + TopicIdMapelement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId); if (topicId) { MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); - mqttsnPacket->setPUBACK(topicId, (uint16_t)ack.msgId, 0); + mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0); client->eraseWaitedPubTopicId((uint16_t)ack.msgId); Event* ev1 = new Event(); diff --git a/MQTTSNGateway/src/MQTTGWSubscribeHandler.cpp b/MQTTSNGateway/src/MQTTGWSubscribeHandler.cpp index 1278cff..a8264ac 100644 --- a/MQTTSNGateway/src/MQTTGWSubscribeHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWSubscribeHandler.cpp @@ -38,12 +38,11 @@ void MQTTGWSubscribeHandler::handleSuback(Client* client, MQTTGWPacket* packet) int qos = 0; packet->getSUBACK(&msgId, &rc); - uint16_t topicId = client->getWaitedSubTopicId(msgId); + TopicIdMapelement* topicId = client->getWaitedSubTopicId(msgId); if (topicId) { MQTTSNPacket* snPacket = new MQTTSNPacket(); - client->eraseWaitedSubTopicId(msgId); if (rc == 0x80) { @@ -54,10 +53,11 @@ void MQTTGWSubscribeHandler::handleSuback(Client* client, MQTTGWPacket* packet) returnCode = MQTTSN_RC_ACCEPTED; qos = rc; } - snPacket->setSUBACK(qos, topicId, msgId, returnCode); + snPacket->setSUBACK(qos, topicId->getTopicId(), msgId, returnCode); Event* evt = new Event(); evt->setClientSendEvent(client, snPacket); _gateway->getClientSendQue()->post(evt); + client->eraseWaitedSubTopicId(msgId); } } diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index f9da50e..65e29d5 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -15,6 +15,7 @@ * Tieto Poland Sp. z o.o. - Gateway improvements **************************************************************************************/ +#include "MQTTSNGWDefines.h" #include "MQTTSNGWClient.h" #include "MQTTSNGateway.h" #include "SensorNetwork.h" @@ -124,6 +125,51 @@ bool ClientList::authorize(const char* fileName) return _authorize; } +bool ClientList::setPredefinedTopics(const char* fileName) +{ + FILE* fp; + char buf[MAX_CLIENTID_LENGTH + 256]; + size_t pos0, pos1; + MQTTSNString clientId; + bool rc = false; + + clientId.cstring = 0; + clientId.lenstring.data = 0; + clientId.lenstring.len = 0; + + if ((fp = fopen(fileName, "r")) != 0) + { + while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0) + { + if (*buf == '#') + { + continue; + } + string data = string(buf); + while ((pos0 = data.find_first_of("  \t\n")) != string::npos) + { + data.erase(pos0, 1); + } + if (data.empty()) + { + continue; + } + + pos0 = data.find_first_of(","); + pos1 = data.find(",", pos0 + 1) ; + string id = data.substr(0, pos0); + clientId.cstring = strdup(id.c_str()); + string topicName = data.substr(pos0 + 1, pos1 - pos0 -1); + uint16_t topicID = stoul(data.substr(pos1 + 1)); + createPredefinedTopic( &clientId, topicName, topicID); + free(clientId.cstring); + } + fclose(fp); + rc = true; + } + return rc; +} + void ClientList::erase(Client*& client) { if ( !_authorize && client->erasable()) @@ -179,14 +225,21 @@ Client* ClientList::getClient(void) return _firstClient; } -Client* ClientList::getClient(uint8_t* clientId) + +Client* ClientList::getClient(MQTTSNString* clientId) { _mutex.lock(); Client* client = _firstClient; + const char* clID =clientId->cstring; + + if (clID == 0 ) + { + clID = clientId->lenstring.data; + } while (client != 0) { - if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 ) + if (strncmp((const char*)client->getClientId(), clID, MQTTSNstrlen(*clientId)) == 0 ) { _mutex.unlock(); return client; @@ -222,7 +275,10 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId, /* creat a new client */ client = new Client(secure); - client->setClientAddress(addr); + if ( addr ) + { + client->setClientAddress(addr); + } client->setSensorNetType(unstableLine); if ( MQTTSNstrlen(*clientId) ) { @@ -256,6 +312,50 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId, return client; } +Client* ClientList::createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t topicId) +{ + Client* client = getClient(clientId); + + if ( _authorize && client == 0) + { + return 0; + } + + /* anonimous clients */ + if ( _clientCnt > MAX_CLIENTS ) + { + return 0; // full of clients + } + + if ( client == 0 ) + { + /* creat a new client */ + client = new Client(); + client->setClientId(*clientId); + + _mutex.lock(); + + /* add the list */ + if ( _firstClient == 0 ) + { + _firstClient = client; + _endClient = client; + } + else + { + _endClient->_nextClient = client; + client->_prevClient = _endClient; + _endClient = client; + } + _clientCnt++; + _mutex.unlock(); + } + + // create Topic & Add it + client->getTopics()->add((const char*)topicName.c_str(), topicId); + return client; +} + uint16_t ClientList::getClientCount() { return _clientCnt; @@ -299,6 +399,7 @@ Client::Client(bool secure) _prevClient = 0; _nextClient = 0; _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); + _hasPredefTopic = false; } Client::~Client() @@ -334,16 +435,14 @@ Client::~Client() } } -uint16_t Client::getWaitedPubTopicId(uint16_t msgId) +TopicIdMapelement* Client::getWaitedPubTopicId(uint16_t msgId) { - MQTTSN_topicTypes type; - return _waitedPubTopicIdMap.getTopicId(msgId, &type); + return _waitedPubTopicIdMap.getElement(msgId); } -uint16_t Client::getWaitedSubTopicId(uint16_t msgId) +TopicIdMapelement* Client::getWaitedSubTopicId(uint16_t msgId) { - MQTTSN_topicTypes type; - return _waitedSubTopicIdMap.getTopicId(msgId, &type); + return _waitedSubTopicIdMap.getElement(msgId); } MQTTGWPacket* Client::getClientSleepPacket() @@ -426,7 +525,7 @@ void Client::setSessionStatus(bool status) bool Client::erasable(void) { - return _sessionStatus; + return _sessionStatus || !_hasPredefTopic; } void Client::updateStatus(MQTTSNPacket* packet) @@ -702,13 +801,15 @@ void Client::setOTAClient(Client* cl) ======================================*/ Topic::Topic() { + _type = MQTTSN_TOPIC_TYPE_NORMAL; _topicName = 0; _topicId = 0; _next = 0; } -Topic::Topic(string* topic) +Topic::Topic(string* topic, MQTTSN_topicTypes type) { + _type = type; _topicName = topic; _topicId = 0; _next = 0; @@ -732,6 +833,11 @@ uint16_t Topic::getTopicId(void) return _topicId; } +MQTTSN_topicTypes Topic::getType(void) +{ + return _type; +} + bool Topic::isMatch(string* topicName) { string::size_type tlen = _topicName->size(); @@ -824,6 +930,11 @@ bool Topic::isMatch(string* topicName) } } +void Topic::print(void) +{ + WRITELOG("TopicName=%s ID=%d Type=%d\n", _topicName->c_str(), _topicId, _type); +} + /*===================================== Class Topics ======================================*/ @@ -831,6 +942,7 @@ Topics::Topics() { _first = 0; _nextTopicId = 0; + _cnt = 0; } Topics::~Topics() @@ -844,113 +956,120 @@ Topics::~Topics() } } -uint16_t Topics::getTopicId(const MQTTSN_topicid* topicid) +Topic* Topics::getTopicByName(const MQTTSN_topicid* topicid) { - if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL) - { - return 0; - } + Topic* p = _first; + char* ch = topicid->data.long_.name; + string sname = string(ch, ch + topicid->data.long_.len); + while (p) + { + if ( p->_topicName->compare(sname) == 0 ) + { + return p; + } + p = p->_next; + } + return 0; +} + +Topic* Topics::getTopicById(const MQTTSN_topicid* topicid) +{ Topic* p = _first; + while (p) { - if ( (int)strlen(p->_topicName->c_str()) == topicid->data.long_.len && - strncmp(p->_topicName->c_str(), topicid->data.long_.name, topicid->data.long_.len) == 0) - { - return p->_topicId; - } - p = p->_next; - } - return 0; -} - -Topic* Topics::getTopic(uint16_t id) -{ - Topic* p = _first; - while (p) - { - if (p->_topicId == id) - { - return p; - } - p = p->_next; - } - return 0; -} - -Topic* Topics::getTopic(const MQTTSN_topicid* topicid) -{ - Topic* p = _first; - while (p) - { - if ( (int)strlen(p->_topicName->c_str()) == topicid->data.long_.len && - strncmp(p->_topicName->c_str(), topicid->data.long_.name, topicid->data.long_.len) == 0 ) - { - return p; - } + if ( p->_type == topicid->type && p->_topicId == topicid->data.id ) + { + return p; + } p = p->_next; } return 0; } +// For MQTTSN_TOPIC_TYPE_NORMAL */ Topic* Topics::add(const MQTTSN_topicid* topicid) { - Topic* topic; - uint16_t id = 0; + if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL ) + { + return 0; + } - if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL) - { - return 0; - } - id = getTopicId(topicid); + Topic* topic = getTopicByName(topicid); - if (id) - { - topic = getTopic(id); - } - else - { - string topicName = string(topicid->data.long_.name, topicid->data.long_.len); - topic = add(&topicName); - } - return topic; + if ( topic ) + { + return topic; + } + string name(topicid->data.long_.name, topicid->data.long_.len); + return add(name.c_str(), 0); } - -Topic* Topics::add(const string* topicName) +Topic* Topics::add(const char* topicName, uint16_t id) { - Topic* topic = 0; + MQTTSN_topicid topicId; - Topic* tp = _first; + if ( _cnt >= MAX_TOPIC_PAR_CLIENT ) + { + return 0; + } - topic = new Topic(); + topicId.data.long_.name = (char*)const_cast(topicName); + topicId.data.long_.len = strlen(topicName); - if (topic == 0) - { - return 0; - } - string* name = new string(*topicName); - topic->_topicName = name; - topic->_topicId = getNextTopicId(); - if (tp == 0) - { - _first = topic; - } + Topic* topic = getTopicByName(&topicId); - while (tp) - { - if (tp->_next == 0) - { - tp->_next = topic; - break; - } - else - { - tp = tp->_next; - } - } - return topic; + if ( topic ) + { + return topic; + } + + topic = new Topic(); + + if (topic == 0) + { + return 0; + } + + string* name = new string(topicName); + topic->_topicName = name; + + if ( id == 0 ) + { + topic->_type = MQTTSN_TOPIC_TYPE_NORMAL; + topic->_topicId = getNextTopicId(); + } + else + { + topic->_type = MQTTSN_TOPIC_TYPE_PREDEFINED; + topic->_topicId = id; + } + + _cnt++; + + if ( _first == 0) + { + _first = topic; + } + else + { + Topic* tp = _first; + while (tp) + { + if (tp->_next == 0) + { + tp->_next = topic; + break; + } + else + { + tp = tp->_next; + } + } + } + return topic; } uint16_t Topics::getNextTopicId() @@ -978,6 +1097,60 @@ Topic* Topics::match(const MQTTSN_topicid* topicid) return 0; } + +void Topics::eraseNormal(void) +{ + Topic* topic = _first; + Topic* next = 0; + Topic* prev = 0; + + while (topic) + { + if ( topic->_type == MQTTSN_TOPIC_TYPE_NORMAL ) + { + next = topic->_next; + if ( _first == topic ) + { + _first = next; + } + if ( prev ) + { + prev->_next = next; + } + delete topic; + _cnt--; + topic = next; + } + else + { + prev = topic; + topic = topic->_next; + } + } +} + +void Topics::print(void) +{ + Topic* topic = _first; + if (topic == 0 ) + { + WRITELOG("No Topic.\n"); + } + else + { + while (topic) + { + topic->print(); + topic = topic->_next; + } + } +} + +uint8_t Topics::getCount(void) +{ + return _cnt; +} + /*===================================== Class TopicIdMap =====================================*/ @@ -995,6 +1168,16 @@ TopicIdMapelement::~TopicIdMapelement() } +MQTTSN_topicTypes TopicIdMapelement::getTopicType(void) +{ + return _type; +} + +uint16_t TopicIdMapelement::getTopicId(void) +{ + return _topicId; +} + TopicIdMap::TopicIdMap() { _maxInflight = MAX_INFLIGHTMESSAGES; @@ -1015,28 +1198,27 @@ TopicIdMap::~TopicIdMap() } } -uint16_t TopicIdMap::getTopicId(uint16_t msgId, MQTTSN_topicTypes* type) +TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId) { TopicIdMapelement* p = _first; while ( p ) { if ( p->_msgId == msgId ) { - *type = p->_type; - return p->_topicId; + return p; } p = p->_next; } return 0; } -int TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) +TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) { if ( _cnt > _maxInflight * 2 || topicId == 0) { return 0; } - if ( getTopicId(msgId, &type) > 0 ) + if ( getElement(msgId) > 0 ) { erase(msgId); } @@ -1058,7 +1240,7 @@ int TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type) _end = elm; } _cnt++; - return 1; + return elm; } void TopicIdMap::erase(uint16_t msgId) diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index 3b52fa1..9bb5ede 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -39,73 +39,73 @@ namespace MQTTSNGW template class PacketQue { public: - PacketQue() - { - _que = new Que; - } + PacketQue() + { + _que = new Que; + } - ~PacketQue() - { - clear(); - delete _que; - } + ~PacketQue() + { + clear(); + delete _que; + } - T* getPacket() - { - T* packet; - if (_que->size() > 0) - { - _mutex.lock(); - packet = _que->front(); - _mutex.unlock(); - return packet; - } - else - { - return 0; - } - } + T* getPacket() + { + T* packet; + if (_que->size() > 0) + { + _mutex.lock(); + packet = _que->front(); + _mutex.unlock(); + return packet; + } + else + { + return 0; + } + } - int - post(T* packet) - { - int rc; - _mutex.lock(); - rc = _que->post(packet); - _mutex.unlock(); - return rc; - } + int + post(T* packet) + { + int rc; + _mutex.lock(); + rc = _que->post(packet); + _mutex.unlock(); + return rc; + } - void pop() - { - if (_que->size() > 0) - { - _mutex.lock(); - _que->pop(); - _mutex.unlock(); - } - } + void pop() + { + if (_que->size() > 0) + { + _mutex.lock(); + _que->pop(); + _mutex.unlock(); + } + } - void clear() - { - _mutex.lock(); - while (_que->size() > 0) - { - delete _que->front(); - _que->pop(); - } - _mutex.unlock(); - } + void clear() + { + _mutex.lock(); + while (_que->size() > 0) + { + delete _que->front(); + _que->pop(); + } + _mutex.unlock(); + } - void setMaxSize(int size) - { - _que->setMaxSize(size); - } + void setMaxSize(int size) + { + _que->setMaxSize(size); + } private: - Que* _que; - Mutex _mutex; + Que* _que; + Mutex _mutex; }; @@ -114,19 +114,21 @@ private: ======================================*/ class Topic { - friend class Topics; + friend class Topics; public: - Topic(); - Topic(string* topic); - ~Topic(); - string* getTopicName(void); - uint16_t getTopicId(void); - bool isMatch(string* topicName); - + Topic(); + Topic(string* topic, MQTTSN_topicTypes type); + ~Topic(); + string* getTopicName(void); + uint16_t getTopicId(void); + MQTTSN_topicTypes getType(void); + bool isMatch(string* topicName); + void print(void); private: - uint16_t _topicId; - string* _topicName; - Topic* _next; + MQTTSN_topicTypes _type; + uint16_t _topicId; + string* _topicName; + Topic* _next; }; /*===================================== @@ -135,20 +137,21 @@ private: class Topics { public: - Topics(); - ~Topics(); - Topic* add(const MQTTSN_topicid* topicid); - Topic* add(const string* topic); - uint16_t getTopicId(const MQTTSN_topicid* topic); - uint16_t getNextTopicId(); - Topic* getTopic(uint16_t topicId); - Topic* getTopic(const MQTTSN_topicid* topicid); - Topic* match(const MQTTSN_topicid* topicid); - + Topics(); + ~Topics(); + Topic* add(const MQTTSN_topicid* topicid); + Topic* add(const char* topicName, uint16_t id = 0); + Topic* getTopicByName(const MQTTSN_topicid* topic); + Topic* getTopicById(const MQTTSN_topicid* topicid); + Topic* match(const MQTTSN_topicid* topicid); + void eraseNormal(void); + uint16_t getNextTopicId(); + void print(void); + uint8_t getCount(void); private: - uint16_t _nextTopicId; - Topic* _first; - + uint16_t _nextTopicId; + Topic* _first; + uint8_t _cnt; }; /*===================================== @@ -156,34 +159,36 @@ private: =====================================*/ class TopicIdMapelement { - friend class TopicIdMap; + friend class TopicIdMap; public: - TopicIdMapelement(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); - ~TopicIdMapelement(); + TopicIdMapelement(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); + ~TopicIdMapelement(); + MQTTSN_topicTypes getTopicType(void); + uint16_t getTopicId(void); private: - uint16_t _msgId; - uint16_t _topicId; - MQTTSN_topicTypes _type; - TopicIdMapelement* _next; - TopicIdMapelement* _prev; + uint16_t _msgId; + uint16_t _topicId; + MQTTSN_topicTypes _type; + TopicIdMapelement* _next; + TopicIdMapelement* _prev; }; class TopicIdMap { public: - TopicIdMap(); - ~TopicIdMap(); - uint16_t getTopicId(uint16_t msgId, MQTTSN_topicTypes* type); - int add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); - void erase(uint16_t msgId); - void clear(void); + TopicIdMap(); + ~TopicIdMap(); + TopicIdMapelement* getElement(uint16_t msgId); + TopicIdMapelement* add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); + void erase(uint16_t msgId); + void clear(void); private: - uint16_t* _msgIds; - TopicIdMapelement* _first; - TopicIdMapelement* _end; - int _cnt; - int _maxInflight; + uint16_t* _msgIds; + TopicIdMapelement* _first; + TopicIdMapelement* _end; + int _cnt; + int _maxInflight; }; /*===================================== @@ -191,16 +196,16 @@ private: =====================================*/ class waitREGACKPacket { - friend class WaitREGACKPacketList; + friend class WaitREGACKPacketList; public: - waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId); - ~waitREGACKPacket(); + waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId); + ~waitREGACKPacket(); private: - uint16_t _msgId; - MQTTSNPacket* _packet; - waitREGACKPacket* _next; - waitREGACKPacket* _prev; + uint16_t _msgId; + MQTTSNPacket* _packet; + waitREGACKPacket* _next; + waitREGACKPacket* _prev; }; /*===================================== @@ -209,15 +214,15 @@ private: class WaitREGACKPacketList { public: - WaitREGACKPacketList(); - ~WaitREGACKPacketList(); - int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId); - MQTTSNPacket* getPacket(uint16_t REGACKMsgId); - void erase(uint16_t REGACKMsgId); + WaitREGACKPacketList(); + ~WaitREGACKPacketList(); + int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId); + MQTTSNPacket* getPacket(uint16_t REGACKMsgId); + void erase(uint16_t REGACKMsgId); private: - waitREGACKPacket* _first; - waitREGACKPacket* _end; + waitREGACKPacket* _first; + waitREGACKPacket* _end; }; /*===================================== @@ -227,110 +232,111 @@ private: typedef enum { - Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost + Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost } ClientStatus; class Client { - friend class ClientList; + friend class ClientList; public: - Client(bool secure = false); - Client(uint8_t maxInflightMessages, bool secure); - ~Client(); + Client(bool secure = false); + Client(uint8_t maxInflightMessages, bool secure); + ~Client(); - Connect* getConnectData(void); - uint16_t getWaitedPubTopicId(uint16_t msgId); - uint16_t getWaitedSubTopicId(uint16_t msgId); - MQTTGWPacket* getClientSleepPacket(void); - void deleteFirstClientSleepPacket(void); - WaitREGACKPacketList* getWaitREGACKPacketList(void); + Connect* getConnectData(void); + TopicIdMapelement* getWaitedPubTopicId(uint16_t msgId); + TopicIdMapelement* getWaitedSubTopicId(uint16_t msgId); + MQTTGWPacket* getClientSleepPacket(void); + void deleteFirstClientSleepPacket(void); + WaitREGACKPacketList* getWaitREGACKPacketList(void); - void eraseWaitedPubTopicId(uint16_t msgId); - void eraseWaitedSubTopicId(uint16_t msgId); - void clearWaitedPubTopicId(void); - void clearWaitedSubTopicId(void); + void eraseWaitedPubTopicId(uint16_t msgId); + void eraseWaitedSubTopicId(uint16_t msgId); + void clearWaitedPubTopicId(void); + void clearWaitedSubTopicId(void); - int setClientSleepPacket(MQTTGWPacket*); - void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); - void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); + int setClientSleepPacket(MQTTGWPacket*); + void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); + void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); - bool checkTimeover(void); - void updateStatus(MQTTSNPacket*); - void updateStatus(ClientStatus); - void connectSended(void); - void connackSended(int rc); - void disconnected(void); - bool isConnectSendable(void); + bool checkTimeover(void); + void updateStatus(MQTTSNPacket*); + void updateStatus(ClientStatus); + void connectSended(void); + void connackSended(int rc); + void disconnected(void); + bool isConnectSendable(void); - uint16_t getNextPacketId(void); - uint8_t getNextSnMsgId(void); - Topics* getTopics(void); - void setTopics(Topics* topics); - void setKeepAlive(MQTTSNPacket* packet); + uint16_t getNextPacketId(void); + uint8_t getNextSnMsgId(void); + Topics* getTopics(void); + void setTopics(Topics* topics); + void setKeepAlive(MQTTSNPacket* packet); - SensorNetAddress* getSensorNetAddress(void); - Network* getNetwork(void); - void setClientAddress(SensorNetAddress* sensorNetAddr); - void setSensorNetType(bool stable); + SensorNetAddress* getSensorNetAddress(void); + Network* getNetwork(void); + void setClientAddress(SensorNetAddress* sensorNetAddr); + void setSensorNetType(bool stable); - void setClientId(MQTTSNString id); - void setWillTopic(MQTTSNString willTopic); - void setWillMsg(MQTTSNString willmsg); - char* getClientId(void); - char* getWillTopic(void); - char* getWillMsg(void); - const char* getStatus(void); - void setWaitWillMsgFlg(bool); - void setSessionStatus(bool); // true: clean session - bool erasable(void); + void setClientId(MQTTSNString id); + void setWillTopic(MQTTSNString willTopic); + void setWillMsg(MQTTSNString willmsg); + char* getClientId(void); + char* getWillTopic(void); + char* getWillMsg(void); + const char* getStatus(void); + void setWaitWillMsgFlg(bool); + void setSessionStatus(bool); // true: clean session + bool erasable(void); - bool isDisconnect(void); - bool isActive(void); - bool isSleep(void); - bool isAwake(void); - bool isSecureNetwork(void); - bool isSensorNetStable(void); - bool isWaitWillMsg(void); + bool isDisconnect(void); + bool isActive(void); + bool isSleep(void); + bool isAwake(void); + bool isSecureNetwork(void); + bool isSensorNetStable(void); + bool isWaitWillMsg(void); - Client* getNextClient(void); - Client* getOTAClient(void); - void setOTAClient(Client* cl); + Client* getNextClient(void); + Client* getOTAClient(void); + void setOTAClient(Client* cl); private: - PacketQue _clientSleepPacketQue; - WaitREGACKPacketList _waitREGACKList; + PacketQue _clientSleepPacketQue; + WaitREGACKPacketList _waitREGACKList; - Topics* _topics; - TopicIdMap _waitedPubTopicIdMap; - TopicIdMap _waitedSubTopicIdMap; + Topics* _topics; + TopicIdMap _waitedPubTopicIdMap; + TopicIdMap _waitedSubTopicIdMap; - Connect _connectData; - MQTTSNPacket* _connAck; + Connect _connectData; + MQTTSNPacket* _connAck; - char* _clientId; - char* _willTopic; - char* _willMsg; + char* _clientId; + char* _willTopic; + char* _willMsg; - Timer _keepAliveTimer; - uint32_t _keepAliveMsec; + Timer _keepAliveTimer; + uint32_t _keepAliveMsec; - ClientStatus _status; - bool _waitWillMsgFlg; + ClientStatus _status; + bool _waitWillMsgFlg; - uint16_t _packetId; - uint8_t _snMsgId; + uint16_t _packetId; + uint8_t _snMsgId; - Network* _network; // Broker - bool _secureNetwork; // SSL - bool _sensorNetype; // false: unstable network like a G3 - SensorNetAddress _sensorNetAddr; + Network* _network; // Broker + bool _secureNetwork; // SSL + bool _sensorNetype; // false: unstable network like a G3 + SensorNetAddress _sensorNetAddr; - bool _sessionStatus; + bool _sessionStatus; + bool _hasPredefTopic; - Client* _nextClient; - Client* _prevClient; - Client* _otaClient; + Client* _nextClient; + Client* _prevClient; + Client* _otaClient; }; /*===================================== @@ -339,23 +345,24 @@ private: class ClientList { public: - ClientList(); - ~ClientList(); - bool authorize(const char* fileName); - void erase(Client*&); - Client* getClient(SensorNetAddress* addr); - Client* getClient(uint8_t* clientId); - Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, - bool secure); - uint16_t getClientCount(void); - Client* getClient(void); - bool isAuthorized(); + ClientList(); + ~ClientList(); + bool authorize(const char* fileName); + bool setPredefinedTopics(const char* fileName); + void erase(Client*&); + Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, bool secure); + Client* getClient(SensorNetAddress* addr); + Client* getClient(MQTTSNString* clientId); + uint16_t getClientCount(void); + Client* getClient(void); + bool isAuthorized(); private: - Client* _firstClient; - Client* _endClient; - Mutex _mutex; - uint16_t _clientCnt; - bool _authorize; + Client* createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t toipcId); + Client* _firstClient; + Client* _endClient; + Mutex _mutex; + uint16_t _clientCnt; + bool _authorize; }; } diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index 0fcf65a..8da9453 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -115,9 +115,21 @@ void ClientRecvTask::run() continue; } - /* create a client */ - client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); + client = _gateway->getClientList()->getClient(&data.clientID); + + if ( client ) + { + /* set SensorNet Address */ + client->setClientAddress(_sensorNetwork->getSenderAddress()); + } + else + { + /* create a client */ + client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); + } + log(client, packet, &data.clientID); + if (!client) { WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER); @@ -126,7 +138,6 @@ void ClientRecvTask::run() } /* set sensorNetAddress & post Event */ - client->setClientAddress(_sensorNetwork->getSenderAddress()); ev = new Event(); ev->setClientRecvEvent(client, packet); _gateway->getPacketEventQue()->post(ev); diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index 47cca8f..66e2d79 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -116,10 +116,8 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet /* renew the TopicList */ if (topics) { - delete topics; + topics->eraseNormal();; } - topics = new Topics(); - client->setTopics(topics); client->setSessionStatus(true); } diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index a40c8f4..db30862 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -25,6 +25,7 @@ namespace MQTTSNGW #define CONFIG_DIRECTORY "./" #define CONFIG_FILE "gateway.conf" #define CLIENT_LIST "clients.conf" +#define PREDEFINEDTOPIC_FILE "predefinedTopic.conf" /*========================================================== * Gateway default parameters @@ -39,6 +40,7 @@ namespace MQTTSNGW #define MAX_CLIENTID_LENGTH (64) // Max length of clientID #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages #define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state +#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256 #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen) #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index 24b077d..f797ca0 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -40,8 +40,8 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet) int qos; uint8_t retained; uint16_t msgId; - MQTTSN_topicid topicid; uint8_t* payload; + MQTTSN_topicid topicid; int payloadlen; Publish pub; char shortTopic[2]; @@ -68,68 +68,6 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet) Topic* topic = 0; - if ( topicid.type == MQTTSN_TOPIC_TYPE_PREDEFINED) - { - if(msgId) - { - /* Reply PubAck to the client */ - MQTTSNPacket* pubAck = new MQTTSNPacket(); - pubAck->setPUBACK( topicid.data.id, msgId, MQTTSN_RC_ACCEPTED); - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, pubAck); - _gateway->getClientSendQue()->post(ev1); - } - -#ifdef OTA_CLIENTS - if ( topicid.data.id == PREDEFINEDID_OTA_REQ ) - { - uint8_t clientId[MAX_CLIENTID_LENGTH + 1]; - - if ( payloadlen <= MAX_CLIENTID_LENGTH ) - { - memcpy(clientId, payload, payloadlen); - clientId[payloadlen] = 0; - Client* cl = _gateway->getClientList()->getClient(clientId); - - if ( cl ) - { - WRITELOG("\033[0m\033[0;33m OTA Client : %s\033[0m\033[0;37m\n",cl->getClientId()); - MQTTSNPacket* pubota = new MQTTSNPacket(); - pubota->setPUBLISH(0, 0, 0, 0, topicid, 0, 0); - cl->setOTAClient(client); - Event* evt = new Event(); - evt->setClientSendEvent(cl, pubota); - _gateway->getClientSendQue()->post(evt); - } - else - { - MQTTSNPacket* publish = new MQTTSNPacket(); - topicid.data.id = PREDEFINEDID_OTA_NO_CLIENT; - publish->setPUBLISH(0, 0, 0, 0, topicid, clientId, (uint16_t)strlen((const char*)clientId)); - Event* evt = new Event(); - evt->setClientSendEvent(client, publish); - _gateway->getClientSendQue()->post(evt); - } - } - } - else if ( topicid.data.id == PREDEFINEDID_OTA_READY ) - { - Client* cl = client->getOTAClient(); - if ( cl ) - { - WRITELOG("\033[0m\033[0;33m OTA Manager : %s\033[0m\033[0;37m\n",cl->getClientId()); - MQTTSNPacket* pubota = new MQTTSNPacket(); - pubota->setPUBLISH(0, 0, 0, 0, topicid, payload, payloadlen); - client->setOTAClient(0); - Event* evt = new Event(); - evt->setClientSendEvent(cl, pubota); - _gateway->getClientSendQue()->post(evt); - } - } -#endif - return; - } - if( topicid.type == MQTTSN_TOPIC_TYPE_SHORT ) { shortTopic[0] = topicid.data.short_name[0]; @@ -137,10 +75,10 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet) pub.topic = shortTopic; pub.topiclen = 2; } - - if ( topicid.type == MQTTSN_TOPIC_TYPE_NORMAL ) + else { - topic = client->getTopics()->getTopic(topicid.data.id); + topic = client->getTopics()->getTopicById(&topicid); + if( !topic && msgId && qos > 0 ) { /* Reply PubAck with INVALID_TOPIC_ID to the client */ diff --git a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp index 5d4aab8..fd192a5 100644 --- a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp @@ -41,160 +41,122 @@ void MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packe uint16_t msgId; MQTTSN_topicid topicFilter; Topic* topic = 0; + uint16_t topicId = 0; + MQTTGWPacket* subscribe; + Event* ev1; + Event* evsuback; if ( packet->getSUBSCRIBE(&dup, &qos, &msgId, &topicFilter) == 0 ) { return; } - if (topicFilter.type <= MQTTSN_TOPIC_TYPE_SHORT) + if ( msgId == 0 ) { - if (topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED) - { - /*----- Predefined TopicId ------*/ - MQTTSNPacket* sSuback = new MQTTSNPacket(); - - if (msgId) - { - int rc = MQTTSN_RC_ACCEPTED; - - switch (topicFilter.data.id) - { - case PREDEFINEDID_OTA_REQ: // check topicIds are defined. - case PREDEFINEDID_OTA_READY: - case PREDEFINEDID_OTA_NO_CLIENT: - break; - default: - rc = MQTTSN_RC_REJECTED_INVALID_TOPIC_ID; - } - sSuback->setSUBACK(qos, topicFilter.data.id, msgId, rc); - Event* evsuback = new Event(); - evsuback->setClientSendEvent(client, sSuback); - _gateway->getClientSendQue()->post(evsuback); - } - switch (topicFilter.data.id) - { - case 1: - /* - * ToDo: write here Predefined Topic 01 Procedures. - */ - break; - case 2: - /* - * ToDo: write here Predefined Topic 02 Procedures. so on - */ - break; - default: - break; - } - } - else - { - uint16_t topicId = 0; - MQTTGWPacket* subscribe = new MQTTGWPacket(); - topic = client->getTopics()->getTopic(&topicFilter); - if (topic == 0) - { - if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL) - { - topic = client->getTopics()->add(&topicFilter); - topicId = topic->getTopicId(); - subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); - } - else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT) - { - char topic[3]; - topic[0] = topicFilter.data.short_name[0]; - topic[1] = topicFilter.data.short_name[1]; - topic[2] = 0; - topicId = topicFilter.data.id; - subscribe->setSUBSCRIBE(topic, (uint8_t)qos, (uint16_t)msgId); - } - } - else - { - topicId = topic->getTopicId(); - subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); - } - - if ( msgId > 0 ) - { - client->setWaitedSubTopicId(msgId, topicId, topicFilter.type); - } - - Event* ev1 = new Event(); - ev1->setBrokerSendEvent(client, subscribe); - _gateway->getBrokerSendQue()->post(ev1); - return; - } - } - else - { - /*-- Invalid TopicIdType --*/ - if (msgId) - { - MQTTSNPacket* sSuback = new MQTTSNPacket(); - sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_REJECTED_INVALID_TOPIC_ID); - Event* evsuback = new Event(); - evsuback->setClientSendEvent(client, sSuback); - _gateway->getClientSendQue()->post(evsuback); - } + return; } + + if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED ) + { + topic = client->getTopics()->getTopicById(&topicFilter); + if ( topic ) + { + topicId = topic->getTopicId(); + subscribe = new MQTTGWPacket(); + subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); + } + else + { + goto RespExit; + } + } + else if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL) + { + topic = client->getTopics()->getTopicByName(&topicFilter); + if ( topic == 0 ) + { + topic = client->getTopics()->add(&topicFilter); + if ( topic == 0 ) + { + WRITELOG("%s Client(%s) can't add the Topic.%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); + return; + } + } + topicId = topic->getTopicId(); + subscribe = new MQTTGWPacket(); + + subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); + } + else //MQTTSN_TOPIC_TYPE_SHORT + { + char topicstr[3]; + topicstr[0] = topicFilter.data.short_name[0]; + topicstr[1] = topicFilter.data.short_name[1]; + topicstr[2] = 0; + topicId = 0; + subscribe = new MQTTGWPacket(); + subscribe->setSUBSCRIBE(topicstr, (uint8_t)qos, (uint16_t)msgId); + } + + client->setWaitedSubTopicId(msgId, topicId, topicFilter.type); + + ev1 = new Event(); + ev1->setBrokerSendEvent(client, subscribe); + _gateway->getBrokerSendQue()->post(ev1); + return; + + +RespExit: + MQTTSNPacket* sSuback = new MQTTSNPacket(); + sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_NOT_SUPPORTED); + evsuback = new Event(); + evsuback->setClientSendEvent(client, sSuback); + _gateway->getClientSendQue()->post(evsuback); } void MQTTSNSubscribeHandler::handleUnsubscribe(Client* client, MQTTSNPacket* packet) { uint16_t msgId; MQTTSN_topicid topicFilter; + MQTTGWPacket* unsubscribe = 0;; if ( packet->getUNSUBSCRIBE(&msgId, &topicFilter) == 0 ) { return; } - if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED ) - { - /* - * ToDo: procedures for Predefined Topic - */ - return; - } + if ( msgId == 0 ) + { + return; + } - Topic* topic = client->getTopics()->getTopic(&topicFilter); - MQTTGWPacket* unsubscribe = new MQTTGWPacket(); + Topic* topic = client->getTopics()->getTopicById(&topicFilter); - if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL) + if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT) { - if ( topic == 0 ) - { - if (msgId) - { - MQTTSNPacket* sUnsuback = new MQTTSNPacket(); - sUnsuback->setUNSUBACK(msgId); - Event* evsuback = new Event(); - evsuback->setClientSendEvent(client, sUnsuback); - _gateway->getClientSendQue()->post(evsuback); - } - delete unsubscribe; - return; - } - else - { - unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId); - } - } - else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT) - { - char shortTopic[3]; - shortTopic[0] = topicFilter.data.short_name[0]; - shortTopic[1] = topicFilter.data.short_name[1]; - shortTopic[2] = 0; - unsubscribe->setUNSUBSCRIBE(shortTopic, msgId); + char shortTopic[3]; + shortTopic[0] = topicFilter.data.short_name[0]; + shortTopic[1] = topicFilter.data.short_name[1]; + shortTopic[2] = 0; + unsubscribe = new MQTTGWPacket(); + unsubscribe->setUNSUBSCRIBE(shortTopic, msgId); } else { - delete unsubscribe; - return; + if ( topic == 0 ) + { + MQTTSNPacket* sUnsuback = new MQTTSNPacket(); + sUnsuback->setUNSUBACK(msgId); + Event* evsuback = new Event(); + evsuback->setClientSendEvent(client, sUnsuback); + _gateway->getClientSendQue()->post(evsuback); + return; + } + else + { + unsubscribe = new MQTTGWPacket(); + unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId); + } } Event* ev1 = new Event(); diff --git a/MQTTSNGateway/src/MQTTSNGWVersion.h b/MQTTSNGateway/src/MQTTSNGWVersion.h index 6506300..90f4eab 100644 --- a/MQTTSNGateway/src/MQTTSNGWVersion.h +++ b/MQTTSNGateway/src/MQTTSNGWVersion.h @@ -17,6 +17,6 @@ #ifndef MQTTSNGWVERSION_H_IN_ #define MQTTSNGWVERSION_H_IN_ -#define PAHO_GATEWAY_VERSION "1.0.1" +#define PAHO_GATEWAY_VERSION "1.1.0" #endif /* MQTTSNGWVERSION_H_IN_ */ diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index a67545e..23f7e78 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -13,7 +13,7 @@ * Contributors: * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation **************************************************************************************/ - +#include "MQTTSNGWDefines.h" #include "MQTTSNGateway.h" #include "SensorNetwork.h" #include "MQTTSNGWProcess.h" @@ -202,13 +202,40 @@ void Gateway::initialize(int argc, char** argv) if (!_clientList.authorize(fileName.c_str())) { - throw Exception("Gateway::initialize: No client list defined by configuration."); + throw Exception("Gateway::initialize: No client list defined by the configuration."); } _params.clientListName = strdup(fileName.c_str()); } } - fileName = *getConfigDirName() + *getConfigFileName(); - _params.configName = strdup(fileName.c_str()); + + + + if (getParam("PredefinedTopic", param) == 0 ) + { + if (!strcasecmp(param, "YES") ) + { + if (getParam("PredefinedTopicFile", param) == 0) + { + fileName =*getConfigDirName() + string(param); + } + else + { + fileName = *getConfigDirName() + string(PREDEFINEDTOPIC_FILE); + } + if (!_clientList.setPredefinedTopics(fileName.c_str())) + { + throw Exception("Gateway::initialize: No PredefinedTopic file defined by the configuration.."); + } + _params.predefinedTopicFileName = strdup(fileName.c_str()); + } + else + { + _params.predefinedTopicFileName = 0; + } + } + + fileName = *getConfigDirName() + *getConfigFileName(); + _params.configName = strdup(fileName.c_str()); } void Gateway::run(void) @@ -229,6 +256,10 @@ void Gateway::run(void) } WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription()); WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure); + if ( _params.predefinedTopicFileName ) + { + WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName); + } WRITELOG(" RootCApath: %s\n", _params.rootCApath); WRITELOG(" RootCAfile: %s\n", _params.rootCAfile); WRITELOG(" CertKey: %s\n", _params.certKey); diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index ec53825..3b8ab53 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -72,10 +72,10 @@ namespace MQTTSNGW /*===================================== Predefined TopicId for OTA ====================================*/ -#define OTA_CLIENTS -#define PREDEFINEDID_OTA_REQ (0x0ff0) -#define PREDEFINEDID_OTA_READY (0x0ff1) -#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2) +//#define OTA_CLIENTS +//#define PREDEFINEDID_OTA_REQ (0x0ff0) +//#define PREDEFINEDID_OTA_READY (0x0ff1) +//#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2) /*===================================== Class Event @@ -160,6 +160,7 @@ typedef struct char* rootCAfile; char* certKey; char* privateKey; + char* predefinedTopicFileName; }GatewayParams; /*===================================== diff --git a/MQTTSNGateway/src/tests/TestProcess.cpp b/MQTTSNGateway/src/tests/TestProcess.cpp index 5d338cc..6ba7283 100644 --- a/MQTTSNGateway/src/tests/TestProcess.cpp +++ b/MQTTSNGateway/src/tests/TestProcess.cpp @@ -87,27 +87,29 @@ void TestProcess::run(void) printf("Timer Test completed\n\n"); /* Test Que */ + printf("Test Que "); TestQue* tque = new TestQue(); tque->test(); delete tque; /* Test Tree23 */ + printf("Test Tree23 "); TestTree23* tree23 = new TestTree23(); tree23->test(); delete tree23; /* Test TopicTable */ + printf("Test Topic "); TestTopics* testTopic = new TestTopics(); testTopic->test(); delete testTopic; /* Test TopicIdMap */ + printf("Test TopicIdMap "); TestTopicIdMap* testMap = new TestTopicIdMap(); testMap->test(); delete testMap; - - /* Test EventQue */ printf("Test EventQue "); Client* client = new Client(); diff --git a/MQTTSNGateway/src/tests/TestQue.cpp b/MQTTSNGateway/src/tests/TestQue.cpp index ce9b210..92d9f83 100644 --- a/MQTTSNGateway/src/tests/TestQue.cpp +++ b/MQTTSNGateway/src/tests/TestQue.cpp @@ -35,7 +35,6 @@ void TestQue::test(void) int* v = 0; int i = 0; - printf("Test Que "); for ( i = 0; i < 10; i++ ) { v = new int(i); diff --git a/MQTTSNGateway/src/tests/TestTopicIdMap.cpp b/MQTTSNGateway/src/tests/TestTopicIdMap.cpp index 58a6ade..dea9b47 100644 --- a/MQTTSNGateway/src/tests/TestTopicIdMap.cpp +++ b/MQTTSNGateway/src/tests/TestTopicIdMap.cpp @@ -31,95 +31,163 @@ TestTopicIdMap::~TestTopicIdMap() delete _map; } + +bool TestTopicIdMap::testGetElement(uint16_t msgid, uint16_t id, MQTTSN_topicTypes type) +{ + TopicIdMapelement* elm = _map->getElement((uint16_t)msgid ); + if ( elm ) + { + //printf("msgid=%d id=%d type=%d\n", msgid, elm->getTopicId(), elm->getTopicType()); + return elm->getTopicId() == id && elm->getTopicType() == type; + } + //printf("msgid=%d\n", msgid); + return false; +} + #define MAXID 30 void TestTopicIdMap::test(void) { uint16_t id[MAXID]; - printf("Test TopicIdMat "); for ( int i = 0; i < MAXID; i++ ) { id[i] = i + 1; - } - - for ( int i = 0; i < MAXID; i++ ) - { _map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL); } - for ( int i = 0; i < MAXID; i++ ) + for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) { - MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT; - uint16_t topicId = _map->getTopicId((uint16_t)i, &type); - //printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type); - assert((i <= MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == i) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0)); + assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); } - //printf("\n"); + for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); + } + + for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } for ( int i = 0; i < 5; i++ ) { - _map->erase(i); + _map->erase(id[i]); } - for ( int i = 0; i < MAXID; i++ ) - { - MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT; - uint16_t topicId = _map->getTopicId((uint16_t)i, &type); - //printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type); - assert((i < 5 && topicId == 0) || (i >= 5 && topicId != 0) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0) ); - } - - - _map->clear(); - //printf("\n"); - - for ( int i = 0; i < MAXID; i++ ) - { - MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT; - uint16_t topicId = _map->getTopicId((uint16_t)i, &type); - //printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type); - assert( topicId == 0 ); - } - - for ( int i = 0; i < MAXID; i++ ) - { - _map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT); - } - - for ( int i = 0; i < MAXID; i++ ) - { - MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL; - uint16_t topicId = _map->getTopicId((uint16_t)i, &type); - //printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type); - assert((i <= MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == i) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0)); - } - - //printf("\n"); - for ( int i = 0; i < 5; i++ ) { - _map->erase(i); - } - for ( int i = 0; i < MAXID; i++ ) - { - MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL; - uint16_t topicId = _map->getTopicId((uint16_t)i, &type); - //printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type); - assert((i < 5 && topicId == 0) || (i >= 5 && topicId != 0) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0) ); + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); } + for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); + } + + for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); + } _map->clear(); - //printf("\n"); - for ( int i = 0; i < MAXID; i++ ) - { - MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL; - uint16_t topicId = _map->getTopicId((uint16_t)i, &type); - //printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type); - assert( topicId == 0 ); - } + for ( int i = 0; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); + } + + for ( int i = 0; i < MAXID; i++ ) + { + _map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT); + } + + for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL)); + } + + for ( int i = 0; i < 5; i++ ) + { + _map->erase(id[i]); + } + for ( int i = 0; i < 5; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + _map->clear(); + + for ( int i = 0; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + for ( int i = 0; i < MAXID; i++ ) + { + _map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED); + } + + for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } + + for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } + + for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT)); + } + + for ( int i = 0; i < 5; i++ ) + { + _map->erase(id[i]); + } + for ( int i = 0; i < 5; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } + + for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ ) + { + assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } + + for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } + + _map->clear(); + + for ( int i = 0; i < MAXID; i++ ) + { + assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED)); + } printf("[ OK ]\n"); } diff --git a/MQTTSNGateway/src/tests/TestTopicIdMap.h b/MQTTSNGateway/src/tests/TestTopicIdMap.h index e70909a..3edceb1 100644 --- a/MQTTSNGateway/src/tests/TestTopicIdMap.h +++ b/MQTTSNGateway/src/tests/TestTopicIdMap.h @@ -24,6 +24,7 @@ public: TestTopicIdMap(); ~TestTopicIdMap(); void test(void); + bool testGetElement(uint16_t msgid, uint16_t id, MQTTSN_topicTypes type); private: TopicIdMap* _map; diff --git a/MQTTSNGateway/src/tests/TestTopics.cpp b/MQTTSNGateway/src/tests/TestTopics.cpp index 12f228f..404013e 100644 --- a/MQTTSNGateway/src/tests/TestTopics.cpp +++ b/MQTTSNGateway/src/tests/TestTopics.cpp @@ -38,7 +38,7 @@ bool testIsMatch(const char* topicFilter, const char* topicName) string* filter = new string(topicFilter); string* name = new string(topicName); - Topic topic(filter); + Topic topic(filter, MQTTSN_TOPIC_TYPE_NORMAL); bool isMatch = topic.isMatch(name); delete name; @@ -46,38 +46,82 @@ bool testIsMatch(const char* topicFilter, const char* topicName) return isMatch; } -bool testGetTopic(const char* topicName, const char* searchedTopicName) +bool testGetTopicByName(const char* topicName, const char* searchedTopicName) { Topics topics; - string name(topicName); - MQTTSN_topicid topicid; + MQTTSN_topicid topicid, serchId; topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; - topicid.data.long_.len = strlen(searchedTopicName); - topicid.data.long_.name = const_cast(searchedTopicName); + topicid.data.long_.len = strlen(topicName); + topicid.data.long_.name = const_cast(topicName); - topics.add(&name); + topics.add(&topicid); - return topics.getTopic(&topicid) != 0; + serchId.type = MQTTSN_TOPIC_TYPE_NORMAL; + serchId.data.long_.len = strlen(searchedTopicName); + serchId.data.long_.name = const_cast(searchedTopicName); + + return topics.getTopicByName(&serchId) != 0; } -bool testGetTopicId(const char* topicName, const char* searchedTopicName) +bool testGetTopicById(const char* topicName, const char* searchedTopicName) { Topics topics; - string name(topicName); - MQTTSN_topicid topicid; + MQTTSN_topicid topicid, stopicid; topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; - topicid.data.long_.len = strlen(searchedTopicName); - topicid.data.long_.name = const_cast(searchedTopicName); + topicid.data.long_.len = strlen(topicName); + topicid.data.long_.name = const_cast(topicName); + stopicid.type = MQTTSN_TOPIC_TYPE_NORMAL; + stopicid.data.long_.len = strlen(searchedTopicName); + stopicid.data.long_.name = const_cast(searchedTopicName); - topics.add(&name); + Topic* tp = topics.add(&topicid); + Topic*stp = topics.add(&stopicid); + topicid.data.id = tp->getTopicId(); + stopicid.data.id = stp->getTopicId(); - return topics.getTopicId(&topicid) != 0; + stp = topics.getTopicById(&stopicid); + + return stp->getTopicId() == tp->getTopicId(); +} + +bool testGetPredefinedTopicByName(const char* topicName, const uint16_t id, const char* searchedTopicName) +{ + Topics topics; + MQTTSN_topicid topicid; + + topics.add(topicName, id); + + topicid.type = MQTTSN_TOPIC_TYPE_PREDEFINED; + topicid.data.long_.len = strlen(searchedTopicName); + topicid.data.long_.name = const_cast(searchedTopicName); + + return topics.getTopicByName(&topicid) != 0; +} + +bool testGetPredefinedTopicById(const char* topicName, const uint16_t id, uint16_t sid) +{ + Topics topics; + MQTTSN_topicid topicid; + + Topic* t = topics.add(topicName, id); + + topicid.type = MQTTSN_TOPIC_TYPE_PREDEFINED; + topicid.data.id = sid; + + Topic* tp = topics.getTopicById(&topicid); + + if ( tp ) + { + return tp->getTopicId() == id && strcmp(t->getTopicName()->c_str(), topicName) == 0; + } + else + { + return false; + } } void TestTopics::test(void) { - printf("Test Topics "); - const int TOPIC_COUNT = 13; MQTTSN_topicid topic[TOPIC_COUNT]; @@ -116,6 +160,18 @@ void TestTopics::test(void) topic[12].data.long_.len = strlen(tp[12]); topic[12].data.long_.name = tp[12]; + + /* Test EraseNorma() */ + for ( int i = 0; i < TOPIC_COUNT; i++ ) + { + MQTTSN_topicid pos = topic[i]; + Topic* t = _topics->add(&pos); + //printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId()); + assert(t !=0); + } + _topics->eraseNormal(); + assert(_topics->getCount() == 0); + /* Add Topic to Topics */ for ( int i = 0; i < TOPIC_COUNT; i++ ) { @@ -129,26 +185,30 @@ void TestTopics::test(void) { string str = "Test/"; str += 0x30 + i; - Topic* t = _topics->add(&str); + Topic* t = _topics->add(str.c_str()); //printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId()); assert(t !=0); } - /* Get Topic by MQTTSN_topicid */ + /* Get Topic by MQTTSN_topicid by Name*/ for ( int i = 0; i < TOPIC_COUNT; i++ ) { - Topic* t = _topics->getTopic(&topic[i]); - //printf("Topic=%s ID=%d ID=%d\n", t->getTopicName()->c_str(), t->getTopicId(),_topics->getTopicId(&topic[i])); - assert(t->getTopicId() == i + 1); + Topic* t = _topics->getTopicByName(&topic[i]); + //printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId()); + assert(strcmp(t->getTopicName()->c_str(), topic[i].data.long_.name) == 0 ); } - /* Get TopicId by MQTTSN_topicid */ - for ( int i = 0; i < TOPIC_COUNT; i++ ) - { - uint16_t id = _topics->getTopicId(&topic[i]); - //printf("ID=%d \n", id); - assert(id == i + 1); - } + /* Get Topic by MQTTSN_topicid by ID*/ + for ( int i = 0; i < TOPIC_COUNT; i++ ) + { + Topic* t = _topics->getTopicByName(&topic[i]); + MQTTSN_topicid stpid; + stpid.type = MQTTSN_TOPIC_TYPE_NORMAL; + stpid.data.id =t->getTopicId(); + Topic* st = _topics->getTopicById(&stpid); + //printf("Topic=%s ID=%d ID=%d\n", t->getTopicName()->c_str(), t->getTopicId(), st->getTopicId()); + assert(t->getTopicId() == st->getTopicId() ); + } /* Test Wildcard */ for ( int i = 0; i < 10 ; i++ ) @@ -286,13 +346,20 @@ void TestTopics::test(void) assert(testIsMatch("/+", "/finance")); assert(!testIsMatch("+", "/finance")); - assert(testGetTopicId("mytopic", "mytopic")); - assert(!testGetTopicId("mytopic", "mytop")); - assert(!testGetTopicId("mytopic", "mytopiclong")); + assert(testGetTopicById("mytopic", "mytopic")); + assert(!testGetTopicById("mytopic", "mytop")); + assert(!testGetTopicById("mytopic", "mytopiclong")); - assert(testGetTopic("mytopic", "mytopic")); - assert(!testGetTopic("mytopic", "mytop")); - assert(!testGetTopic("mytopic", "mytopiclong")); + assert(testGetTopicByName("mytopic", "mytopic")); + assert(!testGetTopicByName("mytopic", "mytop")); + assert(!testGetTopicByName("mytopic", "mytopiclong")); + + assert(testGetPredefinedTopicByName("mypretopic", 1, "mypretopic")); + assert(!testGetPredefinedTopicByName("mypretopic", 1, "mypretop")); + assert(!testGetPredefinedTopicByName("mypretopic", 1, "mypretopiclong")); + + assert(testGetPredefinedTopicById("mypretopic2", 2, 2)); + assert(!testGetPredefinedTopicById("mypretopic2", 2, 1)); printf("[ OK ]\n"); } diff --git a/MQTTSNGateway/src/tests/TestTree23.cpp b/MQTTSNGateway/src/tests/TestTree23.cpp index d455eec..a4245fe 100644 --- a/MQTTSNGateway/src/tests/TestTree23.cpp +++ b/MQTTSNGateway/src/tests/TestTree23.cpp @@ -33,7 +33,6 @@ TestTree23::~TestTree23() void TestTree23::test(void) { - printf("Test Tree23 "); int N = 100; Key* r1[100]; diff --git a/MQTTSNPacket/src/MQTTSNPacket.c b/MQTTSNPacket/src/MQTTSNPacket.c index e6e8d8f..092eae0 100644 --- a/MQTTSNPacket/src/MQTTSNPacket.c +++ b/MQTTSNPacket/src/MQTTSNPacket.c @@ -213,8 +213,10 @@ int readMQTTSNString(MQTTSNString* MQTTSNString, unsigned char** pptr, unsigned *pptr += MQTTSNString->lenstring.len; } else + { MQTTSNString->lenstring.data = NULL; - MQTTSNString->cstring = NULL; + MQTTSNString->cstring = NULL; + } rc = 1; FUNC_EXIT_RC(rc); return rc;