Merge pull request #46 from ty4tw/gateway

BugFix of #45
This commit is contained in:
Ian Craggs
2017-02-24 15:38:45 +00:00
committed by GitHub
6 changed files with 87 additions and 77 deletions

View File

@@ -49,6 +49,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
/* create MQTTSN_topicid */ /* create MQTTSN_topicid */
MQTTSN_topicid topicId; MQTTSN_topicid topicId;
uint16_t id = 0;
if (pub.topiclen == 2) if (pub.topiclen == 2)
{ {
@@ -61,73 +62,77 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
topicId.type = MQTTSN_TOPIC_TYPE_NORMAL; topicId.type = MQTTSN_TOPIC_TYPE_NORMAL;
topicId.data.long_.len = pub.topiclen; topicId.data.long_.len = pub.topiclen;
topicId.data.long_.name = pub.topic; topicId.data.long_.name = pub.topic;
unsigned short id = client->getTopics()->getTopicId(&topicId); id = client->getTopics()->getTopicId(&topicId);
topicId.data.id = id;
}
if (topicId.data.id == 0) if ( id > 0 )
{
/* This message might be subscribed with wild card. */
Topic* topic = client->getTopics()->match(&topicId);
if (topic == 0)
{ {
WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n");
if (pub.header.bits.qos == 1)
{
replyACK(client, &pub, PUBACK);
}
else if ( pub.header.bits.qos == 2 )
{
replyACK(client, &pub, PUBREC);
}
return;
}
/* add the Topic and get a TopicId */
topic = client->getTopics()->add(&topicId);
uint16_t id = topic->getTopicId();
if (id > 0)
{
/* create REGACK */
MQTTSNPacket* regPacket = new MQTTSNPacket();
MQTTSNString topicName;
topicName.lenstring.len = topicId.data.long_.len;
topicName.lenstring.data = topicId.data.long_.name;
uint16_t regackMsgId = client->getNextSnMsgId();
regPacket->setREGISTER(id, regackMsgId, &topicName);
if (client->isSleep())
{
client->setClientSleepPacket(regPacket);
WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(),
RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved.");
}
else if (client->isActive())
{
/* send REGISTER */
Event* evrg = new Event();
evrg->setClientSendEvent(client, regPacket);
_gateway->getClientSendQue()->post(evrg);
}
/* send PUBLISH */
topicId.data.id = id; topicId.data.id = id;
snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
(uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
pub.payloadlen);
client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId);
} }
else else
{ {
WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n"); /* This message might be subscribed with wild card. */
return; Topic* topic = client->getTopics()->match(&topicId);
if (topic == 0)
{
WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n");
if (pub.header.bits.qos == 1)
{
replyACK(client, &pub, PUBACK);
}
else if ( pub.header.bits.qos == 2 )
{
replyACK(client, &pub, PUBREC);
}
return;
}
/* add the Topic and get a TopicId */
topic = client->getTopics()->add(&topicId);
id = topic->getTopicId();
if (id > 0)
{
/* create REGACK */
MQTTSNPacket* regPacket = new MQTTSNPacket();
MQTTSNString topicName;
topicName.lenstring.len = topicId.data.long_.len;
topicName.lenstring.data = topicId.data.long_.name;
uint16_t regackMsgId = client->getNextSnMsgId();
regPacket->setREGISTER(id, regackMsgId, &topicName);
if (client->isSleep())
{
client->setClientSleepPacket(regPacket);
WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(),
RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved.");
}
else if (client->isActive())
{
/* send REGISTER */
Event* evrg = new Event();
evrg->setClientSendEvent(client, regPacket);
_gateway->getClientSendQue()->post(evrg);
}
/* send PUBLISH */
topicId.data.id = id;
snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
(uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
pub.payloadlen);
client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId);
}
else
{
WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n");
delete snPacket;
return;
}
} }
} }
/* TopicId was aquired. */ /* TopicId was acquired. */
if (client->isSleep()) if (client->isSleep())
{ {
/* client is sleeping. save PUBLISH */ /* client is sleeping. save PUBLISH */

View File

@@ -136,7 +136,7 @@ void BrokerRecvTask::run(void)
client->getNetwork()->close(); client->getNetwork()->close();
delete packet; delete packet;
/* delete client when the client is not authorized on & session is clean */ /* delete client when the client is not authorized & session is clean */
_gateway->getClientList()->erase(client); _gateway->getClientList()->erase(client);
if ( client ) if ( client )

View File

@@ -830,7 +830,7 @@ Topics::~Topics()
} }
} }
uint16_t Topics::getTopicId(MQTTSN_topicid* topicid) uint16_t Topics::getTopicId(const MQTTSN_topicid* topicid)
{ {
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL) if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL)
{ {
@@ -863,7 +863,7 @@ Topic* Topics::getTopic(uint16_t id)
return 0; return 0;
} }
Topic* Topics::getTopic(MQTTSN_topicid* topicid) Topic* Topics::getTopic(const MQTTSN_topicid* topicid)
{ {
Topic* p = _first; Topic* p = _first;
while (p) while (p)
@@ -877,7 +877,7 @@ Topic* Topics::getTopic(MQTTSN_topicid* topicid)
return 0; return 0;
} }
Topic* Topics::add(MQTTSN_topicid* topicid) Topic* Topics::add(const MQTTSN_topicid* topicid)
{ {
Topic* topic; Topic* topic;
uint16_t id = 0; uint16_t id = 0;
@@ -901,7 +901,7 @@ Topic* Topics::add(MQTTSN_topicid* topicid)
} }
Topic* Topics::add(string* topicName) Topic* Topics::add(const string* topicName)
{ {
Topic* topic = 0; Topic* topic = 0;
@@ -942,7 +942,7 @@ uint16_t Topics::getNextTopicId()
return ++_nextTopicId == 0xffff ? _nextTopicId += 2 : _nextTopicId; return ++_nextTopicId == 0xffff ? _nextTopicId += 2 : _nextTopicId;
} }
Topic* Topics::match(MQTTSN_topicid* topicid) Topic* Topics::match(const MQTTSN_topicid* topicid)
{ {
if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL) if (topicid->type != MQTTSN_TOPIC_TYPE_NORMAL)
{ {

View File

@@ -127,13 +127,13 @@ class Topics
public: public:
Topics(); Topics();
~Topics(); ~Topics();
Topic* add(MQTTSN_topicid* topicid); Topic* add(const MQTTSN_topicid* topicid);
Topic* add(string* topic); Topic* add(const string* topic);
uint16_t getTopicId(MQTTSN_topicid* topic); uint16_t getTopicId(const MQTTSN_topicid* topic);
uint16_t getNextTopicId(); uint16_t getNextTopicId();
Topic* getTopic(uint16_t topicId); Topic* getTopic(uint16_t topicId);
Topic* getTopic(MQTTSN_topicid* topicid); Topic* getTopic(const MQTTSN_topicid* topicid);
Topic* match(MQTTSN_topicid* topicid); Topic* match(const MQTTSN_topicid* topicid);
private: private:
uint16_t _nextTopicId; uint16_t _nextTopicId;

View File

@@ -55,14 +55,11 @@ void MQTTSNConnectionHandler::handleSearchgw(MQTTSNPacket* packet)
{ {
if (packet->getType() == MQTTSN_SEARCHGW) if (packet->getType() == MQTTSN_SEARCHGW)
{ {
//if (_gateway->getClientList()->getClientCount() < MAX_CLIENTS) MQTTSNPacket* gwinfo = new MQTTSNPacket();
//{ gwinfo->setGWINFO(_gateway->getGWParams()->gatewayId);
MQTTSNPacket* gwinfo = new MQTTSNPacket(); Event* ev1 = new Event();
gwinfo->setGWINFO(_gateway->getGWParams()->gatewayId); ev1->setBrodcastEvent(gwinfo);
Event* ev1 = new Event(); _gateway->getClientSendQue()->post(ev1);
ev1->setBrodcastEvent(gwinfo);
_gateway->getClientSendQue()->post(ev1);
//}
} }
} }

View File

@@ -290,7 +290,15 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co
{ {
SSL_load_error_strings(); SSL_load_error_strings();
SSL_library_init(); SSL_library_init();
#if ( OPENSSL_VERSION_NUMBER >= 0x10100000L )
_ctx = SSL_CTX_new(TLS_client_method()); _ctx = SSL_CTX_new(TLS_client_method());
#elif ( OPENSSL_VERSION_NUMBER >= 0x10001000L )
_ctx = SSL_CTX_new(TLSv1_client_method());
#else
_ctx = SSL_CTX_new(SSLv23_client_method());
#endif
if (_ctx == 0) if (_ctx == 0)
{ {
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));