diff --git a/.cproject b/.cproject
index 62eba26..bf962d5 100644
--- a/.cproject
+++ b/.cproject
@@ -53,17 +53,17 @@
@@ -133,7 +133,7 @@
-
+
@@ -193,15 +193,15 @@
@@ -273,11 +273,9 @@
-
+
-
-
-
+
@@ -319,17 +317,19 @@
+
+
-
+
-
+
@@ -348,7 +348,5 @@
-
-
diff --git a/.settings/language.settings.xml b/.settings/language.settings.xml
index 7d5c34b..e5b4485 100644
--- a/.settings/language.settings.xml
+++ b/.settings/language.settings.xml
@@ -11,7 +11,7 @@
-
+
@@ -33,7 +33,7 @@
-
+
diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
index 918cddb..78a84db 100644
--- a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
+++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
@@ -119,7 +119,7 @@ void LPublishManager::sendPublish(PubElement* elm)
uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
uint8_t org = 0;
- if (elm->payloadlen > 128)
+ if (elm->payloadlen > 248)
{
msg[0] = 0x01;
setUint16(msg + 1, elm->payloadlen + 9);
diff --git a/MQTTSNGateway/GatewayTester/src/LTaskManager.cpp b/MQTTSNGateway/GatewayTester/src/LTaskManager.cpp
index f38182d..1ec298b 100644
--- a/MQTTSNGateway/GatewayTester/src/LTaskManager.cpp
+++ b/MQTTSNGateway/GatewayTester/src/LTaskManager.cpp
@@ -56,12 +56,14 @@ void LTaskManager::run(void){
int i = 0;
char c = 0;
bool cancelFlg = false;
+ TestList test = {0};
+ TaskList task = {0};
if ( !theClientMode )
{
theClient->getGwProxy()->getMessage();
- for (i = 0; _tests[i].testTask > 0; i++)
+ for (i = 0; _tests[i].testTask > test.testTask; i++)
{
PROMPT("Execute \"%s\" ? ( y/n ) : ", _tests[i].testLabel);
while (true)
@@ -118,7 +120,7 @@ void LTaskManager::run(void){
while (true)
{
theClient->getGwProxy()->getMessage();
- for (_index = 0; _tasks[_index].callback > 0; _index++)
+ for (_index = 0; _tasks[_index].callback > task.callback; _index++)
{
if ((_tasks[_index].prevTime + _tasks[_index].interval <= time(NULL)) &&
_tasks[_index].count == 0)
diff --git a/MQTTSNGateway/GatewayTester/src/LTimer.h b/MQTTSNGateway/GatewayTester/src/LTimer.h
index 9c666cf..cb44cc0 100644
--- a/MQTTSNGateway/GatewayTester/src/LTimer.h
+++ b/MQTTSNGateway/GatewayTester/src/LTimer.h
@@ -17,7 +17,7 @@
#ifndef TIMER_H_
#define TIMER_H_
-#include
+#include
#include "LMqttsnClientApp.h"
diff --git a/MQTTSNGateway/Makefile b/MQTTSNGateway/Makefile
index 1fd2233..7e096e9 100644
--- a/MQTTSNGateway/Makefile
+++ b/MQTTSNGateway/Makefile
@@ -74,7 +74,6 @@ $(SUBDIR)/MQTTSNSubscribeServer.c \
$(SUBDIR)/MQTTSNUnsubscribeClient.c \
$(SUBDIR)/MQTTSNUnsubscribeServer.c
-CXX := g++
CPPFLAGS +=
INCLUDE :=
@@ -82,14 +81,20 @@ INCLUDES += $(INCLUDE) -I$(SRCDIR) \
-I$(SRCDIR)/$(OS) \
-I$(SRCDIR)/$(OS)/$(SENSORNET) \
-I$(SUBDIR) \
--I$(SRCDIR)/$(TEST)
+-I$(SRCDIR)/$(TEST) \
+-I/usr/local/opt/openssl/include/
+# preprocessor defines
DEFS :=
+
+CXX := g++
+
LIB :=
-LIBS += $(LIB) -L/usr/local/lib
-LDFLAGS :=
+LIBS += $(LIB) -L/usr/local/lib -L/usr/local/opt/openssl/lib/
+
+LDFLAGS :=
CXXFLAGS := -Wall -O3 -std=c++11
-LDADD := -lpthread -lssl -lcrypto -lrt
+LDADD := -lpthread -lssl -lcrypto
OUTDIR := Build
PROG := $(OUTDIR)/$(PROGNAME)
diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md
index d549065..c7a791c 100644
--- a/MQTTSNGateway/README.md
+++ b/MQTTSNGateway/README.md
@@ -1,18 +1,18 @@
-# MQTT-SN Transparent / Aggrigating Gateway
+# MQTT-SN Transparent / Aggregating Gateway
**MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP).
-This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf.
+This Gateway can run as a transparent or aggregating Gateway by specifying the gateway.conf.
### **step1. Build the gateway**
````
-$ git clone -b experiment https://github.com/eclipse/paho.mqtt-sn.embedded-c
+$ git clone -b develop https://github.com/eclipse/paho.mqtt-sn.embedded-c
$ cd paho.mqtt-sn.embedded-c/MQTTSNGateway
-$ make [SENSORNET={udp6|xbee}]
+$ make [SENSORNET={udp6|xbee|loralink}]
$ make install
$ make clean
````
By default, a gateway for UDP is built.
-In order to create a gateway for UDP6 or XBee, SENSORNET argument is required.
+In order to create a gateway for UDP6, XBee or LoRaLink, SENSORNET argument is required.
MQTT-SNGateway, MQTT-SNLogmonitor and *.conf files are copied into ../ directory.
If you want to install the gateway into specific directories, enter a command line as follows:
@@ -24,10 +24,18 @@ $ make install INSTALL_DIR=/path/to/your_directory CONFIG_DIR=/path/to/your_dire
### **step2. Execute the Gateway.**
````
-$ cd ../
+$ cd ../../
$ ./MQTT-SNGateway [-f Config file name]
````
-
+If you get the error message as follows:
+````
+what(): RingBuffer can't create a shared memory.
+Aborted (core dumped)
+````
+You have to start using sudo command only once for the first time.
+````
+$ sudo ./MQTT-SNGateway [-f Config file name]
+````
### **How to Change the configuration of the gateway**
**../gateway.conf** Contents are follows:
@@ -37,7 +45,7 @@ $ ./MQTT-SNGateway [-f Config file name]
# config file of MQTT-SN Gateway
#
-BrokerName=iot.eclipse.org
+BrokerName=mqtt.eclipse.org
BrokerPortNo=1883
BrokerSecurePortNo=8883
@@ -72,12 +80,25 @@ KeepAlive=900
GatewayPortNo=10000
MulticastIP=225.1.1.1
MulticastPortNo=1883
+MulticastTTL=1
+
+# UDP6
+GatewayUDP6Bind=FFFF:FFFE::1
+GatewayUDP6Port=10000
+GatewayUDP6Broadcast=FF02::1
+GatewayUDP6If=wpan0
+GatewayUDP6Hops=1
# XBee
Baudrate=38400
SerialDevice=/dev/ttyUSB0
ApiMode=2
+#LoRaLink
+BaudrateLoRaLink=115200
+DeviceRxLoRaLink=/dev/ttyLoRaLinkRx
+DeviceTxLoRaLink=/dev/ttyLoRaLinkTx
+
# LOG
ShearedMemory=NO;
diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf
index d01903a..73f110e 100644
--- a/MQTTSNGateway/gateway.conf
+++ b/MQTTSNGateway/gateway.conf
@@ -49,12 +49,14 @@ KeepAlive=900
GatewayPortNo=10000
MulticastIP=225.1.1.1
MulticastPortNo=1883
+MulticastTTL=1
# UDP6
GatewayUDP6Bind=FFFF:FFFE::1
GatewayUDP6Port=10000
GatewayUDP6Broadcast=FF02::1
GatewayUDP6If=wpan0
+GatewayUDP6Hops=1
# XBee
Baudrate=38400
diff --git a/MQTTSNGateway/src/MQTTGWPacket.h b/MQTTSNGateway/src/MQTTGWPacket.h
index 3a4829e..fb2ad86 100644
--- a/MQTTSNGateway/src/MQTTGWPacket.h
+++ b/MQTTSNGateway/src/MQTTGWPacket.h
@@ -116,7 +116,7 @@ typedef struct
unsigned char version; /**< MQTT version number */
} Connect;
-#define MQTTPacket_Connect_Initializer {{0}, 0, nullptr, nullptr, nullptr, nullptr, 0, 0}
+#define MQTTPacket_Connect_Initializer {{0}, {0}, nullptr, nullptr, nullptr, nullptr, 0, 0}
#define MQTTPacket_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, {NULL, {0, NULL}}, {NULL, {0, NULL}}, 0, 0 }
#define MQTTPacket_connectData_initializer { {'M', 'Q', 'T', 'C'}, 0, 4, {NULL, {0, NULL}}, 60, 1, 0, \
MQTTPacket_willOptions_initializer, {NULL, {0, NULL}}, {NULL, {0, NULL}} }
diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp
index ce5625d..e3abc79 100644
--- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp
+++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp
@@ -272,52 +272,31 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket*
Publish pub;
packet->getPUBLISH(&pub);
- WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
- RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
- if (pub.header.bits.qos == 1)
- {
- replyACK(client, &pub, PUBACK);
- }
- else if ( pub.header.bits.qos == 2)
- {
- replyACK(client, &pub, PUBREC);
- }
-
-
-
- string* topicName = new string(pub.topic, pub.topiclen);
+ string* topicName = new string(pub.topic, pub.topiclen); // topic deletes topicName when the topic is deleted
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
- AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic);
- if ( list != nullptr )
+
+ // ToDo: need to refactor
+ ClientTopicElement* elm = _gateway->getAdapterManager()->getAggregater()->getClientElement(&topic);
+
+ while ( elm != nullptr )
{
- ClientTopicElement* p = list->getFirstElement();
+ Client* devClient = elm->getClient();
+ MQTTGWPacket* msg = new MQTTGWPacket();
+ *msg = *packet;
- while ( p )
+ if ( msg->getType() == 0 )
{
- Client* devClient = p->getClient();
- if ( devClient != nullptr )
- {
- MQTTGWPacket* msg = new MQTTGWPacket();
- *msg = *packet;
- if ( msg->getType() == 0 )
- {
- WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
- delete msg;
- break;
- }
- Event* ev = new Event();
- ev->setBrokerRecvEvent(devClient, msg);
- _gateway->getPacketEventQue()->post(ev);
- }
- else
- {
- break;
- }
-
- p = list->getNextElement(p);
+ WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
+ delete msg;
+ break;
}
- delete list;
+
+ Event* ev = new Event();
+ ev->setBrokerRecvEvent(devClient, msg);
+ _gateway->getPacketEventQue()->post(ev);
+
+ elm = elm->getNextClientElement();
}
}
diff --git a/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp
index dded6df..90672bf 100644
--- a/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp
+++ b/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.cpp
@@ -81,7 +81,15 @@ void MQTTSNAggregateConnectionHandler::handleConnect(Client* client, MQTTSNPacke
/* renew the TopicList */
if (topics)
{
- _gateway->getAdapterManager()->removeAggregateTopicList(topics, client);
+ Topic* tp = topics->getFirstTopic();
+ while( tp != nullptr )
+ {
+ if ( tp->getType() == MQTTSN_TOPIC_TYPE_NORMAL )
+ {
+ _gateway->getAdapterManager()->getAggregater()->removeAggregateTopic(tp, client);
+ }
+ tp = topics->getNextTopic(tp);
+ }
topics->eraseNormal();
}
client->setSessionStatus(true);
@@ -170,19 +178,17 @@ void MQTTSNAggregateConnectionHandler::handlePingreq(Client* client, MQTTSNPacke
sendStoredPublish(client);
client->holdPingRequest();
}
- else
- {
- /* create and send PINGRESP to the PacketHandler */
- client->resetPingRequest();
- MQTTGWPacket* pingresp = new MQTTGWPacket();
+ /* create and send PINGRESP to the PacketHandler */
+ client->resetPingRequest();
- pingresp->setHeader(PINGRESP);
+ MQTTGWPacket* pingresp = new MQTTGWPacket();
- Event* evt = new Event();
- evt->setBrokerRecvEvent(client, pingresp);
- _gateway->getPacketEventQue()->post(evt);
- }
+ pingresp->setHeader(PINGRESP);
+
+ Event* evt = new Event();
+ evt->setBrokerRecvEvent(client, pingresp);
+ _gateway->getPacketEventQue()->post(evt);
}
void MQTTSNAggregateConnectionHandler::sendStoredPublish(Client* client)
@@ -191,7 +197,6 @@ void MQTTSNAggregateConnectionHandler::sendStoredPublish(Client* client)
while ( ( msg = client->getClientSleepPacket() ) != nullptr )
{
- // ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
client->deleteFirstClientSleepPacket(); // pop the que to delete element.
Event* ev = new Event();
diff --git a/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.h b/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.h
index 46bf4f0..53282b3 100644
--- a/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.h
+++ b/MQTTSNGateway/src/MQTTSNAggregateConnectionHandler.h
@@ -39,7 +39,6 @@ public:
private:
void sendStoredPublish(Client* client);
- char _pbuf[MQTTSNGW_MAX_PACKET_SIZE * 3];
Gateway* _gateway;
};
diff --git a/MQTTSNGateway/src/MQTTSNGWAdapter.cpp b/MQTTSNGateway/src/MQTTSNGWAdapter.cpp
index 73cbc29..2fa37b3 100644
--- a/MQTTSNGateway/src/MQTTSNGWAdapter.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWAdapter.cpp
@@ -126,6 +126,11 @@ bool Adapter::isSecure(SensorNetAddress* addr)
}
}
+bool Adapter::isSecure(void)
+{
+ return _isSecure;
+}
+
void Adapter::setClient(Client* client, bool secure)
{
if ( secure )
@@ -212,7 +217,7 @@ Client* Adapter::getAdapterClient(Client* client)
{
if ( client->isSecureNetwork() )
{
- return _client;
+ return _clientSecure;
}
else
{
@@ -241,10 +246,10 @@ void Proxy::checkConnection(Client* client)
if ( client->isDisconnect() || ( client->isConnecting() && _responseTimer.isTimeup()) )
{
client->connectSended();
- _responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL);
+ _responseTimer.start(PROXY_RESPONSE_DURATION * 1000UL);
MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer;
options.clientID.cstring = client->getClientId();
- options.duration = QOSM1_PROXY_KEEPALIVE_DURATION;
+ options.duration = PROXY_KEEPALIVE_DURATION;
MQTTSNPacket* packet = new MQTTSNPacket();
packet->setCONNECT(&options);
@@ -260,10 +265,10 @@ void Proxy::checkConnection(Client* client)
Event* ev = new Event();
ev->setClientRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev);
- _responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL);
+ _responseTimer.start(PROXY_RESPONSE_DURATION * 1000UL);
_isWaitingResp = true;
- if ( ++_retryCnt > QOSM1_PROXY_MAX_RETRY_CNT )
+ if ( ++_retryCnt > PROXY_MAX_RETRY_CNT )
{
client->disconnected();
}
@@ -274,7 +279,7 @@ void Proxy::checkConnection(Client* client)
void Proxy::resetPingTimer(void)
{
- _keepAliveTimer.start(QOSM1_PROXY_KEEPALIVE_DURATION * 1000UL);
+ _keepAliveTimer.start(PROXY_KEEPALIVE_DURATION * 1000UL);
}
void Proxy::recv(MQTTSNPacket* packet, Client* client)
diff --git a/MQTTSNGateway/src/MQTTSNGWAdapter.h b/MQTTSNGateway/src/MQTTSNGWAdapter.h
index 8ea4b45..f126bf1 100644
--- a/MQTTSNGateway/src/MQTTSNGWAdapter.h
+++ b/MQTTSNGateway/src/MQTTSNGWAdapter.h
@@ -56,6 +56,7 @@ public:
void send(MQTTSNPacket* packet, Client* client);
bool isActive(void);
bool isSecure(SensorNetAddress* addr);
+ bool isSecure(void);
void savePacket(Client* client, MQTTSNPacket* packet);
private:
diff --git a/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp b/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp
index 935d58d..0d5fa55 100644
--- a/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWAdapterManager.cpp
@@ -40,13 +40,23 @@ AdapterManager::AdapterManager(Gateway* gw)
}
-void AdapterManager::initialize(void)
+void AdapterManager::initialize(char* gwName, bool aggregate, bool forwarder, bool qosM1)
{
- _aggregater->initialize();
- _forwarders->initialize(_gateway);
- _qosm1Proxy->initialize();
-}
+ if ( aggregate )
+ {
+ _aggregater->initialize(gwName);
+ }
+ if ( qosM1 )
+ {
+ _qosm1Proxy->initialize(gwName);
+ }
+
+ if ( forwarder )
+ {
+ _forwarders->initialize(_gateway);
+ }
+}
AdapterManager::~AdapterManager(void)
{
@@ -91,22 +101,29 @@ bool AdapterManager::isAggregatedClient(Client* client)
}
}
-Client* AdapterManager::getClient(Client& client)
+Client* AdapterManager::getClient(Client* client)
{
- bool secure = client.isSecureNetwork();
- Client* newClient = &client;
- if ( client.isQoSm1() )
+ bool secure = client->isSecureNetwork();
+ Client* newClient = client;
+
+ if ( client->isQoSm1() )
{
- newClient = _qosm1Proxy->getAdapterClient(&client);
+ newClient = _qosm1Proxy->getAdapterClient(client);
_qosm1Proxy->resetPingTimer(secure);
}
- else if ( client.isAggregated() )
-
+ else if ( client->isAggregated() )
+ {
+ newClient = _aggregater->getAdapterClient(client);
+ _aggregater->resetPingTimer(secure);
+ }
+ else if ( client->isQoSm1Proxy() )
+ {
+ _qosm1Proxy->resetPingTimer(secure);
+ }
+ else if ( client->isAggregater() )
{
- newClient = _aggregater->getAdapterClient(&client);
_aggregater->resetPingTimer(secure);
}
-
return newClient;
}
@@ -121,8 +138,8 @@ int AdapterManager::unicastToClient(Client* client, MQTTSNPacket* packet, Client
MQTTSNGWEncapsulatedPacket encap(packet);
WirelessNodeId* wnId = fwd->getWirelessNodeId(client);
encap.setWirelessNodeId(wnId);
- WRITELOG(FORMAT_Y_W_G, currentDateTime(), encap.getName(), RIGHTARROW, fwd->getId(), encap.print(pbuf));
task->log(client, packet);
+ WRITELOG(FORMAT_Y_W_G, currentDateTime(), encap.getName(), RIGHTARROW, fwd->getId(), encap.print(pbuf));
rc = encap.unicast(_gateway->getSensorNetwork(),fwd->getSensorNetAddr());
}
else
@@ -167,22 +184,26 @@ bool AdapterManager::isAggregaterActive(void)
return _aggregater->isActive();
}
-AggregateTopicElement* AdapterManager::createClientList(Topic* topic)
+/*
+AggregateTopicElement* AdapterManager::findTopic(Topic* topic)
{
- return _aggregater->createClientList(topic);
+ return _aggregater->findTopic(topic);
}
-int AdapterManager::addAggregateTopic(Topic* topic, Client* client)
+AggregateTopicElement* AdapterManager::addAggregateTopic(Topic* topic, Client* client)
{
return _aggregater->addAggregateTopic(topic, client);
}
+
void AdapterManager::removeAggregateTopic(Topic* topic, Client* client)
{
- _aggregater->removeAggregateTopic(topic, client);
+ //_aggregater->removeAggregateTopic(topic, client);
}
void AdapterManager::removeAggregateTopicList(Topics* topics, Client* client)
{
- _aggregater->removeAggregateTopicList(topics, client);
+
}
+*/
+
diff --git a/MQTTSNGateway/src/MQTTSNGWAdapterManager.h b/MQTTSNGateway/src/MQTTSNGWAdapterManager.h
index 510d02f..58e5a03 100644
--- a/MQTTSNGateway/src/MQTTSNGWAdapterManager.h
+++ b/MQTTSNGateway/src/MQTTSNGWAdapterManager.h
@@ -40,21 +40,17 @@ class AdapterManager
public:
AdapterManager(Gateway* gw);
~AdapterManager(void);
- void initialize(void);
+ void initialize(char* gwName, bool aggregater, bool fowarder, bool qosM1);
ForwarderList* getForwarderList(void);
QoSm1Proxy* getQoSm1Proxy(void);
Aggregater* getAggregater(void);
void checkConnection(void);
bool isAggregatedClient(Client* client);
- Client* getClient(Client& client);
+ Client* getClient(Client* client);
Client* convertClient(uint16_t msgId, uint16_t* clientMsgId);
int unicastToClient(Client* client, MQTTSNPacket* packet, ClientSendTask* task);
bool isAggregaterActive(void);
- AggregateTopicElement* createClientList(Topic* topic);
- int addAggregateTopic(Topic* topic, Client* client);
- void removeAggregateTopic(Topic* topic, Client* client);
- void removeAggregateTopicList(Topics* topics, Client* client);
private:
Gateway* _gateway {nullptr};
diff --git a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp
index b53aea1..24dd23f 100644
--- a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.cpp
@@ -34,6 +34,11 @@ Client* ClientTopicElement::getClient(void)
return _client;
}
+ClientTopicElement* ClientTopicElement::getNextClientElement(void)
+{
+ return _next;
+}
+
/*=====================================
Class AggregateTopicElement
=====================================*/
@@ -44,6 +49,7 @@ AggregateTopicElement::AggregateTopicElement(void)
AggregateTopicElement::AggregateTopicElement(Topic* topic, Client* client)
{
+ _topic = topic;
ClientTopicElement* elm = new ClientTopicElement(client);
if ( elm != nullptr )
{
@@ -76,7 +82,9 @@ ClientTopicElement* AggregateTopicElement::add(Client* client)
{
return nullptr;
}
+
_mutex.lock();
+
if ( _head == nullptr )
{
_head = elm;
@@ -95,7 +103,7 @@ ClientTopicElement* AggregateTopicElement::add(Client* client)
else
{
delete elm;
- elm = nullptr;
+ elm = p;
}
}
_mutex.unlock();
@@ -105,7 +113,7 @@ ClientTopicElement* AggregateTopicElement::add(Client* client)
ClientTopicElement* AggregateTopicElement::find(Client* client)
{
ClientTopicElement* p = _head;
- while ( p )
+ while ( p != nullptr )
{
if ( p->_client == client)
{
@@ -116,16 +124,48 @@ ClientTopicElement* AggregateTopicElement::find(Client* client)
return p;
}
-ClientTopicElement* AggregateTopicElement::getFirstElement(void)
+ClientTopicElement* AggregateTopicElement::getFirstClientTopicElement(void)
{
return _head;
}
-ClientTopicElement* AggregateTopicElement::getNextElement(ClientTopicElement* elm)
+ClientTopicElement* AggregateTopicElement::getNextClientTopicElement(ClientTopicElement* elmClient)
{
- return elm->_next;
+ return elmClient->_next;
}
+void AggregateTopicElement::eraseClient(Client* client)
+{
+ _mutex.lock();
+
+ ClientTopicElement* p = find(client);
+ if ( p != nullptr )
+ {
+ if ( p->_prev == nullptr ) // head element
+ {
+ _head = p->_next;
+ if ( p->_next == nullptr ) // head & only one
+ {
+ _tail = nullptr;
+ }
+ else
+ {
+ p->_next->_prev = nullptr; // head & midle
+ }
+ }
+ else if ( p->_next != nullptr ) // middle
+ {
+ p->_prev->_next = p->_next;
+ }
+ else // tail
+ {
+ p->_prev->_next = nullptr;
+ _tail = p->_prev;
+ }
+ delete p;
+ }
+ _mutex.unlock();
+}
/*=====================================
Class AggregateTopicTable
@@ -143,19 +183,138 @@ AggregateTopicTable::~AggregateTopicTable()
AggregateTopicElement* AggregateTopicTable::add(Topic* topic, Client* client)
{
- //ToDo: AggregateGW
- return 0;
+ AggregateTopicElement* elm = nullptr;
+ _mutex.lock();
+ elm = getAggregateTopicElement(topic);
+ if ( elm != nullptr )
+ {
+ if ( elm->find(client) == nullptr )
+ {
+ elm->add(client);
+ }
+ }
+ else
+ {
+ Topic* newTopic = topic->duplicate();
+ elm = new AggregateTopicElement(newTopic, client);
+ if ( _head == nullptr )
+ {
+ _head = elm;
+ _tail = elm;
+ }
+ else
+ {
+ elm->_prev = _tail;
+ _tail->_next = elm;
+ _tail = elm;
+ }
+ }
+ _mutex.unlock();
+ return elm;
}
-void AggregateTopicTable::remove(Topic* topic, Client* client)
+void AggregateTopicTable::erase(Topic* topic, Client* client)
{
- //ToDo: AggregateGW
+ AggregateTopicElement* elm = nullptr;
+
+ _mutex.lock();
+ elm = getAggregateTopicElement(topic);
+
+ if ( elm != nullptr )
+ {
+ elm->eraseClient(client);
+ }
+ if ( elm->_head == nullptr )
+ {
+ erase(elm);
+ }
+ _mutex.unlock();
+ return;
}
-AggregateTopicElement* AggregateTopicTable::getClientList(Topic* client)
+void AggregateTopicTable::erase(AggregateTopicElement* elmTopic)
{
- // ToDo: AggregateGW
- return 0;
+ if ( elmTopic != nullptr )
+ {
+ if ( elmTopic->_prev == nullptr ) // head element
+ {
+ _head = elmTopic->_next;
+ if ( elmTopic->_next == nullptr ) // head & only one
+ {
+ _tail = nullptr;
+ }
+ else
+ {
+ elmTopic->_next->_prev = nullptr; // head & midle
+ }
+ }
+ else if ( elmTopic->_next != nullptr ) // middle
+ {
+ elmTopic->_prev->_next = elmTopic->_next;
+ }
+ else // tail
+ {
+ elmTopic->_prev->_next = nullptr;
+ _tail = elmTopic->_prev;
+ }
+ delete elmTopic;
+ }
}
+AggregateTopicElement* AggregateTopicTable::getAggregateTopicElement(Topic* topic)
+{
+ AggregateTopicElement* elm = _head;
+ while( elm != nullptr )
+ {
+ if ( elm->_topic->isMatch(topic->_topicName) )
+ {
+ break;
+ }
+ elm = elm->_next;
+ }
+ return elm;
+}
+
+ClientTopicElement* AggregateTopicTable::getClientElement(Topic* topic)
+{
+ AggregateTopicElement* elm = getAggregateTopicElement(topic);
+ if ( elm != nullptr )
+ {
+ return elm->_head;
+ }
+ else
+ {
+ return nullptr;
+ }
+}
+
+void AggregateTopicTable::print(void)
+{
+ AggregateTopicElement* elm = _head;
+
+ printf("Beginning of AggregateTopicTable\n");
+ while( elm != nullptr )
+ {
+ printf("%s\n", elm->_topic->getTopicName()->c_str());
+
+ ClientTopicElement* clElm = elm->getFirstClientTopicElement();
+ Client* client = clElm->getClient();
+
+ while ( client != nullptr )
+ {
+ printf(" %s\n", client->getClientId());
+ clElm = clElm->getNextClientElement();
+ if ( clElm != nullptr )
+ {
+ client = clElm->getClient();
+ }
+ else
+ {
+ client = nullptr;
+ }
+ }
+ elm = elm->_next;
+ }
+ printf("End of AggregateTopicTable\n");
+}
diff --git a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h
index 624743f..87d9e82 100644
--- a/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h
+++ b/MQTTSNGateway/src/MQTTSNGWAggregateTopicTable.h
@@ -39,10 +39,16 @@ public:
~AggregateTopicTable();
AggregateTopicElement* add(Topic* topic, Client* client);
- AggregateTopicElement* getClientList(Topic* client);
- void remove(Topic* topic, Client* client);
+ AggregateTopicElement* getAggregateTopicElement(Topic* topic);
+ ClientTopicElement* getClientElement(Topic* topic);
+ void erase(Topic* topic, Client* client);
void clear(void);
+
+ void print(void);
+
private:
+ void erase(AggregateTopicElement* elmTopic);
+ Mutex _mutex;
AggregateTopicElement* _head {nullptr};
AggregateTopicElement* _tail {nullptr};
int _cnt {0};
@@ -61,14 +67,16 @@ public:
~AggregateTopicElement(void);
ClientTopicElement* add(Client* client);
- ClientTopicElement* getFirstElement(void);
- ClientTopicElement* getNextElement(ClientTopicElement* elm);
- void erase(ClientTopicElement* elm);
+ ClientTopicElement* getFirstClientTopicElement(void);
+ ClientTopicElement* getNextClientTopicElement(ClientTopicElement* elmClient);
+ void eraseClient(Client* client);
ClientTopicElement* find(Client* client);
private:
Mutex _mutex;
Topic* _topic {nullptr};
+ AggregateTopicElement* _next {nullptr};
+ AggregateTopicElement* _prev {nullptr};
ClientTopicElement* _head {nullptr};
ClientTopicElement* _tail {nullptr};
};
@@ -83,6 +91,8 @@ class ClientTopicElement
public:
ClientTopicElement(Client* client);
~ClientTopicElement(void);
+
+ ClientTopicElement* getNextClientElement(void);
Client* getClient(void);
private:
diff --git a/MQTTSNGateway/src/MQTTSNGWAggregater.cpp b/MQTTSNGateway/src/MQTTSNGWAggregater.cpp
index b74ec86..93718db 100644
--- a/MQTTSNGateway/src/MQTTSNGWAggregater.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWAggregater.cpp
@@ -36,22 +36,12 @@ Aggregater::~Aggregater(void)
}
-void Aggregater::initialize(void)
+void Aggregater::initialize(char* gwName)
{
- char param[MQTTSNGW_PARAM_MAX];
-
- if (_gateway->getParam("AggregatingGateway", param) == 0 )
- {
- if (!strcasecmp(param, "YES") )
- {
- /* Create Aggregated Clients */
- _gateway->getClientList()->setClientList(AGGREGATER_TYPE);
-
- string name = _gateway->getGWParams()->gatewayName;
- setup(name.c_str(), Atype_Aggregater);
- _isActive = true;
- }
- }
+ /* Create Aggregater Client */
+ string name = string(gwName) + string("_Aggregater");
+ setup(name.c_str(), Atype_Aggregater);
+ _isActive = true;
//testMessageIdTable();
@@ -64,6 +54,7 @@ bool Aggregater::isActive(void)
uint16_t Aggregater::msgId(void)
{
+ // Only SecureClient generates msgId to avoid duplication of msgId. Client does not generate it.
return Adapter::getSecureClient()->getNextPacketId();
}
@@ -93,26 +84,38 @@ uint16_t Aggregater::getMsgId(Client* client, uint16_t clientMsgId)
return _msgIdTable.getMsgId(client, clientMsgId);
}
+AggregateTopicElement* Aggregater::addAggregateTopic(Topic* topic, Client* client)
+{
+ return _topicTable.add(topic, client);
+}
+
+
void Aggregater::removeAggregateTopic(Topic* topic, Client* client)
{
- // ToDo: AggregateGW this method called when the client disconnect and erase it`s Topics. this method call */
+ _topicTable.erase(topic, client);
}
-void Aggregater::removeAggregateTopicList(Topics* topics, Client* client)
+AggregateTopicElement* Aggregater::findTopic(Topic* topic)
{
- // ToDo: AggregateGW this method called when the client disconnect and erase it`s Topics. this method call */
+ return _topicTable.getAggregateTopicElement(topic);
}
-int Aggregater::addAggregateTopic(Topic* topic, Client* client)
+ClientTopicElement* Aggregater::getClientElement(Topic* topic)
{
- // ToDo: AggregateGW */
- return 0;
+ AggregateTopicElement* elm = findTopic(topic);
+ if ( elm != nullptr )
+ {
+ return elm->getFirstClientTopicElement();
+ }
+ else
+ {
+ return nullptr;
+ }
}
-AggregateTopicElement* Aggregater::createClientList(Topic* topic)
+void Aggregater::printAggregateTopicTable(void)
{
- // ToDo: AggregateGW */
- return 0;
+ _topicTable.print();
}
bool Aggregater::testMessageIdTable(void)
diff --git a/MQTTSNGateway/src/MQTTSNGWAggregater.h b/MQTTSNGateway/src/MQTTSNGWAggregater.h
index 9baaa15..959ff87 100644
--- a/MQTTSNGateway/src/MQTTSNGWAggregater.h
+++ b/MQTTSNGateway/src/MQTTSNGWAggregater.h
@@ -20,6 +20,7 @@
#include "MQTTSNGWAdapter.h"
#include "MQTTSNGWMessageIdTable.h"
#include "MQTTSNGWAggregateTopicTable.h"
+
namespace MQTTSNGW
{
class Gateway;
@@ -40,7 +41,7 @@ public:
Aggregater(Gateway* gw);
~Aggregater(void);
- void initialize(void);
+ void initialize(char* gwName);
const char* getClientId(SensorNetAddress* addr);
Client* getClient(SensorNetAddress* addr);
@@ -48,13 +49,18 @@ public:
uint16_t addMessageIdTable(Client* client, uint16_t msgId);
uint16_t getMsgId(Client* client, uint16_t clientMsgId);
+ ClientTopicElement* getClientElement(Topic* topic);
+ ClientTopicElement* getNextClientElement(ClientTopicElement* clientElement);
+ Client* getClient(ClientTopicElement* clientElement);
+
+ AggregateTopicElement* findTopic(Topic* topic);
+ AggregateTopicElement* addAggregateTopic(Topic* topic, Client* client);
- AggregateTopicElement* createClientList(Topic* topic);
- int addAggregateTopic(Topic* topic, Client* client);
void removeAggregateTopic(Topic* topic, Client* client);
- void removeAggregateTopicList(Topics* topics, Client* client);
+ void removeAggregateAllTopic(Client* client);
bool isActive(void);
+ void printAggregateTopicTable(void);
bool testMessageIdTable(void);
private:
diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp
index 0f180c9..99b0ce5 100644
--- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp
@@ -64,7 +64,7 @@ void BrokerRecvTask::run(void)
_light->blueLight(false);
if (CHK_SIGINT)
{
- WRITELOG("%s BrokerRecvTask stopped.\n", currentDateTime());
+ WRITELOG("\n%s BrokerRecvTask stopped.", currentDateTime());
return;
}
timeout.tv_sec = 0;
@@ -161,7 +161,7 @@ void BrokerRecvTask::run(void)
delete packet;
- if ( (rc == -1 || rc == -2) && client->isActive() )
+ if ( (rc == -1 || rc == -2) && ( client->isActive() || client->isSleep() || client->isAwake() ))
{
/* disconnect the client */
packet = new MQTTGWPacket();
diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp
index b886c9a..ff58a8d 100644
--- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp
@@ -41,7 +41,7 @@ BrokerSendTask::BrokerSendTask(Gateway* gateway)
BrokerSendTask::~BrokerSendTask()
{
-
+// WRITELOG("BrokerSendTask is deleted normally.\r\n");
}
/**
@@ -70,7 +70,7 @@ void BrokerSendTask::run()
if ( ev->getEventType() == EtStop )
{
- WRITELOG("%s BrokerSendTask stopped.\n", currentDateTime());
+ WRITELOG("\n%s BrokerSendTask stopped.", currentDateTime());
delete ev;
return;
}
@@ -81,7 +81,7 @@ void BrokerSendTask::run()
packet = ev->getMQTTGWPacket();
/* Check Client is managed by Adapters */
- client = adpMgr->getClient(*client);
+ client = adpMgr->getClient(client);
if ( packet->getType() == CONNECT && client->getNetwork()->isValid() )
{
@@ -121,13 +121,21 @@ void BrokerSendTask::run()
{
client->connectSended();
}
+ else if ( packet->getType() == DISCONNECT )
+ {
+ client->getNetwork()->close();
+ client->disconnected();
+ }
log(client, packet);
}
else
{
WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n",
ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER);
- client->getNetwork()->close();
+ if ( errno != EBADF )
+ {
+ client->getNetwork()->close();
+ }
/* Disconnect the client */
packet = new MQTTGWPacket();
diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp
index 11c5520..746f1af 100644
--- a/MQTTSNGateway/src/MQTTSNGWClient.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp
@@ -572,7 +572,7 @@ void Client::resetPingRequest(void)
_holdPingRequest = false;
}
-bool Client::isHoldPringReqest(void)
+bool Client::isHoldPingReqest(void)
{
return _holdPingRequest;
}
diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h
index 5529ed0..5071710 100644
--- a/MQTTSNGateway/src/MQTTSNGWClient.h
+++ b/MQTTSNGateway/src/MQTTSNGWClient.h
@@ -252,7 +252,7 @@ public:
void holdPingRequest(void);
void resetPingRequest(void);
- bool isHoldPringReqest(void);
+ bool isHoldPingReqest(void);
Client* getNextClient(void);
diff --git a/MQTTSNGateway/src/MQTTSNGWClientList.cpp b/MQTTSNGateway/src/MQTTSNGWClientList.cpp
index 5452765..f164996 100644
--- a/MQTTSNGateway/src/MQTTSNGWClientList.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClientList.cpp
@@ -61,52 +61,27 @@ void ClientList::initialize(bool aggregate)
setClientList(type);
_authorize = true;
}
+
+ if ( theGateway->getGWParams()->predefinedTopic )
+ {
+ setPredefinedTopics(aggregate);
+ }
}
void ClientList::setClientList(int type)
{
- char param[MQTTSNGW_PARAM_MAX];
- string fileName;
- GatewayParams* params = theGateway->getGWParams();
- if (theGateway->getParam("ClientsList", param) == 0)
+ if (!createList(theGateway->getGWParams()->clientListName, type))
{
- fileName = string(param);
- }
- else
- {
- fileName = params->configDir + string(CLIENT_LIST);
- }
-
- if (!createList(fileName.c_str(), type))
- {
- throw Exception("ClientList::initialize(): No client list defined by the configuration.");
- }
-
- if ( params->clientListName == nullptr )
- {
- params->clientListName = strdup(fileName.c_str());
+ throw Exception("ClientList::setClientList No client list defined by config file.");
}
}
void ClientList::setPredefinedTopics(bool aggrecate)
{
- char param[MQTTSNGW_PARAM_MAX];
-
- string fileName;
- GatewayParams* params = theGateway->getGWParams();
-
- if (theGateway->getParam("PredefinedTopicList", param) == 0)
+ if ( !readPredefinedList(theGateway->getGWParams()->predefinedTopicFileName, aggrecate) )
{
- fileName = string(param);
- }
- else
- {
- fileName = params->configDir + string(PREDEFINEDTOPIC_FILE);
- }
+ throw Exception("ClientList::setPredefinedTopics No predefindTopi list defined by config file.");
- if ( readPredefinedList(fileName.c_str(), aggrecate) )
- {
- params->predefinedTopicFileName = strdup(fileName.c_str());
}
}
@@ -118,18 +93,18 @@ void ClientList::setPredefinedTopics(bool aggrecate)
* File format is:
* Lines bigning with # are comment line.
* ClientId, SensorNetAddress, "unstableLine", "secureConnection"
- * in case of UDP, SensorNetAddress format is portNo@IPAddress.
+ * in case of UDP, SensorNetAddress format is IPAddress:portNo.
* if the SensorNetwork is not stable, write unstableLine.
* if BrokerConnection is SSL, write secureConnection.
* if the client send PUBLISH QoS-1, QoS-1 is required.
*
* Ex:
* #Client List
- * ClientId1,11200@192.168.10.10
- * ClientID2,35000@192.168.50.200,unstableLine
- * ClientID3,40000@192.168.200.50,secureConnection
- * ClientID4,41000@192.168.200.51,unstableLine,secureConnection
- * ClientID5,41000@192.168.200.51,unstableLine,secureConnection,QoS-1
+ * ClientId1,192.168.10.10:11200
+ * ClientID2,192.168.50.200:35000,unstableLine
+ * ClientID3,192.168.200.50:40000,secureConnection
+ * ClientID4,192.168.200.51:41000,unstableLine,secureConnection
+ * ClientID5,192.168.200.51:41000,unstableLine,secureConnection,QoS-1
*/
bool ClientList::createList(const char* fileName, int type)
@@ -141,7 +116,7 @@ bool ClientList::createList(const char* fileName, int type)
bool stable;
bool qos_1;
bool forwarder;
- bool rc = true;
+ bool rc = false;
SensorNetAddress netAddr;
MQTTSNString clientId = MQTTSNString_initializer;
@@ -194,6 +169,7 @@ bool ClientList::createList(const char* fileName, int type)
free(clientId.cstring);
}
fclose(fp);
+ rc = true;
}
return rc;
}
@@ -380,7 +356,7 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
}
else
{
- MQTTSNString dummyId MQTTSNString_initializer;;
+ MQTTSNString dummyId MQTTSNString_initializer;
dummyId.cstring = strdup("");
client->setClientId(dummyId);
free(dummyId.cstring);
@@ -416,10 +392,16 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
Client* ClientList::createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t topicId, bool aggregate)
{
+ if ( topicId == 0 )
+ {
+ WRITELOG("Invalid TopicId. Predefined Topic %s, TopicId is 0. \n", topicName.c_str());
+ return nullptr;
+ }
+
if ( strcmp(clientId->cstring, common_topic) == 0 )
{
theGateway->getTopics()->add((const char*)topicName.c_str(), topicId);
- return 0;
+ return nullptr;
}
else
{
@@ -427,7 +409,7 @@ Client* ClientList::createPredefinedTopic( MQTTSNString* clientId, string topicN
if ( _authorize && client == nullptr )
{
- return 0;
+ return nullptr;
}
/* anonimous clients */
diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp
index 5957cae..03f3415 100644
--- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp
@@ -60,7 +60,7 @@ void ClientRecvTask::run()
Event* ev = nullptr;
AdapterManager* adpMgr = _gateway->getAdapterManager();
QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy();
- bool isAggrActive = adpMgr->isAggregaterActive();
+ int clientType = adpMgr->isAggregaterActive() ? AGGREGATER_TYPE : TRANSPEARENT_TYPE;
ClientList* clientList = _gateway->getClientList();
EventQue* packetEventQue = _gateway->getPacketEventQue();
@@ -77,7 +77,7 @@ void ClientRecvTask::run()
if (CHK_SIGINT)
{
- WRITELOG("%s ClientRecvTask stopped.\n", currentDateTime());
+ WRITELOG("\n%s ClientRecvTask stopped.", currentDateTime());
delete packet;
return;
}
@@ -133,11 +133,12 @@ void ClientRecvTask::run()
{
const char* clientName = qosm1Proxy->getClientId(senderAddr);
- if ( clientName )
+ if ( clientName != nullptr )
{
+ client = qosm1Proxy->getClient();
+
if ( !packet->isQoSMinusPUBLISH() )
{
- client = qosm1Proxy->getClient();
log(clientName, packet);
WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER);
delete packet;
@@ -145,11 +146,14 @@ void ClientRecvTask::run()
}
}
}
+
+ if ( client == nullptr )
+ {
+ client = _gateway->getClientList()->getClient(senderAddr);
+ }
}
- client = _gateway->getClientList()->getClient(senderAddr);
-
- if ( client )
+ if ( client != nullptr )
{
/* write log and post Event */
log(client, packet, 0);
@@ -174,33 +178,36 @@ void ClientRecvTask::run()
client = clientList->getClient(&data.clientID);
- if ( fwd )
+ if ( fwd != nullptr )
{
if ( client == nullptr )
{
/* create a new client */
- client = clientList->createClient(0, &data.clientID, isAggrActive);
+ client = clientList->createClient(0, &data.clientID, clientType);
}
- /* Add to af forwarded client list of forwarder. */
+ /* Add to a forwarded client list of forwarder. */
fwd->addClient(client, &nodeId);
}
else
{
if ( client )
{
- /* Client exists. Set SensorNet Address of it. */
- client->setClientAddress(senderAddr);
+ /* Authentication is not required */
+ if ( _gateway->getGWParams()->clientAuthentication == false)
+ {
+ client->setClientAddress(senderAddr);
+ }
}
else
{
/* create a new client */
- client = clientList->createClient(senderAddr, &data.clientID, isAggrActive);
+ client = clientList->createClient(senderAddr, &data.clientID, clientType);
}
}
log(client, packet, &data.clientID);
- if (!client)
+ if ( client == nullptr )
{
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
delete packet;
@@ -217,11 +224,11 @@ void ClientRecvTask::run()
log(client, packet, 0);
if ( packet->getType() == MQTTSN_ENCAPSULATED )
{
- WRITELOG("%s Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
+ WRITELOG("%s MQTTSNGWClientRecvTask Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
}
else
{
- WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
+ WRITELOG("%s MQTTSNGWClientRecvTask Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
}
delete packet;
}
diff --git a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp
index eaa6e68..b0af063 100644
--- a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp
@@ -35,7 +35,7 @@ ClientSendTask::ClientSendTask(Gateway* gateway)
ClientSendTask::~ClientSendTask()
{
-
+// WRITELOG("ClientSendTask is deleted normally.\r\n");
}
void ClientSendTask::run()
@@ -49,35 +49,44 @@ void ClientSendTask::run()
{
Event* ev = _gateway->getClientSendQue()->wait();
- if (ev->getEventType() == EtStop)
+ if (ev->getEventType() == EtStop || _gateway->IsStopping() )
{
- WRITELOG("%s ClientSendTask stopped.\n", currentDateTime());
+ WRITELOG("\n%s ClientSendTask stopped.", currentDateTime());
delete ev;
break;
}
- if (ev->getEventType() == EtClientSend)
- {
- client = ev->getClient();
- packet = ev->getMQTTSNPacket();
- rc = adpMgr->unicastToClient(client, packet, this);
- }
- else if (ev->getEventType() == EtBroadcast)
- {
- packet = ev->getMQTTSNPacket();
- log(client, packet);
- rc = packet->broadcast(_sensorNetwork);
- }
- else if (ev->getEventType() == EtSensornetSend)
- {
- packet = ev->getMQTTSNPacket();
- log(client, packet);
- rc = packet->unicast(_sensorNetwork, ev->getSensorNetAddress());
- }
- if ( rc < 0 )
+ if (ev->getEventType() == EtBroadcast)
{
- WRITELOG("%s ClientSendTask can't send a packet to the client %s. Error=%d%s\n",
- ERRMSG_HEADER, (client ? (const char*)client->getClientId() : UNKNOWNCL ), errno, ERRMSG_FOOTER);
+ packet = ev->getMQTTSNPacket();
+ log(client, packet);
+
+ if ( packet->broadcast(_sensorNetwork) < 0 )
+ {
+ WRITELOG("%s ClientSendTask can't multicast a packet Error=%d%s\n",
+ ERRMSG_HEADER, errno, ERRMSG_FOOTER);
+ }
+ }
+ else
+ {
+ if (ev->getEventType() == EtClientSend)
+ {
+ client = ev->getClient();
+ packet = ev->getMQTTSNPacket();
+ rc = adpMgr->unicastToClient(client, packet, this);
+ }
+ else if (ev->getEventType() == EtSensornetSend)
+ {
+ packet = ev->getMQTTSNPacket();
+ log(client, packet);
+ rc = packet->unicast(_sensorNetwork, ev->getSensorNetAddress());
+ }
+
+ if ( rc < 0 )
+ {
+ WRITELOG("%s ClientSendTask can't send a packet to the client %s. Error=%d%s\n",
+ ERRMSG_HEADER, (client ? (const char*)client->getClientId() : UNKNOWNCL ), errno, ERRMSG_FOOTER);
+ }
}
delete ev;
}
diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h
index 0cc22b6..d73827d 100644
--- a/MQTTSNGateway/src/MQTTSNGWDefines.h
+++ b/MQTTSNGateway/src/MQTTSNGWDefines.h
@@ -42,13 +42,14 @@ namespace MQTTSNGW
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
#define MAX_MESSAGEID_TABLE_SIZE (500) // Number of MessageIdTable size
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
-#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
+#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation)
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes
-#define QOSM1_PROXY_KEEPALIVE_DURATION 900 // Secs
-#define QOSM1_PROXY_RESPONSE_DURATION 10 // Secs
-#define QOSM1_PROXY_MAX_RETRY_CNT 3
+#define PROXY_KEEPALIVE_DURATION (900) // Seconds
+#define PROXY_RESPONSE_DURATION (10) // Seconds
+#define PROXY_MAX_RETRY_CNT (3)
+
/*=================================
* Data Type
==================================*/
diff --git a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp
index 5fa2a49..71b2e83 100644
--- a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp
@@ -47,16 +47,8 @@ ForwarderList::~ForwarderList()
void ForwarderList::initialize(Gateway* gw)
{
- char param[MQTTSNGW_PARAM_MAX];
- string fileName;
-
- if (gw->getParam("Forwarder", param) == 0 )
- {
- if (!strcasecmp(param, "YES") )
- {
- gw->getClientList()->setClientList(FORWARDER_TYPE);
- }
- }
+ /* Create Fowarders from clients.conf */
+ gw->getClientList()->setClientList(FORWARDER_TYPE);
}
@@ -151,6 +143,7 @@ void Forwarder::addClient(Client* client, WirelessNodeId* id)
if ( p->_client == client )
{
client->setForwarder(this);
+ p->setWirelessNodeId(id);
return;
}
prev = p;
diff --git a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp
index 47f1eae..f195d76 100644
--- a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp
@@ -113,7 +113,7 @@ void PacketHandleTask::run()
if (ev->getEventType() == EtStop)
{
- WRITELOG("%s PacketHandleTask stopped.\n", currentDateTime());
+ WRITELOG("\n%s PacketHandleTask stopped.", currentDateTime());
delete ev;
return;
}
@@ -148,7 +148,7 @@ void PacketHandleTask::run()
if ( adpMgr->isAggregatedClient(client) )
{
- aggregatePacketHandler(client, snPacket);
+ aggregatePacketHandler(client, snPacket); // client is converted to Aggregater by BrokerSendTask
}
else
{
@@ -198,6 +198,9 @@ void PacketHandleTask::aggregatePacketHandler(Client*client, MQTTSNPacket* packe
case MQTTSN_DISCONNECT:
_mqttsnAggrConnection->handleDisconnect(client, packet);
break;
+ case MQTTSN_WILLTOPICUPD:
+ _mqttsnConnection->handleWilltopicupd(client, packet);
+ break;
case MQTTSN_WILLMSGUPD:
_mqttsnConnection->handleWillmsgupd(client, packet);
break;
@@ -289,6 +292,9 @@ void PacketHandleTask::transparentPacketHandler(Client*client, MQTTSNPacket* pac
case MQTTSN_DISCONNECT:
_mqttsnConnection->handleDisconnect(client, packet);
break;
+ case MQTTSN_WILLTOPICUPD:
+ _mqttsnConnection->handleWilltopicupd(client, packet);
+ break;
case MQTTSN_WILLMSGUPD:
_mqttsnConnection->handleWillmsgupd(client, packet);
break;
@@ -359,6 +365,9 @@ void PacketHandleTask::transparentPacketHandler(Client*client, MQTTGWPacket* pac
case UNSUBACK:
_mqttSubscribe->handleUnsuback(client, packet);
break;
+ case DISCONNECT:
+ client->disconnected(); // Just change Client's status to "Disconnected"
+ break;
default:
break;
}
diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.cpp b/MQTTSNGateway/src/MQTTSNGWProcess.cpp
index 2abe3a5..90accea 100644
--- a/MQTTSNGateway/src/MQTTSNGWProcess.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWProcess.cpp
@@ -103,7 +103,7 @@ void Process::initialize(int argc, char** argv)
}
}
}
- _rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0);
+ _rbsem = new NamedSemaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0);
_rb = new RingBuffer(_configDir.c_str());
if (getParam("ShearedMemory", param) == 0)
@@ -306,7 +306,7 @@ void MultiTaskProcess::waitStop(void)
}
}
-void MultiTaskProcess::threadStoped(void)
+void MultiTaskProcess::threadStopped(void)
{
_mutex.lock();
_stopCount++;
diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.h b/MQTTSNGateway/src/MQTTSNGWProcess.h
index 1fb02a9..42ff2a2 100644
--- a/MQTTSNGateway/src/MQTTSNGWProcess.h
+++ b/MQTTSNGateway/src/MQTTSNGWProcess.h
@@ -66,7 +66,7 @@ private:
string _configDir;
string _configFile;
RingBuffer* _rb;
- Semaphore* _rbsem;
+ NamedSemaphore* _rbsem;
Mutex _mt;
int _log;
char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1];
@@ -84,7 +84,7 @@ public:
int getParam(const char* parameter, char* value);
void run(void);
void waitStop(void);
- void threadStoped(void);
+ void threadStopped(void);
void attach(Thread* thread);
private:
diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp
index 3162f04..5ef8d5b 100644
--- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp
@@ -94,6 +94,12 @@ MQTTGWPacket* MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket*
return nullptr;
}
+ if ( ( qos == 0 || qos == 3 ) && msgId > 0 )
+ {
+ WRITELOG("%s Invalid MsgId.%s %s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
+ return nullptr;
+ }
+
if( !topic && msgId && qos > 0 && qos < 3 )
{
/* Reply PubAck with INVALID_TOPIC_ID to the client */
@@ -233,7 +239,7 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet)
ev->setClientSendEvent(client, regAck);
_gateway->getClientSendQue()->post(ev);
}
- if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 )
+ if (client->isHoldPingReqest() && client->getWaitREGACKPacketList()->getCount() == 0 )
{
/* send PINGREQ to the broker */
client->resetPingRequest();
diff --git a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp
index 50ff56c..349996f 100644
--- a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.cpp
@@ -38,28 +38,20 @@ QoSm1Proxy::~QoSm1Proxy(void)
}
-void QoSm1Proxy::initialize(void)
+void QoSm1Proxy::initialize(char* gwName)
{
- char param[MQTTSNGW_PARAM_MAX];
-
if ( _gateway->hasSecureConnection() )
{
_isSecure = true;
}
- if (_gateway->getParam("QoS-1", param) == 0 )
- {
- if (strcasecmp(param, "YES") == 0 )
- {
- /* Create QoS-1 Clients */
- _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE);
+ /* Create QoS-1 Clients from clients.conf */
+ _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE);
- /* initialize Adapter */
- string name = string(_gateway->getGWParams()->gatewayName) + "QoS-1";
- setup(name.c_str(), Atype_QoSm1Proxy);
- _isActive = true;
- }
- }
+ /* Create a client for QoS-1 proxy */
+ string name = string(gwName) + string("_QoS-1");
+ setup(name.c_str(), Atype_QoSm1Proxy);
+ _isActive = true;
}
diff --git a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h
index d3dfcf5..82e5f2a 100644
--- a/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h
+++ b/MQTTSNGateway/src/MQTTSNGWQoSm1Proxy.h
@@ -35,7 +35,7 @@ public:
QoSm1Proxy(Gateway* gw);
~QoSm1Proxy(void);
- void initialize(void);
+ void initialize(char* GWnAME);
bool isActive(void);
private:
diff --git a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp
index dceb864..a08bf49 100644
--- a/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWSubscribeHandler.cpp
@@ -100,7 +100,8 @@ MQTTGWPacket* MQTTSNSubscribeHandler::handleSubscribe(Client* client, MQTTSNPack
topicstr[0] = topicFilter.data.short_name[0];
topicstr[1] = topicFilter.data.short_name[1];
topicstr[2] = 0;
- topicId = 0;
+ topicId = topicFilter.data.short_name[0] << 8;
+ topicId |= topicFilter.data.short_name[1];
subscribe = new MQTTGWPacket();
subscribe->setSUBSCRIBE(topicstr, (uint8_t)qos, (uint16_t)msgId);
}
@@ -203,11 +204,6 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack
if ( subscribe != nullptr )
{
- UTF8String str = subscribe->getTopic();
- string* topicName = new string(str.data, str.len);
- Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
- _gateway->getAdapterManager()->addAggregateTopic(&topic, client);
-
int msgId = 0;
if ( packet->isDuplicate() )
{
@@ -223,7 +219,13 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack
WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return;
}
-WRITELOG("msgId=%d\n",msgId);
+
+ UTF8String str = subscribe->getTopic();
+ string* topicName = new string(str.data, str.len); // topicName is delete by topic
+ Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
+
+ _gateway->getAdapterManager()->getAggregater()->addAggregateTopic(&topic, client);
+
subscribe->setMsgId(msgId);
Event* ev = new Event();
ev->setBrokerSendEvent(client, subscribe);
@@ -236,11 +238,6 @@ void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPa
MQTTGWPacket* unsubscribe = handleUnsubscribe(client, packet);
if ( unsubscribe != nullptr )
{
- UTF8String str = unsubscribe->getTopic();
- string* topicName = new string(str.data, str.len);
- Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
- _gateway->getAdapterManager()->removeAggregateTopic(&topic, client);
-
int msgId = 0;
if ( packet->isDuplicate() )
{
@@ -256,6 +253,12 @@ void MQTTSNSubscribeHandler::handleAggregateUnsubscribe(Client* client, MQTTSNPa
WRITELOG("%s MQTTSNUnsubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return;
}
+
+ UTF8String str = unsubscribe->getTopic();
+ string* topicName = new string(str.data, str.len); // topicName is delete by topic
+ Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL);
+ _gateway->getAdapterManager()->getAggregater()->removeAggregateTopic(&topic, client);
+
unsubscribe->setMsgId(msgId);
Event* ev = new Event();
ev->setBrokerSendEvent(client, unsubscribe);
diff --git a/MQTTSNGateway/src/MQTTSNGWTopic.cpp b/MQTTSNGateway/src/MQTTSNGWTopic.cpp
index 5be2278..8f56a4c 100644
--- a/MQTTSNGateway/src/MQTTSNGWTopic.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWTopic.cpp
@@ -63,6 +63,15 @@ MQTTSN_topicTypes Topic::getType(void)
return _type;
}
+Topic* Topic::duplicate(void)
+{
+ Topic* newTopic = new Topic();
+ newTopic->_type = _type;
+ newTopic->_topicId = _topicId;
+ newTopic->_topicName = new string(_topicName->c_str());
+ return newTopic;
+}
+
bool Topic::isMatch(string* topicName)
{
string::size_type tlen = _topicName->size();
@@ -354,6 +363,16 @@ void Topics::eraseNormal(void)
}
}
+Topic* Topics::getFirstTopic(void)
+{
+ return _first;
+}
+
+Topic* Topics::getNextTopic(Topic* topic)
+{
+ return topic->_next;
+}
+
void Topics::print(void)
{
Topic* topic = _first;
diff --git a/MQTTSNGateway/src/MQTTSNGWTopic.h b/MQTTSNGateway/src/MQTTSNGWTopic.h
index 5c3cc77..4d7b0c3 100644
--- a/MQTTSNGateway/src/MQTTSNGWTopic.h
+++ b/MQTTSNGateway/src/MQTTSNGWTopic.h
@@ -31,6 +31,7 @@ namespace MQTTSNGW
class Topic
{
friend class Topics;
+ friend class AggregateTopicTable;
public:
Topic();
Topic(string* topic, MQTTSN_topicTypes type);
@@ -39,7 +40,9 @@ public:
uint16_t getTopicId(void);
MQTTSN_topicTypes getType(void);
bool isMatch(string* topicName);
+ Topic* duplicate(void);
void print(void);
+
private:
MQTTSN_topicTypes _type;
uint16_t _topicId;
@@ -59,6 +62,8 @@ public:
Topic* add(const char* topicName, uint16_t id = 0);
Topic* getTopicByName(const MQTTSN_topicid* topic);
Topic* getTopicById(const MQTTSN_topicid* topicid);
+ Topic* getFirstTopic(void);
+ Topic* getNextTopic(Topic* topic);
Topic* match(const MQTTSN_topicid* topicid);
void eraseNormal(void);
uint16_t getNextTopicId();
diff --git a/MQTTSNGateway/src/MQTTSNGWVersion.h b/MQTTSNGateway/src/MQTTSNGWVersion.h
index ff9d6e1..251e5af 100644
--- a/MQTTSNGateway/src/MQTTSNGWVersion.h
+++ b/MQTTSNGateway/src/MQTTSNGWVersion.h
@@ -17,6 +17,6 @@
#ifndef MQTTSNGWVERSION_H_IN_
#define MQTTSNGWVERSION_H_IN_
-#define PAHO_GATEWAY_VERSION "1.3.1"
+#define PAHO_GATEWAY_VERSION "1.4.0"
#endif /* MQTTSNGWVERSION_H_IN_ */
diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp
index c5894ce..6ed0c04 100644
--- a/MQTTSNGateway/src/MQTTSNGateway.cpp
+++ b/MQTTSNGateway/src/MQTTSNGateway.cpp
@@ -38,6 +38,7 @@ Gateway::Gateway(void)
_clientList = new ClientList();
_adapterManager = new AdapterManager(this);
_topics = new Topics();
+ _stopFlg = false;
}
Gateway::~Gateway()
@@ -86,6 +87,10 @@ Gateway::~Gateway()
{
free(_params.clientListName);
}
+ if ( _params.predefinedTopicFileName )
+ {
+ free( _params.predefinedTopicFileName);
+ }
if ( _params.configName )
{
free(_params.configName);
@@ -109,6 +114,7 @@ Gateway::~Gateway()
{
delete _topics;
}
+// WRITELOG("Gateway is deleted normally.\r\n");
}
int Gateway::getParam(const char* parameter, char* value)
@@ -116,6 +122,16 @@ int Gateway::getParam(const char* parameter, char* value)
return MultiTaskProcess::getParam(parameter, value);
}
+char* Gateway::getClientListFileName(void)
+{
+ return _params.clientListName;
+}
+
+char* Gateway::getPredefinedTopicFileName(void)
+{
+ return _params.predefinedTopicFileName;
+}
+
void Gateway::initialize(int argc, char** argv)
{
char param[MQTTSNGW_PARAM_MAX];
@@ -215,14 +231,53 @@ void Gateway::initialize(int argc, char** argv)
}
}
- /* ClientList and Adapters Initialize */
- _adapterManager->initialize();
+ if (getParam("ClientsList", param) == 0)
+ {
+ _params.clientListName = strdup(param);
+ }
- bool aggregate = _adapterManager->isAggregaterActive();
- _clientList->initialize(aggregate);
+ if (getParam("PredefinedTopic", param) == 0)
+ {
+ if ( !strcasecmp(param, "YES") )
+ {
+ _params.predefinedTopic = true;
+ if (getParam("PredefinedTopicList", param) == 0)
+ {
+ _params.predefinedTopicFileName = strdup(param);
+ }
+ }
+ }
- /* Setup predefined topics */
- _clientList->setPredefinedTopics(aggregate);
+ if (getParam("AggregatingGateway", param) == 0)
+ {
+ if ( !strcasecmp(param, "YES") )
+ {
+ _params.aggregatingGw = true;
+ }
+ }
+
+ if (getParam("Forwarder", param) == 0)
+ {
+ if ( !strcasecmp(param, "YES") )
+ {
+ _params.forwarder = true;
+ }
+ }
+
+ if (getParam("QoS-1", param) == 0)
+ {
+ if ( !strcasecmp(param, "YES") )
+ {
+ _params.qosMinus1 = true;
+ }
+ }
+
+
+ /* Initialize adapters */
+ _adapterManager->initialize( _params.gatewayName, _params.aggregatingGw, _params.forwarder, _params.qosMinus1);
+
+ /* Setup ClientList and Predefined topics */
+ _clientList->initialize(_params.aggregatingGw);
}
void Gateway::run(void)
@@ -256,10 +311,13 @@ void Gateway::run(void)
WRITELOG(" CertKey: %s\n", _params.certKey);
WRITELOG(" PrivateKey: %s\n\n\n", _params.privateKey);
+ _stopFlg = false;
/* Run Tasks until CTRL+C entred */
MultiTaskProcess::run();
+ _stopFlg = true;
+
/* stop Tasks */
Event* ev = new Event();
ev->setStop();
@@ -274,10 +332,15 @@ void Gateway::run(void)
/* wait until all Task stop */
MultiTaskProcess::waitStop();
- WRITELOG("\n%s MQTT-SN Gateway stoped\n\n", currentDateTime());
+ WRITELOG("\n\n%s MQTT-SN Gateway stopped.\n\n", currentDateTime());
_lightIndicator.allLightOff();
}
+bool Gateway::IsStopping(void)
+{
+ return _stopFlg;
+}
+
EventQue* Gateway::getPacketEventQue()
{
return &_packetEventQue;
diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h
index d9b4857..86c7952 100644
--- a/MQTTSNGateway/src/MQTTSNGateway.h
+++ b/MQTTSNGateway/src/MQTTSNGateway.h
@@ -26,7 +26,7 @@ namespace MQTTSNGW
/*=================================
* Starting prompt
==================================*/
-#define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway"
+#define PAHO_COPYRIGHT0 " * MQTT-SN Gateway"
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"
#define PAHO_COPYRIGHT2 " * (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt-sn.embedded-c.git/)"
#define PAHO_COPYRIGHT3 " * Author : Tomoaki YAMAGUCHI"
@@ -160,10 +160,14 @@ public:
char* rootCApath {nullptr};
char* rootCAfile {nullptr};
char* certKey {nullptr};
- char* privateKey {nullptr};
char* predefinedTopicFileName {nullptr};
+ char* privateKey {nullptr};
char* qosMinusClientListName {nullptr};
bool clientAuthentication {false};
+ bool predefinedTopic {false};
+ bool aggregatingGw {false};
+ bool qosMinus1 {false};
+ bool forwarder {false};
};
@@ -190,8 +194,12 @@ public:
GatewayParams* getGWParams(void);
AdapterManager* getAdapterManager(void);
int getParam(const char* parameter, char* value);
+ char* getClientListFileName(void);
+ char* getPredefinedTopicFileName(void);
+
bool hasSecureConnection(void);
Topics* getTopics(void);
+ bool IsStopping(void);
private:
GatewayParams _params;
@@ -203,6 +211,7 @@ private:
SensorNetwork _sensorNetwork;
AdapterManager* _adapterManager {nullptr};
Topics* _topics;
+ bool _stopFlg;
};
}
diff --git a/MQTTSNGateway/src/linux/Network.cpp b/MQTTSNGateway/src/linux/Network.cpp
index 5c5e88d..842e121 100644
--- a/MQTTSNGateway/src/linux/Network.cpp
+++ b/MQTTSNGateway/src/linux/Network.cpp
@@ -145,7 +145,11 @@ bool TCPStack::accept(TCPStack& new_socket)
int TCPStack::send(const uint8_t* buf, int length)
{
+#ifdef __APPLE__
+ return ::send(_sockfd, buf, length, SO_NOSIGPIPE);
+#else
return ::send(_sockfd, buf, length, MSG_NOSIGNAL);
+#endif
}
int TCPStack::recv(uint8_t* buf, int len)
diff --git a/MQTTSNGateway/src/linux/Threading.cpp b/MQTTSNGateway/src/linux/Threading.cpp
index b0b3ac4..7c452e7 100644
--- a/MQTTSNGateway/src/linux/Threading.cpp
+++ b/MQTTSNGateway/src/linux/Threading.cpp
@@ -21,27 +21,40 @@
#include
#include
#include
-#include
#include
#include
#include
#include
+#include
using namespace std;
using namespace MQTTSNGW;
-#if defined(OSX)
-int sem_timedwait(sem_type sem, const struct timespec *timeout)
+#ifdef __APPLE__
+
+int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout)
{
- int rc = -1;
- int64_t tout = timeout->tv_sec * 1000L + tv_nsec * 1000000L
- rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, tout));
- if (rc != 0)
+ while (true)
{
- rc = ETIMEDOUT;
+ // try to lock the semaphore
+ int result = sem_trywait(sem);
+ if (result != -1 || errno != EAGAIN)
+ return result;
+
+ // spin lock
+ sched_yield();
+
+ // check if timeout reached
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ if (ts.tv_sec > abs_timeout->tv_sec
+ || (ts.tv_sec == abs_timeout->tv_sec && abs_timeout->tv_nsec >= ts.tv_nsec))
+ {
+ return ETIMEDOUT;
+ }
}
- return rc;
}
+
#endif
/*=====================================
@@ -69,7 +82,7 @@ Mutex::Mutex(const char* fileName)
throw Exception( -1, "Mutex can't create a shared memory.");
}
_pmutex = (pthread_mutex_t*) shmat(_shmid, NULL, 0);
- if (_pmutex < 0)
+ if (_pmutex == (void*) -1)
{
throw Exception( -1, "Mutex can't attach shared memory.");
}
@@ -153,21 +166,61 @@ void Mutex::unlock(void)
Class Semaphore
=====================================*/
-Semaphore::Semaphore()
-{
- sem_init(&_sem, 0, 0);
- _name = 0;
- _psem = 0;
-}
-
Semaphore::Semaphore(unsigned int val)
{
+#ifdef __APPLE__
+ _sem = dispatch_semaphore_create(val);
+#else
sem_init(&_sem, 0, val);
- _name = 0;
- _psem = 0;
+#endif
}
-Semaphore::Semaphore(const char* name, unsigned int val)
+Semaphore::~Semaphore()
+{
+#ifdef __APPLE__
+ dispatch_release(_sem);
+#else
+ sem_destroy(&_sem);
+#endif
+}
+
+void Semaphore::post(void)
+{
+#ifdef __APPLE__
+ dispatch_semaphore_signal(_sem);
+#else
+ sem_post(&_sem);
+#endif
+}
+
+void Semaphore::wait(void)
+{
+#ifdef __APPLE__
+ dispatch_semaphore_wait(_sem, DISPATCH_TIME_FOREVER);
+#else
+ sem_wait(&_sem);
+#endif
+}
+
+void Semaphore::timedwait(uint16_t millsec)
+{
+#ifdef __APPLE__
+ dispatch_semaphore_wait(_sem, dispatch_time(DISPATCH_TIME_NOW, int64_t(millsec) * 1000000));
+#else
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ int nsec = ts.tv_nsec + (millsec % 1000) * 1000000;
+ ts.tv_nsec = nsec % 1000000000;
+ ts.tv_sec += millsec / 1000 + nsec / 1000000000;
+ sem_timedwait(&_sem, &ts);
+#endif
+}
+
+/*=====================================
+ Class NamedSemaphore
+ =====================================*/
+
+NamedSemaphore::NamedSemaphore(const char* name, unsigned int val)
{
_psem = sem_open(name, O_CREAT, 0666, val);
if (_psem == SEM_FAILED)
@@ -181,67 +234,30 @@ Semaphore::Semaphore(const char* name, unsigned int val)
}
}
-Semaphore::~Semaphore()
+NamedSemaphore::~NamedSemaphore()
{
- if (_name)
- {
- sem_close(_psem);
- sem_unlink(_name);
- free(_name);
- }
- else
- {
- sem_destroy(&_sem);
- }
+ sem_close(_psem);
+ sem_unlink(_name);
+ free(_name);
}
-void Semaphore::post(void)
+void NamedSemaphore::post(void)
{
- int val = 0;
- if (_psem)
- {
- sem_getvalue(_psem, &val);
- if (val <= 0)
- {
- sem_post(_psem);
- }
- }
- else
- {
- sem_getvalue(&_sem, &val);
- if (val <= 0)
- {
- sem_post(&_sem);
- }
- }
+ sem_post(_psem);
}
-void Semaphore::wait(void)
+void NamedSemaphore::wait(void)
{
- if (_psem)
- {
- sem_wait(_psem);
- }
- else
- {
- sem_wait(&_sem);
- }
+ sem_wait(_psem);
}
-void Semaphore::timedwait(uint16_t millsec)
+void NamedSemaphore::timedwait(uint16_t millsec)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += millsec / 1000;
ts.tv_nsec = (millsec % 1000) * 1000000;
- if (_psem)
- {
- sem_timedwait(_psem, &ts);
- }
- else
- {
- sem_timedwait(&_sem, &ts);
- }
+ sem_timedwait(_psem, &ts);
}
/*=========================================
@@ -274,7 +290,7 @@ RingBuffer::RingBuffer(const char* keyDirectory)
if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE,
IPC_CREAT | IPC_EXCL | 0666)) >= 0)
{
- if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0)
+ if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) != (void*) -1)
{
_length = (uint16_t*) _shmaddr;
_start = (uint16_t*) _length + sizeof(uint16_t*);
@@ -290,9 +306,9 @@ RingBuffer::RingBuffer(const char* keyDirectory)
throw Exception(-1, "RingBuffer can't attach shared memory.");
}
}
- else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) >= 0)
+ else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) != -1)
{
- if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0)
+ if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) != (void*) -1)
{
_length = (uint16_t*) _shmaddr;
_start = (uint16_t*) _length + sizeof(uint16_t*);
@@ -330,7 +346,7 @@ RingBuffer::~RingBuffer()
}
}
- if (_pmx > 0)
+ if (_pmx != NULL)
{
delete _pmx;
}
@@ -525,7 +541,7 @@ int Thread::start(void)
void Thread::stopProcess(void)
{
- theMultiTaskProcess->threadStoped();
+ theMultiTaskProcess->threadStopped();
}
void Thread::stop(void)
diff --git a/MQTTSNGateway/src/linux/Threading.h b/MQTTSNGateway/src/linux/Threading.h
index df58f1c..795ee5a 100644
--- a/MQTTSNGateway/src/linux/Threading.h
+++ b/MQTTSNGateway/src/linux/Threading.h
@@ -19,6 +19,9 @@
#include
#include
+#ifdef __APPLE__
+#include
+#endif
#include "MQTTSNGWDefines.h"
namespace MQTTSNGW
@@ -52,17 +55,34 @@ private:
class Semaphore
{
public:
- Semaphore();
- Semaphore(unsigned int val);
- Semaphore(const char* name, unsigned int val);
+ Semaphore(unsigned int val = 0);
~Semaphore();
void post(void);
void wait(void);
void timedwait(uint16_t millsec);
+private:
+#ifdef __APPLE__
+ dispatch_semaphore_t _sem;
+#else
+ sem_t _sem;
+#endif
+};
+
+/*=====================================
+ Class NamedSemaphore
+ ====================================*/
+class NamedSemaphore
+{
+public:
+ NamedSemaphore(const char* name, unsigned int val);
+ ~NamedSemaphore();
+ void post(void);
+ void wait(void);
+ void timedwait(uint16_t millsec);
+
private:
sem_t* _psem;
- sem_t _sem;
char* _name;
};
diff --git a/MQTTSNGateway/src/linux/loralink/SensorNetwork.cpp b/MQTTSNGateway/src/linux/loralink/SensorNetwork.cpp
new file mode 100644
index 0000000..854854c
--- /dev/null
+++ b/MQTTSNGateway/src/linux/loralink/SensorNetwork.cpp
@@ -0,0 +1,559 @@
+/**************************************************************************************
+ * Copyright (c) 2016, Tomoaki Yamaguchi
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Tomoaki Yamaguchi - initial API and implementation
+ **************************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include "SensorNetwork.h"
+#include "MQTTSNGWProcess.h"
+
+using namespace std;
+using namespace MQTTSNGW;
+
+#define LORA_PHY_MAXPAYLOAD 256
+#define LORALINK_MAX_API_LEN ( LORA_PHY_MAXPAYLOAD + 5 ) // PayloadType[1] + Rssi[2] + Snr[2]
+
+#define LORALINK_ACK 0x10
+#define LORALINK_NO_FREE_CH 0x20
+#define LORALINK_TX_TIMEOUT 0x40
+
+#define LORALINK_TIMEOUT_ACK 10000 // 10 secs
+
+/*===========================================
+ Class SensorNetAddreess
+ ============================================*/
+SensorNetAddress::SensorNetAddress()
+{
+ _devAddr = 0;
+}
+
+SensorNetAddress::~SensorNetAddress()
+{
+
+}
+
+void SensorNetAddress::setAddress( uint8_t devAddr)
+{
+ _devAddr = devAddr;
+}
+
+
+int SensorNetAddress::setAddress(string* address)
+{
+ _devAddr = atoi(address->c_str());
+
+ if ( _devAddr == 0 )
+ {
+ return -1;
+ }
+ return 0;
+}
+
+void SensorNetAddress::setBroadcastAddress(void)
+{
+ _devAddr = BROADCAST_DEVADDR;
+}
+
+bool SensorNetAddress::isMatch(SensorNetAddress* addr)
+{
+ return _devAddr == addr->_devAddr;
+}
+
+SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr)
+{
+ _devAddr = addr._devAddr;
+ return *this;
+}
+
+char* SensorNetAddress::sprint(char* buf)
+{
+ sprintf( buf, "%d", _devAddr);
+ return buf;
+}
+
+/*===========================================
+ Class SensorNetwork
+ ============================================*/
+SensorNetwork::SensorNetwork()
+{
+
+}
+
+SensorNetwork::~SensorNetwork()
+{
+
+}
+
+int SensorNetwork::unicast(const uint8_t* payload, uint16_t payloadLength, SensorNetAddress* sendToAddr)
+{
+ return LoRaLink::unicast(payload, payloadLength, sendToAddr);
+}
+
+int SensorNetwork::broadcast(const uint8_t* payload, uint16_t payloadLength)
+{
+ return LoRaLink::broadcast(payload, payloadLength);
+}
+
+int SensorNetwork::read(uint8_t* buf, uint16_t bufLen)
+{
+ return LoRaLink::recv(buf, bufLen, &_clientAddr);
+}
+
+int SensorNetwork::initialize(void)
+{
+ char param[MQTTSNGW_PARAM_MAX];
+ uint32_t baudrate = 115200;
+
+ if (theProcess->getParam("BaudrateLoRaLink", param) == 0)
+ {
+ baudrate = (uint32_t)atoi(param);
+ }
+ _description += "LoRaLink, Baudrate ";
+ sprintf(param ,"%d", baudrate);
+ _description += param;
+
+ theProcess->getParam("DeviceRxLoRaLink", param);
+ _description += ", SerialRx ";
+ _description += param;
+ if ( LoRaLink::open(LORALINK_MODEM_RX, param, baudrate) < 0 )
+ {
+ return -1;
+ }
+
+ theProcess->getParam("DeviceTxLoRaLink", param);
+ _description += ", SerialTx ";
+ _description += param;
+ return LoRaLink::open(LORALINK_MODEM_TX, param, baudrate);
+}
+
+const char* SensorNetwork::getDescription(void)
+{
+ return _description.c_str();
+}
+
+SensorNetAddress* SensorNetwork::getSenderAddress(void)
+{
+ return &_clientAddr;
+}
+
+/*===========================================
+ Class LoRaLink
+ ============================================*/
+LoRaLink::LoRaLink(){
+ _serialPortRx = new SerialPort();
+ _serialPortTx = new SerialPort();
+ _respCd = 0;
+}
+
+LoRaLink::~LoRaLink(){
+ if ( _serialPortRx )
+ {
+ delete _serialPortRx;
+ }
+ if ( _serialPortTx )
+ {
+ delete _serialPortTx;
+ }
+}
+
+int LoRaLink::open(LoRaLinkModemType_t type, char* device, int baudrate)
+{
+ int rate = B9600;
+
+ switch (baudrate)
+ {
+ case 9600:
+ rate = B9600;
+ break;
+ case 19200:
+ rate = B19200;
+ break;
+ case 38400:
+ rate = B38400;
+ break;
+ case 57600:
+ rate = B57600;
+ break;
+ case 115200:
+ rate = B115200;
+ break;
+ default:
+ return -1;
+ }
+
+ int rc = 0;
+ if ( type == LORALINK_MODEM_RX )
+ {
+ if ( (rc = _serialPortRx->open(device, rate, false, 1, O_RDWR | O_NOCTTY)) < 0 )
+ return rc;
+ }
+ else
+ {
+ rc = _serialPortTx->open(device, rate, false, 1, O_RDWR | O_NOCTTY);
+ }
+ return rc;
+}
+
+int LoRaLink::broadcast(const uint8_t* payload, uint16_t payloadLen){
+ SensorNetAddress addr;
+ addr.setBroadcastAddress();
+ return send(MQTT_SN, payload, (uint8_t) payloadLen, &addr);
+}
+
+int LoRaLink:: unicast(const uint8_t* payload, uint16_t payloadLen, SensorNetAddress* addr){
+ return send(MQTT_SN, payload, (uint8_t) payloadLen, addr);
+}
+
+int LoRaLink::recv(uint8_t* buf, uint16_t bufLen, SensorNetAddress* clientAddr)
+{
+ while ( true )
+ {
+ if ( ( readApiFrame( &_loRaLinkApi, &_loRaLinkPara) == true ) && (_loRaLinkPara.Available == true) && ( _loRaLinkPara.Error == false ) )
+ {
+ clientAddr->_devAddr = _loRaLinkApi.SourceAddr;
+
+ bufLen = _loRaLinkApi.PayloadLen;
+
+ memcpy( buf, _loRaLinkApi.Payload, bufLen );
+
+ switch ( (int) _loRaLinkApi.PayloadType )
+ {
+ case API_RSP_ACK:
+ _respCd = LORALINK_ACK;
+ break;
+
+ case API_RSP_NFC:
+ _respCd = LORALINK_NO_FREE_CH;
+ break;
+
+ case API_RSP_TOT:
+ _respCd = LORALINK_TX_TIMEOUT;
+ break;
+
+
+ case MQTT_SN:
+ memcpy( buf, _loRaLinkApi.Payload, bufLen );
+ return bufLen;
+
+ default:
+ return 0;
+ }
+ _sem.post();
+ return bufLen;
+
+ }
+ else
+ {
+ return 0;
+ }
+ }
+}
+
+
+
+bool LoRaLink::readApiFrame(LoRaLinkFrame_t* api, LoRaLinkReadParameters_t* para)
+{
+ uint8_t byte = 0;
+ uint16_t val = 0;
+
+ while ( recv(&byte) == 0 )
+ {
+ if ( byte == START_BYTE )
+ {
+ para->apipos= 1;
+ para->Error = true;
+ para->Available = false;
+ continue;
+ }
+
+ if ( para->apipos > 0 && byte == ESCAPE )
+ {
+ if( recv(&byte ) == 0 )
+ {
+ byte ^= PAD; // decode
+ }
+ else
+ {
+ para->Escape = true;
+ continue;
+ }
+ }
+
+ if( para->Escape == true )
+ {
+ byte ^= PAD;
+ para->Escape = false;
+ }
+
+ switch ( para->apipos )
+ {
+ case 0:
+ break;
+
+ case 1:
+ val = (uint16_t)byte;
+ api->PayloadLen = val << 8;
+ break;
+
+ case 2:
+ api->PayloadLen += byte;
+ break;
+
+ case 3:
+ api->SourceAddr = byte;
+ para->checksum = byte;
+ break;
+
+ case 4:
+ val = (uint16_t)byte;
+ api->Rssi = val << 8;
+ para->checksum += byte;
+ break;
+
+ case 5:
+ api->Rssi += byte;
+ para->checksum += byte;
+ break;
+
+ case 6:
+ val = (uint16_t)byte;
+ api->Snr = val << 8;
+ para->checksum += byte;
+ break;
+
+ case 7:
+ api->Snr += byte;
+ para->checksum += byte;
+ break;
+
+ case 8:
+ api->PayloadType = byte;
+ para->checksum += byte;
+ break;
+
+ default:
+ if ( para->apipos >= api->PayloadLen + 2 ) // FRM_DEL + CRC = 2
+ {
+ para->Error = ( (0xff - para->checksum) != byte );
+ para->Available = true;
+ api->PayloadLen -= 7; // 7 = SrcAddr[1] + Rssi[2] + Snr[2] + PlType[1] + Crc[1]
+ para->apipos = 0;
+ para->checksum = 0;
+ return true;
+ }
+ else
+ {
+ para->checksum += byte;
+ api->Payload[para->apipos - 9] = byte;
+ }
+ break;
+ }
+
+ para->apipos++;
+ }
+ return false;
+}
+
+int LoRaLink::send(LoRaLinkPayloadType_t type, const uint8_t* payload, uint16_t pLen, SensorNetAddress* addr)
+{
+ D_NWSTACK("\r\n===> Send: ");
+ uint8_t buf[2] = { 0 };
+ uint8_t chks = 0;
+ uint16_t len = pLen + 3; // 3 = DestAddr[1] + PayloadType[1] + Crc[1]
+ _respCd = 0;
+
+ _serialPortTx->send(START_BYTE);
+
+ buf[0] = (len >> 8) & 0xff;
+ buf[1] = len & 0xff;
+ send(buf[0]);
+ send(buf[1]);
+
+ send( addr->_devAddr );
+ chks = addr->_devAddr;
+
+
+
+ send(type);
+ chks += type;
+
+ D_NWSTACK("\r\n Payload: ");
+
+ for ( uint8_t i = 0; i < pLen; i++ ){
+ send(payload[i]); // Payload
+ chks += payload[i];
+ }
+
+ chks = 0xff - chks;
+ D_NWSTACK(" checksum ");
+ send(chks);
+ D_NWSTACK("\r\n");
+
+ /* wait ACK */
+ _sem.timedwait(LORALINK_TIMEOUT_ACK);
+
+ if ( _respCd == LORALINK_NO_FREE_CH )
+ {
+ D_NWSTACK(" Channel isn't free\r\n");
+ return -1;
+ }
+ else if ( _respCd != LORALINK_ACK )
+ {
+ D_NWSTACK(" Not Acknowleged\r\n");
+ return -1;
+ }
+ return (int)pLen;
+}
+
+void LoRaLink::send(uint8_t c)
+{
+ if( (c == START_BYTE || c == ESCAPE || c == XON || c == XOFF)){
+ _serialPortTx->send(ESCAPE);
+ _serialPortTx->send(c ^ PAD);
+ }else{
+ _serialPortTx->send(c);
+ }
+}
+
+int LoRaLink::recv(uint8_t* buf)
+{
+ struct timeval timeout;
+ int maxfd;
+ int fd;
+ fd_set rfds;
+
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0; // 500ms
+ FD_ZERO(&rfds);
+ FD_SET(_serialPortRx->_fd, &rfds);
+ FD_SET(_serialPortTx->_fd, &rfds);
+ if ( _serialPortRx->_fd > _serialPortTx->_fd )
+ {
+ maxfd = _serialPortRx->_fd;
+ }
+ else
+ {
+ maxfd = _serialPortTx->_fd;
+ }
+
+ if ( select(maxfd + 1, &rfds, 0, 0, &timeout) > 0 )
+ {
+ if ( FD_ISSET(_serialPortRx->_fd, &rfds) )
+ {
+ fd = _serialPortRx->_fd;
+ }
+ else
+ {
+ fd = _serialPortTx->_fd;
+ }
+
+ if ( read(fd, buf, 1) == 1 )
+ {
+ /*
+ if ( *buf == ESCAPE )
+ {
+ D_NWSTACK( " %02x",buf[0] );
+ if ( read(fd, buf, 1) == 1 )
+ {
+ *buf = PAD ^ *buf;
+ }
+ else
+ {
+ return -1;
+ }
+
+ }
+ */
+ D_NWSTACK( " %02x",buf[0] );
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/*=========================================
+ Class SerialPort
+ =========================================*/
+SerialPort::SerialPort()
+{
+ _tio.c_iflag = IGNBRK | IGNPAR;
+ _tio.c_cflag = CS8 | CLOCAL | CREAD;
+ _tio.c_cc[VINTR] = 0;
+ _tio.c_cc[VTIME] = 10; // 1 sec.
+ _tio.c_cc[VMIN] = 1;
+ _fd = 0;
+}
+
+SerialPort::~SerialPort()
+{
+ if (_fd)
+ {
+ ::close(_fd);
+ }
+}
+
+int SerialPort::open(char* devName, unsigned int baudrate, bool parity,
+ unsigned int stopbit, unsigned int flg)
+{
+ _fd = ::open(devName, flg);
+ if (_fd < 0)
+ {
+ return _fd;
+ }
+
+ if (parity)
+ {
+ _tio.c_cflag = _tio.c_cflag | PARENB;
+ }
+ if (stopbit == 2)
+ {
+ _tio.c_cflag = _tio.c_cflag | CSTOPB;
+ }
+
+ if (cfsetspeed(&_tio, baudrate) < 0)
+ {
+ return errno;
+ }
+ return tcsetattr(_fd, TCSANOW, &_tio);
+}
+
+bool SerialPort::send(unsigned char b)
+{
+ if (write(_fd, &b, 1) <= 0)
+ {
+ return false;
+ }
+ else
+ {
+ D_NWSTACK( " %02x", b);
+ return true;
+ }
+}
+
+
+
+void SerialPort::flush(void)
+{
+ tcsetattr(_fd, TCSAFLUSH, &_tio);
+}
+
diff --git a/MQTTSNGateway/src/linux/loralink/SensorNetwork.h b/MQTTSNGateway/src/linux/loralink/SensorNetwork.h
new file mode 100644
index 0000000..0565ff1
--- /dev/null
+++ b/MQTTSNGateway/src/linux/loralink/SensorNetwork.h
@@ -0,0 +1,187 @@
+/**************************************************************************************
+ * Copyright (c) 2016, Tomoaki Yamaguchi
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Tomoaki Yamaguchi - initial API and implementation
+ **************************************************************************************/
+#ifndef SENSORNETWORKX_H_
+#define SENSORNETWORKX_H_
+
+#include "MQTTSNGWDefines.h"
+#include "MQTTSNGWProcess.h"
+#include
+#include
+
+using namespace std;
+
+namespace MQTTSNGW
+{
+//#define DEBUG_NWSTACK
+
+#ifdef DEBUG_NWSTACK
+ #define D_NWSTACK(...) printf(__VA_ARGS__); fflush(stdout)
+#else
+ #define D_NWSTACK(...)
+#endif
+
+
+#define XMIT_STATUS_TIME_OVER 5000
+
+#define START_BYTE 0x7e
+#define ESCAPE 0x7d
+#define XON 0x11
+#define XOFF 0x13
+#define PAD 0x20
+
+#define BROADCAST_DEVADDR 0xFF
+
+#define LORA_PHY_MAXPAYLOAD 256
+
+/*!
+ * LoRaLink Modem Type
+ */
+typedef enum
+{
+ LORALINK_MODEM_TX,
+ LORALINK_MODEM_RX,
+}LoRaLinkModemType_t;
+
+/*!
+ * LoRaLink Serialized API
+ */
+typedef struct
+{
+ uint16_t PanId;
+ uint8_t DestinationAddr;
+ uint8_t SourceAddr;
+ uint16_t Rssi;
+ uint16_t Snr;
+ uint8_t PayloadType;
+ uint8_t Payload[LORA_PHY_MAXPAYLOAD];
+ uint16_t PayloadLen;
+}LoRaLinkFrame_t;
+
+typedef struct
+{
+ bool Available;
+ bool Error;
+ bool Escape;
+ uint16_t apipos;
+ uint8_t checksum;
+} LoRaLinkReadParameters_t;
+
+typedef enum
+{
+ MQTT_SN = 0x40,
+ API_RSP_ACK = 0x80,
+ API_RSP_NFC,
+ API_RSP_TOT,
+ API_REQ_UTC,
+ API_RSP_UTC,
+ API_CHG_TASK_PARAM,
+ API_REQ_RESET,
+
+}LoRaLinkPayloadType_t;
+
+/*===========================================
+ Class SerialPort
+ ============================================*/
+class SerialPort
+{
+ friend class LoRaLink;
+public:
+ SerialPort();
+ ~SerialPort();
+ int open(char* devName, unsigned int baudrate, bool parity, unsigned int stopbit, unsigned int flg);
+ bool send(unsigned char b);
+ void flush();
+
+private:
+ int _fd; // file descriptor
+ struct termios _tio;
+};
+
+/*===========================================
+ Class SensorNetAddreess
+ ============================================*/
+class SensorNetAddress
+{
+ friend class LoRaLink;
+public:
+ SensorNetAddress();
+ ~SensorNetAddress();
+ void setAddress( uint8_t devAddr);
+ int setAddress(string* data);
+ void setBroadcastAddress(void);
+ bool isMatch(SensorNetAddress* addr);
+ SensorNetAddress& operator =(SensorNetAddress& addr);
+ char* sprint(char*);
+private:
+ uint8_t _devAddr;
+// uint8_t _destAddr;
+};
+
+/*========================================
+ Class LoRaLink
+ =======================================*/
+class LoRaLink
+{
+public:
+ LoRaLink();
+ ~LoRaLink();
+
+ int open(LoRaLinkModemType_t type, char* device, int boudrate );
+ void close(void);
+ int unicast(const uint8_t* buf, uint16_t length, SensorNetAddress* sendToAddr);
+ int broadcast(const uint8_t* buf, uint16_t length);
+ int recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr);
+ void setApiMode(uint8_t mode);
+
+private:
+ bool readApiFrame(LoRaLinkFrame_t* api, LoRaLinkReadParameters_t* para);
+ int recv(uint8_t* buf);
+ int send(LoRaLinkPayloadType_t type, const uint8_t* payload, uint16_t pLen, SensorNetAddress* addr);
+ void send(uint8_t b);
+
+ Semaphore _sem;
+ Mutex _meutex;
+ uint8_t _respCd;
+ SerialPort* _serialPortRx;
+ SerialPort* _serialPortTx;
+ LoRaLinkFrame_t _loRaLinkApi;
+ LoRaLinkReadParameters_t _loRaLinkPara;
+};
+
+/*===========================================
+ Class SensorNetwork
+ ============================================*/
+class SensorNetwork: public LoRaLink
+{
+public:
+ SensorNetwork();
+ ~SensorNetwork();
+
+ int unicast(const uint8_t* payload, uint16_t payloadLength, SensorNetAddress* sendto);
+ int broadcast(const uint8_t* payload, uint16_t payloadLength);
+ int read(uint8_t* buf, uint16_t bufLen);
+ int initialize(void);
+ const char* getDescription(void);
+ SensorNetAddress* getSenderAddress(void);
+
+private:
+ SensorNetAddress _clientAddr; // Sender's address. not gateway's one.
+ string _description;
+};
+
+}
+
+#endif /* SENSORNETWORKX_H_ */
diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp
index 6f08481..1b57ff5 100644
--- a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp
+++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp
@@ -185,7 +185,7 @@ int SensorNetwork::initialize(void)
uint16_t multicastPortNo = 0;
uint16_t unicastPortNo = 0;
string ip;
-
+ unsigned int ttl = 1;
/*
* theProcess->getParam( ) copies
* a text specified by "Key" into param[] from the Gateway.conf
@@ -216,9 +216,15 @@ int SensorNetwork::initialize(void)
_description += " Gateway Port ";
_description += param;
}
+ if (theProcess->getParam("MulticastTTL", param) == 0)
+ {
+ ttl = atoi(param);
+ _description += " TTL: ";
+ _description += param;
+ }
/* Prepare UDP sockets */
- return UDPPort::open(ip.c_str(), multicastPortNo, unicastPortNo);
+ return UDPPort::open(ip.c_str(), multicastPortNo, unicastPortNo, ttl);
}
const char* SensorNetwork::getDescription(void)
@@ -261,7 +267,7 @@ void UDPPort::close(void)
}
}
-int UDPPort::open(const char* ipAddress, uint16_t multiPortNo, uint16_t uniPortNo)
+int UDPPort::open(const char* ipAddress, uint16_t multiPortNo, uint16_t uniPortNo, unsigned int ttl)
{
char loopch = 0;
const int reuse = 1;
@@ -275,6 +281,7 @@ int UDPPort::open(const char* ipAddress, uint16_t multiPortNo, uint16_t uniPortN
uint32_t ip = inet_addr(ipAddress);
_grpAddr.setAddress(ip, htons(multiPortNo));
_clientAddr.setAddress(ip, htons(uniPortNo));
+ _ttl = ttl;
/*------ Create unicast socket --------*/
_sockfdUnicast = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
@@ -342,6 +349,13 @@ int UDPPort::open(const char* ipAddress, uint16_t multiPortNo, uint16_t uniPortN
return -1;
}
+ if (setsockopt(_sockfdMulticast, IPPROTO_IP, IP_MULTICAST_TTL, &ttl,sizeof(ttl)) < 0)
+ {
+ D_NWSTACK("error Multicast IP_ADD_MEMBERSHIP in UDPPort::open\n");
+ close();
+ return -1;
+ }
+
if (setsockopt(_sockfdUnicast, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
{
D_NWSTACK("error Unicast IP_ADD_MEMBERSHIP in UDPPort::open\n");
@@ -378,8 +392,8 @@ int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr)
fd_set recvfds;
int maxSock = 0;
- timeout.tv_sec = 0;
- timeout.tv_usec = 1000000; // 1 sec
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0; // 1 sec
FD_ZERO(&recvfds);
FD_SET(_sockfdUnicast, &recvfds);
FD_SET(_sockfdMulticast, &recvfds);
diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.h b/MQTTSNGateway/src/linux/udp/SensorNetwork.h
index 07967f8..735a3c5 100644
--- a/MQTTSNGateway/src/linux/udp/SensorNetwork.h
+++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.h
@@ -60,7 +60,7 @@ public:
UDPPort();
virtual ~UDPPort();
- int open(const char* ipAddress, uint16_t multiPortNo, uint16_t uniPortNo);
+ int open(const char* ipAddress, uint16_t multiPortNo, uint16_t uniPortNo, unsigned int hops);
void close(void);
int unicast(const uint8_t* buf, uint32_t length, SensorNetAddress* sendToAddr);
int broadcast(const uint8_t* buf, uint32_t length);
@@ -76,7 +76,7 @@ private:
SensorNetAddress _grpAddr;
SensorNetAddress _clientAddr;
bool _disconReq;
-
+ unsigned int _ttl;
};
/*===========================================
diff --git a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp
index e31e67b..915aef8 100644
--- a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp
+++ b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp
@@ -122,11 +122,8 @@ char* SensorNetAddress::getAddress(void)
bool SensorNetAddress::isMatch(SensorNetAddress* addr)
{
- return ((this->_portNo == addr->_portNo) && \
- (this->_IpAddr.sin6_addr.s6_addr32[0] == addr->_IpAddr.sin6_addr.s6_addr32[0]) && \
- (this->_IpAddr.sin6_addr.s6_addr32[1] == addr->_IpAddr.sin6_addr.s6_addr32[1]) && \
- (this->_IpAddr.sin6_addr.s6_addr32[2] == addr->_IpAddr.sin6_addr.s6_addr32[2]) && \
- (this->_IpAddr.sin6_addr.s6_addr32[3] == addr->_IpAddr.sin6_addr.s6_addr32[3]));
+ return (this->_portNo == addr->_portNo) && \
+ (memcmp(this->_IpAddr.sin6_addr.s6_addr, addr->_IpAddr.sin6_addr.s6_addr, sizeof(this->_IpAddr.sin6_addr.s6_addr)) == 0);
}
SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr)
@@ -179,6 +176,7 @@ int SensorNetwork::initialize(void)
string ip;
string broadcast;
string interface;
+ unsigned int hops = 1;
if (theProcess->getParam("GatewayUDP6Bind", param) == 0)
{
@@ -204,8 +202,14 @@ int SensorNetwork::initialize(void)
_description += " Interface: ";
_description += param;
}
+ if (theProcess->getParam("GatewayUDP6Hops", param) == 0)
+ {
+ hops = atoi(param);
+ _description += " Hops: ";
+ _description += param;
+ }
- return UDPPort6::open(ip.c_str(), unicastPortNo, broadcast.c_str(), interface.c_str());
+ return UDPPort6::open(ip.c_str(), unicastPortNo, broadcast.c_str(), interface.c_str(), hops);
}
const char* SensorNetwork::getDescription(void)
@@ -248,7 +252,7 @@ void UDPPort6::close(void)
}
}
-int UDPPort6::open(const char* ipAddress, uint16_t uniPortNo, const char* broadcastAddr, const char* interfaceName)
+int UDPPort6::open(const char* ipAddress, uint16_t uniPortNo, const char* broadcastAddr, const char* interfaceName, unsigned int hops)
{
struct addrinfo hints, *res;
int errnu;
@@ -295,8 +299,15 @@ int UDPPort6::open(const char* ipAddress, uint16_t uniPortNo, const char* broadc
WRITELOG("UDP6::open - limit IPv6: %s",strerror(errnu));
return errnu;
}
+ errnu = setsockopt(_sockfdMulticast, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &hops,sizeof(hops));
+ if(errnu <0)
+ {
+ WRITELOG("UDP6::open - limit HOPS: %s",strerror(errnu));
+ return errnu;
+ }
_uniPortNo = uniPortNo;
+ _hops = hops;
freeaddrinfo(res);
//init the structs for getaddrinfo
@@ -320,8 +331,13 @@ int UDPPort6::open(const char* ipAddress, uint16_t uniPortNo, const char* broadc
//if given, set a given device name to bind to
if(strlen(interfaceName) > 0)
{
+#ifdef __APPLE__
+ int idx = if_nametoindex(interfaceName);
+ setsockopt(_sockfdUnicast, IPPROTO_IP, IP_BOUND_IF, &idx, sizeof(idx));
+#else
//socket option: bind to a given interface name
setsockopt(_sockfdUnicast, SOL_SOCKET, SO_BINDTODEVICE, interfaceName, strlen(interfaceName));
+#endif
}
//socket option: reuse address
@@ -371,7 +387,7 @@ int UDPPort6::unicast(const uint8_t* buf, uint32_t length, SensorNetAddress* add
strcpy(destStr, addr->getAddress());
strcat(destStr,"%");
strcat(destStr,_interfaceName);
- if(IN6_IS_ADDR_LINKLOCAL(addr->getAddress()))
+ if(IN6_IS_ADDR_LINKLOCAL(&addr->getIpAddress()->sin6_addr))
{
getaddrinfo(destStr, portStr.c_str(), &hints, &res);
}
@@ -412,8 +428,8 @@ int UDPPort6::broadcast(const uint8_t* buf, uint32_t length)
strcpy(destStr, _grpAddr.getAddress());
strcat(destStr,"%");
strcat(destStr,_interfaceName);
- if(IN6_IS_ADDR_MC_NODELOCAL(_grpAddr.getAddress()) ||
- IN6_IS_ADDR_MC_LINKLOCAL(_grpAddr.getAddress()))
+ if(IN6_IS_ADDR_MC_NODELOCAL(&_grpAddr.getIpAddress()->sin6_addr) ||
+ IN6_IS_ADDR_MC_LINKLOCAL(&_grpAddr.getIpAddress()->sin6_addr))
{
err = getaddrinfo(destStr, std::to_string(_uniPortNo).c_str(), &hint, &info );
}
@@ -440,14 +456,13 @@ int UDPPort6::broadcast(const uint8_t* buf, uint32_t length)
return 0;
}
-//TODO: test if this is working properly (GW works, but this function is not completely tested)
int UDPPort6::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr)
{
struct timeval timeout;
fd_set recvfds;
- timeout.tv_sec = 0;
- timeout.tv_usec = 1000000; // 1 sec
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0; // 1 sec
FD_ZERO(&recvfds);
FD_SET(_sockfdUnicast, &recvfds);
@@ -462,7 +477,6 @@ int UDPPort6::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr)
return rc;
}
-//TODO: test if this is working properly (GW works, but this function is not completely tested)
int UDPPort6::recvfrom(int sockfd, uint8_t* buf, uint16_t len, uint8_t flags, SensorNetAddress* addr)
{
sockaddr_in6 sender;
diff --git a/MQTTSNGateway/src/linux/udp6/SensorNetwork.h b/MQTTSNGateway/src/linux/udp6/SensorNetwork.h
index 62c297f..0e3ab7e 100644
--- a/MQTTSNGateway/src/linux/udp6/SensorNetwork.h
+++ b/MQTTSNGateway/src/linux/udp6/SensorNetwork.h
@@ -66,7 +66,7 @@ public:
UDPPort6();
virtual ~UDPPort6();
- int open(const char* ipAddress, uint16_t uniPortNo, const char* broadcastAddr, const char* interfaceName);
+ int open(const char* ipAddress, uint16_t uniPortNo, const char* broadcastAddr, const char* interfaceName, unsigned int hops);
void close(void);
int unicast(const uint8_t* buf, uint32_t length, SensorNetAddress* sendToAddr);
int broadcast(const uint8_t* buf, uint32_t length);
@@ -84,7 +84,7 @@ private:
SensorNetAddress _clientAddr;
uint16_t _uniPortNo;
bool _disconReq;
-
+ unsigned int _hops;
};
/*===========================================
diff --git a/MQTTSNGateway/src/mainGateway.cpp b/MQTTSNGateway/src/mainGateway.cpp
index e196d96..4c6657c 100644
--- a/MQTTSNGateway/src/mainGateway.cpp
+++ b/MQTTSNGateway/src/mainGateway.cpp
@@ -34,12 +34,17 @@ BrokerSendTask task5(&gateway);
int main(int argc, char** argv)
{
- try {
- gateway.initialize(argc, argv);
- gateway.run();
- } catch (const std::exception &ex) {
- WRITELOG("\nEclipse Paho MQTT-SN Gateway exception: %s\n", ex.what());
- WRITELOG("MQTT-SNGateway [-f Config file name]\n");
- }
+ gateway.initialize(argc, argv);
+ gateway.run();
+ try
+ {
+ gateway.initialize(argc, argv);
+ gateway.run();
+ }
+ catch (const std::exception &ex)
+ {
+ WRITELOG("\nEclipse Paho MQTT-SN Gateway exception: %s\n", ex.what());
+ WRITELOG("MQTT-SNGateway [-f Config file name]\n");
+ }
return 0;
}
diff --git a/MQTTSNGateway/src/tests/TestTopics.cpp b/MQTTSNGateway/src/tests/TestTopics.cpp
index 404013e..52c968b 100644
--- a/MQTTSNGateway/src/tests/TestTopics.cpp
+++ b/MQTTSNGateway/src/tests/TestTopics.cpp
@@ -214,7 +214,7 @@ void TestTopics::test(void)
for ( int i = 0; i < 10 ; i++ )
{
MQTTSN_topicid tp1;
- char tp0[10];
+ char tp0[20];
sprintf(tp0, "Topic/%d/%d", i, i);
tp1.type = MQTTSN_TOPIC_TYPE_NORMAL;
tp1.data.long_.len = strlen(tp0);
@@ -237,7 +237,7 @@ void TestTopics::test(void)
for ( int i = 0; i < 10 ; i++ )
{
MQTTSN_topicid tp1;
- char tp0[10];
+ char tp0[20];
sprintf(tp0, "Topic/%d", i);
tp1.type = MQTTSN_TOPIC_TYPE_NORMAL;
tp1.data.long_.len = strlen(tp0);
@@ -261,7 +261,7 @@ void TestTopics::test(void)
for ( int i = 0; i < 10 ; i++ )
{
MQTTSN_topicid tpid1;
- char tp0[10];
+ char tp0[20];
sprintf(tp0, "TOPIC/%d/%d", i, i);
tpid1.type = MQTTSN_TOPIC_TYPE_NORMAL;
tpid1.data.long_.len = strlen(tp0);
diff --git a/MQTTSNPacket/src/MQTTSNPacket.c b/MQTTSNPacket/src/MQTTSNPacket.c
index a788b78..86c1e90 100644
--- a/MQTTSNPacket/src/MQTTSNPacket.c
+++ b/MQTTSNPacket/src/MQTTSNPacket.c
@@ -53,7 +53,7 @@ const char* MQTTSNPacket_name(int code)
*/
int MQTTSNPacket_len(int length)
{
- return (length > 255) ? length + 3 : length + 1;
+ return (length >= 255) ? length + 3 : length + 1;
}
/**
@@ -67,7 +67,7 @@ int MQTTSNPacket_encode(unsigned char* buf, int length)
int rc = 0;
FUNC_ENTRY;
- if (length > 255)
+ if (length >= 255)
{
writeChar(&buf, 0x01);
writeInt(&buf, length);
@@ -83,7 +83,8 @@ int MQTTSNPacket_encode(unsigned char* buf, int length)
/**
* Obtains the MQTT-SN packet length from received data
- * @param getcharfn pointer to function to read the next character from the data source
+ * @param buf the buffer that contains the MQTT-SN packet
+ * @param buflen the length in bytes of the supplied buffer
* @param value the decoded length returned
* @return the number of bytes read from the socket
*/
diff --git a/MQTTSNPacket/src/MQTTSNSearchClient.c b/MQTTSNPacket/src/MQTTSNSearchClient.c
index c55e134..af93c19 100644
--- a/MQTTSNPacket/src/MQTTSNSearchClient.c
+++ b/MQTTSNPacket/src/MQTTSNSearchClient.c
@@ -116,7 +116,7 @@ int MQTTSNDeserialize_gwinfo(unsigned char* gatewayid, unsigned short* gatewayad
*gatewayid = readChar(&curdata);
*gatewayaddress_len = enddata - curdata;
- *gatewayaddress = (gatewayaddress_len > 0) ? curdata : NULL;
+ *gatewayaddress = (*gatewayaddress_len > 0) ? curdata : NULL;
rc = 1;
exit: