Update Aggregatting gateway can now receive PUBLISH from a broker

Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2020-03-05 17:39:54 +09:00
parent 53be14f76e
commit 258d534009
22 changed files with 397 additions and 180 deletions

View File

@@ -2,7 +2,6 @@
**MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP). **MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP).
This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf. This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf.
The Aggregating Gateway can not receive PUBLISH message from the broker at this version.
### **step1. Build the gateway** ### **step1. Build the gateway**
```` ````

View File

@@ -272,53 +272,31 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket*
Publish pub; Publish pub;
packet->getPUBLISH(&pub); packet->getPUBLISH(&pub);
// Start of temporary code
WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish Aggregater can't handle a PUBLISH from the broker at the current version.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
if (pub.header.bits.qos == 1) string* topicName = new string(pub.topic, pub.topiclen); // topic deletes topicName when the topic is deleted
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
// ToDo: need to refacter
ClientTopicElement* elm = _gateway->getAdapterManager()->getAggregater()->getClientElement(&topic);
while ( elm != nullptr )
{ {
replyACK(client, &pub, PUBACK); Client* devClient = elm->getClient();
} MQTTGWPacket* msg = new MQTTGWPacket();
else if ( pub.header.bits.qos == 2) *msg = *packet;
{
replyACK(client, &pub, PUBREC);
}
// End of temporary code
if ( msg->getType() == 0 )
string* topicName = new string(pub.topic, pub.topiclen);
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); // topic deletes topicName when the topic is deleted
AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic);
if ( list != nullptr )
{
ClientTopicElement* p = list->getFirstElement();
while ( p )
{ {
Client* devClient = p->getClient(); WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
if ( devClient != nullptr ) delete msg;
{ break;
MQTTGWPacket* msg = new MQTTGWPacket();
*msg = *packet;
if ( msg->getType() == 0 )
{
WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
delete msg;
break;
}
Event* ev = new Event();
ev->setBrokerRecvEvent(devClient, msg);
_gateway->getPacketEventQue()->post(ev);
}
else
{
break;
}
p = list->getNextElement(p);
} }
delete list;
Event* ev = new Event();
ev->setBrokerRecvEvent(devClient, msg);
_gateway->getPacketEventQue()->post(ev);
elm = elm->getNextClientElement();
} }
} }

View File

@@ -81,7 +81,15 @@ void MQTTSNAggregateConnectionHandler::handleConnect(Client* client, MQTTSNPacke
/* renew the TopicList */ /* renew the TopicList */
if (topics) if (topics)
{ {
_gateway->getAdapterManager()->removeAggregateTopicList(topics, client); Topic* tp = topics->getFirstTopic();
while( tp != nullptr )
{
if ( tp->getType() == MQTTSN_TOPIC_TYPE_NORMAL )
{
_gateway->getAdapterManager()->getAggregater()->removeAggregateTopic(tp, client);
}
tp = topics->getNextTopic(tp);
}
topics->eraseNormal(); topics->eraseNormal();
} }
client->setSessionStatus(true); client->setSessionStatus(true);
@@ -189,7 +197,6 @@ void MQTTSNAggregateConnectionHandler::sendStoredPublish(Client* client)
while ( ( msg = client->getClientSleepPacket() ) != nullptr ) while ( ( msg = client->getClientSleepPacket() ) != nullptr )
{ {
// ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
client->deleteFirstClientSleepPacket(); // pop the que to delete element. client->deleteFirstClientSleepPacket(); // pop the que to delete element.
Event* ev = new Event(); Event* ev = new Event();

View File

@@ -217,7 +217,7 @@ Client* Adapter::getAdapterClient(Client* client)
{ {
if ( client->isSecureNetwork() ) if ( client->isSecureNetwork() )
{ {
return _client; return _clientSecure;
} }
else else
{ {

View File

@@ -40,13 +40,23 @@ AdapterManager::AdapterManager(Gateway* gw)
} }
void AdapterManager::initialize(void) void AdapterManager::initialize(char* gwName, bool aggregate, bool forwarder, bool qosM1)
{ {
_aggregater->initialize(); if ( aggregate )
_forwarders->initialize(_gateway); {
_qosm1Proxy->initialize(); _aggregater->initialize(gwName);
} }
if ( qosM1 )
{
_qosm1Proxy->initialize(gwName);
}
if ( forwarder )
{
_forwarders->initialize(_gateway);
}
}
AdapterManager::~AdapterManager(void) AdapterManager::~AdapterManager(void)
{ {
@@ -91,25 +101,26 @@ bool AdapterManager::isAggregatedClient(Client* client)
} }
} }
Client* AdapterManager::getClient(Client& client) Client* AdapterManager::getClient(Client* client)
{ {
bool secure = client.isSecureNetwork(); bool secure = client->isSecureNetwork();
Client* newClient = &client; Client* newClient = client;
if ( client.isQoSm1() )
if ( client->isQoSm1() )
{ {
newClient = _qosm1Proxy->getAdapterClient(&client); newClient = _qosm1Proxy->getAdapterClient(client);
_qosm1Proxy->resetPingTimer(secure); _qosm1Proxy->resetPingTimer(secure);
} }
else if ( client.isAggregated() ) else if ( client->isAggregated() )
{ {
newClient = _aggregater->getAdapterClient(&client); newClient = _aggregater->getAdapterClient(client);
_aggregater->resetPingTimer(secure); _aggregater->resetPingTimer(secure);
} }
else if ( client.isQoSm1Proxy() ) else if ( client->isQoSm1Proxy() )
{ {
_qosm1Proxy->resetPingTimer(secure); _qosm1Proxy->resetPingTimer(secure);
} }
else if ( client.isAggregater() ) else if ( client->isAggregater() )
{ {
_aggregater->resetPingTimer(secure); _aggregater->resetPingTimer(secure);
} }
@@ -173,22 +184,26 @@ bool AdapterManager::isAggregaterActive(void)
return _aggregater->isActive(); return _aggregater->isActive();
} }
AggregateTopicElement* AdapterManager::createClientList(Topic* topic) /*
AggregateTopicElement* AdapterManager::findTopic(Topic* topic)
{ {
return _aggregater->createClientList(topic); return _aggregater->findTopic(topic);
} }
int AdapterManager::addAggregateTopic(Topic* topic, Client* client) AggregateTopicElement* AdapterManager::addAggregateTopic(Topic* topic, Client* client)
{ {
return _aggregater->addAggregateTopic(topic, client); return _aggregater->addAggregateTopic(topic, client);
} }
void AdapterManager::removeAggregateTopic(Topic* topic, Client* client) void AdapterManager::removeAggregateTopic(Topic* topic, Client* client)
{ {
_aggregater->removeAggregateTopic(topic, client); //_aggregater->removeAggregateTopic(topic, client);
} }
void AdapterManager::removeAggregateTopicList(Topics* topics, Client* client) void AdapterManager::removeAggregateTopicList(Topics* topics, Client* client)
{ {
_aggregater->removeAggregateTopicList(topics, client);
} }
*/

View File

@@ -40,21 +40,17 @@ class AdapterManager
public: public:
AdapterManager(Gateway* gw); AdapterManager(Gateway* gw);
~AdapterManager(void); ~AdapterManager(void);
void initialize(void); void initialize(char* gwName, bool aggregater, bool fowarder, bool qosM1);
ForwarderList* getForwarderList(void); ForwarderList* getForwarderList(void);
QoSm1Proxy* getQoSm1Proxy(void); QoSm1Proxy* getQoSm1Proxy(void);
Aggregater* getAggregater(void); Aggregater* getAggregater(void);
void checkConnection(void); void checkConnection(void);
bool isAggregatedClient(Client* client); bool isAggregatedClient(Client* client);
Client* getClient(Client& client); Client* getClient(Client* client);
Client* convertClient(uint16_t msgId, uint16_t* clientMsgId); Client* convertClient(uint16_t msgId, uint16_t* clientMsgId);
int unicastToClient(Client* client, MQTTSNPacket* packet, ClientSendTask* task); int unicastToClient(Client* client, MQTTSNPacket* packet, ClientSendTask* task);
bool isAggregaterActive(void); bool isAggregaterActive(void);
AggregateTopicElement* createClientList(Topic* topic);
int addAggregateTopic(Topic* topic, Client* client);
void removeAggregateTopic(Topic* topic, Client* client);
void removeAggregateTopicList(Topics* topics, Client* client);
private: private:
Gateway* _gateway {nullptr}; Gateway* _gateway {nullptr};

View File

@@ -34,6 +34,11 @@ Client* ClientTopicElement::getClient(void)
return _client; return _client;
} }
ClientTopicElement* ClientTopicElement::getNextClientElement(void)
{
return _next;
}
/*===================================== /*=====================================
Class AggregateTopicElement Class AggregateTopicElement
=====================================*/ =====================================*/
@@ -44,6 +49,7 @@ AggregateTopicElement::AggregateTopicElement(void)
AggregateTopicElement::AggregateTopicElement(Topic* topic, Client* client) AggregateTopicElement::AggregateTopicElement(Topic* topic, Client* client)
{ {
_topic = topic;
ClientTopicElement* elm = new ClientTopicElement(client); ClientTopicElement* elm = new ClientTopicElement(client);
if ( elm != nullptr ) if ( elm != nullptr )
{ {
@@ -76,7 +82,9 @@ ClientTopicElement* AggregateTopicElement::add(Client* client)
{ {
return nullptr; return nullptr;
} }
_mutex.lock(); _mutex.lock();
if ( _head == nullptr ) if ( _head == nullptr )
{ {
_head = elm; _head = elm;
@@ -95,7 +103,7 @@ ClientTopicElement* AggregateTopicElement::add(Client* client)
else else
{ {
delete elm; delete elm;
elm = nullptr; elm = p;
} }
} }
_mutex.unlock(); _mutex.unlock();
@@ -105,7 +113,7 @@ ClientTopicElement* AggregateTopicElement::add(Client* client)
ClientTopicElement* AggregateTopicElement::find(Client* client) ClientTopicElement* AggregateTopicElement::find(Client* client)
{ {
ClientTopicElement* p = _head; ClientTopicElement* p = _head;
while ( p ) while ( p != nullptr )
{ {
if ( p->_client == client) if ( p->_client == client)
{ {
@@ -116,16 +124,48 @@ ClientTopicElement* AggregateTopicElement::find(Client* client)
return p; return p;
} }
ClientTopicElement* AggregateTopicElement::getFirstElement(void) ClientTopicElement* AggregateTopicElement::getFirstClientTopicElement(void)
{ {
return _head; return _head;
} }
ClientTopicElement* AggregateTopicElement::getNextElement(ClientTopicElement* elm) ClientTopicElement* AggregateTopicElement::getNextClientTopicElement(ClientTopicElement* elmClient)
{ {
return elm->_next; return elmClient->_next;
} }
void AggregateTopicElement::eraseClient(Client* client)
{
_mutex.lock();
ClientTopicElement* p = find(client);
if ( p != nullptr )
{
if ( p->_prev == nullptr ) // head element
{
_head = p->_next;
if ( p->_next == nullptr ) // head & only one
{
_tail = nullptr;
}
else
{
p->_next->_prev = nullptr; // head & midle
}
}
else if ( p->_next != nullptr ) // middle
{
p->_prev->_next = p->_next;
}
else // tail
{
p->_prev->_next = nullptr;
_tail = p->_prev;
}
delete p;
}
_mutex.unlock();
}
/*===================================== /*=====================================
Class AggregateTopicTable Class AggregateTopicTable
@@ -143,19 +183,138 @@ AggregateTopicTable::~AggregateTopicTable()
AggregateTopicElement* AggregateTopicTable::add(Topic* topic, Client* client) AggregateTopicElement* AggregateTopicTable::add(Topic* topic, Client* client)
{ {
//ToDo: AggregateGW AggregateTopicElement* elm = nullptr;
return 0; _mutex.lock();
elm = getAggregateTopicElement(topic);
if ( elm != nullptr )
{
if ( elm->find(client) == nullptr )
{
elm->add(client);
}
}
else
{
Topic* newTopic = topic->duplicate();
elm = new AggregateTopicElement(newTopic, client);
if ( _head == nullptr )
{
_head = elm;
_tail = elm;
}
else
{
elm->_prev = _tail;
_tail->_next = elm;
_tail = elm;
}
}
_mutex.unlock();
return elm;
} }
void AggregateTopicTable::remove(Topic* topic, Client* client) void AggregateTopicTable::erase(Topic* topic, Client* client)
{ {
//ToDo: AggregateGW AggregateTopicElement* elm = nullptr;
_mutex.lock();
elm = getAggregateTopicElement(topic);
if ( elm != nullptr )
{
elm->eraseClient(client);
}
if ( elm->_head == nullptr )
{
erase(elm);
}
_mutex.unlock();
return;
} }
AggregateTopicElement* AggregateTopicTable::getClientList(Topic* client) void AggregateTopicTable::erase(AggregateTopicElement* elmTopic)
{ {
// ToDo: AggregateGW if ( elmTopic != nullptr )
return 0; {
if ( elmTopic->_prev == nullptr ) // head element
{
_head = elmTopic->_next;
if ( elmTopic->_next == nullptr ) // head & only one
{
_tail = nullptr;
}
else
{
elmTopic->_next->_prev = nullptr; // head & midle
}
}
else if ( elmTopic->_next != nullptr ) // middle
{
elmTopic->_prev->_next = elmTopic->_next;
}
else // tail
{
elmTopic->_prev->_next = nullptr;
_tail = elmTopic->_prev;
}
delete elmTopic;
}
} }
AggregateTopicElement* AggregateTopicTable::getAggregateTopicElement(Topic* topic)
{
AggregateTopicElement* elm = _head;
while( elm != nullptr )
{
if ( elm->_topic->isMatch(topic->_topicName) )
{
break;
}
elm = elm->_next;
}
return elm;
}
ClientTopicElement* AggregateTopicTable::getClientElement(Topic* topic)
{
AggregateTopicElement* elm = getAggregateTopicElement(topic);
if ( elm != nullptr )
{
return elm->_head;
}
else
{
return nullptr;
}
}
void AggregateTopicTable::print(void)
{
AggregateTopicElement* elm = _head;
printf("Beginning of AggregateTopicTable\n");
while( elm != nullptr )
{
printf("%s\n", elm->_topic->getTopicName()->c_str());
ClientTopicElement* clElm = elm->getFirstClientTopicElement();
Client* client = clElm->getClient();
while ( client != nullptr )
{
printf(" %s\n", client->getClientId());
clElm = clElm->getNextClientElement();
if ( clElm != nullptr )
{
client = clElm->getClient();
}
else
{
client = nullptr;
}
}
elm = elm->_next;
}
printf("End of AggregateTopicTable\n");
}

View File

@@ -39,10 +39,16 @@ public:
~AggregateTopicTable(); ~AggregateTopicTable();
AggregateTopicElement* add(Topic* topic, Client* client); AggregateTopicElement* add(Topic* topic, Client* client);
AggregateTopicElement* getClientList(Topic* client); AggregateTopicElement* getAggregateTopicElement(Topic* topic);
void remove(Topic* topic, Client* client); ClientTopicElement* getClientElement(Topic* topic);
void erase(Topic* topic, Client* client);
void clear(void); void clear(void);
void print(void);
private: private:
void erase(AggregateTopicElement* elmTopic);
Mutex _mutex;
AggregateTopicElement* _head {nullptr}; AggregateTopicElement* _head {nullptr};
AggregateTopicElement* _tail {nullptr}; AggregateTopicElement* _tail {nullptr};
int _cnt {0}; int _cnt {0};
@@ -61,14 +67,16 @@ public:
~AggregateTopicElement(void); ~AggregateTopicElement(void);
ClientTopicElement* add(Client* client); ClientTopicElement* add(Client* client);
ClientTopicElement* getFirstElement(void); ClientTopicElement* getFirstClientTopicElement(void);
ClientTopicElement* getNextElement(ClientTopicElement* elm); ClientTopicElement* getNextClientTopicElement(ClientTopicElement* elmClient);
void erase(ClientTopicElement* elm); void eraseClient(Client* client);
ClientTopicElement* find(Client* client); ClientTopicElement* find(Client* client);
private: private:
Mutex _mutex; Mutex _mutex;
Topic* _topic {nullptr}; Topic* _topic {nullptr};
AggregateTopicElement* _next {nullptr};
AggregateTopicElement* _prev {nullptr};
ClientTopicElement* _head {nullptr}; ClientTopicElement* _head {nullptr};
ClientTopicElement* _tail {nullptr}; ClientTopicElement* _tail {nullptr};
}; };
@@ -83,6 +91,8 @@ class ClientTopicElement
public: public:
ClientTopicElement(Client* client); ClientTopicElement(Client* client);
~ClientTopicElement(void); ~ClientTopicElement(void);
ClientTopicElement* getNextClientElement(void);
Client* getClient(void); Client* getClient(void);
private: private:

View File

@@ -36,23 +36,12 @@ Aggregater::~Aggregater(void)
} }
void Aggregater::initialize(void) void Aggregater::initialize(char* gwName)
{ {
char param[MQTTSNGW_PARAM_MAX]; /* Create Aggregater Client */
string name = string(gwName) + string("_Aggregater");
if (_gateway->getParam("AggregatingGateway", param) == 0 ) setup(name.c_str(), Atype_Aggregater);
{ _isActive = true;
if (!strcasecmp(param, "YES") )
{
/* Create Aggregated Clients from clients.conf */
_gateway->getClientList()->setClientList(AGGREGATER_TYPE);
/* Create Aggregater Client */
string name = string(_gateway->getGWParams()->gatewayName) + "_Aggregater";
setup(name.c_str(), Atype_Aggregater);
_isActive = true;
}
}
//testMessageIdTable(); //testMessageIdTable();
@@ -95,26 +84,38 @@ uint16_t Aggregater::getMsgId(Client* client, uint16_t clientMsgId)
return _msgIdTable.getMsgId(client, clientMsgId); return _msgIdTable.getMsgId(client, clientMsgId);
} }
AggregateTopicElement* Aggregater::addAggregateTopic(Topic* topic, Client* client)
{
return _topicTable.add(topic, client);
}
void Aggregater::removeAggregateTopic(Topic* topic, Client* client) void Aggregater::removeAggregateTopic(Topic* topic, Client* client)
{ {
// ToDo: AggregateGW this method called when the client disconnect and erase it`s Topics. this method call */ _topicTable.erase(topic, client);
} }
void Aggregater::removeAggregateTopicList(Topics* topics, Client* client) AggregateTopicElement* Aggregater::findTopic(Topic* topic)
{ {
// ToDo: AggregateGW this method called when the client disconnect and erase it`s Topics. this method call */ return _topicTable.getAggregateTopicElement(topic);
} }
int Aggregater::addAggregateTopic(Topic* topic, Client* client) ClientTopicElement* Aggregater::getClientElement(Topic* topic)
{ {
// ToDo: AggregateGW */ AggregateTopicElement* elm = findTopic(topic);
return 0; if ( elm != nullptr )
{
return elm->getFirstClientTopicElement();
}
else
{
return nullptr;
}
} }
AggregateTopicElement* Aggregater::createClientList(Topic* topic) void Aggregater::printAggregateTopicTable(void)
{ {
// ToDo: AggregateGW */ _topicTable.print();
return 0;
} }
bool Aggregater::testMessageIdTable(void) bool Aggregater::testMessageIdTable(void)

View File

@@ -20,6 +20,7 @@
#include "MQTTSNGWAdapter.h" #include "MQTTSNGWAdapter.h"
#include "MQTTSNGWMessageIdTable.h" #include "MQTTSNGWMessageIdTable.h"
#include "MQTTSNGWAggregateTopicTable.h" #include "MQTTSNGWAggregateTopicTable.h"
namespace MQTTSNGW namespace MQTTSNGW
{ {
class Gateway; class Gateway;
@@ -40,7 +41,7 @@ public:
Aggregater(Gateway* gw); Aggregater(Gateway* gw);
~Aggregater(void); ~Aggregater(void);
void initialize(void); void initialize(char* gwName);
const char* getClientId(SensorNetAddress* addr); const char* getClientId(SensorNetAddress* addr);
Client* getClient(SensorNetAddress* addr); Client* getClient(SensorNetAddress* addr);
@@ -48,13 +49,18 @@ public:
uint16_t addMessageIdTable(Client* client, uint16_t msgId); uint16_t addMessageIdTable(Client* client, uint16_t msgId);
uint16_t getMsgId(Client* client, uint16_t clientMsgId); uint16_t getMsgId(Client* client, uint16_t clientMsgId);
ClientTopicElement* getClientElement(Topic* topic);
ClientTopicElement* getNextClientElement(ClientTopicElement* clientElement);
Client* getClient(ClientTopicElement* clientElement);
AggregateTopicElement* findTopic(Topic* topic);
AggregateTopicElement* addAggregateTopic(Topic* topic, Client* client);
AggregateTopicElement* createClientList(Topic* topic);
int addAggregateTopic(Topic* topic, Client* client);
void removeAggregateTopic(Topic* topic, Client* client); void removeAggregateTopic(Topic* topic, Client* client);
void removeAggregateTopicList(Topics* topics, Client* client); void removeAggregateAllTopic(Client* client);
bool isActive(void); bool isActive(void);
void printAggregateTopicTable(void);
bool testMessageIdTable(void); bool testMessageIdTable(void);
private: private:

View File

@@ -81,7 +81,7 @@ void BrokerSendTask::run()
packet = ev->getMQTTGWPacket(); packet = ev->getMQTTGWPacket();
/* Check Client is managed by Adapters */ /* Check Client is managed by Adapters */
client = adpMgr->getClient(*client); client = adpMgr->getClient(client);
if ( packet->getType() == CONNECT && client->getNetwork()->isValid() ) if ( packet->getType() == CONNECT && client->getNetwork()->isValid() )
{ {

View File

@@ -61,20 +61,28 @@ void ClientList::initialize(bool aggregate)
setClientList(type); setClientList(type);
_authorize = true; _authorize = true;
} }
if ( theGateway->getGWParams()->predefinedTopic )
{
setPredefinedTopics(aggregate);
}
} }
void ClientList::setClientList(int type) void ClientList::setClientList(int type)
{ {
if (!createList(theGateway->getGWParams()->clientListName, type))
if (!createList(theGateway->getClientListFileName(), type))
{ {
throw Exception("ClientList::initialize(): No client list defined by the configuration."); throw Exception("ClientList::setClientList No client list defined by config file.");
} }
} }
void ClientList::setPredefinedTopics(bool aggrecate) void ClientList::setPredefinedTopics(bool aggrecate)
{ {
readPredefinedList(theGateway->getPredefinedTopicFileName(), aggrecate); if ( !readPredefinedList(theGateway->getGWParams()->predefinedTopicFileName, aggrecate) )
{
throw Exception("ClientList::setPredefinedTopics No predefindTopi list defined by config file.");
}
} }
/** /**
@@ -108,7 +116,7 @@ bool ClientList::createList(const char* fileName, int type)
bool stable; bool stable;
bool qos_1; bool qos_1;
bool forwarder; bool forwarder;
bool rc = true; bool rc = false;
SensorNetAddress netAddr; SensorNetAddress netAddr;
MQTTSNString clientId = MQTTSNString_initializer; MQTTSNString clientId = MQTTSNString_initializer;
@@ -161,6 +169,7 @@ bool ClientList::createList(const char* fileName, int type)
free(clientId.cstring); free(clientId.cstring);
} }
fclose(fp); fclose(fp);
rc = true;
} }
return rc; return rc;
} }

View File

@@ -60,7 +60,7 @@ void ClientRecvTask::run()
Event* ev = nullptr; Event* ev = nullptr;
AdapterManager* adpMgr = _gateway->getAdapterManager(); AdapterManager* adpMgr = _gateway->getAdapterManager();
QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy(); QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy();
bool isAggrActive = adpMgr->isAggregaterActive(); int clientType = adpMgr->isAggregaterActive() ? AGGREGATER_TYPE : TRANSPEARENT_TYPE;
ClientList* clientList = _gateway->getClientList(); ClientList* clientList = _gateway->getClientList();
EventQue* packetEventQue = _gateway->getPacketEventQue(); EventQue* packetEventQue = _gateway->getPacketEventQue();
@@ -133,11 +133,12 @@ void ClientRecvTask::run()
{ {
const char* clientName = qosm1Proxy->getClientId(senderAddr); const char* clientName = qosm1Proxy->getClientId(senderAddr);
if ( clientName ) if ( clientName != nullptr )
{ {
client = qosm1Proxy->getClient();
if ( !packet->isQoSMinusPUBLISH() ) if ( !packet->isQoSMinusPUBLISH() )
{ {
client = qosm1Proxy->getClient();
log(clientName, packet); log(clientName, packet);
WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER); WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER);
delete packet; delete packet;
@@ -145,10 +146,14 @@ void ClientRecvTask::run()
} }
} }
} }
client = _gateway->getClientList()->getClient(senderAddr);
if ( client == nullptr )
{
client = _gateway->getClientList()->getClient(senderAddr);
}
} }
if ( client ) if ( client != nullptr )
{ {
/* write log and post Event */ /* write log and post Event */
log(client, packet, 0); log(client, packet, 0);
@@ -178,7 +183,7 @@ void ClientRecvTask::run()
if ( client == nullptr ) if ( client == nullptr )
{ {
/* create a new client */ /* create a new client */
client = clientList->createClient(0, &data.clientID, isAggrActive); client = clientList->createClient(0, &data.clientID, clientType);
} }
/* Add to af forwarded client list of forwarder. */ /* Add to af forwarded client list of forwarder. */
fwd->addClient(client, &nodeId); fwd->addClient(client, &nodeId);
@@ -193,7 +198,7 @@ void ClientRecvTask::run()
else else
{ {
/* create a new client */ /* create a new client */
client = clientList->createClient(senderAddr, &data.clientID, isAggrActive); client = clientList->createClient(senderAddr, &data.clientID, clientType);
} }
} }

View File

@@ -291,7 +291,6 @@ void MQTTSNConnectionHandler::sendStoredPublish(Client* client)
while ( ( msg = client->getClientSleepPacket() ) != nullptr ) while ( ( msg = client->getClientSleepPacket() ) != nullptr )
{ {
// ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
client->deleteFirstClientSleepPacket(); // pop the que to delete element. client->deleteFirstClientSleepPacket(); // pop the que to delete element.
Event* ev = new Event(); Event* ev = new Event();

View File

@@ -47,17 +47,8 @@ ForwarderList::~ForwarderList()
void ForwarderList::initialize(Gateway* gw) void ForwarderList::initialize(Gateway* gw)
{ {
char param[MQTTSNGW_PARAM_MAX]; /* Create Fowarders from clients.conf */
string fileName; gw->getClientList()->setClientList(FORWARDER_TYPE);
if (gw->getParam("Forwarder", param) == 0 )
{
if (!strcasecmp(param, "YES") )
{
/* Create Fowarders from clients.conf */
gw->getClientList()->setClientList(FORWARDER_TYPE);
}
}
} }

View File

@@ -38,28 +38,20 @@ QoSm1Proxy::~QoSm1Proxy(void)
} }
void QoSm1Proxy::initialize(void) void QoSm1Proxy::initialize(char* gwName)
{ {
char param[MQTTSNGW_PARAM_MAX];
if ( _gateway->hasSecureConnection() ) if ( _gateway->hasSecureConnection() )
{ {
_isSecure = true; _isSecure = true;
} }
if (_gateway->getParam("QoS-1", param) == 0 ) /* Create QoS-1 Clients from clients.conf */
{ _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE);
if (strcasecmp(param, "YES") == 0 )
{
/* Create QoS-1 Clients from clients.conf */
_gateway->getClientList()->setClientList(QOSM1PROXY_TYPE);
/* Create a client for QoS-1 proxy */ /* Create a client for QoS-1 proxy */
string name = string(_gateway->getGWParams()->gatewayName) + "_QoS-1"; string name = string(gwName) + string("_QoS-1");
setup(name.c_str(), Atype_QoSm1Proxy); setup(name.c_str(), Atype_QoSm1Proxy);
_isActive = true; _isActive = true;
}
}
} }

View File

@@ -35,7 +35,7 @@ public:
QoSm1Proxy(Gateway* gw); QoSm1Proxy(Gateway* gw);
~QoSm1Proxy(void); ~QoSm1Proxy(void);
void initialize(void); void initialize(char* GWnAME);
bool isActive(void); bool isActive(void);
private: private:

View File

@@ -203,11 +203,6 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack
if ( subscribe != nullptr ) if ( subscribe != nullptr )
{ {
UTF8String str = subscribe->getTopic();
string* topicName = new string(str.data, str.len);
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
_gateway->getAdapterManager()->addAggregateTopic(&topic, client);
int msgId = 0; int msgId = 0;
if ( packet->isDuplicate() ) if ( packet->isDuplicate() )
{ {
@@ -223,6 +218,13 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack
WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return; return;
} }
UTF8String str = subscribe->getTopic();
string* topicName = new string(str.data, str.len); // topicName is delete by topic
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
_gateway->getAdapterManager()->getAggregater()->addAggregateTopic(&topic, client);
subscribe->setMsgId(msgId); subscribe->setMsgId(msgId);
Event* ev = new Event(); Event* ev = new Event();
ev->setBrokerSendEvent(client, subscribe); ev->setBrokerSendEvent(client, subscribe);
@@ -235,11 +237,6 @@ void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPa
MQTTGWPacket* unsubscribe = handleUnsubscribe(client, packet); MQTTGWPacket* unsubscribe = handleUnsubscribe(client, packet);
if ( unsubscribe != nullptr ) if ( unsubscribe != nullptr )
{ {
UTF8String str = unsubscribe->getTopic();
string* topicName = new string(str.data, str.len);
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
_gateway->getAdapterManager()->removeAggregateTopic(&topic, client);
int msgId = 0; int msgId = 0;
if ( packet->isDuplicate() ) if ( packet->isDuplicate() )
{ {
@@ -255,6 +252,12 @@ void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPa
WRITELOG("%s MQTTSNUnsubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s MQTTSNUnsubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return; return;
} }
UTF8String str = unsubscribe->getTopic();
string* topicName = new string(str.data, str.len); // topicName is delete by topic
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
_gateway->getAdapterManager()->getAggregater()->removeAggregateTopic(&topic, client);
unsubscribe->setMsgId(msgId); unsubscribe->setMsgId(msgId);
Event* ev = new Event(); Event* ev = new Event();
ev->setBrokerSendEvent(client, unsubscribe); ev->setBrokerSendEvent(client, unsubscribe);

View File

@@ -63,6 +63,15 @@ MQTTSN_topicTypes Topic::getType(void)
return _type; return _type;
} }
Topic* Topic::duplicate(void)
{
Topic* newTopic = new Topic();
newTopic->_type = _type;
newTopic->_topicId = _topicId;
newTopic->_topicName = new string(_topicName->c_str());
return newTopic;
}
bool Topic::isMatch(string* topicName) bool Topic::isMatch(string* topicName)
{ {
string::size_type tlen = _topicName->size(); string::size_type tlen = _topicName->size();
@@ -354,6 +363,16 @@ void Topics::eraseNormal(void)
} }
} }
Topic* Topics::getFirstTopic(void)
{
return _first;
}
Topic* Topics::getNextTopic(Topic* topic)
{
return topic->_next;
}
void Topics::print(void) void Topics::print(void)
{ {
Topic* topic = _first; Topic* topic = _first;

View File

@@ -31,6 +31,7 @@ namespace MQTTSNGW
class Topic class Topic
{ {
friend class Topics; friend class Topics;
friend class AggregateTopicTable;
public: public:
Topic(); Topic();
Topic(string* topic, MQTTSN_topicTypes type); Topic(string* topic, MQTTSN_topicTypes type);
@@ -39,7 +40,9 @@ public:
uint16_t getTopicId(void); uint16_t getTopicId(void);
MQTTSN_topicTypes getType(void); MQTTSN_topicTypes getType(void);
bool isMatch(string* topicName); bool isMatch(string* topicName);
Topic* duplicate(void);
void print(void); void print(void);
private: private:
MQTTSN_topicTypes _type; MQTTSN_topicTypes _type;
uint16_t _topicId; uint16_t _topicId;
@@ -59,6 +62,8 @@ public:
Topic* add(const char* topicName, uint16_t id = 0); Topic* add(const char* topicName, uint16_t id = 0);
Topic* getTopicByName(const MQTTSN_topicid* topic); Topic* getTopicByName(const MQTTSN_topicid* topic);
Topic* getTopicById(const MQTTSN_topicid* topicid); Topic* getTopicById(const MQTTSN_topicid* topicid);
Topic* getFirstTopic(void);
Topic* getNextTopic(Topic* topic);
Topic* match(const MQTTSN_topicid* topicid); Topic* match(const MQTTSN_topicid* topicid);
void eraseNormal(void); void eraseNormal(void);
uint16_t getNextTopicId(); uint16_t getNextTopicId();

View File

@@ -234,29 +234,48 @@ void Gateway::initialize(int argc, char** argv)
_params.clientListName = strdup(param); _params.clientListName = strdup(param);
} }
if (getParam("PredefinedTopicList", param) == 0) if (getParam("PredefinedTopic", param) == 0)
{ {
_params.predefinedTopicFileName = strdup(param); if ( !strcasecmp(param, "YES") )
{
_params.predefinedTopic = true;
if (getParam("PredefinedTopicList", param) == 0)
{
_params.predefinedTopicFileName = strdup(param);
}
}
} }
if ( _params.clientListName == nullptr ) if (getParam("AggregatingGateway", param) == 0)
{ {
_params.clientListName = strdup(( _params.configDir + string(CLIENT_LIST) ).c_str()); if ( !strcasecmp(param, "YES") )
{
_params.aggregatingGw = true;
}
} }
if ( _params.predefinedTopicFileName == nullptr ) if (getParam("Forwarder", param) == 0)
{ {
_params.predefinedTopicFileName = strdup(( _params.configDir + string(PREDEFINEDTOPIC_FILE) ).c_str()); if ( !strcasecmp(param, "YES") )
{
_params.forwarder = true;
}
} }
/* ClientList and Adapters Initialize */ if (getParam("QoS-1", param) == 0)
_adapterManager->initialize(); {
if ( !strcasecmp(param, "YES") )
{
_params.qosMinus1 = true;
}
}
bool aggregate = _adapterManager->isAggregaterActive();
_clientList->initialize(aggregate);
/* Setup predefined topics */ /* Initialize adapters */
_clientList->setPredefinedTopics(aggregate); _adapterManager->initialize( _params.gatewayName, _params.aggregatingGw, _params.forwarder, _params.qosMinus1);
/* Setup ClientList and Predefined topics */
_clientList->initialize(_params.aggregatingGw);
} }
void Gateway::run(void) void Gateway::run(void)

View File

@@ -164,6 +164,10 @@ public:
char* privateKey {nullptr}; char* privateKey {nullptr};
char* qosMinusClientListName {nullptr}; char* qosMinusClientListName {nullptr};
bool clientAuthentication {false}; bool clientAuthentication {false};
bool predefinedTopic {false};
bool aggregatingGw {false};
bool qosMinus1 {false};
bool forwarder {false};
}; };