Merge pull request #103 from tieto/develop

Gateway improvements
This commit is contained in:
Tomoaki Yamaguchi
2018-02-24 09:25:50 +09:00
committed by GitHub
6 changed files with 28 additions and 10 deletions

View File

@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/ **************************************************************************************/
#include "MQTTGWPublishHandler.h" #include "MQTTGWPublishHandler.h"
@@ -113,6 +114,8 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
{ {
replyACK(client, &pub, PUBREC); replyACK(client, &pub, PUBREC);
} }
delete snPacket;
return; return;
} }
@@ -126,6 +129,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
MQTTSNPacket* regPacket = new MQTTSNPacket(); MQTTSNPacket* regPacket = new MQTTSNPacket();
MQTTSNString topicName; MQTTSNString topicName;
topicName.cstring = 0;
topicName.lenstring.len = topicId.data.long_.len; topicName.lenstring.len = topicId.data.long_.len;
topicName.lenstring.data = topicId.data.long_.name; topicName.lenstring.data = topicId.data.long_.name;

View File

@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/ **************************************************************************************/
#include "MQTTSNGWClient.h" #include "MQTTSNGWClient.h"
@@ -123,7 +124,7 @@ bool ClientList::authorize(const char* fileName)
return _authorize; return _authorize;
} }
void ClientList::erase(Client* client) void ClientList::erase(Client*& client)
{ {
if ( !_authorize && client->erasable()) if ( !_authorize && client->erasable())
{ {
@@ -236,6 +237,8 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
free(dummyId.cstring); free(dummyId.cstring);
} }
_mutex.lock();
/* add the list */ /* add the list */
if ( _firstClient == 0 ) if ( _firstClient == 0 )
{ {
@@ -1152,8 +1155,12 @@ int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId)
_first = elm; _first = elm;
_end = elm; _end = elm;
} }
elm->_prev = _end; else
_end->_next = elm; {
_end->_next = elm;
elm->_prev = _end;
_end = elm;
}
return 1; return 1;
} }

View File

@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/ **************************************************************************************/
#ifndef MQTTSNGWCLIENT_H_ #ifndef MQTTSNGWCLIENT_H_
@@ -341,7 +342,7 @@ public:
ClientList(); ClientList();
~ClientList(); ~ClientList();
bool authorize(const char* fileName); bool authorize(const char* fileName);
void erase(Client*); void erase(Client*&);
Client* getClient(SensorNetAddress* addr); Client* getClient(SensorNetAddress* addr);
Client* getClient(uint8_t* clientId); Client* getClient(uint8_t* clientId);
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine,

View File

@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/ **************************************************************************************/
#include "MQTTSNGWDefines.h" #include "MQTTSNGWDefines.h"
@@ -155,6 +156,7 @@ void PacketHandleTask::run()
case MQTTSN_PUBACK: case MQTTSN_PUBACK:
_mqttsnPublish->handlePuback(client, snPacket); _mqttsnPublish->handlePuback(client, snPacket);
break; break;
case MQTTSN_PUBREC:
_mqttsnPublish->handleAck(client, snPacket, PUBREC); _mqttsnPublish->handleAck(client, snPacket, PUBREC);
break; break;
case MQTTSN_PUBREL: case MQTTSN_PUBREL:

View File

@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/ **************************************************************************************/
#include "MQTTSNGWPublishHandler.h" #include "MQTTSNGWPublishHandler.h"
@@ -180,8 +181,6 @@ void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet)
if ( client->isActive() ) if ( client->isActive() )
{ {
MQTTGWPacket* pubAck = new MQTTGWPacket();
if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 )
{ {
return; return;
@@ -189,6 +188,7 @@ void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet)
if ( rc == MQTTSN_RC_ACCEPTED) if ( rc == MQTTSN_RC_ACCEPTED)
{ {
MQTTGWPacket* pubAck = new MQTTGWPacket();
pubAck->setAck(PUBACK, msgId); pubAck->setAck(PUBACK, msgId);
Event* ev1 = new Event(); Event* ev1 = new Event();
ev1->setBrokerSendEvent(client, pubAck); ev1->setBrokerSendEvent(client, pubAck);
@@ -226,10 +226,8 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
MQTTSNString topicName; MQTTSNString topicName;
MQTTSN_topicid topicid; MQTTSN_topicid topicid;
if ( client->isActive() || client->isAwake()) if ( client->isActive() || client->isAwake())
{ {
MQTTSNPacket* regAck = new MQTTSNPacket();
if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 )
{ {
return; return;
@@ -240,6 +238,8 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
topicid.data.long_.name = topicName.lenstring.data; topicid.data.long_.name = topicName.lenstring.data;
id = client->getTopics()->add(&topicid)->getTopicId(); id = client->getTopics()->add(&topicid)->getTopicId();
MQTTSNPacket* regAck = new MQTTSNPacket();
regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED); regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED);
Event* ev = new Event(); Event* ev = new Event();
ev->setClientSendEvent(client, regAck); ev->setClientSendEvent(client, regAck);

View File

@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
* Tieto Poland Sp. z o.o. - Gateway improvements
**************************************************************************************/ **************************************************************************************/
#include "MQTTSNGWSubscribeHandler.h" #include "MQTTSNGWSubscribeHandler.h"
@@ -89,6 +90,7 @@ void MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packe
} }
else else
{ {
uint16_t topicId = 0;
MQTTGWPacket* subscribe = new MQTTGWPacket(); MQTTGWPacket* subscribe = new MQTTGWPacket();
topic = client->getTopics()->getTopic(&topicFilter); topic = client->getTopics()->getTopic(&topicFilter);
if (topic == 0) if (topic == 0)
@@ -96,6 +98,7 @@ void MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packe
if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL) if (topicFilter.type == MQTTSN_TOPIC_TYPE_NORMAL)
{ {
topic = client->getTopics()->add(&topicFilter); topic = client->getTopics()->add(&topicFilter);
topicId = topic->getTopicId();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
} }
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT) else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
@@ -104,17 +107,19 @@ void MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPacket* packe
topic[0] = topicFilter.data.short_name[0]; topic[0] = topicFilter.data.short_name[0];
topic[1] = topicFilter.data.short_name[1]; topic[1] = topicFilter.data.short_name[1];
topic[2] = 0; topic[2] = 0;
topicId = topicFilter.data.id;
subscribe->setSUBSCRIBE(topic, (uint8_t)qos, (uint16_t)msgId); subscribe->setSUBSCRIBE(topic, (uint8_t)qos, (uint16_t)msgId);
} }
} }
else else
{ {
topicId = topic->getTopicId();
subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId); subscribe->setSUBSCRIBE((char*)topic->getTopicName()->c_str(), (uint8_t)qos, (uint16_t)msgId);
} }
if ( msgId > 0 ) if ( msgId > 0 )
{ {
client->setWaitedSubTopicId(msgId, topic->getTopicId(), topicFilter.type); client->setWaitedSubTopicId(msgId, topicId, topicFilter.type);
} }
Event* ev1 = new Event(); Event* ev1 = new Event();
@@ -180,7 +185,6 @@ void MQTTSNSubscribeHandler::handleUnsubscribe(Client* client, MQTTSNPacket* pac
} }
else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT) else if (topicFilter.type == MQTTSN_TOPIC_TYPE_SHORT)
{ {
MQTTGWPacket* unsubscribe = new MQTTGWPacket();
char shortTopic[3]; char shortTopic[3];
shortTopic[0] = topicFilter.data.short_name[0]; shortTopic[0] = topicFilter.data.short_name[0];
shortTopic[1] = topicFilter.data.short_name[1]; shortTopic[1] = topicFilter.data.short_name[1];