diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md index d549065..4f2b3c4 100644 --- a/MQTTSNGateway/README.md +++ b/MQTTSNGateway/README.md @@ -1,7 +1,8 @@ # MQTT-SN Transparent / Aggrigating Gateway **MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP). -This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf. +This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf. +The Transparent Gateway can not receive PUBLISH message from the broker at this version. ### **step1. Build the gateway** ```` @@ -37,7 +38,7 @@ $ ./MQTT-SNGateway [-f Config file name] # config file of MQTT-SN Gateway # -BrokerName=iot.eclipse.org +BrokerName=mqtt.eclipse.org BrokerPortNo=1883 BrokerSecurePortNo=8883 diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index ce5625d..13a1b35 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -272,8 +272,8 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* Publish pub; packet->getPUBLISH(&pub); - WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), - RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); + // Start of temporary code + WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish Aggregater can't handle a PUBLISH from the broker at the current version.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); if (pub.header.bits.qos == 1) { @@ -283,11 +283,12 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket* { replyACK(client, &pub, PUBREC); } - + // End of temporary code string* topicName = new string(pub.topic, pub.topiclen); - Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); + Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); // topic deletes topicName when the topic is deleted + AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic); if ( list != nullptr ) { diff --git a/MQTTSNGateway/src/MQTTSNGWAdapter.cpp b/MQTTSNGateway/src/MQTTSNGWAdapter.cpp index 73cbc29..a38ba67 100644 --- a/MQTTSNGateway/src/MQTTSNGWAdapter.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAdapter.cpp @@ -126,6 +126,11 @@ bool Adapter::isSecure(SensorNetAddress* addr) } } +bool Adapter::isSecure(void) +{ + return _isSecure; +} + void Adapter::setClient(Client* client, bool secure) { if ( secure ) @@ -241,10 +246,10 @@ void Proxy::checkConnection(Client* client) if ( client->isDisconnect() || ( client->isConnecting() && _responseTimer.isTimeup()) ) { client->connectSended(); - _responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL); + _responseTimer.start(PROXY_RESPONSE_DURATION * 1000UL); MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer; options.clientID.cstring = client->getClientId(); - options.duration = QOSM1_PROXY_KEEPALIVE_DURATION; + options.duration = PROXY_KEEPALIVE_DURATION; MQTTSNPacket* packet = new MQTTSNPacket(); packet->setCONNECT(&options); @@ -260,10 +265,10 @@ void Proxy::checkConnection(Client* client) Event* ev = new Event(); ev->setClientRecvEvent(client, packet); _gateway->getPacketEventQue()->post(ev); - _responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL); + _responseTimer.start(PROXY_RESPONSE_DURATION * 1000UL); _isWaitingResp = true; - if ( ++_retryCnt > QOSM1_PROXY_MAX_RETRY_CNT ) + if ( ++_retryCnt > PROXY_MAX_RETRY_CNT ) { client->disconnected(); } @@ -274,7 +279,7 @@ void Proxy::checkConnection(Client* client) void Proxy::resetPingTimer(void) { - _keepAliveTimer.start(QOSM1_PROXY_KEEPALIVE_DURATION * 1000UL); + _keepAliveTimer.start(PROXY_KEEPALIVE_DURATION * 1000UL); } void Proxy::recv(MQTTSNPacket* packet, Client* client) diff --git a/MQTTSNGateway/src/MQTTSNGWAdapter.h b/MQTTSNGateway/src/MQTTSNGWAdapter.h index 8ea4b45..f126bf1 100644 --- a/MQTTSNGateway/src/MQTTSNGWAdapter.h +++ b/MQTTSNGateway/src/MQTTSNGWAdapter.h @@ -56,6 +56,7 @@ public: void send(MQTTSNPacket* packet, Client* client); bool isActive(void); bool isSecure(SensorNetAddress* addr); + bool isSecure(void); void savePacket(Client* client, MQTTSNPacket* packet); private: diff --git a/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp b/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp index 935d58d..512be6d 100644 --- a/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp @@ -101,12 +101,18 @@ Client* AdapterManager::getClient(Client& client) _qosm1Proxy->resetPingTimer(secure); } else if ( client.isAggregated() ) - { newClient = _aggregater->getAdapterClient(&client); _aggregater->resetPingTimer(secure); } - + else if ( client.isQoSm1Proxy() ) + { + _qosm1Proxy->resetPingTimer(secure); + } + else if ( client.isAggregater() ) + { + _aggregater->resetPingTimer(secure); + } return newClient; } diff --git a/MQTTSNGateway/src/MQTTSNGWAggregater.cpp b/MQTTSNGateway/src/MQTTSNGWAggregater.cpp index b74ec86..32f4924 100644 --- a/MQTTSNGateway/src/MQTTSNGWAggregater.cpp +++ b/MQTTSNGateway/src/MQTTSNGWAggregater.cpp @@ -44,10 +44,11 @@ void Aggregater::initialize(void) { if (!strcasecmp(param, "YES") ) { - /* Create Aggregated Clients */ + /* Create Aggregated Clients from clients.conf */ _gateway->getClientList()->setClientList(AGGREGATER_TYPE); - string name = _gateway->getGWParams()->gatewayName; + /* Create Aggregater Client */ + string name = string(_gateway->getGWParams()->gatewayName) + "_Aggregater"; setup(name.c_str(), Atype_Aggregater); _isActive = true; } @@ -64,6 +65,7 @@ bool Aggregater::isActive(void) uint16_t Aggregater::msgId(void) { + // Only SecureClient generates msgId to avoid duplication of msgId. Client does not generate it. return Adapter::getSecureClient()->getNextPacketId(); } diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp index b886c9a..0821d23 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp @@ -127,7 +127,10 @@ void BrokerSendTask::run() { WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n", ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER); - client->getNetwork()->close(); + if ( errno != EBADF ) + { + client->getNetwork()->close(); + } /* Disconnect the client */ packet = new MQTTGWPacket(); diff --git a/MQTTSNGateway/src/MQTTSNGWClientList.cpp b/MQTTSNGateway/src/MQTTSNGWClientList.cpp index 524b3b6..776c23e 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientList.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientList.cpp @@ -118,7 +118,7 @@ void ClientList::setPredefinedTopics(bool aggrecate) * File format is: * Lines bigning with # are comment line. * ClientId, SensorNetAddress, "unstableLine", "secureConnection" - * in case of UDP, SensorNetAddress format is portNo@IPAddress. + * in case of UDP, SensorNetAddress format is IPAddress:portNo. * if the SensorNetwork is not stable, write unstableLine. * if BrokerConnection is SSL, write secureConnection. * if the client send PUBLISH QoS-1, QoS-1 is required. @@ -380,7 +380,7 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId, } else { - MQTTSNString dummyId MQTTSNString_initializer;; + MQTTSNString dummyId MQTTSNString_initializer; dummyId.cstring = strdup(""); client->setClientId(dummyId); free(dummyId.cstring); diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index 0cc22b6..d73827d 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -42,13 +42,14 @@ namespace MQTTSNGW #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages #define MAX_MESSAGEID_TABLE_SIZE (500) // Number of MessageIdTable size #define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state -#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256 +#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256 #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation) #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes -#define QOSM1_PROXY_KEEPALIVE_DURATION 900 // Secs -#define QOSM1_PROXY_RESPONSE_DURATION 10 // Secs -#define QOSM1_PROXY_MAX_RETRY_CNT 3 +#define PROXY_KEEPALIVE_DURATION (900) // Seconds +#define PROXY_RESPONSE_DURATION (10) // Seconds +#define PROXY_MAX_RETRY_CNT (3) + /*================================= * Data Type ==================================*/ diff --git a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp index 5fa2a49..8449c85 100644 --- a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp +++ b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp @@ -54,6 +54,7 @@ void ForwarderList::initialize(Gateway* gw) { if (!strcasecmp(param, "YES") ) { + /* Create Fowarders from clients.conf */ gw->getClientList()->setClientList(FORWARDER_TYPE); } } diff --git a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp index 47f1eae..a10749f 100644 --- a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp @@ -148,7 +148,7 @@ void PacketHandleTask::run() if ( adpMgr->isAggregatedClient(client) ) { - aggregatePacketHandler(client, snPacket); + aggregatePacketHandler(client, snPacket); // client is converted to Aggregater by BrokerSendTask } else { diff --git a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp index 50ff56c..7876fc0 100644 --- a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp +++ b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp @@ -51,11 +51,11 @@ void QoSm1Proxy::initialize(void) { if (strcasecmp(param, "YES") == 0 ) { - /* Create QoS-1 Clients */ + /* Create QoS-1 Clients from clients.conf */ _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE); - /* initialize Adapter */ - string name = string(_gateway->getGWParams()->gatewayName) + "QoS-1"; + /* Create a client for QoS-1 proxy */ + string name = string(_gateway->getGWParams()->gatewayName) + "_QoS-1"; setup(name.c_str(), Atype_QoSm1Proxy); _isActive = true; } diff --git a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp index dceb864..45b2964 100644 --- a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp @@ -223,7 +223,6 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); return; } -WRITELOG("msgId=%d\n",msgId); subscribe->setMsgId(msgId); Event* ev = new Event(); ev->setBrokerSendEvent(client, subscribe);