Update: Add Pre-defined-Topic

Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2018-07-01 18:18:38 +09:00
parent 9ae5c72125
commit bb993aed5b
37 changed files with 1090 additions and 749 deletions

View File

@@ -16,16 +16,17 @@
*
* Supported functions.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
* void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE ( const char* topicName );
*
* void UNSUBSCRIBE ( uint16_t topicId );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*

View File

@@ -16,16 +16,17 @@
*
* Supported functions.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
* void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE ( const char* topicName );
*
* void UNSUBSCRIBE ( uint16_t topicId );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
@@ -112,11 +113,13 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen)
* A Link list of Callback routines and Topics
*------------------------------------------------------*/
SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
//SUB(topic1, on_Topic01, 1),
//SUB(topic4, on_Topic03, 1),
END_OF_SUBSCRIBE_LIST
};
SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx),
// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1),
// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1),
END_OF_SUBSCRIBE_LIST
};
/*------------------------------------------------------

View File

@@ -16,16 +16,17 @@
*
* Supported functions.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
* void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE ( const char* topicName );
*
* void UNSUBSCRIBE ( uint16_t topicId );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
@@ -76,6 +77,8 @@ MQTTSNCONF = {
const char* topic1 = "ty4tw/topic1";
const char* topic2 = "ty4tw/topic2";
const char* topic3 = "ty4tw/topic3";
const char* topic4 = "ty4tw/topic4";
const char* topic5 = "ty4tw/topic5";
/*------------------------------------------------------
@@ -100,7 +103,7 @@ int on_Topic02(uint8_t* pload, uint16_t ploadlen)
int on_Topic03(uint8_t* pload, uint16_t ploadlen)
{
DISPLAY("\n\nNew callback recv Topic2\n");
DISPLAY("\n\nNew callback recv Topic3\n");
pload[ploadlen-1]= 0; // set null terminator
DISPLAY("Payload -->%s <--\n\n",pload);
return 0;
@@ -110,8 +113,9 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen)
* A Link list of Callback routines and Topics
*------------------------------------------------------*/
SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
SUB(topic1, on_Topic01, 1),
SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx),
SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1),
SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1),
END_OF_SUBSCRIBE_LIST
};
@@ -119,29 +123,32 @@ SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
/*------------------------------------------------------
* Test functions
*------------------------------------------------------*/
void subscribePredefTopic1(void)
{
SUBSCRIBE(1, on_Topic03, QoS1);
}
void publishTopic1(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/Topic1\" \n");
uint8_t qos = 0;
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), qos);
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), QoS0);
}
void subscribeTopic2(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_Topic02, qos);
SUBSCRIBE(10, on_Topic02, QoS1);
}
void publishTopic2(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/topic2\" \n");
uint8_t qos = 0;
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos);
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), QoS1);
}
void unsubscribe(void)
{
UNSUBSCRIBE(topic2);
@@ -149,8 +156,7 @@ void unsubscribe(void)
void subscribechangeCallback(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_Topic03, qos);
SUBSCRIBE(topic2, on_Topic02, QoS1);
}
void test3(void)
@@ -178,6 +184,7 @@ void asleep(void)
*------------------------------------------------------*/
TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step0:Subscribe predef topic1", subscribePredefTopic1),
TEST("Step1:Publish topic1", publishTopic1),
TEST("Step2:Publish topic2", publishTopic2),
TEST("Step3:Subscribe topic2", subscribeTopic2),

View File

@@ -183,9 +183,9 @@ void LMqttsnClient::subscribe(const char* topicName, TopicCallback onPublish, ui
_subMgr.subscribe(topicName, onPublish, qos);
}
void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType)
void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos)
{
_subMgr.subscribe(topicId, onPublish, qos, topicType);
_subMgr.subscribe(topicId, onPublish, qos);
}
void LMqttsnClient::unsubscribe(const char* topicName)
@@ -193,6 +193,11 @@ void LMqttsnClient::unsubscribe(const char* topicName)
_subMgr.unsubscribe(topicName);
}
void LMqttsnClient::unsubscribe(const uint16_t topicId)
{
_subMgr.unsubscribe(topicId);
}
void LMqttsnClient::disconnect(uint16_t sleepInSecs)
{
_gwProxy.disconnect(sleepInSecs);

View File

@@ -34,7 +34,9 @@ namespace linuxAsyncClient {
struct OnPublishList
{
MQTTSN_topicTypes type;
const char* topic;
uint16_t id;
int (*pubCallback)(uint8_t* payload, uint16_t payloadlen);
uint8_t qos;
};
@@ -52,8 +54,9 @@ public:
void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos);
void unsubscribe(const char* topicName);
void unsubscribe(const uint16_t topicId);
void disconnect(uint16_t sleepInSecs);
void initialize(LUdpConfig netconf, LMqttsnConfig mqconf);
void run(void);

View File

@@ -73,6 +73,15 @@ struct LUdpConfig{
uint16_t uPortNo;
};
typedef enum
{
MQTTSN_TOPIC_TYPE_NORMAL,
MQTTSN_TOPIC_TYPE_PREDEFINED,
MQTTSN_TOPIC_TYPE_SHORT
} MQTTSN_topicTypes;
/*======================================
MACROs for Application
=======================================*/
@@ -93,7 +102,7 @@ struct LUdpConfig{
#define END_OF_TEST_LIST {0, 0, 0}
#define SUBSCRIBE_LIST OnPublishList theOnPublishList[]
#define SUB(...) {__VA_ARGS__}
#define END_OF_SUBSCRIBE_LIST {0,0,0}
#define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0}
#define UDPCONF LUdpConfig theNetcon
#define MQTTSNCONF LMqttsnConfig theMqcon
#ifdef CLIENT_MODE
@@ -129,6 +138,9 @@ struct LUdpConfig{
/*======================================
MQTT-SN Defines
========================================*/
#define QoS0 0
#define QoS1 1
#define QoS2 2
#define MQTTSN_TYPE_ADVERTISE 0x00
#define MQTTSN_TYPE_SEARCHGW 0x01
#define MQTTSN_TYPE_GWINFO 0x02
@@ -157,10 +169,7 @@ struct LUdpConfig{
#define MQTTSN_TYPE_WILLMSGUPD 0x1C
#define MQTTSN_TYPE_WILLMSGRESP 0x1D
#define MQTTSN_TOPIC_TYPE_NORMAL 0x00
#define MQTTSN_TOPIC_TYPE_PREDEFINED 0x01
#define MQTTSN_TOPIC_TYPE_SHORT 0x02
#define MQTTSN_TOPIC_TYPE 0x03
#define MQTTSN_TOPIC_TYPE 0x03
#define MQTTSN_FLAG_DUP 0x80
#define MQTTSN_FLAG_QOS_0 0x0
@@ -179,14 +188,10 @@ struct LUdpConfig{
#define MQTTSN_RC_REJECTED_INVALID_TOPIC_ID 0x02
#define MQTTSN_RC_REJECTED_NOT_SUPPORTED 0x03
#define PREDEFINEDID_OTA_REQ (0x0ff0)
#define PREDEFINEDID_OTA_READY (0x0ff1)
#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
/*=================================
* Starting prompt
==================================*/
#define TESTER_VERSION " * Version: 1.0.0"
#define TESTER_VERSION " * Version: 2.0.0"
#define PAHO_COPYRIGHT0 " * MQTT-SN Gateway Tester"
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"

View File

@@ -66,25 +66,22 @@ void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t q
publish(topicName, payload->getRowData(), payload->getLen(), qos, retain);
}
void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL;
if ( strlen(topicName) < 2 )
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
publish(topicName, payload, len, qos, topicType, retain);
}
void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain)
{
uint16_t msgId = 0;
uint8_t topicType = MQTTSN_TOPIC_TYPE_SHORT;
if ( strlen(topicName) > 2 )
{
topicType = MQTTSN_TOPIC_TYPE_NORMAL;
}
if ( qos > 0 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}
PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType);
if (elm->status == TOPICID_IS_READY)
{
sendPublish(elm);
@@ -286,7 +283,7 @@ void LPublishManager::published(uint8_t* msg, uint16_t msglen)
}
_publishedFlg = NEG_TASK_INDEX;
theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, msg[1] & 0x03);
theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, (MQTTSN_topicTypes)(msg[1] & MQTTSN_TOPIC_TYPE));
_publishedFlg = SAVE_TASK_INDEX;
}

View File

@@ -62,7 +62,6 @@ public:
~LPublishManager();
void publish(const char* topicName, Payload* payload, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain = false);
void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void responce(const uint8_t* msg, uint16_t msglen);

View File

@@ -190,13 +190,13 @@ void LRegisterManager::registerTopic(char* topicName)
void LRegisterManager::responceRegAck(uint16_t msgId, uint16_t topicId)
{
const char* topicName = getTopic(msgId);
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
if (topicName)
{
uint8_t topicType = strlen((char*) topicName) > 2 ? MQTTSN_TOPIC_TYPE_NORMAL : MQTTSN_TOPIC_TYPE_SHORT;
theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, topicType); // Add Topic to TopicTable
theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, type); // Add Topic to TopicTable
RegQueElement* elm = getElement(msgId);
remove(elm);
theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, topicType);
theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, type);
}
}
@@ -213,7 +213,7 @@ void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen)
{
TopicCallback callback = tp->getCallback();
void* topicName = calloc(strlen((char*) msg + 5) + 1, sizeof(char));
theClient->getGwProxy()->getTopicTable()->add((char*) topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, callback, 1);
theClient->getGwProxy()->getTopicTable()->add((char*) topicName, MQTTSN_TOPIC_TYPE_NORMAL, 0, callback, 1);
regack[6] = MQTTSN_RC_ACCEPTED;
}
else

View File

@@ -65,7 +65,14 @@ void LSubscribeManager::onConnect(void)
{
for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++)
{
subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos);
if ( theOnPublishList[i].type == MQTTSN_TOPIC_TYPE_PREDEFINED)
{
subscribe(theOnPublishList[i].id, theOnPublishList[i].pubCallback, theOnPublishList[i].qos);
}
else
{
subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos);
}
}
}
else
@@ -149,30 +156,43 @@ void LSubscribeManager::send(SubElement* elm)
void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos)
{
uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL;
if ( strlen(topicName) <= 2)
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, 0, topicType, qos, onPublish);
send(elm);
MQTTSN_topicTypes topicType;
if ( strlen(topicName) > 2 )
{
topicType = MQTTSN_TOPIC_TYPE_NORMAL;
}
else
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, topicType, 0, qos, onPublish);
send(elm);
}
void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType)
void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos)
{
SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, topicId, topicType, qos, onPublish);
SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, qos, onPublish);
send(elm);
}
void LSubscribeManager::unsubscribe(const char* topicName)
{
SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, 0, 0);
MQTTSN_topicTypes topicType;
if ( strlen(topicName) > 2 )
{
topicType = MQTTSN_TOPIC_TYPE_NORMAL;
}
else
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, topicType, 0, 0, 0);
send(elm);
}
void LSubscribeManager::unsubscribe(uint16_t topicId, uint8_t topicType)
void LSubscribeManager::unsubscribe( uint16_t topicId)
{
SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, topicId, topicType, 0, 0);
SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, MQTTSN_TOPIC_TYPE_PREDEFINED, topicId, 0, 0);
send(elm);
}
@@ -214,20 +234,19 @@ void LSubscribeManager::responce(const uint8_t* msg)
uint16_t msgId = getUint16(msg + 4);
uint8_t rc = msg[6];
LTopicTable* tt = theClient->getGwProxy()->getTopicTable();
SubElement* elm = getElement(msgId);
if (elm)
{
if ( rc == MQTTSN_RC_ACCEPTED )
{
tt->add((char*) elm->topicName, topicId, elm->topicType, elm->callback);
theClient->getGwProxy()->getTopicTable()->add((char*) elm->topicName, elm->topicType, topicId, elm->callback);
getElement(msgId)->done = SUB_DONE;
DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, topicId);
}
else
{
remove(elm);
DISPLAY("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
remove(elm);
}
}
}
@@ -237,22 +256,20 @@ void LSubscribeManager::responce(const uint8_t* msg)
SubElement* elm = getElement(msgId);
if (elm)
{
LTopicTable* tt = theClient->getGwProxy()->getTopicTable();
tt->setCallback(elm->topicName, 0);
//theClient->getGwProxy()->getTopicTable()->setCallback(elm->topicName, 0);
DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
remove(getElement(msgId));
remove(elm);
}
else
{
DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
remove(getElement(msgId));
DISPLAY("\033[0m\033[0;31m UNSUBACK Invalid messageId. \033[0m\033[0;37m\n\n");
}
}
}
/* SubElement operations */
SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType,
SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId,
uint8_t qos, TopicCallback callback)
{
SubElement* elm = 0;
@@ -358,7 +375,7 @@ SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType
SubElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName) == 0 && elm->msgType == msgType)
if ( elm->msgType == msgType && strncmp(elm->topicName, topicName, strlen(topicName)) == 0 )
{
return elm;
}
@@ -370,7 +387,7 @@ SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType
return 0;
}
SubElement* LSubscribeManager::getElement(uint16_t topicId, uint8_t topicType)
SubElement* LSubscribeManager::getElement(uint16_t topicId, MQTTSN_topicTypes topicType)
{
SubElement* elm = _first;
while (elm)

View File

@@ -37,7 +37,7 @@ typedef struct SubElement{
time_t sendUTC;
uint16_t topicId;
uint8_t msgType;
uint8_t topicType;
MQTTSN_topicTypes topicType;
uint8_t qos;
int retryCount;
@@ -56,9 +56,9 @@ public:
~LSubscribeManager();
void onConnect(void);
void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos);
void unsubscribe(const char* topicName);
void unsubscribe(uint16_t topicId, uint8_t topicType);
void unsubscribe(uint16_t topicId);
void responce(const uint8_t* msg);
void checkTimeout(void);
bool isDone(void);
@@ -66,9 +66,9 @@ private:
void send(SubElement* elm);
SubElement* getFirstElement(void);
SubElement* getElement(uint16_t msgId);
SubElement* getElement(uint16_t topicId, uint8_t topicType);
SubElement* getElement(uint16_t topicId, MQTTSN_topicTypes topicType);
SubElement* getElement(const char* topicName, uint8_t msgType);
SubElement* add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType, uint8_t qos, TopicCallback callback);
SubElement* add(uint8_t msgType, const char* topicName, MQTTSN_topicTypes topicType, uint16_t topicId, uint8_t qos, TopicCallback callback);
void remove(SubElement* elm);
SubElement* _first;
SubElement* _last;

View File

@@ -131,7 +131,7 @@ LTopic* LTopicTable::getTopic(const char* topic){
return 0;
}
LTopic* LTopicTable::getTopic(uint16_t topicId, uint8_t topicType){
LTopic* LTopicTable::getTopic(uint16_t topicId, MQTTSN_topicTypes topicType){
LTopic* p = _first;
while(p){
if (p->_topicId == topicId && p->_topicType == topicType){
@@ -156,12 +156,12 @@ char* LTopicTable::getTopicName(LTopic* topic){
}
void LTopicTable::setTopicId(const char* topic, uint16_t id, uint8_t type){
void LTopicTable::setTopicId(const char* topic, uint16_t id, MQTTSN_topicTypes type){
LTopic* tp = getTopic(topic);
if (tp){
tp->_topicId = id;
}else{
add(topic, id, type, 0);
add(topic, type, id, 0);
}
}
@@ -176,7 +176,7 @@ bool LTopicTable::setCallback(const char* topic, TopicCallback callback){
}
bool LTopicTable::setCallback(uint16_t topicId, uint8_t topicType, TopicCallback callback){
bool LTopicTable::setCallback(uint16_t topicId, MQTTSN_topicTypes topicType, TopicCallback callback){
LTopic* p = getTopic(topicId, topicType);
if (p){
p->_callback = callback;
@@ -186,7 +186,7 @@ bool LTopicTable::setCallback(uint16_t topicId, uint8_t topicType, TopicCallback
}
int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, uint8_t topicType){
int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, MQTTSN_topicTypes topicType){
LTopic* p = getTopic(topicId, topicType);
if (p){;
return p->execCallback(payload, payloadlen);
@@ -195,7 +195,7 @@ int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payl
}
LTopic* LTopicTable::add(const char* topicName, uint16_t id, uint8_t type, TopicCallback callback, uint8_t alocFlg)
LTopic* LTopicTable::add(const char* topicName, MQTTSN_topicTypes type, uint16_t id, TopicCallback callback, uint8_t alocFlg)
{
LTopic* elm;
@@ -234,9 +234,9 @@ exit:
return elm;
}
void LTopicTable::remove(uint16_t topicId)
void LTopicTable::remove(uint16_t topicId, MQTTSN_topicTypes type)
{
LTopic* elm = getTopic(topicId);
LTopic* elm = getTopic(topicId, type);
if (elm){
if (elm->_prev == 0)

View File

@@ -42,7 +42,7 @@ public:
TopicCallback getCallback(void);
private:
uint16_t _topicId;
uint8_t _topicType;
MQTTSN_topicTypes _topicType;
char* _topicStr;
TopicCallback _callback;
uint8_t _malocFlg;
@@ -60,16 +60,16 @@ public:
uint16_t getTopicId(const char* topic);
char* getTopicName(LTopic* topic);
LTopic* getTopic(const char* topic);
LTopic* getTopic(uint16_t topicId, uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL);
void setTopicId(const char* topic, uint16_t id, uint8_t topicType);
LTopic* getTopic(uint16_t topicId, MQTTSN_topicTypes topicType);
void setTopicId(const char* topic, uint16_t id, MQTTSN_topicTypes topicType);
bool setCallback(const char* topic, TopicCallback callback);
bool setCallback(uint16_t topicId, uint8_t type, TopicCallback callback);
int execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL);
LTopic* add(const char* topic, uint16_t id = 0, uint8_t type = MQTTSN_TOPIC_TYPE_NORMAL, TopicCallback callback = 0, uint8_t alocFlg = 0);
LTopic* add(uint16_t topicId, uint16_t id, uint8_t type, TopicCallback callback, uint8_t alocFlg);
bool setCallback(uint16_t topicId, MQTTSN_topicTypes type, TopicCallback callback);
int execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, MQTTSN_topicTypes topicType);
LTopic* add(const char* topic, MQTTSN_topicTypes type, uint16_t id = 0, TopicCallback callback = 0, uint8_t alocFlg = 0);
//LTopic* add(uint16_t topicId, uint16_t id, MQTTSN_topicTypes type, TopicCallback callback, uint8_t alocFlg);
LTopic* match(const char* topic);
void clearTopic(void);
void remove(uint16_t topicId);
void remove(uint16_t topicId, MQTTSN_topicTypes type);
private:
LTopic* _first;

View File

@@ -9,6 +9,7 @@ TESTAPPL := mainTestProcess
CONFIG := gateway.conf
CLIENTS := clients.conf
PREDEFTOPIC := predefinedTopic.conf
SRCDIR := src
SUBDIR := ../MQTTSNPacket/src
@@ -135,6 +136,7 @@ install:
cp -pf $(LPROG) ../../
cp -pf $(CONFIG) ../../
cp -pf $(CLIENTS) ../../
cp -pf $(PREDEFTOPIC) ../../
exectest:
./$(OUTDIR)/$(TESTPROGNAME) -f ./gateway.conf

View File

@@ -60,7 +60,9 @@ ApiMode=2
Client should know the MulticastIP and MulticastPortNo to send a SEARCHGW message.
**GatewayId** is used by GWINFO message.
**KeepAlive** is a duration of ADVERTISE message in seconds.
when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No.
when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No.
When **PredefinedTopic** is YES, Pre-definedTopicID file specified by PredefinedTopicFile is effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format.

View File

@@ -20,6 +20,9 @@ BrokerSecurePortNo=8883
ClientAuthentication=NO
#ClientsList=/path/to/your_clients.conf
PredefinedTopic=NO
#PredefinedTopicFile=/path/to/your_predefinedTopic.conf
#RootCAfile=/etc/ssl/certs/ca-certificates.crt
#RootCApath=/etc/ssl/certs/
#CertsFile=/path/to/certKey.pem

View File

@@ -0,0 +1,19 @@
#***********************************************************************
# Copyright (c) 2017, Tomoaki Yamaguchi
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#***********************************************************************
#
# ClientID, TopicName, TopicID
#
GatewayTestClient,ty4tw/predefinedTopic1, 1
GatewayTestClient,ty4tw/predefinedTopic2, 2
GatewayTestClient,ty4tw/predefinedTopic3, 3

View File

@@ -90,18 +90,21 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
}
else
{
topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic;
id = client->getTopics()->getTopicId(&topicId);
topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic;
Topic* tp = client->getTopics()->getTopicByName(&topicId);
if ( id > 0 )
{
topicId.data.id = id;
}
if ( tp )
{
topicId.type = tp->getType();
topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic;
topicId.data.id = tp->getTopicId();
}
else
{
/* This message might be subscribed with wild card. */
topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
Topic* topic = client->getTopics()->match(&topicId);
if (topic == 0)
{
@@ -179,11 +182,11 @@ void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet)
{
Ack ack;
packet->getAck(&ack);
uint16_t topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId);
TopicIdMapelement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId);
if (topicId)
{
MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
mqttsnPacket->setPUBACK(topicId, (uint16_t)ack.msgId, 0);
mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0);
client->eraseWaitedPubTopicId((uint16_t)ack.msgId);
Event* ev1 = new Event();

View File

@@ -38,12 +38,11 @@ void MQTTGWSubscribeHandler::handleSuback(Client* client, MQTTGWPacket* packet)
int qos = 0;
packet->getSUBACK(&msgId, &rc);
uint16_t topicId = client->getWaitedSubTopicId(msgId);
TopicIdMapelement* topicId = client->getWaitedSubTopicId(msgId);
if (topicId)
{
MQTTSNPacket* snPacket = new MQTTSNPacket();
client->eraseWaitedSubTopicId(msgId);
if (rc == 0x80)
{
@@ -54,10 +53,11 @@ void MQTTGWSubscribeHandler::handleSuback(Client* client, MQTTGWPacket* packet)
returnCode = MQTTSN_RC_ACCEPTED;
qos = rc;
}
snPacket->setSUBACK(qos, topicId, msgId, returnCode);
snPacket->setSUBACK(qos, topicId->getTopicId(), msgId, returnCode);
Event* evt = new Event();
evt->setClientSendEvent(client, snPacket);
_gateway->getClientSendQue()->post(evt);
client->eraseWaitedSubTopicId(msgId);
}
}

View File

@@ -15,6 +15,7 @@
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/
#include "MQTTSNGWDefines.h"
#include "MQTTSNGWClient.h"
#include "MQTTSNGateway.h"
#include "SensorNetwork.h"
@@ -124,6 +125,51 @@ bool ClientList::authorize(const char* fileName)
return _authorize;
}
bool ClientList::setPredefinedTopics(const char* fileName)
{
FILE* fp;
char buf[MAX_CLIENTID_LENGTH + 256];
size_t pos0, pos1;
MQTTSNString clientId;
bool rc = false;
clientId.cstring = 0;
clientId.lenstring.data = 0;
clientId.lenstring.len = 0;
if ((fp = fopen(fileName, "r")) != 0)
{
while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
{
if (*buf == '#')
{
continue;
}
string data = string(buf);
while ((pos0 = data.find_first_of("  \t\n")) != string::npos)
{
data.erase(pos0, 1);
}
if (data.empty())
{
continue;
}
pos0 = data.find_first_of(",");
pos1 = data.find(",", pos0 + 1) ;
string id = data.substr(0, pos0);
clientId.cstring = strdup(id.c_str());
string topicName = data.substr(pos0 + 1, pos1 - pos0 -1);
uint16_t topicID = stoul(data.substr(pos1 + 1));
createPredefinedTopic( &clientId, topicName, topicID);
free(clientId.cstring);
}
fclose(fp);
rc = true;
}
return rc;
}
void ClientList::erase(Client*& client)
{
if ( !_authorize && client->erasable())
@@ -179,14 +225,21 @@ Client* ClientList::getClient(void)
return _firstClient;
}
Client* ClientList::getClient(uint8_t* clientId)
Client* ClientList::getClient(MQTTSNString* clientId)
{
_mutex.lock();
Client* client = _firstClient;
const char* clID =clientId->cstring;
if (clID == 0 )
{
clID = clientId->lenstring.data;
}
while (client != 0)
{
if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 )
if (strncmp((const char*)client->getClientId(), clID, MQTTSNstrlen(*clientId)) == 0 )
{
_mutex.unlock();
return client;
@@ -222,7 +275,10 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
/* creat a new client */
client = new Client(secure);
client->setClientAddress(addr);
if ( addr )
{
client->setClientAddress(addr);
}
client->setSensorNetType(unstableLine);
if ( MQTTSNstrlen(*clientId) )
{
@@ -256,6 +312,50 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
return client;
}
Client* ClientList::createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t topicId)
{
Client* client = getClient(clientId);
if ( _authorize && client == 0)
{
return 0;
}
/* anonimous clients */
if ( _clientCnt > MAX_CLIENTS )
{
return 0; // full of clients
}
if ( client == 0 )
{
/* creat a new client */
client = new Client();
client->setClientId(*clientId);
_mutex.lock();
/* add the list */
if ( _firstClient == 0 )
{
_firstClient = client;
_endClient = client;
}
else
{
_endClient->_nextClient = client;
client->_prevClient = _endClient;
_endClient = client;
}
_clientCnt++;
_mutex.unlock();
}
// create Topic & Add it
client->getTopics()->add((const char*)topicName.c_str(), topicId);
return client;
}
uint16_t ClientList::getClientCount()
{
return _clientCnt;
@@ -299,6 +399,7 @@ Client::Client(bool secure)
_prevClient = 0;
_nextClient = 0;
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
_hasPredefTopic = false;
}
Client::~Client()
@@ -334,16 +435,14 @@ Client::~Client()
}
}
uint16_t Client::getWaitedPubTopicId(uint16_t msgId)
TopicIdMapelement* Client::getWaitedPubTopicId(uint16_t msgId)
{
MQTTSN_topicTypes type;
return _waitedPubTopicIdMap.getTopicId(msgId, &type);
return _waitedPubTopicIdMap.getElement(msgId);
}
uint16_t Client::getWaitedSubTopicId(uint16_t msgId)
TopicIdMapelement* Client::getWaitedSubTopicId(uint16_t msgId)
{
MQTTSN_topicTypes type;
return _waitedSubTopicIdMap.getTopicId(msgId, &type);
return _waitedSubTopicIdMap.getElement(msgId);
}
MQTTGWPacket* Client::getClientSleepPacket()
@@ -426,7 +525,7 @@ void Client::setSessionStatus(bool status)
bool Client::erasable(void)
{
return _sessionStatus;
return _sessionStatus || !_hasPredefTopic;
}
void Client::updateStatus(MQTTSNPacket* packet)
@@ -702,13 +801,15 @@ void Client::setOTAClient(Client* cl)
======================================*/
Topic::Topic()
{
_type = MQTTSN_TOPIC_TYPE_NORMAL;
_topicName = 0;
_topicId = 0;
_next = 0;
}
Topic::Topic(string* topic)
Topic::Topic(string* topic, MQTTSN_topicTypes type)
{
_type = type;
_topicName = topic;
_topicId = 0;
_next = 0;
@@ -732,6 +833,11 @@ uint16_t Topic::getTopicId(void)
return _topicId;
}
MQTTSN_topicTypes Topic::getType(void)
{
return _type;
}
bool Topic::isMatch(string* topicName)
{
string::size_type tlen = _topicName->size();
@@ -824,6 +930,11 @@ bool Topic::isMatch(string* topicName)
}
}
void Topic::print(void)
{
WRITELOG("TopicName=%s ID=%d Type=%d\n", _topicName->c_str(), _topicId, _type);
}
/*=====================================
Class Topics
======================================*/
@@ -831,6 +942,7 @@ Topics::Topics()
{
_first = 0;
_nextTopicId = 0;
_cnt = 0;
}
Topics::~Topics()
@@ -844,113 +956,120 @@ Topics::~Topics()
}
}
uint16_t Topics::getTopicId(const MQTTSN_topicid* topicid)
Topic* Topics::getTopicByName(const MQTTSN_topicid* topicid)
{
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL)
{
return 0;
}
Topic* p = _first;
char* ch = topicid->data.long_.name;
string sname = string(ch, ch + topicid->data.long_.len);
while (p)
{
if ( p->_topicName->compare(sname) == 0 )
{
return p;
}
p = p->_next;
}
return 0;
}
Topic* Topics::getTopicById(const MQTTSN_topicid* topicid)
{
Topic* p = _first;
while (p)
{
if ( (int)strlen(p->_topicName->c_str()) == topicid->data.long_.len &&
strncmp(p->_topicName->c_str(), topicid->data.long_.name, topicid->data.long_.len) == 0)
{
return p->_topicId;
}
p = p->_next;
}
return 0;
}
Topic* Topics::getTopic(uint16_t id)
{
Topic* p = _first;
while (p)
{
if (p->_topicId == id)
{
return p;
}
p = p->_next;
}
return 0;
}
Topic* Topics::getTopic(const MQTTSN_topicid* topicid)
{
Topic* p = _first;
while (p)
{
if ( (int)strlen(p->_topicName->c_str()) == topicid->data.long_.len &&
strncmp(p->_topicName->c_str(), topicid->data.long_.name, topicid->data.long_.len) == 0 )
{
return p;
}
if ( p->_type == topicid->type && p->_topicId == topicid->data.id )
{
return p;
}
p = p->_next;
}
return 0;
}
// For MQTTSN_TOPIC_TYPE_NORMAL */
Topic* Topics::add(const MQTTSN_topicid* topicid)
{
Topic* topic;
uint16_t id = 0;
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL )
{
return 0;
}
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL)
{
return 0;
}
id = getTopicId(topicid);
Topic* topic = getTopicByName(topicid);
if (id)
{
topic = getTopic(id);
}
else
{
string topicName = string(topicid->data.long_.name, topicid->data.long_.len);
topic = add(&topicName);
}
return topic;
if ( topic )
{
return topic;
}
string name(topicid->data.long_.name, topicid->data.long_.len);
return add(name.c_str(), 0);
}
Topic* Topics::add(const string* topicName)
Topic* Topics::add(const char* topicName, uint16_t id)
{
Topic* topic = 0;
MQTTSN_topicid topicId;
Topic* tp = _first;
if ( _cnt >= MAX_TOPIC_PAR_CLIENT )
{
return 0;
}
topic = new Topic();
topicId.data.long_.name = (char*)const_cast<char*>(topicName);
topicId.data.long_.len = strlen(topicName);
if (topic == 0)
{
return 0;
}
string* name = new string(*topicName);
topic->_topicName = name;
topic->_topicId = getNextTopicId();
if (tp == 0)
{
_first = topic;
}
Topic* topic = getTopicByName(&topicId);
while (tp)
{
if (tp->_next == 0)
{
tp->_next = topic;
break;
}
else
{
tp = tp->_next;
}
}
return topic;
if ( topic )
{
return topic;
}
topic = new Topic();
if (topic == 0)
{
return 0;
}
string* name = new string(topicName);
topic->_topicName = name;
if ( id == 0 )
{
topic->_type = MQTTSN_TOPIC_TYPE_NORMAL;
topic->_topicId = getNextTopicId();
}
else
{
topic->_type = MQTTSN_TOPIC_TYPE_PREDEFINED;
topic->_topicId = id;
}
_cnt++;
if ( _first == 0)
{
_first = topic;
}
else
{
Topic* tp = _first;
while (tp)
{
if (tp->_next == 0)
{
tp->_next = topic;
break;
}
else
{
tp = tp->_next;
}
}
}
return topic;
}
uint16_t Topics::getNextTopicId()
@@ -978,6 +1097,60 @@ Topic* Topics::match(const MQTTSN_topicid* topicid)
return 0;
}
void Topics::eraseNormal(void)
{
Topic* topic = _first;
Topic* next = 0;
Topic* prev = 0;
while (topic)
{
if ( topic->_type == MQTTSN_TOPIC_TYPE_NORMAL )
{
next = topic->_next;
if ( _first == topic )
{
_first = next;
}
if ( prev )
{
prev->_next = next;
}
delete topic;
_cnt--;
topic = next;
}
else
{
prev = topic;
topic = topic->_next;
}
}
}
void Topics::print(void)
{
Topic* topic = _first;
if (topic == 0 )
{
WRITELOG("No Topic.\n");
}
else
{
while (topic)
{
topic->print();
topic = topic->_next;
}
}
}
uint8_t Topics::getCount(void)
{
return _cnt;
}
/*=====================================
Class TopicIdMap
=====================================*/
@@ -995,6 +1168,16 @@ TopicIdMapelement::~TopicIdMapelement()
}
MQTTSN_topicTypes TopicIdMapelement::getTopicType(void)
{
return _type;
}
uint16_t TopicIdMapelement::getTopicId(void)
{
return _topicId;
}
TopicIdMap::TopicIdMap()
{
_maxInflight = MAX_INFLIGHTMESSAGES;
@@ -1015,28 +1198,27 @@ TopicIdMap::~TopicIdMap()
}
}
uint16_t TopicIdMap::getTopicId(uint16_t msgId, MQTTSN_topicTypes* type)
TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId)
{
TopicIdMapelement* p = _first;
while ( p )
{
if ( p->_msgId == msgId )
{
*type = p->_type;
return p->_topicId;
return p;
}
p = p->_next;
}
return 0;
}
int TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
{
if ( _cnt > _maxInflight * 2 || topicId == 0)
{
return 0;
}
if ( getTopicId(msgId, &type) > 0 )
if ( getElement(msgId) > 0 )
{
erase(msgId);
}
@@ -1058,7 +1240,7 @@ int TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
_end = elm;
}
_cnt++;
return 1;
return elm;
}
void TopicIdMap::erase(uint16_t msgId)

View File

@@ -39,73 +39,73 @@ namespace MQTTSNGW
template<class T> class PacketQue
{
public:
PacketQue()
{
_que = new Que<T>;
}
PacketQue()
{
_que = new Que<T>;
}
~PacketQue()
{
clear();
delete _que;
}
~PacketQue()
{
clear();
delete _que;
}
T* getPacket()
{
T* packet;
if (_que->size() > 0)
{
_mutex.lock();
packet = _que->front();
_mutex.unlock();
return packet;
}
else
{
return 0;
}
}
T* getPacket()
{
T* packet;
if (_que->size() > 0)
{
_mutex.lock();
packet = _que->front();
_mutex.unlock();
return packet;
}
else
{
return 0;
}
}
int
post(T* packet)
{
int rc;
_mutex.lock();
rc = _que->post(packet);
_mutex.unlock();
return rc;
}
int
post(T* packet)
{
int rc;
_mutex.lock();
rc = _que->post(packet);
_mutex.unlock();
return rc;
}
void pop()
{
if (_que->size() > 0)
{
_mutex.lock();
_que->pop();
_mutex.unlock();
}
}
void pop()
{
if (_que->size() > 0)
{
_mutex.lock();
_que->pop();
_mutex.unlock();
}
}
void clear()
{
_mutex.lock();
while (_que->size() > 0)
{
delete _que->front();
_que->pop();
}
_mutex.unlock();
}
void clear()
{
_mutex.lock();
while (_que->size() > 0)
{
delete _que->front();
_que->pop();
}
_mutex.unlock();
}
void setMaxSize(int size)
{
_que->setMaxSize(size);
}
void setMaxSize(int size)
{
_que->setMaxSize(size);
}
private:
Que<T>* _que;
Mutex _mutex;
Que<T>* _que;
Mutex _mutex;
};
@@ -114,19 +114,21 @@ private:
======================================*/
class Topic
{
friend class Topics;
friend class Topics;
public:
Topic();
Topic(string* topic);
~Topic();
string* getTopicName(void);
uint16_t getTopicId(void);
bool isMatch(string* topicName);
Topic();
Topic(string* topic, MQTTSN_topicTypes type);
~Topic();
string* getTopicName(void);
uint16_t getTopicId(void);
MQTTSN_topicTypes getType(void);
bool isMatch(string* topicName);
void print(void);
private:
uint16_t _topicId;
string* _topicName;
Topic* _next;
MQTTSN_topicTypes _type;
uint16_t _topicId;
string* _topicName;
Topic* _next;
};
/*=====================================
@@ -135,20 +137,21 @@ private:
class Topics
{
public:
Topics();
~Topics();
Topic* add(const MQTTSN_topicid* topicid);
Topic* add(const string* topic);
uint16_t getTopicId(const MQTTSN_topicid* topic);
uint16_t getNextTopicId();
Topic* getTopic(uint16_t topicId);
Topic* getTopic(const MQTTSN_topicid* topicid);
Topic* match(const MQTTSN_topicid* topicid);
Topics();
~Topics();
Topic* add(const MQTTSN_topicid* topicid);
Topic* add(const char* topicName, uint16_t id = 0);
Topic* getTopicByName(const MQTTSN_topicid* topic);
Topic* getTopicById(const MQTTSN_topicid* topicid);
Topic* match(const MQTTSN_topicid* topicid);
void eraseNormal(void);
uint16_t getNextTopicId();
void print(void);
uint8_t getCount(void);
private:
uint16_t _nextTopicId;
Topic* _first;
uint16_t _nextTopicId;
Topic* _first;
uint8_t _cnt;
};
/*=====================================
@@ -156,34 +159,36 @@ private:
=====================================*/
class TopicIdMapelement
{
friend class TopicIdMap;
friend class TopicIdMap;
public:
TopicIdMapelement(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
~TopicIdMapelement();
TopicIdMapelement(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
~TopicIdMapelement();
MQTTSN_topicTypes getTopicType(void);
uint16_t getTopicId(void);
private:
uint16_t _msgId;
uint16_t _topicId;
MQTTSN_topicTypes _type;
TopicIdMapelement* _next;
TopicIdMapelement* _prev;
uint16_t _msgId;
uint16_t _topicId;
MQTTSN_topicTypes _type;
TopicIdMapelement* _next;
TopicIdMapelement* _prev;
};
class TopicIdMap
{
public:
TopicIdMap();
~TopicIdMap();
uint16_t getTopicId(uint16_t msgId, MQTTSN_topicTypes* type);
int add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void erase(uint16_t msgId);
void clear(void);
TopicIdMap();
~TopicIdMap();
TopicIdMapelement* getElement(uint16_t msgId);
TopicIdMapelement* add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void erase(uint16_t msgId);
void clear(void);
private:
uint16_t* _msgIds;
TopicIdMapelement* _first;
TopicIdMapelement* _end;
int _cnt;
int _maxInflight;
uint16_t* _msgIds;
TopicIdMapelement* _first;
TopicIdMapelement* _end;
int _cnt;
int _maxInflight;
};
/*=====================================
@@ -191,16 +196,16 @@ private:
=====================================*/
class waitREGACKPacket
{
friend class WaitREGACKPacketList;
friend class WaitREGACKPacketList;
public:
waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
~waitREGACKPacket();
waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
~waitREGACKPacket();
private:
uint16_t _msgId;
MQTTSNPacket* _packet;
waitREGACKPacket* _next;
waitREGACKPacket* _prev;
uint16_t _msgId;
MQTTSNPacket* _packet;
waitREGACKPacket* _next;
waitREGACKPacket* _prev;
};
/*=====================================
@@ -209,15 +214,15 @@ private:
class WaitREGACKPacketList
{
public:
WaitREGACKPacketList();
~WaitREGACKPacketList();
int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
MQTTSNPacket* getPacket(uint16_t REGACKMsgId);
void erase(uint16_t REGACKMsgId);
WaitREGACKPacketList();
~WaitREGACKPacketList();
int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
MQTTSNPacket* getPacket(uint16_t REGACKMsgId);
void erase(uint16_t REGACKMsgId);
private:
waitREGACKPacket* _first;
waitREGACKPacket* _end;
waitREGACKPacket* _first;
waitREGACKPacket* _end;
};
/*=====================================
@@ -227,110 +232,111 @@ private:
typedef enum
{
Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost
Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost
} ClientStatus;
class Client
{
friend class ClientList;
friend class ClientList;
public:
Client(bool secure = false);
Client(uint8_t maxInflightMessages, bool secure);
~Client();
Client(bool secure = false);
Client(uint8_t maxInflightMessages, bool secure);
~Client();
Connect* getConnectData(void);
uint16_t getWaitedPubTopicId(uint16_t msgId);
uint16_t getWaitedSubTopicId(uint16_t msgId);
MQTTGWPacket* getClientSleepPacket(void);
void deleteFirstClientSleepPacket(void);
WaitREGACKPacketList* getWaitREGACKPacketList(void);
Connect* getConnectData(void);
TopicIdMapelement* getWaitedPubTopicId(uint16_t msgId);
TopicIdMapelement* getWaitedSubTopicId(uint16_t msgId);
MQTTGWPacket* getClientSleepPacket(void);
void deleteFirstClientSleepPacket(void);
WaitREGACKPacketList* getWaitREGACKPacketList(void);
void eraseWaitedPubTopicId(uint16_t msgId);
void eraseWaitedSubTopicId(uint16_t msgId);
void clearWaitedPubTopicId(void);
void clearWaitedSubTopicId(void);
void eraseWaitedPubTopicId(uint16_t msgId);
void eraseWaitedSubTopicId(uint16_t msgId);
void clearWaitedPubTopicId(void);
void clearWaitedSubTopicId(void);
int setClientSleepPacket(MQTTGWPacket*);
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
int setClientSleepPacket(MQTTGWPacket*);
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
bool checkTimeover(void);
void updateStatus(MQTTSNPacket*);
void updateStatus(ClientStatus);
void connectSended(void);
void connackSended(int rc);
void disconnected(void);
bool isConnectSendable(void);
bool checkTimeover(void);
void updateStatus(MQTTSNPacket*);
void updateStatus(ClientStatus);
void connectSended(void);
void connackSended(int rc);
void disconnected(void);
bool isConnectSendable(void);
uint16_t getNextPacketId(void);
uint8_t getNextSnMsgId(void);
Topics* getTopics(void);
void setTopics(Topics* topics);
void setKeepAlive(MQTTSNPacket* packet);
uint16_t getNextPacketId(void);
uint8_t getNextSnMsgId(void);
Topics* getTopics(void);
void setTopics(Topics* topics);
void setKeepAlive(MQTTSNPacket* packet);
SensorNetAddress* getSensorNetAddress(void);
Network* getNetwork(void);
void setClientAddress(SensorNetAddress* sensorNetAddr);
void setSensorNetType(bool stable);
SensorNetAddress* getSensorNetAddress(void);
Network* getNetwork(void);
void setClientAddress(SensorNetAddress* sensorNetAddr);
void setSensorNetType(bool stable);
void setClientId(MQTTSNString id);
void setWillTopic(MQTTSNString willTopic);
void setWillMsg(MQTTSNString willmsg);
char* getClientId(void);
char* getWillTopic(void);
char* getWillMsg(void);
const char* getStatus(void);
void setWaitWillMsgFlg(bool);
void setSessionStatus(bool); // true: clean session
bool erasable(void);
void setClientId(MQTTSNString id);
void setWillTopic(MQTTSNString willTopic);
void setWillMsg(MQTTSNString willmsg);
char* getClientId(void);
char* getWillTopic(void);
char* getWillMsg(void);
const char* getStatus(void);
void setWaitWillMsgFlg(bool);
void setSessionStatus(bool); // true: clean session
bool erasable(void);
bool isDisconnect(void);
bool isActive(void);
bool isSleep(void);
bool isAwake(void);
bool isSecureNetwork(void);
bool isSensorNetStable(void);
bool isWaitWillMsg(void);
bool isDisconnect(void);
bool isActive(void);
bool isSleep(void);
bool isAwake(void);
bool isSecureNetwork(void);
bool isSensorNetStable(void);
bool isWaitWillMsg(void);
Client* getNextClient(void);
Client* getOTAClient(void);
void setOTAClient(Client* cl);
Client* getNextClient(void);
Client* getOTAClient(void);
void setOTAClient(Client* cl);
private:
PacketQue<MQTTGWPacket> _clientSleepPacketQue;
WaitREGACKPacketList _waitREGACKList;
PacketQue<MQTTGWPacket> _clientSleepPacketQue;
WaitREGACKPacketList _waitREGACKList;
Topics* _topics;
TopicIdMap _waitedPubTopicIdMap;
TopicIdMap _waitedSubTopicIdMap;
Topics* _topics;
TopicIdMap _waitedPubTopicIdMap;
TopicIdMap _waitedSubTopicIdMap;
Connect _connectData;
MQTTSNPacket* _connAck;
Connect _connectData;
MQTTSNPacket* _connAck;
char* _clientId;
char* _willTopic;
char* _willMsg;
char* _clientId;
char* _willTopic;
char* _willMsg;
Timer _keepAliveTimer;
uint32_t _keepAliveMsec;
Timer _keepAliveTimer;
uint32_t _keepAliveMsec;
ClientStatus _status;
bool _waitWillMsgFlg;
ClientStatus _status;
bool _waitWillMsgFlg;
uint16_t _packetId;
uint8_t _snMsgId;
uint16_t _packetId;
uint8_t _snMsgId;
Network* _network; // Broker
bool _secureNetwork; // SSL
bool _sensorNetype; // false: unstable network like a G3
SensorNetAddress _sensorNetAddr;
Network* _network; // Broker
bool _secureNetwork; // SSL
bool _sensorNetype; // false: unstable network like a G3
SensorNetAddress _sensorNetAddr;
bool _sessionStatus;
bool _sessionStatus;
bool _hasPredefTopic;
Client* _nextClient;
Client* _prevClient;
Client* _otaClient;
Client* _nextClient;
Client* _prevClient;
Client* _otaClient;
};
/*=====================================
@@ -339,23 +345,24 @@ private:
class ClientList
{
public:
ClientList();
~ClientList();
bool authorize(const char* fileName);
void erase(Client*&);
Client* getClient(SensorNetAddress* addr);
Client* getClient(uint8_t* clientId);
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine,
bool secure);
uint16_t getClientCount(void);
Client* getClient(void);
bool isAuthorized();
ClientList();
~ClientList();
bool authorize(const char* fileName);
bool setPredefinedTopics(const char* fileName);
void erase(Client*&);
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, bool secure);
Client* getClient(SensorNetAddress* addr);
Client* getClient(MQTTSNString* clientId);
uint16_t getClientCount(void);
Client* getClient(void);
bool isAuthorized();
private:
Client* _firstClient;
Client* _endClient;
Mutex _mutex;
uint16_t _clientCnt;
bool _authorize;
Client* createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t toipcId);
Client* _firstClient;
Client* _endClient;
Mutex _mutex;
uint16_t _clientCnt;
bool _authorize;
};
}

View File

@@ -115,9 +115,21 @@ void ClientRecvTask::run()
continue;
}
/* create a client */
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
client = _gateway->getClientList()->getClient(&data.clientID);
if ( client )
{
/* set SensorNet Address */
client->setClientAddress(_sensorNetwork->getSenderAddress());
}
else
{
/* create a client */
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
}
log(client, packet, &data.clientID);
if (!client)
{
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
@@ -126,7 +138,6 @@ void ClientRecvTask::run()
}
/* set sensorNetAddress & post Event */
client->setClientAddress(_sensorNetwork->getSenderAddress());
ev = new Event();
ev->setClientRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev);

View File

@@ -116,10 +116,8 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
/* renew the TopicList */
if (topics)
{
delete topics;
topics->eraseNormal();;
}
topics = new Topics();
client->setTopics(topics);
client->setSessionStatus(true);
}

View File

@@ -25,6 +25,7 @@ namespace MQTTSNGW
#define CONFIG_DIRECTORY "./"
#define CONFIG_FILE "gateway.conf"
#define CLIENT_LIST "clients.conf"
#define PREDEFINEDTOPIC_FILE "predefinedTopic.conf"
/*==========================================================
* Gateway default parameters
@@ -39,6 +40,7 @@ namespace MQTTSNGW
#define MAX_CLIENTID_LENGTH (64) // Max length of clientID
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen)
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes

View File

@@ -40,8 +40,8 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
int qos;
uint8_t retained;
uint16_t msgId;
MQTTSN_topicid topicid;
uint8_t* payload;
MQTTSN_topicid topicid;
int payloadlen;
Publish pub;
char shortTopic[2];
@@ -68,68 +68,6 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
Topic* topic = 0;
if ( topicid.type == MQTTSN_TOPIC_TYPE_PREDEFINED)
{
if(msgId)
{
/* Reply PubAck to the client */
MQTTSNPacket* pubAck = new MQTTSNPacket();
pubAck->setPUBACK( topicid.data.id, msgId, MQTTSN_RC_ACCEPTED);
Event* ev1 = new Event();
ev1->setClientSendEvent(client, pubAck);
_gateway->getClientSendQue()->post(ev1);
}
#ifdef OTA_CLIENTS
if ( topicid.data.id == PREDEFINEDID_OTA_REQ )
{
uint8_t clientId[MAX_CLIENTID_LENGTH + 1];
if ( payloadlen <= MAX_CLIENTID_LENGTH )
{
memcpy(clientId, payload, payloadlen);
clientId[payloadlen] = 0;
Client* cl = _gateway->getClientList()->getClient(clientId);
if ( cl )
{
WRITELOG("\033[0m\033[0;33m OTA Client : %s\033[0m\033[0;37m\n",cl->getClientId());
MQTTSNPacket* pubota = new MQTTSNPacket();
pubota->setPUBLISH(0, 0, 0, 0, topicid, 0, 0);
cl->setOTAClient(client);
Event* evt = new Event();
evt->setClientSendEvent(cl, pubota);
_gateway->getClientSendQue()->post(evt);
}
else
{
MQTTSNPacket* publish = new MQTTSNPacket();
topicid.data.id = PREDEFINEDID_OTA_NO_CLIENT;
publish->setPUBLISH(0, 0, 0, 0, topicid, clientId, (uint16_t)strlen((const char*)clientId));
Event* evt = new Event();
evt->setClientSendEvent(client, publish);
_gateway->getClientSendQue()->post(evt);
}
}
}
else if ( topicid.data.id == PREDEFINEDID_OTA_READY )
{
Client* cl = client->getOTAClient();
if ( cl )
{
WRITELOG("\033[0m\033[0;33m OTA Manager : %s\033[0m\033[0;37m\n",cl->getClientId());
MQTTSNPacket* pubota = new MQTTSNPacket();
pubota->setPUBLISH(0, 0, 0, 0, topicid, payload, payloadlen);
client->setOTAClient(0);
Event* evt = new Event();
evt->setClientSendEvent(cl, pubota);
_gateway->getClientSendQue()->post(evt);
}
}
#endif
return;
}
if( topicid.type == MQTTSN_TOPIC_TYPE_SHORT )
{
shortTopic[0] = topicid.data.short_name[0];
@@ -137,10 +75,10 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
pub.topic = shortTopic;
pub.topiclen = 2;
}
if ( topicid.type == MQTTSN_TOPIC_TYPE_NORMAL )
else
{
topic = client->getTopics()->getTopic(topicid.data.id);
topic = client->getTopics()->getTopicById(&topicid);
if( !topic && msgId && qos > 0 )
{
/* Reply PubAck with INVALID_TOPIC_ID to the client */

View File

@@ -41,160 +41,122 @@ void MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packe
uint16_t msgId;
MQTTSN_topicid topicFilter;
Topic* topic = 0;
uint16_t topicId = 0;
MQTTGWPacket* subscribe;
Event* ev1;
Event* evsuback;
if ( packet->getSUBSCRIBE(&dup, &qos, &msgId, &topicFilter) == 0 )
{
return;
}
if (topicFilter.type <= MQTTSN_TOPIC_TYPE_SHORT)
if ( msgId == 0 )
{
if (topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED)
{
/*----- Predefined TopicId ------*/
MQTTSNPacket* sSuback = new MQTTSNPacket();
if (msgId)
{
int rc = MQTTSN_RC_ACCEPTED;
switch (topicFilter.data.id)
{
case PREDEFINEDID_OTA_REQ: // check topicIds are defined.
case PREDEFINEDID_OTA_READY:
case PREDEFINEDID_OTA_NO_CLIENT:
break;
default:
rc = MQTTSN_RC_REJECTED_INVALID_TOPIC_ID;
}
sSuback->setSUBACK(qos, topicFilter.data.id, msgId, rc);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sSuback);
_gateway->getClientSendQue()->post(evsuback);
}
switch (topicFilter.data.id)
{
case 1:
/*
* ToDo: write here Predefined Topic 01 Procedures.
*/
break;
case 2:
/*
* ToDo: write here Predefined Topic 02 Procedures. so on
*/
break;
default:
break;
}
}
else
{
uint16_t topicId = 0;
MQTTGWPacket* subscribe = new MQTTGWPacket();
topic = client->getTopics()->getTopic(&topicFilter);
if (topic == 0)
{
if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
{
topic = client->getTopics()->add(&topicFilter);
topicId = topic->getTopicId();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{
char topic[3];
topic[0] = topicFilter.data.short_name[0];
topic[1] = topicFilter.data.short_name[1];
topic[2] = 0;
topicId = topicFilter.data.id;
subscribe->setSUBSCRIBE(topic, (uint8_t)qos, (uint16_t)msgId);
}
}
else
{
topicId = topic->getTopicId();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
if ( msgId > 0 )
{
client->setWaitedSubTopicId(msgId, topicId, topicFilter.type);
}
Event* ev1 = new Event();
ev1->setBrokerSendEvent(client, subscribe);
_gateway->getBrokerSendQue()->post(ev1);
return;
}
}
else
{
/*-- Invalid TopicIdType --*/
if (msgId)
{
MQTTSNPacket* sSuback = new MQTTSNPacket();
sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_REJECTED_INVALID_TOPIC_ID);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sSuback);
_gateway->getClientSendQue()->post(evsuback);
}
return;
}
if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED )
{
topic = client->getTopics()->getTopicById(&topicFilter);
if ( topic )
{
topicId = topic->getTopicId();
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
else
{
goto RespExit;
}
}
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
{
topic = client->getTopics()->getTopicByName(&topicFilter);
if ( topic == 0 )
{
topic = client->getTopics()->add(&topicFilter);
if ( topic == 0 )
{
WRITELOG("%s Client(%s) can't add the Topic.%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return;
}
}
topicId = topic->getTopicId();
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
else //MQTTSN_TOPIC_TYPE_SHORT
{
char topicstr[3];
topicstr[0] = topicFilter.data.short_name[0];
topicstr[1] = topicFilter.data.short_name[1];
topicstr[2] = 0;
topicId = 0;
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE(topicstr, (uint8_t)qos, (uint16_t)msgId);
}
client->setWaitedSubTopicId(msgId, topicId, topicFilter.type);
ev1 = new Event();
ev1->setBrokerSendEvent(client, subscribe);
_gateway->getBrokerSendQue()->post(ev1);
return;
RespExit:
MQTTSNPacket* sSuback = new MQTTSNPacket();
sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_NOT_SUPPORTED);
evsuback = new Event();
evsuback->setClientSendEvent(client, sSuback);
_gateway->getClientSendQue()->post(evsuback);
}
void MQTTSNSubscribeHandler::handleUnsubscribe(Client* client, MQTTSNPacket* packet)
{
uint16_t msgId;
MQTTSN_topicid topicFilter;
MQTTGWPacket* unsubscribe = 0;;
if ( packet->getUNSUBSCRIBE(&msgId, &topicFilter) == 0 )
{
return;
}
if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED )
{
/*
* ToDo: procedures for Predefined Topic
*/
return;
}
if ( msgId == 0 )
{
return;
}
Topic* topic = client->getTopics()->getTopic(&topicFilter);
MQTTGWPacket* unsubscribe = new MQTTGWPacket();
Topic* topic = client->getTopics()->getTopicById(&topicFilter);
if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{
if ( topic == 0 )
{
if (msgId)
{
MQTTSNPacket* sUnsuback = new MQTTSNPacket();
sUnsuback->setUNSUBACK(msgId);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sUnsuback);
_gateway->getClientSendQue()->post(evsuback);
}
delete unsubscribe;
return;
}
else
{
unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId);
}
}
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{
char shortTopic[3];
shortTopic[0] = topicFilter.data.short_name[0];
shortTopic[1] = topicFilter.data.short_name[1];
shortTopic[2] = 0;
unsubscribe->setUNSUBSCRIBE(shortTopic, msgId);
char shortTopic[3];
shortTopic[0] = topicFilter.data.short_name[0];
shortTopic[1] = topicFilter.data.short_name[1];
shortTopic[2] = 0;
unsubscribe = new MQTTGWPacket();
unsubscribe->setUNSUBSCRIBE(shortTopic, msgId);
}
else
{
delete unsubscribe;
return;
if ( topic == 0 )
{
MQTTSNPacket* sUnsuback = new MQTTSNPacket();
sUnsuback->setUNSUBACK(msgId);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sUnsuback);
_gateway->getClientSendQue()->post(evsuback);
return;
}
else
{
unsubscribe = new MQTTGWPacket();
unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId);
}
}
Event* ev1 = new Event();

View File

@@ -17,6 +17,6 @@
#ifndef MQTTSNGWVERSION_H_IN_
#define MQTTSNGWVERSION_H_IN_
#define PAHO_GATEWAY_VERSION "1.0.1"
#define PAHO_GATEWAY_VERSION "1.1.0"
#endif /* MQTTSNGWVERSION_H_IN_ */

View File

@@ -13,7 +13,7 @@
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include "MQTTSNGWDefines.h"
#include "MQTTSNGateway.h"
#include "SensorNetwork.h"
#include "MQTTSNGWProcess.h"
@@ -202,13 +202,40 @@ void Gateway::initialize(int argc, char** argv)
if (!_clientList.authorize(fileName.c_str()))
{
throw Exception("Gateway::initialize: No client list defined by configuration.");
throw Exception("Gateway::initialize: No client list defined by the configuration.");
}
_params.clientListName = strdup(fileName.c_str());
}
}
fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str());
if (getParam("PredefinedTopic", param) == 0 )
{
if (!strcasecmp(param, "YES") )
{
if (getParam("PredefinedTopicFile", param) == 0)
{
fileName =*getConfigDirName() + string(param);
}
else
{
fileName = *getConfigDirName() + string(PREDEFINEDTOPIC_FILE);
}
if (!_clientList.setPredefinedTopics(fileName.c_str()))
{
throw Exception("Gateway::initialize: No PredefinedTopic file defined by the configuration..");
}
_params.predefinedTopicFileName = strdup(fileName.c_str());
}
else
{
_params.predefinedTopicFileName = 0;
}
}
fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str());
}
void Gateway::run(void)
@@ -229,6 +256,10 @@ void Gateway::run(void)
}
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
if ( _params.predefinedTopicFileName )
{
WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
}
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
WRITELOG(" CertKey: %s\n", _params.certKey);

View File

@@ -72,10 +72,10 @@ namespace MQTTSNGW
/*=====================================
Predefined TopicId for OTA
====================================*/
#define OTA_CLIENTS
#define PREDEFINEDID_OTA_REQ (0x0ff0)
#define PREDEFINEDID_OTA_READY (0x0ff1)
#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
//#define OTA_CLIENTS
//#define PREDEFINEDID_OTA_REQ (0x0ff0)
//#define PREDEFINEDID_OTA_READY (0x0ff1)
//#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
/*=====================================
Class Event
@@ -160,6 +160,7 @@ typedef struct
char* rootCAfile;
char* certKey;
char* privateKey;
char* predefinedTopicFileName;
}GatewayParams;
/*=====================================

View File

@@ -87,27 +87,29 @@ void TestProcess::run(void)
printf("Timer Test completed\n\n");
/* Test Que */
printf("Test Que ");
TestQue* tque = new TestQue();
tque->test();
delete tque;
/* Test Tree23 */
printf("Test Tree23 ");
TestTree23* tree23 = new TestTree23();
tree23->test();
delete tree23;
/* Test TopicTable */
printf("Test Topic ");
TestTopics* testTopic = new TestTopics();
testTopic->test();
delete testTopic;
/* Test TopicIdMap */
printf("Test TopicIdMap ");
TestTopicIdMap* testMap = new TestTopicIdMap();
testMap->test();
delete testMap;
/* Test EventQue */
printf("Test EventQue ");
Client* client = new Client();

View File

@@ -35,7 +35,6 @@ void TestQue::test(void)
int* v = 0;
int i = 0;
printf("Test Que ");
for ( i = 0; i < 10; i++ )
{
v = new int(i);

View File

@@ -31,95 +31,163 @@ TestTopicIdMap::~TestTopicIdMap()
delete _map;
}
bool TestTopicIdMap::testGetElement(uint16_t msgid, uint16_t id, MQTTSN_topicTypes type)
{
TopicIdMapelement* elm = _map->getElement((uint16_t)msgid );
if ( elm )
{
//printf("msgid=%d id=%d type=%d\n", msgid, elm->getTopicId(), elm->getTopicType());
return elm->getTopicId() == id && elm->getTopicType() == type;
}
//printf("msgid=%d\n", msgid);
return false;
}
#define MAXID 30
void TestTopicIdMap::test(void)
{
uint16_t id[MAXID];
printf("Test TopicIdMat ");
for ( int i = 0; i < MAXID; i++ )
{
id[i] = i + 1;
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL);
}
for ( int i = 0; i < MAXID; i++ )
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i <= MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == i) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0));
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
//printf("\n");
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = 0; i < 5; i++ )
{
_map->erase(i);
_map->erase(id[i]);
}
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i < 5 && topicId == 0) || (i >= 5 && topicId != 0) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0) );
}
_map->clear();
//printf("\n");
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert( topicId == 0 );
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT);
}
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i <= MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == i) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0));
}
//printf("\n");
for ( int i = 0; i < 5; i++ )
{
_map->erase(i);
}
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i < 5 && topicId == 0) || (i >= 5 && topicId != 0) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0) );
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
_map->clear();
//printf("\n");
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert( topicId == 0 );
}
for ( int i = 0; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT);
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 0; i < 5; i++ )
{
_map->erase(id[i]);
}
for ( int i = 0; i < 5; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
_map->clear();
for ( int i = 0; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED);
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 0; i < 5; i++ )
{
_map->erase(id[i]);
}
for ( int i = 0; i < 5; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
_map->clear();
for ( int i = 0; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
printf("[ OK ]\n");
}

View File

@@ -24,6 +24,7 @@ public:
TestTopicIdMap();
~TestTopicIdMap();
void test(void);
bool testGetElement(uint16_t msgid, uint16_t id, MQTTSN_topicTypes type);
private:
TopicIdMap* _map;

View File

@@ -38,7 +38,7 @@ bool testIsMatch(const char* topicFilter, const char* topicName)
string* filter = new string(topicFilter);
string* name = new string(topicName);
Topic topic(filter);
Topic topic(filter, MQTTSN_TOPIC_TYPE_NORMAL);
bool isMatch = topic.isMatch(name);
delete name;
@@ -46,38 +46,82 @@ bool testIsMatch(const char* topicFilter, const char* topicName)
return isMatch;
}
bool testGetTopic(const char* topicName, const char* searchedTopicName)
bool testGetTopicByName(const char* topicName, const char* searchedTopicName)
{
Topics topics;
string name(topicName);
MQTTSN_topicid topicid;
MQTTSN_topicid topicid, serchId;
topicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicid.data.long_.len = strlen(searchedTopicName);
topicid.data.long_.name = const_cast<char*>(searchedTopicName);
topicid.data.long_.len = strlen(topicName);
topicid.data.long_.name = const_cast<char*>(topicName);
topics.add(&name);
topics.add(&topicid);
return topics.getTopic(&topicid) != 0;
serchId.type = MQTTSN_TOPIC_TYPE_NORMAL;
serchId.data.long_.len = strlen(searchedTopicName);
serchId.data.long_.name = const_cast<char*>(searchedTopicName);
return topics.getTopicByName(&serchId) != 0;
}
bool testGetTopicId(const char* topicName, const char* searchedTopicName)
bool testGetTopicById(const char* topicName, const char* searchedTopicName)
{
Topics topics;
string name(topicName);
MQTTSN_topicid topicid;
MQTTSN_topicid topicid, stopicid;
topicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicid.data.long_.len = strlen(searchedTopicName);
topicid.data.long_.name = const_cast<char*>(searchedTopicName);
topicid.data.long_.len = strlen(topicName);
topicid.data.long_.name = const_cast<char*>(topicName);
stopicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
stopicid.data.long_.len = strlen(searchedTopicName);
stopicid.data.long_.name = const_cast<char*>(searchedTopicName);
topics.add(&name);
Topic* tp = topics.add(&topicid);
Topic*stp = topics.add(&stopicid);
topicid.data.id = tp->getTopicId();
stopicid.data.id = stp->getTopicId();
return topics.getTopicId(&topicid) != 0;
stp = topics.getTopicById(&stopicid);
return stp->getTopicId() == tp->getTopicId();
}
bool testGetPredefinedTopicByName(const char* topicName, const uint16_t id, const char* searchedTopicName)
{
Topics topics;
MQTTSN_topicid topicid;
topics.add(topicName, id);
topicid.type = MQTTSN_TOPIC_TYPE_PREDEFINED;
topicid.data.long_.len = strlen(searchedTopicName);
topicid.data.long_.name = const_cast<char*>(searchedTopicName);
return topics.getTopicByName(&topicid) != 0;
}
bool testGetPredefinedTopicById(const char* topicName, const uint16_t id, uint16_t sid)
{
Topics topics;
MQTTSN_topicid topicid;
Topic* t = topics.add(topicName, id);
topicid.type = MQTTSN_TOPIC_TYPE_PREDEFINED;
topicid.data.id = sid;
Topic* tp = topics.getTopicById(&topicid);
if ( tp )
{
return tp->getTopicId() == id && strcmp(t->getTopicName()->c_str(), topicName) == 0;
}
else
{
return false;
}
}
void TestTopics::test(void)
{
printf("Test Topics ");
const int TOPIC_COUNT = 13;
MQTTSN_topicid topic[TOPIC_COUNT];
@@ -116,6 +160,18 @@ void TestTopics::test(void)
topic[12].data.long_.len = strlen(tp[12]);
topic[12].data.long_.name = tp[12];
/* Test EraseNorma() */
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
MQTTSN_topicid pos = topic[i];
Topic* t = _topics->add(&pos);
//printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId());
assert(t !=0);
}
_topics->eraseNormal();
assert(_topics->getCount() == 0);
/* Add Topic to Topics */
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
@@ -129,26 +185,30 @@ void TestTopics::test(void)
{
string str = "Test/";
str += 0x30 + i;
Topic* t = _topics->add(&str);
Topic* t = _topics->add(str.c_str());
//printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId());
assert(t !=0);
}
/* Get Topic by MQTTSN_topicid */
/* Get Topic by MQTTSN_topicid by Name*/
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
Topic* t = _topics->getTopic(&topic[i]);
//printf("Topic=%s ID=%d ID=%d\n", t->getTopicName()->c_str(), t->getTopicId(),_topics->getTopicId(&topic[i]));
assert(t->getTopicId() == i + 1);
Topic* t = _topics->getTopicByName(&topic[i]);
//printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId());
assert(strcmp(t->getTopicName()->c_str(), topic[i].data.long_.name) == 0 );
}
/* Get TopicId by MQTTSN_topicid */
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
uint16_t id = _topics->getTopicId(&topic[i]);
//printf("ID=%d \n", id);
assert(id == i + 1);
}
/* Get Topic by MQTTSN_topicid by ID*/
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
Topic* t = _topics->getTopicByName(&topic[i]);
MQTTSN_topicid stpid;
stpid.type = MQTTSN_TOPIC_TYPE_NORMAL;
stpid.data.id =t->getTopicId();
Topic* st = _topics->getTopicById(&stpid);
//printf("Topic=%s ID=%d ID=%d\n", t->getTopicName()->c_str(), t->getTopicId(), st->getTopicId());
assert(t->getTopicId() == st->getTopicId() );
}
/* Test Wildcard */
for ( int i = 0; i < 10 ; i++ )
@@ -286,13 +346,20 @@ void TestTopics::test(void)
assert(testIsMatch("/+", "/finance"));
assert(!testIsMatch("+", "/finance"));
assert(testGetTopicId("mytopic", "mytopic"));
assert(!testGetTopicId("mytopic", "mytop"));
assert(!testGetTopicId("mytopic", "mytopiclong"));
assert(testGetTopicById("mytopic", "mytopic"));
assert(!testGetTopicById("mytopic", "mytop"));
assert(!testGetTopicById("mytopic", "mytopiclong"));
assert(testGetTopic("mytopic", "mytopic"));
assert(!testGetTopic("mytopic", "mytop"));
assert(!testGetTopic("mytopic", "mytopiclong"));
assert(testGetTopicByName("mytopic", "mytopic"));
assert(!testGetTopicByName("mytopic", "mytop"));
assert(!testGetTopicByName("mytopic", "mytopiclong"));
assert(testGetPredefinedTopicByName("mypretopic", 1, "mypretopic"));
assert(!testGetPredefinedTopicByName("mypretopic", 1, "mypretop"));
assert(!testGetPredefinedTopicByName("mypretopic", 1, "mypretopiclong"));
assert(testGetPredefinedTopicById("mypretopic2", 2, 2));
assert(!testGetPredefinedTopicById("mypretopic2", 2, 1));
printf("[ OK ]\n");
}

View File

@@ -33,7 +33,6 @@ TestTree23::~TestTree23()
void TestTree23::test(void)
{
printf("Test Tree23 ");
int N = 100;
Key* r1[100];