Merge pull request #120 from eclipse/develop

Add Pre-defined-topic
This commit is contained in:
Tomoaki Yamaguchi
2018-07-17 07:00:50 +09:00
committed by GitHub
38 changed files with 1095 additions and 751 deletions

View File

@@ -90,18 +90,21 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
}
else
{
topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic;
id = client->getTopics()->getTopicId(&topicId);
topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic;
Topic* tp = client->getTopics()->getTopicByName(&topicId);
if ( id > 0 )
{
topicId.data.id = id;
}
if ( tp )
{
topicId.type = tp->getType();
topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic;
topicId.data.id = tp->getTopicId();
}
else
{
/* This message might be subscribed with wild card. */
topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
Topic* topic = client->getTopics()->match(&topicId);
if (topic == 0)
{
@@ -179,11 +182,11 @@ void MQTTGWPublishHandler::handlePuback(Client* client, MQTTGWPacket* packet)
{
Ack ack;
packet->getAck(&ack);
uint16_t topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId);
TopicIdMapelement* topicId = client->getWaitedPubTopicId((uint16_t)ack.msgId);
if (topicId)
{
MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
mqttsnPacket->setPUBACK(topicId, (uint16_t)ack.msgId, 0);
mqttsnPacket->setPUBACK(topicId->getTopicId(), (uint16_t)ack.msgId, 0);
client->eraseWaitedPubTopicId((uint16_t)ack.msgId);
Event* ev1 = new Event();

View File

@@ -38,12 +38,11 @@ void MQTTGWSubscribeHandler::handleSuback(Client* client, MQTTGWPacket* packet)
int qos = 0;
packet->getSUBACK(&msgId, &rc);
uint16_t topicId = client->getWaitedSubTopicId(msgId);
TopicIdMapelement* topicId = client->getWaitedSubTopicId(msgId);
if (topicId)
{
MQTTSNPacket* snPacket = new MQTTSNPacket();
client->eraseWaitedSubTopicId(msgId);
if (rc == 0x80)
{
@@ -54,10 +53,11 @@ void MQTTGWSubscribeHandler::handleSuback(Client* client, MQTTGWPacket* packet)
returnCode = MQTTSN_RC_ACCEPTED;
qos = rc;
}
snPacket->setSUBACK(qos, topicId, msgId, returnCode);
snPacket->setSUBACK(qos, topicId->getTopicId(), msgId, returnCode);
Event* evt = new Event();
evt->setClientSendEvent(client, snPacket);
_gateway->getClientSendQue()->post(evt);
client->eraseWaitedSubTopicId(msgId);
}
}

View File

@@ -15,6 +15,7 @@
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/
#include "MQTTSNGWDefines.h"
#include "MQTTSNGWClient.h"
#include "MQTTSNGateway.h"
#include "SensorNetwork.h"
@@ -124,6 +125,51 @@ bool ClientList::authorize(const char* fileName)
return _authorize;
}
bool ClientList::setPredefinedTopics(const char* fileName)
{
FILE* fp;
char buf[MAX_CLIENTID_LENGTH + 256];
size_t pos0, pos1;
MQTTSNString clientId;
bool rc = false;
clientId.cstring = 0;
clientId.lenstring.data = 0;
clientId.lenstring.len = 0;
if ((fp = fopen(fileName, "r")) != 0)
{
while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
{
if (*buf == '#')
{
continue;
}
string data = string(buf);
while ((pos0 = data.find_first_of("  \t\n")) != string::npos)
{
data.erase(pos0, 1);
}
if (data.empty())
{
continue;
}
pos0 = data.find_first_of(",");
pos1 = data.find(",", pos0 + 1) ;
string id = data.substr(0, pos0);
clientId.cstring = strdup(id.c_str());
string topicName = data.substr(pos0 + 1, pos1 - pos0 -1);
uint16_t topicID = stoul(data.substr(pos1 + 1));
createPredefinedTopic( &clientId, topicName, topicID);
free(clientId.cstring);
}
fclose(fp);
rc = true;
}
return rc;
}
void ClientList::erase(Client*& client)
{
if ( !_authorize && client->erasable())
@@ -179,14 +225,21 @@ Client* ClientList::getClient(void)
return _firstClient;
}
Client* ClientList::getClient(uint8_t* clientId)
Client* ClientList::getClient(MQTTSNString* clientId)
{
_mutex.lock();
Client* client = _firstClient;
const char* clID =clientId->cstring;
if (clID == 0 )
{
clID = clientId->lenstring.data;
}
while (client != 0)
{
if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 )
if (strncmp((const char*)client->getClientId(), clID, MQTTSNstrlen(*clientId)) == 0 )
{
_mutex.unlock();
return client;
@@ -222,7 +275,10 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
/* creat a new client */
client = new Client(secure);
client->setClientAddress(addr);
if ( addr )
{
client->setClientAddress(addr);
}
client->setSensorNetType(unstableLine);
if ( MQTTSNstrlen(*clientId) )
{
@@ -256,6 +312,50 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
return client;
}
Client* ClientList::createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t topicId)
{
Client* client = getClient(clientId);
if ( _authorize && client == 0)
{
return 0;
}
/* anonimous clients */
if ( _clientCnt > MAX_CLIENTS )
{
return 0; // full of clients
}
if ( client == 0 )
{
/* creat a new client */
client = new Client();
client->setClientId(*clientId);
_mutex.lock();
/* add the list */
if ( _firstClient == 0 )
{
_firstClient = client;
_endClient = client;
}
else
{
_endClient->_nextClient = client;
client->_prevClient = _endClient;
_endClient = client;
}
_clientCnt++;
_mutex.unlock();
}
// create Topic & Add it
client->getTopics()->add((const char*)topicName.c_str(), topicId);
return client;
}
uint16_t ClientList::getClientCount()
{
return _clientCnt;
@@ -299,6 +399,7 @@ Client::Client(bool secure)
_prevClient = 0;
_nextClient = 0;
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
_hasPredefTopic = false;
}
Client::~Client()
@@ -334,16 +435,14 @@ Client::~Client()
}
}
uint16_t Client::getWaitedPubTopicId(uint16_t msgId)
TopicIdMapelement* Client::getWaitedPubTopicId(uint16_t msgId)
{
MQTTSN_topicTypes type;
return _waitedPubTopicIdMap.getTopicId(msgId, &type);
return _waitedPubTopicIdMap.getElement(msgId);
}
uint16_t Client::getWaitedSubTopicId(uint16_t msgId)
TopicIdMapelement* Client::getWaitedSubTopicId(uint16_t msgId)
{
MQTTSN_topicTypes type;
return _waitedSubTopicIdMap.getTopicId(msgId, &type);
return _waitedSubTopicIdMap.getElement(msgId);
}
MQTTGWPacket* Client::getClientSleepPacket()
@@ -426,7 +525,7 @@ void Client::setSessionStatus(bool status)
bool Client::erasable(void)
{
return _sessionStatus;
return _sessionStatus || !_hasPredefTopic;
}
void Client::updateStatus(MQTTSNPacket* packet)
@@ -702,13 +801,15 @@ void Client::setOTAClient(Client* cl)
======================================*/
Topic::Topic()
{
_type = MQTTSN_TOPIC_TYPE_NORMAL;
_topicName = 0;
_topicId = 0;
_next = 0;
}
Topic::Topic(string* topic)
Topic::Topic(string* topic, MQTTSN_topicTypes type)
{
_type = type;
_topicName = topic;
_topicId = 0;
_next = 0;
@@ -732,6 +833,11 @@ uint16_t Topic::getTopicId(void)
return _topicId;
}
MQTTSN_topicTypes Topic::getType(void)
{
return _type;
}
bool Topic::isMatch(string* topicName)
{
string::size_type tlen = _topicName->size();
@@ -824,6 +930,11 @@ bool Topic::isMatch(string* topicName)
}
}
void Topic::print(void)
{
WRITELOG("TopicName=%s ID=%d Type=%d\n", _topicName->c_str(), _topicId, _type);
}
/*=====================================
Class Topics
======================================*/
@@ -831,6 +942,7 @@ Topics::Topics()
{
_first = 0;
_nextTopicId = 0;
_cnt = 0;
}
Topics::~Topics()
@@ -844,113 +956,120 @@ Topics::~Topics()
}
}
uint16_t Topics::getTopicId(const MQTTSN_topicid* topicid)
Topic* Topics::getTopicByName(const MQTTSN_topicid* topicid)
{
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL)
{
return 0;
}
Topic* p = _first;
char* ch = topicid->data.long_.name;
string sname = string(ch, ch + topicid->data.long_.len);
while (p)
{
if ( p->_topicName->compare(sname) == 0 )
{
return p;
}
p = p->_next;
}
return 0;
}
Topic* Topics::getTopicById(const MQTTSN_topicid* topicid)
{
Topic* p = _first;
while (p)
{
if ( (int)strlen(p->_topicName->c_str()) == topicid->data.long_.len &&
strncmp(p->_topicName->c_str(), topicid->data.long_.name, topicid->data.long_.len) == 0)
{
return p->_topicId;
}
p = p->_next;
}
return 0;
}
Topic* Topics::getTopic(uint16_t id)
{
Topic* p = _first;
while (p)
{
if (p->_topicId == id)
{
return p;
}
p = p->_next;
}
return 0;
}
Topic* Topics::getTopic(const MQTTSN_topicid* topicid)
{
Topic* p = _first;
while (p)
{
if ( (int)strlen(p->_topicName->c_str()) == topicid->data.long_.len &&
strncmp(p->_topicName->c_str(), topicid->data.long_.name, topicid->data.long_.len) == 0 )
{
return p;
}
if ( p->_type == topicid->type && p->_topicId == topicid->data.id )
{
return p;
}
p = p->_next;
}
return 0;
}
// For MQTTSN_TOPIC_TYPE_NORMAL */
Topic* Topics::add(const MQTTSN_topicid* topicid)
{
Topic* topic;
uint16_t id = 0;
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL )
{
return 0;
}
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL)
{
return 0;
}
id = getTopicId(topicid);
Topic* topic = getTopicByName(topicid);
if (id)
{
topic = getTopic(id);
}
else
{
string topicName = string(topicid->data.long_.name, topicid->data.long_.len);
topic = add(&topicName);
}
return topic;
if ( topic )
{
return topic;
}
string name(topicid->data.long_.name, topicid->data.long_.len);
return add(name.c_str(), 0);
}
Topic* Topics::add(const string* topicName)
Topic* Topics::add(const char* topicName, uint16_t id)
{
Topic* topic = 0;
MQTTSN_topicid topicId;
Topic* tp = _first;
if ( _cnt >= MAX_TOPIC_PAR_CLIENT )
{
return 0;
}
topic = new Topic();
topicId.data.long_.name = (char*)const_cast<char*>(topicName);
topicId.data.long_.len = strlen(topicName);
if (topic == 0)
{
return 0;
}
string* name = new string(*topicName);
topic->_topicName = name;
topic->_topicId = getNextTopicId();
if (tp == 0)
{
_first = topic;
}
Topic* topic = getTopicByName(&topicId);
while (tp)
{
if (tp->_next == 0)
{
tp->_next = topic;
break;
}
else
{
tp = tp->_next;
}
}
return topic;
if ( topic )
{
return topic;
}
topic = new Topic();
if (topic == 0)
{
return 0;
}
string* name = new string(topicName);
topic->_topicName = name;
if ( id == 0 )
{
topic->_type = MQTTSN_TOPIC_TYPE_NORMAL;
topic->_topicId = getNextTopicId();
}
else
{
topic->_type = MQTTSN_TOPIC_TYPE_PREDEFINED;
topic->_topicId = id;
}
_cnt++;
if ( _first == 0)
{
_first = topic;
}
else
{
Topic* tp = _first;
while (tp)
{
if (tp->_next == 0)
{
tp->_next = topic;
break;
}
else
{
tp = tp->_next;
}
}
}
return topic;
}
uint16_t Topics::getNextTopicId()
@@ -978,6 +1097,60 @@ Topic* Topics::match(const MQTTSN_topicid* topicid)
return 0;
}
void Topics::eraseNormal(void)
{
Topic* topic = _first;
Topic* next = 0;
Topic* prev = 0;
while (topic)
{
if ( topic->_type == MQTTSN_TOPIC_TYPE_NORMAL )
{
next = topic->_next;
if ( _first == topic )
{
_first = next;
}
if ( prev )
{
prev->_next = next;
}
delete topic;
_cnt--;
topic = next;
}
else
{
prev = topic;
topic = topic->_next;
}
}
}
void Topics::print(void)
{
Topic* topic = _first;
if (topic == 0 )
{
WRITELOG("No Topic.\n");
}
else
{
while (topic)
{
topic->print();
topic = topic->_next;
}
}
}
uint8_t Topics::getCount(void)
{
return _cnt;
}
/*=====================================
Class TopicIdMap
=====================================*/
@@ -995,6 +1168,16 @@ TopicIdMapelement::~TopicIdMapelement()
}
MQTTSN_topicTypes TopicIdMapelement::getTopicType(void)
{
return _type;
}
uint16_t TopicIdMapelement::getTopicId(void)
{
return _topicId;
}
TopicIdMap::TopicIdMap()
{
_maxInflight = MAX_INFLIGHTMESSAGES;
@@ -1015,28 +1198,27 @@ TopicIdMap::~TopicIdMap()
}
}
uint16_t TopicIdMap::getTopicId(uint16_t msgId, MQTTSN_topicTypes* type)
TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId)
{
TopicIdMapelement* p = _first;
while ( p )
{
if ( p->_msgId == msgId )
{
*type = p->_type;
return p->_topicId;
return p;
}
p = p->_next;
}
return 0;
}
int TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
{
if ( _cnt > _maxInflight * 2 || topicId == 0)
{
return 0;
}
if ( getTopicId(msgId, &type) > 0 )
if ( getElement(msgId) > 0 )
{
erase(msgId);
}
@@ -1058,7 +1240,7 @@ int TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
_end = elm;
}
_cnt++;
return 1;
return elm;
}
void TopicIdMap::erase(uint16_t msgId)

View File

@@ -39,73 +39,73 @@ namespace MQTTSNGW
template<class T> class PacketQue
{
public:
PacketQue()
{
_que = new Que<T>;
}
PacketQue()
{
_que = new Que<T>;
}
~PacketQue()
{
clear();
delete _que;
}
~PacketQue()
{
clear();
delete _que;
}
T* getPacket()
{
T* packet;
if (_que->size() > 0)
{
_mutex.lock();
packet = _que->front();
_mutex.unlock();
return packet;
}
else
{
return 0;
}
}
T* getPacket()
{
T* packet;
if (_que->size() > 0)
{
_mutex.lock();
packet = _que->front();
_mutex.unlock();
return packet;
}
else
{
return 0;
}
}
int
post(T* packet)
{
int rc;
_mutex.lock();
rc = _que->post(packet);
_mutex.unlock();
return rc;
}
int
post(T* packet)
{
int rc;
_mutex.lock();
rc = _que->post(packet);
_mutex.unlock();
return rc;
}
void pop()
{
if (_que->size() > 0)
{
_mutex.lock();
_que->pop();
_mutex.unlock();
}
}
void pop()
{
if (_que->size() > 0)
{
_mutex.lock();
_que->pop();
_mutex.unlock();
}
}
void clear()
{
_mutex.lock();
while (_que->size() > 0)
{
delete _que->front();
_que->pop();
}
_mutex.unlock();
}
void clear()
{
_mutex.lock();
while (_que->size() > 0)
{
delete _que->front();
_que->pop();
}
_mutex.unlock();
}
void setMaxSize(int size)
{
_que->setMaxSize(size);
}
void setMaxSize(int size)
{
_que->setMaxSize(size);
}
private:
Que<T>* _que;
Mutex _mutex;
Que<T>* _que;
Mutex _mutex;
};
@@ -114,19 +114,21 @@ private:
======================================*/
class Topic
{
friend class Topics;
friend class Topics;
public:
Topic();
Topic(string* topic);
~Topic();
string* getTopicName(void);
uint16_t getTopicId(void);
bool isMatch(string* topicName);
Topic();
Topic(string* topic, MQTTSN_topicTypes type);
~Topic();
string* getTopicName(void);
uint16_t getTopicId(void);
MQTTSN_topicTypes getType(void);
bool isMatch(string* topicName);
void print(void);
private:
uint16_t _topicId;
string* _topicName;
Topic* _next;
MQTTSN_topicTypes _type;
uint16_t _topicId;
string* _topicName;
Topic* _next;
};
/*=====================================
@@ -135,20 +137,21 @@ private:
class Topics
{
public:
Topics();
~Topics();
Topic* add(const MQTTSN_topicid* topicid);
Topic* add(const string* topic);
uint16_t getTopicId(const MQTTSN_topicid* topic);
uint16_t getNextTopicId();
Topic* getTopic(uint16_t topicId);
Topic* getTopic(const MQTTSN_topicid* topicid);
Topic* match(const MQTTSN_topicid* topicid);
Topics();
~Topics();
Topic* add(const MQTTSN_topicid* topicid);
Topic* add(const char* topicName, uint16_t id = 0);
Topic* getTopicByName(const MQTTSN_topicid* topic);
Topic* getTopicById(const MQTTSN_topicid* topicid);
Topic* match(const MQTTSN_topicid* topicid);
void eraseNormal(void);
uint16_t getNextTopicId();
void print(void);
uint8_t getCount(void);
private:
uint16_t _nextTopicId;
Topic* _first;
uint16_t _nextTopicId;
Topic* _first;
uint8_t _cnt;
};
/*=====================================
@@ -156,34 +159,36 @@ private:
=====================================*/
class TopicIdMapelement
{
friend class TopicIdMap;
friend class TopicIdMap;
public:
TopicIdMapelement(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
~TopicIdMapelement();
TopicIdMapelement(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
~TopicIdMapelement();
MQTTSN_topicTypes getTopicType(void);
uint16_t getTopicId(void);
private:
uint16_t _msgId;
uint16_t _topicId;
MQTTSN_topicTypes _type;
TopicIdMapelement* _next;
TopicIdMapelement* _prev;
uint16_t _msgId;
uint16_t _topicId;
MQTTSN_topicTypes _type;
TopicIdMapelement* _next;
TopicIdMapelement* _prev;
};
class TopicIdMap
{
public:
TopicIdMap();
~TopicIdMap();
uint16_t getTopicId(uint16_t msgId, MQTTSN_topicTypes* type);
int add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void erase(uint16_t msgId);
void clear(void);
TopicIdMap();
~TopicIdMap();
TopicIdMapelement* getElement(uint16_t msgId);
TopicIdMapelement* add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void erase(uint16_t msgId);
void clear(void);
private:
uint16_t* _msgIds;
TopicIdMapelement* _first;
TopicIdMapelement* _end;
int _cnt;
int _maxInflight;
uint16_t* _msgIds;
TopicIdMapelement* _first;
TopicIdMapelement* _end;
int _cnt;
int _maxInflight;
};
/*=====================================
@@ -191,16 +196,16 @@ private:
=====================================*/
class waitREGACKPacket
{
friend class WaitREGACKPacketList;
friend class WaitREGACKPacketList;
public:
waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
~waitREGACKPacket();
waitREGACKPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
~waitREGACKPacket();
private:
uint16_t _msgId;
MQTTSNPacket* _packet;
waitREGACKPacket* _next;
waitREGACKPacket* _prev;
uint16_t _msgId;
MQTTSNPacket* _packet;
waitREGACKPacket* _next;
waitREGACKPacket* _prev;
};
/*=====================================
@@ -209,15 +214,15 @@ private:
class WaitREGACKPacketList
{
public:
WaitREGACKPacketList();
~WaitREGACKPacketList();
int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
MQTTSNPacket* getPacket(uint16_t REGACKMsgId);
void erase(uint16_t REGACKMsgId);
WaitREGACKPacketList();
~WaitREGACKPacketList();
int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
MQTTSNPacket* getPacket(uint16_t REGACKMsgId);
void erase(uint16_t REGACKMsgId);
private:
waitREGACKPacket* _first;
waitREGACKPacket* _end;
waitREGACKPacket* _first;
waitREGACKPacket* _end;
};
/*=====================================
@@ -227,110 +232,111 @@ private:
typedef enum
{
Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost
Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost
} ClientStatus;
class Client
{
friend class ClientList;
friend class ClientList;
public:
Client(bool secure = false);
Client(uint8_t maxInflightMessages, bool secure);
~Client();
Client(bool secure = false);
Client(uint8_t maxInflightMessages, bool secure);
~Client();
Connect* getConnectData(void);
uint16_t getWaitedPubTopicId(uint16_t msgId);
uint16_t getWaitedSubTopicId(uint16_t msgId);
MQTTGWPacket* getClientSleepPacket(void);
void deleteFirstClientSleepPacket(void);
WaitREGACKPacketList* getWaitREGACKPacketList(void);
Connect* getConnectData(void);
TopicIdMapelement* getWaitedPubTopicId(uint16_t msgId);
TopicIdMapelement* getWaitedSubTopicId(uint16_t msgId);
MQTTGWPacket* getClientSleepPacket(void);
void deleteFirstClientSleepPacket(void);
WaitREGACKPacketList* getWaitREGACKPacketList(void);
void eraseWaitedPubTopicId(uint16_t msgId);
void eraseWaitedSubTopicId(uint16_t msgId);
void clearWaitedPubTopicId(void);
void clearWaitedSubTopicId(void);
void eraseWaitedPubTopicId(uint16_t msgId);
void eraseWaitedSubTopicId(uint16_t msgId);
void clearWaitedPubTopicId(void);
void clearWaitedSubTopicId(void);
int setClientSleepPacket(MQTTGWPacket*);
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
int setClientSleepPacket(MQTTGWPacket*);
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
bool checkTimeover(void);
void updateStatus(MQTTSNPacket*);
void updateStatus(ClientStatus);
void connectSended(void);
void connackSended(int rc);
void disconnected(void);
bool isConnectSendable(void);
bool checkTimeover(void);
void updateStatus(MQTTSNPacket*);
void updateStatus(ClientStatus);
void connectSended(void);
void connackSended(int rc);
void disconnected(void);
bool isConnectSendable(void);
uint16_t getNextPacketId(void);
uint8_t getNextSnMsgId(void);
Topics* getTopics(void);
void setTopics(Topics* topics);
void setKeepAlive(MQTTSNPacket* packet);
uint16_t getNextPacketId(void);
uint8_t getNextSnMsgId(void);
Topics* getTopics(void);
void setTopics(Topics* topics);
void setKeepAlive(MQTTSNPacket* packet);
SensorNetAddress* getSensorNetAddress(void);
Network* getNetwork(void);
void setClientAddress(SensorNetAddress* sensorNetAddr);
void setSensorNetType(bool stable);
SensorNetAddress* getSensorNetAddress(void);
Network* getNetwork(void);
void setClientAddress(SensorNetAddress* sensorNetAddr);
void setSensorNetType(bool stable);
void setClientId(MQTTSNString id);
void setWillTopic(MQTTSNString willTopic);
void setWillMsg(MQTTSNString willmsg);
char* getClientId(void);
char* getWillTopic(void);
char* getWillMsg(void);
const char* getStatus(void);
void setWaitWillMsgFlg(bool);
void setSessionStatus(bool); // true: clean session
bool erasable(void);
void setClientId(MQTTSNString id);
void setWillTopic(MQTTSNString willTopic);
void setWillMsg(MQTTSNString willmsg);
char* getClientId(void);
char* getWillTopic(void);
char* getWillMsg(void);
const char* getStatus(void);
void setWaitWillMsgFlg(bool);
void setSessionStatus(bool); // true: clean session
bool erasable(void);
bool isDisconnect(void);
bool isActive(void);
bool isSleep(void);
bool isAwake(void);
bool isSecureNetwork(void);
bool isSensorNetStable(void);
bool isWaitWillMsg(void);
bool isDisconnect(void);
bool isActive(void);
bool isSleep(void);
bool isAwake(void);
bool isSecureNetwork(void);
bool isSensorNetStable(void);
bool isWaitWillMsg(void);
Client* getNextClient(void);
Client* getOTAClient(void);
void setOTAClient(Client* cl);
Client* getNextClient(void);
Client* getOTAClient(void);
void setOTAClient(Client* cl);
private:
PacketQue<MQTTGWPacket> _clientSleepPacketQue;
WaitREGACKPacketList _waitREGACKList;
PacketQue<MQTTGWPacket> _clientSleepPacketQue;
WaitREGACKPacketList _waitREGACKList;
Topics* _topics;
TopicIdMap _waitedPubTopicIdMap;
TopicIdMap _waitedSubTopicIdMap;
Topics* _topics;
TopicIdMap _waitedPubTopicIdMap;
TopicIdMap _waitedSubTopicIdMap;
Connect _connectData;
MQTTSNPacket* _connAck;
Connect _connectData;
MQTTSNPacket* _connAck;
char* _clientId;
char* _willTopic;
char* _willMsg;
char* _clientId;
char* _willTopic;
char* _willMsg;
Timer _keepAliveTimer;
uint32_t _keepAliveMsec;
Timer _keepAliveTimer;
uint32_t _keepAliveMsec;
ClientStatus _status;
bool _waitWillMsgFlg;
ClientStatus _status;
bool _waitWillMsgFlg;
uint16_t _packetId;
uint8_t _snMsgId;
uint16_t _packetId;
uint8_t _snMsgId;
Network* _network; // Broker
bool _secureNetwork; // SSL
bool _sensorNetype; // false: unstable network like a G3
SensorNetAddress _sensorNetAddr;
Network* _network; // Broker
bool _secureNetwork; // SSL
bool _sensorNetype; // false: unstable network like a G3
SensorNetAddress _sensorNetAddr;
bool _sessionStatus;
bool _sessionStatus;
bool _hasPredefTopic;
Client* _nextClient;
Client* _prevClient;
Client* _otaClient;
Client* _nextClient;
Client* _prevClient;
Client* _otaClient;
};
/*=====================================
@@ -339,23 +345,24 @@ private:
class ClientList
{
public:
ClientList();
~ClientList();
bool authorize(const char* fileName);
void erase(Client*&);
Client* getClient(SensorNetAddress* addr);
Client* getClient(uint8_t* clientId);
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine,
bool secure);
uint16_t getClientCount(void);
Client* getClient(void);
bool isAuthorized();
ClientList();
~ClientList();
bool authorize(const char* fileName);
bool setPredefinedTopics(const char* fileName);
void erase(Client*&);
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, bool secure);
Client* getClient(SensorNetAddress* addr);
Client* getClient(MQTTSNString* clientId);
uint16_t getClientCount(void);
Client* getClient(void);
bool isAuthorized();
private:
Client* _firstClient;
Client* _endClient;
Mutex _mutex;
uint16_t _clientCnt;
bool _authorize;
Client* createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t toipcId);
Client* _firstClient;
Client* _endClient;
Mutex _mutex;
uint16_t _clientCnt;
bool _authorize;
};
}

View File

@@ -115,9 +115,21 @@ void ClientRecvTask::run()
continue;
}
/* create a client */
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
client = _gateway->getClientList()->getClient(&data.clientID);
if ( client )
{
/* set SensorNet Address */
client->setClientAddress(_sensorNetwork->getSenderAddress());
}
else
{
/* create a client */
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
}
log(client, packet, &data.clientID);
if (!client)
{
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
@@ -126,7 +138,6 @@ void ClientRecvTask::run()
}
/* set sensorNetAddress & post Event */
client->setClientAddress(_sensorNetwork->getSenderAddress());
ev = new Event();
ev->setClientRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev);

View File

@@ -116,10 +116,8 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
/* renew the TopicList */
if (topics)
{
delete topics;
topics->eraseNormal();;
}
topics = new Topics();
client->setTopics(topics);
client->setSessionStatus(true);
}

View File

@@ -25,6 +25,7 @@ namespace MQTTSNGW
#define CONFIG_DIRECTORY "./"
#define CONFIG_FILE "gateway.conf"
#define CLIENT_LIST "clients.conf"
#define PREDEFINEDTOPIC_FILE "predefinedTopic.conf"
/*==========================================================
* Gateway default parameters
@@ -39,6 +40,7 @@ namespace MQTTSNGW
#define MAX_CLIENTID_LENGTH (64) // Max length of clientID
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen)
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes

View File

@@ -40,8 +40,8 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
int qos;
uint8_t retained;
uint16_t msgId;
MQTTSN_topicid topicid;
uint8_t* payload;
MQTTSN_topicid topicid;
int payloadlen;
Publish pub;
char shortTopic[2];
@@ -68,68 +68,6 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
Topic* topic = 0;
if ( topicid.type == MQTTSN_TOPIC_TYPE_PREDEFINED)
{
if(msgId)
{
/* Reply PubAck to the client */
MQTTSNPacket* pubAck = new MQTTSNPacket();
pubAck->setPUBACK( topicid.data.id, msgId, MQTTSN_RC_ACCEPTED);
Event* ev1 = new Event();
ev1->setClientSendEvent(client, pubAck);
_gateway->getClientSendQue()->post(ev1);
}
#ifdef OTA_CLIENTS
if ( topicid.data.id == PREDEFINEDID_OTA_REQ )
{
uint8_t clientId[MAX_CLIENTID_LENGTH + 1];
if ( payloadlen <= MAX_CLIENTID_LENGTH )
{
memcpy(clientId, payload, payloadlen);
clientId[payloadlen] = 0;
Client* cl = _gateway->getClientList()->getClient(clientId);
if ( cl )
{
WRITELOG("\033[0m\033[0;33m OTA Client : %s\033[0m\033[0;37m\n",cl->getClientId());
MQTTSNPacket* pubota = new MQTTSNPacket();
pubota->setPUBLISH(0, 0, 0, 0, topicid, 0, 0);
cl->setOTAClient(client);
Event* evt = new Event();
evt->setClientSendEvent(cl, pubota);
_gateway->getClientSendQue()->post(evt);
}
else
{
MQTTSNPacket* publish = new MQTTSNPacket();
topicid.data.id = PREDEFINEDID_OTA_NO_CLIENT;
publish->setPUBLISH(0, 0, 0, 0, topicid, clientId, (uint16_t)strlen((const char*)clientId));
Event* evt = new Event();
evt->setClientSendEvent(client, publish);
_gateway->getClientSendQue()->post(evt);
}
}
}
else if ( topicid.data.id == PREDEFINEDID_OTA_READY )
{
Client* cl = client->getOTAClient();
if ( cl )
{
WRITELOG("\033[0m\033[0;33m OTA Manager : %s\033[0m\033[0;37m\n",cl->getClientId());
MQTTSNPacket* pubota = new MQTTSNPacket();
pubota->setPUBLISH(0, 0, 0, 0, topicid, payload, payloadlen);
client->setOTAClient(0);
Event* evt = new Event();
evt->setClientSendEvent(cl, pubota);
_gateway->getClientSendQue()->post(evt);
}
}
#endif
return;
}
if( topicid.type == MQTTSN_TOPIC_TYPE_SHORT )
{
shortTopic[0] = topicid.data.short_name[0];
@@ -137,10 +75,10 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
pub.topic = shortTopic;
pub.topiclen = 2;
}
if ( topicid.type == MQTTSN_TOPIC_TYPE_NORMAL )
else
{
topic = client->getTopics()->getTopic(topicid.data.id);
topic = client->getTopics()->getTopicById(&topicid);
if( !topic && msgId && qos > 0 )
{
/* Reply PubAck with INVALID_TOPIC_ID to the client */

View File

@@ -41,160 +41,122 @@ void MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packe
uint16_t msgId;
MQTTSN_topicid topicFilter;
Topic* topic = 0;
uint16_t topicId = 0;
MQTTGWPacket* subscribe;
Event* ev1;
Event* evsuback;
if ( packet->getSUBSCRIBE(&dup, &qos, &msgId, &topicFilter) == 0 )
{
return;
}
if (topicFilter.type <= MQTTSN_TOPIC_TYPE_SHORT)
if ( msgId == 0 )
{
if (topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED)
{
/*----- Predefined TopicId ------*/
MQTTSNPacket* sSuback = new MQTTSNPacket();
if (msgId)
{
int rc = MQTTSN_RC_ACCEPTED;
switch (topicFilter.data.id)
{
case PREDEFINEDID_OTA_REQ: // check topicIds are defined.
case PREDEFINEDID_OTA_READY:
case PREDEFINEDID_OTA_NO_CLIENT:
break;
default:
rc = MQTTSN_RC_REJECTED_INVALID_TOPIC_ID;
}
sSuback->setSUBACK(qos, topicFilter.data.id, msgId, rc);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sSuback);
_gateway->getClientSendQue()->post(evsuback);
}
switch (topicFilter.data.id)
{
case 1:
/*
* ToDo: write here Predefined Topic 01 Procedures.
*/
break;
case 2:
/*
* ToDo: write here Predefined Topic 02 Procedures. so on
*/
break;
default:
break;
}
}
else
{
uint16_t topicId = 0;
MQTTGWPacket* subscribe = new MQTTGWPacket();
topic = client->getTopics()->getTopic(&topicFilter);
if (topic == 0)
{
if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
{
topic = client->getTopics()->add(&topicFilter);
topicId = topic->getTopicId();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{
char topic[3];
topic[0] = topicFilter.data.short_name[0];
topic[1] = topicFilter.data.short_name[1];
topic[2] = 0;
topicId = topicFilter.data.id;
subscribe->setSUBSCRIBE(topic, (uint8_t)qos, (uint16_t)msgId);
}
}
else
{
topicId = topic->getTopicId();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
if ( msgId > 0 )
{
client->setWaitedSubTopicId(msgId, topicId, topicFilter.type);
}
Event* ev1 = new Event();
ev1->setBrokerSendEvent(client, subscribe);
_gateway->getBrokerSendQue()->post(ev1);
return;
}
}
else
{
/*-- Invalid TopicIdType --*/
if (msgId)
{
MQTTSNPacket* sSuback = new MQTTSNPacket();
sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_REJECTED_INVALID_TOPIC_ID);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sSuback);
_gateway->getClientSendQue()->post(evsuback);
}
return;
}
if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED )
{
topic = client->getTopics()->getTopicById(&topicFilter);
if ( topic )
{
topicId = topic->getTopicId();
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
else
{
goto RespExit;
}
}
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
{
topic = client->getTopics()->getTopicByName(&topicFilter);
if ( topic == 0 )
{
topic = client->getTopics()->add(&topicFilter);
if ( topic == 0 )
{
WRITELOG("%s Client(%s) can't add the Topic.%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return;
}
}
topicId = topic->getTopicId();
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
}
else //MQTTSN_TOPIC_TYPE_SHORT
{
char topicstr[3];
topicstr[0] = topicFilter.data.short_name[0];
topicstr[1] = topicFilter.data.short_name[1];
topicstr[2] = 0;
topicId = 0;
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE(topicstr, (uint8_t)qos, (uint16_t)msgId);
}
client->setWaitedSubTopicId(msgId, topicId, topicFilter.type);
ev1 = new Event();
ev1->setBrokerSendEvent(client, subscribe);
_gateway->getBrokerSendQue()->post(ev1);
return;
RespExit:
MQTTSNPacket* sSuback = new MQTTSNPacket();
sSuback->setSUBACK(qos, topicFilter.data.id, msgId, MQTTSN_RC_NOT_SUPPORTED);
evsuback = new Event();
evsuback->setClientSendEvent(client, sSuback);
_gateway->getClientSendQue()->post(evsuback);
}
void MQTTSNSubscribeHandler::handleUnsubscribe(Client* client, MQTTSNPacket* packet)
{
uint16_t msgId;
MQTTSN_topicid topicFilter;
MQTTGWPacket* unsubscribe = 0;;
if ( packet->getUNSUBSCRIBE(&msgId, &topicFilter) == 0 )
{
return;
}
if ( topicFilter.type == MQTTSN_TOPIC_TYPE_PREDEFINED )
{
/*
* ToDo: procedures for Predefined Topic
*/
return;
}
if ( msgId == 0 )
{
return;
}
Topic* topic = client->getTopics()->getTopic(&topicFilter);
MQTTGWPacket* unsubscribe = new MQTTGWPacket();
Topic* topic = client->getTopics()->getTopicById(&topicFilter);
if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{
if ( topic == 0 )
{
if (msgId)
{
MQTTSNPacket* sUnsuback = new MQTTSNPacket();
sUnsuback->setUNSUBACK(msgId);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sUnsuback);
_gateway->getClientSendQue()->post(evsuback);
}
delete unsubscribe;
return;
}
else
{
unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId);
}
}
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{
char shortTopic[3];
shortTopic[0] = topicFilter.data.short_name[0];
shortTopic[1] = topicFilter.data.short_name[1];
shortTopic[2] = 0;
unsubscribe->setUNSUBSCRIBE(shortTopic, msgId);
char shortTopic[3];
shortTopic[0] = topicFilter.data.short_name[0];
shortTopic[1] = topicFilter.data.short_name[1];
shortTopic[2] = 0;
unsubscribe = new MQTTGWPacket();
unsubscribe->setUNSUBSCRIBE(shortTopic, msgId);
}
else
{
delete unsubscribe;
return;
if ( topic == 0 )
{
MQTTSNPacket* sUnsuback = new MQTTSNPacket();
sUnsuback->setUNSUBACK(msgId);
Event* evsuback = new Event();
evsuback->setClientSendEvent(client, sUnsuback);
_gateway->getClientSendQue()->post(evsuback);
return;
}
else
{
unsubscribe = new MQTTGWPacket();
unsubscribe->setUNSUBSCRIBE(topic->getTopicName()->c_str(), msgId);
}
}
Event* ev1 = new Event();

View File

@@ -17,6 +17,6 @@
#ifndef MQTTSNGWVERSION_H_IN_
#define MQTTSNGWVERSION_H_IN_
#define PAHO_GATEWAY_VERSION "1.0.1"
#define PAHO_GATEWAY_VERSION "1.1.0"
#endif /* MQTTSNGWVERSION_H_IN_ */

View File

@@ -13,7 +13,7 @@
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include "MQTTSNGWDefines.h"
#include "MQTTSNGateway.h"
#include "SensorNetwork.h"
#include "MQTTSNGWProcess.h"
@@ -202,13 +202,40 @@ void Gateway::initialize(int argc, char** argv)
if (!_clientList.authorize(fileName.c_str()))
{
throw Exception("Gateway::initialize: No client list defined by configuration.");
throw Exception("Gateway::initialize: No client list defined by the configuration.");
}
_params.clientListName = strdup(fileName.c_str());
}
}
fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str());
if (getParam("PredefinedTopic", param) == 0 )
{
if (!strcasecmp(param, "YES") )
{
if (getParam("PredefinedTopicFile", param) == 0)
{
fileName =*getConfigDirName() + string(param);
}
else
{
fileName = *getConfigDirName() + string(PREDEFINEDTOPIC_FILE);
}
if (!_clientList.setPredefinedTopics(fileName.c_str()))
{
throw Exception("Gateway::initialize: No PredefinedTopic file defined by the configuration..");
}
_params.predefinedTopicFileName = strdup(fileName.c_str());
}
else
{
_params.predefinedTopicFileName = 0;
}
}
fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str());
}
void Gateway::run(void)
@@ -229,6 +256,10 @@ void Gateway::run(void)
}
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
if ( _params.predefinedTopicFileName )
{
WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
}
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
WRITELOG(" CertKey: %s\n", _params.certKey);

View File

@@ -72,10 +72,10 @@ namespace MQTTSNGW
/*=====================================
Predefined TopicId for OTA
====================================*/
#define OTA_CLIENTS
#define PREDEFINEDID_OTA_REQ (0x0ff0)
#define PREDEFINEDID_OTA_READY (0x0ff1)
#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
//#define OTA_CLIENTS
//#define PREDEFINEDID_OTA_REQ (0x0ff0)
//#define PREDEFINEDID_OTA_READY (0x0ff1)
//#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
/*=====================================
Class Event
@@ -160,6 +160,7 @@ typedef struct
char* rootCAfile;
char* certKey;
char* privateKey;
char* predefinedTopicFileName;
}GatewayParams;
/*=====================================

View File

@@ -403,7 +403,7 @@ void XBee::setApiMode(uint8_t mode)
SerialPort::SerialPort()
{
_tio.c_iflag = IGNBRK | IGNPAR;
_tio.c_cflag = CS8 | CLOCAL | CRTSCTS;
_tio.c_cflag = CS8 | CLOCAL | CRTSCTS | CREAD;
_tio.c_cc[VINTR] = 0;
_tio.c_cc[VTIME] = 10; // 1 sec.
_tio.c_cc[VMIN] = 1;
@@ -464,7 +464,7 @@ bool SerialPort::recv(unsigned char* buf)
FD_SET(_fd, &rfds);
timeout.tv_sec = 0;
timeout.tv_usec = 500000; // 500ms
if ( select(1, &rfds, 0, 0, &timeout) > 0 )
if ( select(_fd + 1, &rfds, 0, 0, &timeout) > 0 )
{
if (read(_fd, buf, 1) > 0)
{

View File

@@ -87,27 +87,29 @@ void TestProcess::run(void)
printf("Timer Test completed\n\n");
/* Test Que */
printf("Test Que ");
TestQue* tque = new TestQue();
tque->test();
delete tque;
/* Test Tree23 */
printf("Test Tree23 ");
TestTree23* tree23 = new TestTree23();
tree23->test();
delete tree23;
/* Test TopicTable */
printf("Test Topic ");
TestTopics* testTopic = new TestTopics();
testTopic->test();
delete testTopic;
/* Test TopicIdMap */
printf("Test TopicIdMap ");
TestTopicIdMap* testMap = new TestTopicIdMap();
testMap->test();
delete testMap;
/* Test EventQue */
printf("Test EventQue ");
Client* client = new Client();

View File

@@ -35,7 +35,6 @@ void TestQue::test(void)
int* v = 0;
int i = 0;
printf("Test Que ");
for ( i = 0; i < 10; i++ )
{
v = new int(i);

View File

@@ -31,95 +31,163 @@ TestTopicIdMap::~TestTopicIdMap()
delete _map;
}
bool TestTopicIdMap::testGetElement(uint16_t msgid, uint16_t id, MQTTSN_topicTypes type)
{
TopicIdMapelement* elm = _map->getElement((uint16_t)msgid );
if ( elm )
{
//printf("msgid=%d id=%d type=%d\n", msgid, elm->getTopicId(), elm->getTopicType());
return elm->getTopicId() == id && elm->getTopicType() == type;
}
//printf("msgid=%d\n", msgid);
return false;
}
#define MAXID 30
void TestTopicIdMap::test(void)
{
uint16_t id[MAXID];
printf("Test TopicIdMat ");
for ( int i = 0; i < MAXID; i++ )
{
id[i] = i + 1;
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL);
}
for ( int i = 0; i < MAXID; i++ )
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i <= MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == i) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0));
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
//printf("\n");
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = 0; i < 5; i++ )
{
_map->erase(i);
_map->erase(id[i]);
}
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i < 5 && topicId == 0) || (i >= 5 && topicId != 0) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0) );
}
_map->clear();
//printf("\n");
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_SHORT;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert( topicId == 0 );
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT);
}
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i <= MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == i) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0));
}
//printf("\n");
for ( int i = 0; i < 5; i++ )
{
_map->erase(i);
}
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert((i < 5 && topicId == 0) || (i >= 5 && topicId != 0) || (i > MAX_INFLIGHTMESSAGES * 2 + 1 && topicId == 0) );
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
_map->clear();
//printf("\n");
for ( int i = 0; i < MAXID; i++ )
{
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
uint16_t topicId = _map->getTopicId((uint16_t)i, &type);
//printf("TopicId=%d msgId=%d type=%d\n", topicId, i, type);
assert( topicId == 0 );
}
for ( int i = 0; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT);
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_NORMAL));
}
for ( int i = 0; i < 5; i++ )
{
_map->erase(id[i]);
}
for ( int i = 0; i < 5; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
_map->clear();
for ( int i = 0; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 0; i < MAXID; i++ )
{
_map->add(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED);
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = 0; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_SHORT));
}
for ( int i = 0; i < 5; i++ )
{
_map->erase(id[i]);
}
for ( int i = 0; i < 5; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = 5; i < MAX_INFLIGHTMESSAGES * 2 + 1; i++ )
{
assert(testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
for ( int i = MAX_INFLIGHTMESSAGES * 2 + 1; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
_map->clear();
for ( int i = 0; i < MAXID; i++ )
{
assert(!testGetElement(id[i], id[i], MQTTSN_TOPIC_TYPE_PREDEFINED));
}
printf("[ OK ]\n");
}

View File

@@ -24,6 +24,7 @@ public:
TestTopicIdMap();
~TestTopicIdMap();
void test(void);
bool testGetElement(uint16_t msgid, uint16_t id, MQTTSN_topicTypes type);
private:
TopicIdMap* _map;

View File

@@ -38,7 +38,7 @@ bool testIsMatch(const char* topicFilter, const char* topicName)
string* filter = new string(topicFilter);
string* name = new string(topicName);
Topic topic(filter);
Topic topic(filter, MQTTSN_TOPIC_TYPE_NORMAL);
bool isMatch = topic.isMatch(name);
delete name;
@@ -46,38 +46,82 @@ bool testIsMatch(const char* topicFilter, const char* topicName)
return isMatch;
}
bool testGetTopic(const char* topicName, const char* searchedTopicName)
bool testGetTopicByName(const char* topicName, const char* searchedTopicName)
{
Topics topics;
string name(topicName);
MQTTSN_topicid topicid;
MQTTSN_topicid topicid, serchId;
topicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicid.data.long_.len = strlen(searchedTopicName);
topicid.data.long_.name = const_cast<char*>(searchedTopicName);
topicid.data.long_.len = strlen(topicName);
topicid.data.long_.name = const_cast<char*>(topicName);
topics.add(&name);
topics.add(&topicid);
return topics.getTopic(&topicid) != 0;
serchId.type = MQTTSN_TOPIC_TYPE_NORMAL;
serchId.data.long_.len = strlen(searchedTopicName);
serchId.data.long_.name = const_cast<char*>(searchedTopicName);
return topics.getTopicByName(&serchId) != 0;
}
bool testGetTopicId(const char* topicName, const char* searchedTopicName)
bool testGetTopicById(const char* topicName, const char* searchedTopicName)
{
Topics topics;
string name(topicName);
MQTTSN_topicid topicid;
MQTTSN_topicid topicid, stopicid;
topicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicid.data.long_.len = strlen(searchedTopicName);
topicid.data.long_.name = const_cast<char*>(searchedTopicName);
topicid.data.long_.len = strlen(topicName);
topicid.data.long_.name = const_cast<char*>(topicName);
stopicid.type = MQTTSN_TOPIC_TYPE_NORMAL;
stopicid.data.long_.len = strlen(searchedTopicName);
stopicid.data.long_.name = const_cast<char*>(searchedTopicName);
topics.add(&name);
Topic* tp = topics.add(&topicid);
Topic*stp = topics.add(&stopicid);
topicid.data.id = tp->getTopicId();
stopicid.data.id = stp->getTopicId();
return topics.getTopicId(&topicid) != 0;
stp = topics.getTopicById(&stopicid);
return stp->getTopicId() == tp->getTopicId();
}
bool testGetPredefinedTopicByName(const char* topicName, const uint16_t id, const char* searchedTopicName)
{
Topics topics;
MQTTSN_topicid topicid;
topics.add(topicName, id);
topicid.type = MQTTSN_TOPIC_TYPE_PREDEFINED;
topicid.data.long_.len = strlen(searchedTopicName);
topicid.data.long_.name = const_cast<char*>(searchedTopicName);
return topics.getTopicByName(&topicid) != 0;
}
bool testGetPredefinedTopicById(const char* topicName, const uint16_t id, uint16_t sid)
{
Topics topics;
MQTTSN_topicid topicid;
Topic* t = topics.add(topicName, id);
topicid.type = MQTTSN_TOPIC_TYPE_PREDEFINED;
topicid.data.id = sid;
Topic* tp = topics.getTopicById(&topicid);
if ( tp )
{
return tp->getTopicId() == id && strcmp(t->getTopicName()->c_str(), topicName) == 0;
}
else
{
return false;
}
}
void TestTopics::test(void)
{
printf("Test Topics ");
const int TOPIC_COUNT = 13;
MQTTSN_topicid topic[TOPIC_COUNT];
@@ -116,6 +160,18 @@ void TestTopics::test(void)
topic[12].data.long_.len = strlen(tp[12]);
topic[12].data.long_.name = tp[12];
/* Test EraseNorma() */
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
MQTTSN_topicid pos = topic[i];
Topic* t = _topics->add(&pos);
//printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId());
assert(t !=0);
}
_topics->eraseNormal();
assert(_topics->getCount() == 0);
/* Add Topic to Topics */
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
@@ -129,26 +185,30 @@ void TestTopics::test(void)
{
string str = "Test/";
str += 0x30 + i;
Topic* t = _topics->add(&str);
Topic* t = _topics->add(str.c_str());
//printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId());
assert(t !=0);
}
/* Get Topic by MQTTSN_topicid */
/* Get Topic by MQTTSN_topicid by Name*/
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
Topic* t = _topics->getTopic(&topic[i]);
//printf("Topic=%s ID=%d ID=%d\n", t->getTopicName()->c_str(), t->getTopicId(),_topics->getTopicId(&topic[i]));
assert(t->getTopicId() == i + 1);
Topic* t = _topics->getTopicByName(&topic[i]);
//printf("Topic=%s ID=%d\n", t->getTopicName()->c_str(), t->getTopicId());
assert(strcmp(t->getTopicName()->c_str(), topic[i].data.long_.name) == 0 );
}
/* Get TopicId by MQTTSN_topicid */
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
uint16_t id = _topics->getTopicId(&topic[i]);
//printf("ID=%d \n", id);
assert(id == i + 1);
}
/* Get Topic by MQTTSN_topicid by ID*/
for ( int i = 0; i < TOPIC_COUNT; i++ )
{
Topic* t = _topics->getTopicByName(&topic[i]);
MQTTSN_topicid stpid;
stpid.type = MQTTSN_TOPIC_TYPE_NORMAL;
stpid.data.id =t->getTopicId();
Topic* st = _topics->getTopicById(&stpid);
//printf("Topic=%s ID=%d ID=%d\n", t->getTopicName()->c_str(), t->getTopicId(), st->getTopicId());
assert(t->getTopicId() == st->getTopicId() );
}
/* Test Wildcard */
for ( int i = 0; i < 10 ; i++ )
@@ -286,13 +346,20 @@ void TestTopics::test(void)
assert(testIsMatch("/+", "/finance"));
assert(!testIsMatch("+", "/finance"));
assert(testGetTopicId("mytopic", "mytopic"));
assert(!testGetTopicId("mytopic", "mytop"));
assert(!testGetTopicId("mytopic", "mytopiclong"));
assert(testGetTopicById("mytopic", "mytopic"));
assert(!testGetTopicById("mytopic", "mytop"));
assert(!testGetTopicById("mytopic", "mytopiclong"));
assert(testGetTopic("mytopic", "mytopic"));
assert(!testGetTopic("mytopic", "mytop"));
assert(!testGetTopic("mytopic", "mytopiclong"));
assert(testGetTopicByName("mytopic", "mytopic"));
assert(!testGetTopicByName("mytopic", "mytop"));
assert(!testGetTopicByName("mytopic", "mytopiclong"));
assert(testGetPredefinedTopicByName("mypretopic", 1, "mypretopic"));
assert(!testGetPredefinedTopicByName("mypretopic", 1, "mypretop"));
assert(!testGetPredefinedTopicByName("mypretopic", 1, "mypretopiclong"));
assert(testGetPredefinedTopicById("mypretopic2", 2, 2));
assert(!testGetPredefinedTopicById("mypretopic2", 2, 1));
printf("[ OK ]\n");
}

View File

@@ -33,7 +33,6 @@ TestTree23::~TestTree23()
void TestTree23::test(void)
{
printf("Test Tree23 ");
int N = 100;
Key* r1[100];