From 258d534009ba6707ae9f255da9d63bf6d8f735e7 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Thu, 5 Mar 2020 17:39:54 +0900 Subject: [PATCH] Update Aggregatting gateway can now receive PUBLISH from a broker Signed-off-by: tomoaki --- MQTTSNGateway/README.md | 1 - MQTTSNGateway/src/MQTTGWPublishHandler.cpp | 62 ++---- .../src/MQTTSNAggregateConnectionHandler.cpp | 11 +- MQTTSNGateway/src/MQTTSNGWAdapter.cpp | 2 +- MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp | 53 +++-- MQTTSNGateway/src/MQTTSNGWAdapterManager.h | 8 +- .../src/MQTTSNGWAggregateTopicTable.cpp | 183 ++++++++++++++++-- .../src/MQTTSNGWAggregateTopicTable.h | 20 +- MQTTSNGateway/src/MQTTSNGWAggregater.cpp | 51 ++--- MQTTSNGateway/src/MQTTSNGWAggregater.h | 14 +- MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp | 2 +- MQTTSNGateway/src/MQTTSNGWClientList.cpp | 19 +- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 19 +- .../src/MQTTSNGWConnectionHandler.cpp | 1 - MQTTSNGateway/src/MQTTSNGWForwarder.cpp | 13 +- MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp | 22 +-- MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h | 2 +- .../src/MQTTSNGWSubscribeHandler.cpp | 23 ++- MQTTSNGateway/src/MQTTSNGWTopic.cpp | 19 ++ MQTTSNGateway/src/MQTTSNGWTopic.h | 5 + MQTTSNGateway/src/MQTTSNGateway.cpp | 43 ++-- MQTTSNGateway/src/MQTTSNGateway.h | 4 + 22 files changed, 397 insertions(+), 180 deletions(-) diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md index 4765264..38823c5 100644 --- a/MQTTSNGateway/README.md +++ b/MQTTSNGateway/README.md @@ -2,7 +2,6 @@ **MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP). This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf. -The Aggregating Gateway can not receive PUBLISH message from the broker at this version. ### **step1. Build the gateway** ```` diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index 13a1b35..653d831 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -272,53 +272,31 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* Publish pub; packet->getPUBLISH(&pub); - // Start of temporary code - WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish Aggregater can't handle a PUBLISH from the broker at the current version.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); - if (pub.header.bits.qos == 1) + string* topicName = new string(pub.topic, pub.topiclen); // topic deletes topicName when the topic is deleted + Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); + + // ToDo: need to refacter + ClientTopicElement* elm = _gateway->getAdapterManager()->getAggregater()->getClientElement(&topic); + + while ( elm != nullptr ) { - replyACK(client, &pub, PUBACK); - } - else if ( pub.header.bits.qos == 2) - { - replyACK(client, &pub, PUBREC); - } - // End of temporary code + Client* devClient = elm->getClient(); + MQTTGWPacket* msg = new MQTTGWPacket(); + *msg = *packet; - - string* topicName = new string(pub.topic, pub.topiclen); - Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); // topic deletes topicName when the topic is deleted - - AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic); - if ( list != nullptr ) - { - ClientTopicElement* p = list->getFirstElement(); - - while ( p ) + if ( msg->getType() == 0 ) { - Client* devClient = p->getClient(); - if ( devClient != nullptr ) - { - MQTTGWPacket* msg = new MQTTGWPacket(); - *msg = *packet; - if ( msg->getType() == 0 ) - { - WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); - delete msg; - break; - } - Event* ev = new Event(); - ev->setBrokerRecvEvent(devClient, msg); - _gateway->getPacketEventQue()->post(ev); - } - else - { - break; - } - - p = list->getNextElement(p); + WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); + delete msg; + break; } - delete list; + + Event* ev = new Event(); + ev->setBrokerRecvEvent(devClient, msg); + _gateway->getPacketEventQue()->post(ev); + + elm = elm->getNextClientElement(); } } diff --git a/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp index 4e84630..90672bf 100644 --- a/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp @@ -81,7 +81,15 @@ void MQTTSNAggregateConnectionHandler::handleConnect(Client* client, MQTTSNPacke /* renew the TopicList */ if (topics) { - _gateway->getAdapterManager()->removeAggregateTopicList(topics, client); + Topic* tp = topics->getFirstTopic(); + while( tp != nullptr ) + { + if ( tp->getType() == MQTTSN_TOPIC_TYPE_NORMAL ) + { + _gateway->getAdapterManager()->getAggregater()->removeAggregateTopic(tp, client); + } + tp = topics->getNextTopic(tp); + } topics->eraseNormal(); } client->setSessionStatus(true); @@ -189,7 +197,6 @@ void MQTTSNAggregateConnectionHandler::sendStoredPublish(Client* client) while ( ( msg = client->getClientSleepPacket() ) != nullptr ) { - // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. client->deleteFirstClientSleepPacket(); // pop the que to delete element. Event* ev = new Event(); diff --git a/MQTTSNGateway/src/MQTTSNGWAdapter.cpp b/MQTTSNGateway/src/MQTTSNGWAdapter.cpp index a38ba67..2fa37b3 100644 --- a/MQTTSNGateway/src/MQTTSNGWAdapter.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAdapter.cpp @@ -217,7 +217,7 @@ Client* Adapter::getAdapterClient(Client* client) { if ( client->isSecureNetwork() ) { - return _client; + return _clientSecure; } else { diff --git a/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp b/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp index 25c3754..0d5fa55 100644 --- a/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp @@ -40,13 +40,23 @@ AdapterManager::AdapterManager(Gateway* gw) } -void AdapterManager::initialize(void) +void AdapterManager::initialize(char* gwName, bool aggregate, bool forwarder, bool qosM1) { - _aggregater->initialize(); - _forwarders->initialize(_gateway); - _qosm1Proxy->initialize(); -} + if ( aggregate ) + { + _aggregater->initialize(gwName); + } + if ( qosM1 ) + { + _qosm1Proxy->initialize(gwName); + } + + if ( forwarder ) + { + _forwarders->initialize(_gateway); + } +} AdapterManager::~AdapterManager(void) { @@ -91,25 +101,26 @@ bool AdapterManager::isAggregatedClient(Client* client) } } -Client* AdapterManager::getClient(Client& client) +Client* AdapterManager::getClient(Client* client) { - bool secure = client.isSecureNetwork(); - Client* newClient = &client; - if ( client.isQoSm1() ) + bool secure = client->isSecureNetwork(); + Client* newClient = client; + + if ( client->isQoSm1() ) { - newClient = _qosm1Proxy->getAdapterClient(&client); + newClient = _qosm1Proxy->getAdapterClient(client); _qosm1Proxy->resetPingTimer(secure); } - else if ( client.isAggregated() ) + else if ( client->isAggregated() ) { - newClient = _aggregater->getAdapterClient(&client); + newClient = _aggregater->getAdapterClient(client); _aggregater->resetPingTimer(secure); } - else if ( client.isQoSm1Proxy() ) + else if ( client->isQoSm1Proxy() ) { _qosm1Proxy->resetPingTimer(secure); } - else if ( client.isAggregater() ) + else if ( client->isAggregater() ) { _aggregater->resetPingTimer(secure); } @@ -173,22 +184,26 @@ bool AdapterManager::isAggregaterActive(void) return _aggregater->isActive(); } -AggregateTopicElement* AdapterManager::createClientList(Topic* topic) +/* +AggregateTopicElement* AdapterManager::findTopic(Topic* topic) { - return _aggregater->createClientList(topic); + return _aggregater->findTopic(topic); } -int AdapterManager::addAggregateTopic(Topic* topic, Client* client) +AggregateTopicElement* AdapterManager::addAggregateTopic(Topic* topic, Client* client) { return _aggregater->addAggregateTopic(topic, client); } + void AdapterManager::removeAggregateTopic(Topic* topic, Client* client) { - _aggregater->removeAggregateTopic(topic, client); + //_aggregater->removeAggregateTopic(topic, client); } void AdapterManager::removeAggregateTopicList(Topics* topics, Client* client) { - _aggregater->removeAggregateTopicList(topics, client); + } +*/ + diff --git a/MQTTSNGateway/src/MQTTSNGWAdapterManager.h b/MQTTSNGateway/src/MQTTSNGWAdapterManager.h index 510d02f..58e5a03 100644 --- a/MQTTSNGateway/src/MQTTSNGWAdapterManager.h +++ b/MQTTSNGateway/src/MQTTSNGWAdapterManager.h @@ -40,21 +40,17 @@ class AdapterManager public: AdapterManager(Gateway* gw); ~AdapterManager(void); - void initialize(void); + void initialize(char* gwName, bool aggregater, bool fowarder, bool qosM1); ForwarderList* getForwarderList(void); QoSm1Proxy* getQoSm1Proxy(void); Aggregater* getAggregater(void); void checkConnection(void); bool isAggregatedClient(Client* client); - Client* getClient(Client& client); + Client* getClient(Client* client); Client* convertClient(uint16_t msgId, uint16_t* clientMsgId); int unicastToClient(Client* client, MQTTSNPacket* packet, ClientSendTask* task); bool isAggregaterActive(void); - AggregateTopicElement* createClientList(Topic* topic); - int addAggregateTopic(Topic* topic, Client* client); - void removeAggregateTopic(Topic* topic, Client* client); - void removeAggregateTopicList(Topics* topics, Client* client); private: Gateway* _gateway {nullptr}; diff --git a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp index b53aea1..24dd23f 100644 --- a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp @@ -34,6 +34,11 @@ Client* ClientTopicElement::getClient(void) return _client; } +ClientTopicElement* ClientTopicElement::getNextClientElement(void) +{ + return _next; +} + /*===================================== Class AggregateTopicElement =====================================*/ @@ -44,6 +49,7 @@ AggregateTopicElement::AggregateTopicElement(void) AggregateTopicElement::AggregateTopicElement(Topic* topic, Client* client) { + _topic = topic; ClientTopicElement* elm = new ClientTopicElement(client); if ( elm != nullptr ) { @@ -76,7 +82,9 @@ ClientTopicElement* AggregateTopicElement::add(Client* client) { return nullptr; } + _mutex.lock(); + if ( _head == nullptr ) { _head = elm; @@ -95,7 +103,7 @@ ClientTopicElement* AggregateTopicElement::add(Client* client) else { delete elm; - elm = nullptr; + elm = p; } } _mutex.unlock(); @@ -105,7 +113,7 @@ ClientTopicElement* AggregateTopicElement::add(Client* client) ClientTopicElement* AggregateTopicElement::find(Client* client) { ClientTopicElement* p = _head; - while ( p ) + while ( p != nullptr ) { if ( p->_client == client) { @@ -116,16 +124,48 @@ ClientTopicElement* AggregateTopicElement::find(Client* client) return p; } -ClientTopicElement* AggregateTopicElement::getFirstElement(void) +ClientTopicElement* AggregateTopicElement::getFirstClientTopicElement(void) { return _head; } -ClientTopicElement* AggregateTopicElement::getNextElement(ClientTopicElement* elm) +ClientTopicElement* AggregateTopicElement::getNextClientTopicElement(ClientTopicElement* elmClient) { - return elm->_next; + return elmClient->_next; } +void AggregateTopicElement::eraseClient(Client* client) +{ + _mutex.lock(); + + ClientTopicElement* p = find(client); + if ( p != nullptr ) + { + if ( p->_prev == nullptr ) // head element + { + _head = p->_next; + if ( p->_next == nullptr ) // head & only one + { + _tail = nullptr; + } + else + { + p->_next->_prev = nullptr; // head & midle + } + } + else if ( p->_next != nullptr ) // middle + { + p->_prev->_next = p->_next; + } + else // tail + { + p->_prev->_next = nullptr; + _tail = p->_prev; + } + delete p; + } + _mutex.unlock(); +} /*===================================== Class AggregateTopicTable @@ -143,19 +183,138 @@ AggregateTopicTable::~AggregateTopicTable() AggregateTopicElement* AggregateTopicTable::add(Topic* topic, Client* client) { - //ToDo: AggregateGW - return 0; + AggregateTopicElement* elm = nullptr; + _mutex.lock(); + elm = getAggregateTopicElement(topic); + if ( elm != nullptr ) + { + if ( elm->find(client) == nullptr ) + { + elm->add(client); + } + } + else + { + Topic* newTopic = topic->duplicate(); + elm = new AggregateTopicElement(newTopic, client); + if ( _head == nullptr ) + { + _head = elm; + _tail = elm; + } + else + { + elm->_prev = _tail; + _tail->_next = elm; + _tail = elm; + } + } + _mutex.unlock(); + return elm; } -void AggregateTopicTable::remove(Topic* topic, Client* client) +void AggregateTopicTable::erase(Topic* topic, Client* client) { - //ToDo: AggregateGW + AggregateTopicElement* elm = nullptr; + + _mutex.lock(); + elm = getAggregateTopicElement(topic); + + if ( elm != nullptr ) + { + elm->eraseClient(client); + } + if ( elm->_head == nullptr ) + { + erase(elm); + } + _mutex.unlock(); + return; } -AggregateTopicElement* AggregateTopicTable::getClientList(Topic* client) +void AggregateTopicTable::erase(AggregateTopicElement* elmTopic) { - // ToDo: AggregateGW - return 0; + if ( elmTopic != nullptr ) + { + if ( elmTopic->_prev == nullptr ) // head element + { + _head = elmTopic->_next; + if ( elmTopic->_next == nullptr ) // head & only one + { + _tail = nullptr; + } + else + { + elmTopic->_next->_prev = nullptr; // head & midle + } + } + else if ( elmTopic->_next != nullptr ) // middle + { + elmTopic->_prev->_next = elmTopic->_next; + } + else // tail + { + elmTopic->_prev->_next = nullptr; + _tail = elmTopic->_prev; + } + delete elmTopic; + } } +AggregateTopicElement* AggregateTopicTable::getAggregateTopicElement(Topic* topic) +{ + AggregateTopicElement* elm = _head; + while( elm != nullptr ) + { + if ( elm->_topic->isMatch(topic->_topicName) ) + { + break; + } + elm = elm->_next; + } + return elm; +} + +ClientTopicElement* AggregateTopicTable::getClientElement(Topic* topic) +{ + AggregateTopicElement* elm = getAggregateTopicElement(topic); + if ( elm != nullptr ) + { + return elm->_head; + } + else + { + return nullptr; + } +} + +void AggregateTopicTable::print(void) +{ + AggregateTopicElement* elm = _head; + + printf("Beginning of AggregateTopicTable\n"); + while( elm != nullptr ) + { + printf("%s\n", elm->_topic->getTopicName()->c_str()); + + ClientTopicElement* clElm = elm->getFirstClientTopicElement(); + Client* client = clElm->getClient(); + + while ( client != nullptr ) + { + printf(" %s\n", client->getClientId()); + clElm = clElm->getNextClientElement(); + if ( clElm != nullptr ) + { + client = clElm->getClient(); + } + else + { + client = nullptr; + } + } + elm = elm->_next; + } + printf("End of AggregateTopicTable\n"); +} diff --git a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h index 624743f..87d9e82 100644 --- a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h +++ b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h @@ -39,10 +39,16 @@ public: ~AggregateTopicTable(); AggregateTopicElement* add(Topic* topic, Client* client); - AggregateTopicElement* getClientList(Topic* client); - void remove(Topic* topic, Client* client); + AggregateTopicElement* getAggregateTopicElement(Topic* topic); + ClientTopicElement* getClientElement(Topic* topic); + void erase(Topic* topic, Client* client); void clear(void); + + void print(void); + private: + void erase(AggregateTopicElement* elmTopic); + Mutex _mutex; AggregateTopicElement* _head {nullptr}; AggregateTopicElement* _tail {nullptr}; int _cnt {0}; @@ -61,14 +67,16 @@ public: ~AggregateTopicElement(void); ClientTopicElement* add(Client* client); - ClientTopicElement* getFirstElement(void); - ClientTopicElement* getNextElement(ClientTopicElement* elm); - void erase(ClientTopicElement* elm); + ClientTopicElement* getFirstClientTopicElement(void); + ClientTopicElement* getNextClientTopicElement(ClientTopicElement* elmClient); + void eraseClient(Client* client); ClientTopicElement* find(Client* client); private: Mutex _mutex; Topic* _topic {nullptr}; + AggregateTopicElement* _next {nullptr}; + AggregateTopicElement* _prev {nullptr}; ClientTopicElement* _head {nullptr}; ClientTopicElement* _tail {nullptr}; }; @@ -83,6 +91,8 @@ class ClientTopicElement public: ClientTopicElement(Client* client); ~ClientTopicElement(void); + + ClientTopicElement* getNextClientElement(void); Client* getClient(void); private: diff --git a/MQTTSNGateway/src/MQTTSNGWAggregater.cpp b/MQTTSNGateway/src/MQTTSNGWAggregater.cpp index 32f4924..93718db 100644 --- a/MQTTSNGateway/src/MQTTSNGWAggregater.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAggregater.cpp @@ -36,23 +36,12 @@ Aggregater::~Aggregater(void) } -void Aggregater::initialize(void) +void Aggregater::initialize(char* gwName) { - char param[MQTTSNGW_PARAM_MAX]; - - if (_gateway->getParam("AggregatingGateway", param) == 0 ) - { - if (!strcasecmp(param, "YES") ) - { - /* Create Aggregated Clients from clients.conf */ - _gateway->getClientList()->setClientList(AGGREGATER_TYPE); - - /* Create Aggregater Client */ - string name = string(_gateway->getGWParams()->gatewayName) + "_Aggregater"; - setup(name.c_str(), Atype_Aggregater); - _isActive = true; - } - } + /* Create Aggregater Client */ + string name = string(gwName) + string("_Aggregater"); + setup(name.c_str(), Atype_Aggregater); + _isActive = true; //testMessageIdTable(); @@ -95,26 +84,38 @@ uint16_t Aggregater::getMsgId(Client* client, uint16_t clientMsgId) return _msgIdTable.getMsgId(client, clientMsgId); } +AggregateTopicElement* Aggregater::addAggregateTopic(Topic* topic, Client* client) +{ + return _topicTable.add(topic, client); +} + + void Aggregater::removeAggregateTopic(Topic* topic, Client* client) { - // ToDo: AggregateGW this method called when the client disconnect and erase it`s Topics. this method call */ + _topicTable.erase(topic, client); } -void Aggregater::removeAggregateTopicList(Topics* topics, Client* client) +AggregateTopicElement* Aggregater::findTopic(Topic* topic) { - // ToDo: AggregateGW this method called when the client disconnect and erase it`s Topics. this method call */ + return _topicTable.getAggregateTopicElement(topic); } -int Aggregater::addAggregateTopic(Topic* topic, Client* client) +ClientTopicElement* Aggregater::getClientElement(Topic* topic) { - // ToDo: AggregateGW */ - return 0; + AggregateTopicElement* elm = findTopic(topic); + if ( elm != nullptr ) + { + return elm->getFirstClientTopicElement(); + } + else + { + return nullptr; + } } -AggregateTopicElement* Aggregater::createClientList(Topic* topic) +void Aggregater::printAggregateTopicTable(void) { - // ToDo: AggregateGW */ - return 0; + _topicTable.print(); } bool Aggregater::testMessageIdTable(void) diff --git a/MQTTSNGateway/src/MQTTSNGWAggregater.h b/MQTTSNGateway/src/MQTTSNGWAggregater.h index 9baaa15..959ff87 100644 --- a/MQTTSNGateway/src/MQTTSNGWAggregater.h +++ b/MQTTSNGateway/src/MQTTSNGWAggregater.h @@ -20,6 +20,7 @@ #include "MQTTSNGWAdapter.h" #include "MQTTSNGWMessageIdTable.h" #include "MQTTSNGWAggregateTopicTable.h" + namespace MQTTSNGW { class Gateway; @@ -40,7 +41,7 @@ public: Aggregater(Gateway* gw); ~Aggregater(void); - void initialize(void); + void initialize(char* gwName); const char* getClientId(SensorNetAddress* addr); Client* getClient(SensorNetAddress* addr); @@ -48,13 +49,18 @@ public: uint16_t addMessageIdTable(Client* client, uint16_t msgId); uint16_t getMsgId(Client* client, uint16_t clientMsgId); + ClientTopicElement* getClientElement(Topic* topic); + ClientTopicElement* getNextClientElement(ClientTopicElement* clientElement); + Client* getClient(ClientTopicElement* clientElement); + + AggregateTopicElement* findTopic(Topic* topic); + AggregateTopicElement* addAggregateTopic(Topic* topic, Client* client); - AggregateTopicElement* createClientList(Topic* topic); - int addAggregateTopic(Topic* topic, Client* client); void removeAggregateTopic(Topic* topic, Client* client); - void removeAggregateTopicList(Topics* topics, Client* client); + void removeAggregateAllTopic(Client* client); bool isActive(void); + void printAggregateTopicTable(void); bool testMessageIdTable(void); private: diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp index 385ad87..4f58359 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp @@ -81,7 +81,7 @@ void BrokerSendTask::run() packet = ev->getMQTTGWPacket(); /* Check Client is managed by Adapters */ - client = adpMgr->getClient(*client); + client = adpMgr->getClient(client); if ( packet->getType() == CONNECT && client->getNetwork()->isValid() ) { diff --git a/MQTTSNGateway/src/MQTTSNGWClientList.cpp b/MQTTSNGateway/src/MQTTSNGWClientList.cpp index dfe32a2..f164996 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientList.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientList.cpp @@ -61,20 +61,28 @@ void ClientList::initialize(bool aggregate) setClientList(type); _authorize = true; } + + if ( theGateway->getGWParams()->predefinedTopic ) + { + setPredefinedTopics(aggregate); + } } void ClientList::setClientList(int type) { - - if (!createList(theGateway->getClientListFileName(), type)) + if (!createList(theGateway->getGWParams()->clientListName, type)) { - throw Exception("ClientList::initialize(): No client list defined by the configuration."); + throw Exception("ClientList::setClientList No client list defined by config file."); } } void ClientList::setPredefinedTopics(bool aggrecate) { - readPredefinedList(theGateway->getPredefinedTopicFileName(), aggrecate); + if ( !readPredefinedList(theGateway->getGWParams()->predefinedTopicFileName, aggrecate) ) + { + throw Exception("ClientList::setPredefinedTopics No predefindTopi list defined by config file."); + + } } /** @@ -108,7 +116,7 @@ bool ClientList::createList(const char* fileName, int type) bool stable; bool qos_1; bool forwarder; - bool rc = true; + bool rc = false; SensorNetAddress netAddr; MQTTSNString clientId = MQTTSNString_initializer; @@ -161,6 +169,7 @@ bool ClientList::createList(const char* fileName, int type) free(clientId.cstring); } fclose(fp); + rc = true; } return rc; } diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index d319f47..8bc6e17 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -60,7 +60,7 @@ void ClientRecvTask::run() Event* ev = nullptr; AdapterManager* adpMgr = _gateway->getAdapterManager(); QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy(); - bool isAggrActive = adpMgr->isAggregaterActive(); + int clientType = adpMgr->isAggregaterActive() ? AGGREGATER_TYPE : TRANSPEARENT_TYPE; ClientList* clientList = _gateway->getClientList(); EventQue* packetEventQue = _gateway->getPacketEventQue(); @@ -133,11 +133,12 @@ void ClientRecvTask::run() { const char* clientName = qosm1Proxy->getClientId(senderAddr); - if ( clientName ) + if ( clientName != nullptr ) { + client = qosm1Proxy->getClient(); + if ( !packet->isQoSMinusPUBLISH() ) { - client = qosm1Proxy->getClient(); log(clientName, packet); WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER); delete packet; @@ -145,10 +146,14 @@ void ClientRecvTask::run() } } } - client = _gateway->getClientList()->getClient(senderAddr); + + if ( client == nullptr ) + { + client = _gateway->getClientList()->getClient(senderAddr); + } } - if ( client ) + if ( client != nullptr ) { /* write log and post Event */ log(client, packet, 0); @@ -178,7 +183,7 @@ void ClientRecvTask::run() if ( client == nullptr ) { /* create a new client */ - client = clientList->createClient(0, &data.clientID, isAggrActive); + client = clientList->createClient(0, &data.clientID, clientType); } /* Add to af forwarded client list of forwarder. */ fwd->addClient(client, &nodeId); @@ -193,7 +198,7 @@ void ClientRecvTask::run() else { /* create a new client */ - client = clientList->createClient(senderAddr, &data.clientID, isAggrActive); + client = clientList->createClient(senderAddr, &data.clientID, clientType); } } diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index c73b517..f47b1cc 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -291,7 +291,6 @@ void MQTTSNConnectionHandler::sendStoredPublish(Client* client) while ( ( msg = client->getClientSleepPacket() ) != nullptr ) { - // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. client->deleteFirstClientSleepPacket(); // pop the que to delete element. Event* ev = new Event(); diff --git a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp index 8449c85..9cba49e 100644 --- a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp +++ b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp @@ -47,17 +47,8 @@ ForwarderList::~ForwarderList() void ForwarderList::initialize(Gateway* gw) { - char param[MQTTSNGW_PARAM_MAX]; - string fileName; - - if (gw->getParam("Forwarder", param) == 0 ) - { - if (!strcasecmp(param, "YES") ) - { - /* Create Fowarders from clients.conf */ - gw->getClientList()->setClientList(FORWARDER_TYPE); - } - } + /* Create Fowarders from clients.conf */ + gw->getClientList()->setClientList(FORWARDER_TYPE); } diff --git a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp index 7876fc0..349996f 100644 --- a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp +++ b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp @@ -38,28 +38,20 @@ QoSm1Proxy::~QoSm1Proxy(void) } -void QoSm1Proxy::initialize(void) +void QoSm1Proxy::initialize(char* gwName) { - char param[MQTTSNGW_PARAM_MAX]; - if ( _gateway->hasSecureConnection() ) { _isSecure = true; } - if (_gateway->getParam("QoS-1", param) == 0 ) - { - if (strcasecmp(param, "YES") == 0 ) - { - /* Create QoS-1 Clients from clients.conf */ - _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE); + /* Create QoS-1 Clients from clients.conf */ + _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE); - /* Create a client for QoS-1 proxy */ - string name = string(_gateway->getGWParams()->gatewayName) + "_QoS-1"; - setup(name.c_str(), Atype_QoSm1Proxy); - _isActive = true; - } - } + /* Create a client for QoS-1 proxy */ + string name = string(gwName) + string("_QoS-1"); + setup(name.c_str(), Atype_QoSm1Proxy); + _isActive = true; } diff --git a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h index d3dfcf5..82e5f2a 100644 --- a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h +++ b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h @@ -35,7 +35,7 @@ public: QoSm1Proxy(Gateway* gw); ~QoSm1Proxy(void); - void initialize(void); + void initialize(char* GWnAME); bool isActive(void); private: diff --git a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp index 45b2964..71694ab 100644 --- a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp @@ -203,11 +203,6 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack if ( subscribe != nullptr ) { - UTF8String str = subscribe->getTopic(); - string* topicName = new string(str.data, str.len); - Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); - _gateway->getAdapterManager()->addAggregateTopic(&topic, client); - int msgId = 0; if ( packet->isDuplicate() ) { @@ -223,6 +218,13 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); return; } + + UTF8String str = subscribe->getTopic(); + string* topicName = new string(str.data, str.len); // topicName is delete by topic + Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); + + _gateway->getAdapterManager()->getAggregater()->addAggregateTopic(&topic, client); + subscribe->setMsgId(msgId); Event* ev = new Event(); ev->setBrokerSendEvent(client, subscribe); @@ -235,11 +237,6 @@ void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPa MQTTGWPacket* unsubscribe = handleUnsubscribe(client, packet); if ( unsubscribe != nullptr ) { - UTF8String str = unsubscribe->getTopic(); - string* topicName = new string(str.data, str.len); - Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); - _gateway->getAdapterManager()->removeAggregateTopic(&topic, client); - int msgId = 0; if ( packet->isDuplicate() ) { @@ -255,6 +252,12 @@ void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPa WRITELOG("%s MQTTSNUnsubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); return; } + + UTF8String str = unsubscribe->getTopic(); + string* topicName = new string(str.data, str.len); // topicName is delete by topic + Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); + _gateway->getAdapterManager()->getAggregater()->removeAggregateTopic(&topic, client); + unsubscribe->setMsgId(msgId); Event* ev = new Event(); ev->setBrokerSendEvent(client, unsubscribe); diff --git a/MQTTSNGateway/src/MQTTSNGWTopic.cpp b/MQTTSNGateway/src/MQTTSNGWTopic.cpp index 5be2278..8f56a4c 100644 --- a/MQTTSNGateway/src/MQTTSNGWTopic.cpp +++ b/MQTTSNGateway/src/MQTTSNGWTopic.cpp @@ -63,6 +63,15 @@ MQTTSN_topicTypes Topic::getType(void) return _type; } +Topic* Topic::duplicate(void) +{ + Topic* newTopic = new Topic(); + newTopic->_type = _type; + newTopic->_topicId = _topicId; + newTopic->_topicName = new string(_topicName->c_str()); + return newTopic; +} + bool Topic::isMatch(string* topicName) { string::size_type tlen = _topicName->size(); @@ -354,6 +363,16 @@ void Topics::eraseNormal(void) } } +Topic* Topics::getFirstTopic(void) +{ + return _first; +} + +Topic* Topics::getNextTopic(Topic* topic) +{ + return topic->_next; +} + void Topics::print(void) { Topic* topic = _first; diff --git a/MQTTSNGateway/src/MQTTSNGWTopic.h b/MQTTSNGateway/src/MQTTSNGWTopic.h index 5c3cc77..4d7b0c3 100644 --- a/MQTTSNGateway/src/MQTTSNGWTopic.h +++ b/MQTTSNGateway/src/MQTTSNGWTopic.h @@ -31,6 +31,7 @@ namespace MQTTSNGW class Topic { friend class Topics; + friend class AggregateTopicTable; public: Topic(); Topic(string* topic, MQTTSN_topicTypes type); @@ -39,7 +40,9 @@ public: uint16_t getTopicId(void); MQTTSN_topicTypes getType(void); bool isMatch(string* topicName); + Topic* duplicate(void); void print(void); + private: MQTTSN_topicTypes _type; uint16_t _topicId; @@ -59,6 +62,8 @@ public: Topic* add(const char* topicName, uint16_t id = 0); Topic* getTopicByName(const MQTTSN_topicid* topic); Topic* getTopicById(const MQTTSN_topicid* topicid); + Topic* getFirstTopic(void); + Topic* getNextTopic(Topic* topic); Topic* match(const MQTTSN_topicid* topicid); void eraseNormal(void); uint16_t getNextTopicId(); diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index bb86d14..cac3ac1 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -234,29 +234,48 @@ void Gateway::initialize(int argc, char** argv) _params.clientListName = strdup(param); } - if (getParam("PredefinedTopicList", param) == 0) + if (getParam("PredefinedTopic", param) == 0) { - _params.predefinedTopicFileName = strdup(param); + if ( !strcasecmp(param, "YES") ) + { + _params.predefinedTopic = true; + if (getParam("PredefinedTopicList", param) == 0) + { + _params.predefinedTopicFileName = strdup(param); + } + } } - if ( _params.clientListName == nullptr ) + if (getParam("AggregatingGateway", param) == 0) { - _params.clientListName = strdup(( _params.configDir + string(CLIENT_LIST) ).c_str()); + if ( !strcasecmp(param, "YES") ) + { + _params.aggregatingGw = true; + } } - if ( _params.predefinedTopicFileName == nullptr ) + if (getParam("Forwarder", param) == 0) { - _params.predefinedTopicFileName = strdup(( _params.configDir + string(PREDEFINEDTOPIC_FILE) ).c_str()); + if ( !strcasecmp(param, "YES") ) + { + _params.forwarder = true; + } } - /* ClientList and Adapters Initialize */ - _adapterManager->initialize(); + if (getParam("QoS-1", param) == 0) + { + if ( !strcasecmp(param, "YES") ) + { + _params.qosMinus1 = true; + } + } - bool aggregate = _adapterManager->isAggregaterActive(); - _clientList->initialize(aggregate); - /* Setup predefined topics */ - _clientList->setPredefinedTopics(aggregate); + /* Initialize adapters */ + _adapterManager->initialize( _params.gatewayName, _params.aggregatingGw, _params.forwarder, _params.qosMinus1); + + /* Setup ClientList and Predefined topics */ + _clientList->initialize(_params.aggregatingGw); } void Gateway::run(void) diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index 0053219..b6bed6a 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -164,6 +164,10 @@ public: char* privateKey {nullptr}; char* qosMinusClientListName {nullptr}; bool clientAuthentication {false}; + bool predefinedTopic {false}; + bool aggregatingGw {false}; + bool qosMinus1 {false}; + bool forwarder {false}; };