diff --git a/.cproject b/.cproject
index c3137e8..fae7107 100644
--- a/.cproject
+++ b/.cproject
@@ -14,7 +14,7 @@
-
+
@@ -66,7 +66,7 @@
-
+
@@ -136,8 +136,8 @@
-
-
+
+
diff --git a/MQTTSNGateway/GatewayTester/Makefile b/MQTTSNGateway/GatewayTester/Makefile
index 7ce0f29..0ed4a27 100644
--- a/MQTTSNGateway/GatewayTester/Makefile
+++ b/MQTTSNGateway/GatewayTester/Makefile
@@ -7,9 +7,13 @@ PUBAPPL := mainPub
PRGSUB := MQTT-SNSub
SUBAPPL := mainSub
+PRGQOS := MQTT-SNPubQoS-1
+QOSAPPL := mainPubQoS-1
+
SRCDIR := samples
SRCPUB := ClientPub
SRCSUB := ClientSub
+SRCQOS := ClientPubQoS-1
SUBDIR := src
CPPSRCS := \
@@ -46,10 +50,11 @@ DEPS := $(CPPSRCS:%.cpp=$(OUTDIR)/%.d)
PROGPUB := $(OUTDIR)/$(PRGPUB)
PROGSUB := $(OUTDIR)/$(PRGSUB)
+PROGQOS := $(OUTDIR)/$(PRGQOS)
.PHONY: install clean
-all: $(PROG) $(PROGPUB) $(PROGSUB)
+all: $(PROG) $(PROGPUB) $(PROGSUB) $(PROGQOS)
@@ -64,6 +69,9 @@ $(PROGPUB): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(SRCPUB)/$(PUBAPPL).o
$(PROGSUB): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(SRCSUB)/$(SUBAPPL).o
$(CXX) $(LDFLAGS) -o $(PROGSUB) $(OUTDIR)/$(SRCDIR)/$(SRCSUB)/$(SUBAPPL).o $(OBJS) $(LIBS) $(LDADD)
+$(PROGQOS): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(SRCQOS)/$(QOSAPPL).o
+ $(CXX) $(LDFLAGS) -o $(PROGQOS) $(OUTDIR)/$(SRCDIR)/$(SRCQOS)/$(QOSAPPL).o $(OBJS) $(LIBS) $(LDADD)
+
$(OUTDIR)/$(SUBDIR)/%.o:$(SUBDIR)/%.cpp
@if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi
@@ -80,6 +88,10 @@ $(OUTDIR)/$(SRCDIR)/$(SRCPUB)/%.o:$(SRCDIR)/$(SRCPUB)%.cpp
$(OUTDIR)/$(SRCDIR)/$(SRCSUB)/%.o:$(SRCDIR)/$(SRCSUB)%.cpp
@if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $<
+
+$(OUTDIR)/$(SRCDIR)/$(SRCQOS)/%.o:$(SRCDIR)/$(SRCQOS)%.cpp
+ @if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi
+ $(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $<
clean:
rm -rf $(OUTDIR)
@@ -88,4 +100,5 @@ install:
cp -pf $(PROG) ../../../
cp -pf $(PROGPUB) ../../../
cp -pf $(PROGSUB) ../../../
-
+ cp -pf $(PROGQOS) ../../../
+
\ No newline at end of file
diff --git a/MQTTSNGateway/GatewayTester/samples/ClientPubQoS-1/mainPubQoS-1.cpp b/MQTTSNGateway/GatewayTester/samples/ClientPubQoS-1/mainPubQoS-1.cpp
new file mode 100644
index 0000000..8b72f53
--- /dev/null
+++ b/MQTTSNGateway/GatewayTester/samples/ClientPubQoS-1/mainPubQoS-1.cpp
@@ -0,0 +1,161 @@
+/****************************************************************************
+ * Copyright (c) 2016, Tomoaki Yamaguchi
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *---------------------------------------------------------------------------
+ *
+ * MQTT-SN GATEWAY TEST CLIENT
+ *
+ * Supported functions.
+ *
+ * void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
+ *
+ * void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
+ *
+ * void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
+ *
+ * void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
+ *
+ * void UNSUBSCRIBE ( const char* topicName );
+ *
+ * void UNSUBSCRIBE ( uint16_t topicId );
+ *
+ * void DISCONNECT ( uint16_t sleepInSecs );
+ *
+ * void CONNECT ( void );
+ *
+ * void DISPLAY( format, .....); <== instead of printf()
+ *
+ *
+ * Contributors:
+ * Tomoaki Yamaguchi - initial API and implementation
+ ***************************************************************************/
+
+#include "LMqttsnClientApp.h"
+#include "LMqttsnClient.h"
+#include "LScreen.h"
+
+using namespace std;
+using namespace linuxAsyncClient;
+extern LMqttsnClient* theClient;
+extern LScreen* theScreen;
+
+/*------------------------------------------------------
+ * UDP Configuration (theNetcon)
+ *------------------------------------------------------*/
+UDPCONF = {
+ "GatewayTestPubClient", // ClientId
+ {225,1,1,1}, // Multicast group IP
+ 1883, // Multicast group Port
+ 20001, // Local PortNo
+};
+
+/*------------------------------------------------------
+ * Client Configuration (theMqcon)
+ *------------------------------------------------------*/
+MQTTSNCONF = {
+ 300, //KeepAlive [seconds]
+ true, //Clean session
+ 300, //Sleep duration [seconds]
+ "", //WillTopic
+ "", //WillMessage
+ 0, //WillQos
+ false //WillRetain
+};
+
+/*------------------------------------------------------
+ * Define Topics
+ *------------------------------------------------------*/
+
+
+/*------------------------------------------------------
+ * Callback routines for Subscribed Topics
+ *------------------------------------------------------*/
+
+/*------------------------------------------------------
+ * A Link list of Callback routines and Topics
+ *------------------------------------------------------*/
+SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx),
+
+ END_OF_SUBSCRIBE_LIST
+ };
+
+/*------------------------------------------------------
+ * Test functions
+ *------------------------------------------------------*/
+
+void publishTopic1(void)
+{
+ char payload[300];
+ sprintf(payload, "publish \"ty4tw/Topic1\" \n");
+ uint8_t qos = 3;
+ PUBLISH(1,(uint8_t*)payload, strlen(payload), qos);
+}
+
+void publishTopic2(void)
+{
+ char payload[300];
+ sprintf(payload, "publish \"ty4tw/topic2\" \n");
+ uint8_t qos = 3;
+ PUBLISH(2,(uint8_t*)payload, strlen(payload), qos);
+}
+
+void publishTopic57(void)
+{
+ char payload[300];
+ sprintf(payload, "publish \"ty4tw/topic57\" \n");
+ uint8_t qos = 3;
+ PUBLISH(5,(uint8_t*)payload, strlen(payload), qos);
+}
+
+
+void disconnect(void)
+{
+ DISCONNECT(0);
+}
+
+
+/*------------------------------------------------------
+ * A List of Test functions is valid in case of
+ * line 23 of LMqttsnClientApp.h is commented out.
+ * //#define CLIENT_MODE
+ *------------------------------------------------------*/
+
+TEST_LIST = {// e.g. TEST( Label, Test),
+ TEST("Step1:Publish topic1", publishTopic1),
+ TEST("Step2:Publish topic57", publishTopic57),
+ TEST("Step3:Publish topic2", publishTopic2),
+ END_OF_TEST_LIST
+ };
+
+
+/*------------------------------------------------------
+ * List of tasks is invalid in case of line23 of
+ * LMqttsnClientApp.h is commented out.
+ * #define CLIENT_MODE
+ *------------------------------------------------------*/
+TASK_LIST = {// e.g. TASK( task, executing duration in second),
+ TASK(publishTopic1, 4), // publishTopic1() is executed every 4 seconds
+ TASK(publishTopic2, 7), // publishTopic2() is executed every 7 seconds
+ END_OF_TASK_LIST
+ };
+
+
+/*------------------------------------------------------
+ * Initialize function
+ *------------------------------------------------------*/
+void setup(void)
+{
+ SetQoSMinus1Mode(true);
+}
+
+
+/***************** END OF PROGRAM ********************/
diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp
index 34db1c5..6075ced 100644
--- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp
+++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp
@@ -220,7 +220,7 @@ TASK_LIST = {// e.g. TASK( task, executing duration in second),
*------------------------------------------------------*/
void setup(void)
{
- //SetForwarderMode();
+ SetForwarderMode(false);
}
diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp
index 78cc726..b0e86cd 100644
--- a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp
+++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp
@@ -30,21 +30,16 @@ extern uint16_t getUint16(const uint8_t* pos);
extern LMqttsnClient* theClient;
extern LScreen* theScreen;
-
/*=====================================
- Class GwProxy
+ Class GwProxy
======================================*/
-static const char* packet_names[] =
-{
- "ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK",
- "WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK",
- "PUBLISH", "PUBACK", "PUBCOMP", "PUBREC", "PUBREL", "RESERVED",
- "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP",
- "DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD",
- "WILLMSGRESP"
-};
+static const char* packet_names[] = { "ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK",
+ "WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK", "PUBLISH", "PUBACK", "PUBCOMP",
+ "PUBREC", "PUBREL", "RESERVED", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP",
+ "DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD", "WILLMSGRESP" };
-LGwProxy::LGwProxy(){
+LGwProxy::LGwProxy()
+{
_nextMsgId = 0;
_status = GW_LOST;
_gwId = 0;
@@ -61,13 +56,16 @@ LGwProxy::LGwProxy(){
_tWake = 0;
_initialized = 0;
_isForwarderMode = false;
+ _isQoSMinus1Mode = false;
}
-LGwProxy::~LGwProxy(){
+LGwProxy::~LGwProxy()
+{
_topicTbl.clearTopic();
}
-void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf){
+void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf)
+{
_network.initialize(netconf);
_clientId = netconf.clientId;
_willTopic = mqconf.willTopic;
@@ -79,78 +77,105 @@ void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf){
_initialized = 1;
}
-void LGwProxy::connect(){
+void LGwProxy::connect()
+{
char* pos;
- while (_status != GW_CONNECTED){
+ while (_status != GW_CONNECTED)
+ {
pos = _msg;
- if (_status == GW_SEND_WILLMSG){
- *pos++ = 2 + (uint8_t)strlen(_willMsg);
- *pos++ = MQTTSN_TYPE_WILLMSG;
- strcpy(pos,_willMsg); // WILLMSG
- _status = GW_WAIT_CONNACK;
- writeGwMsg();
- }else if (_status == GW_SEND_WILLTOPIC){
- *pos++ = 3 + (uint8_t)strlen(_willTopic);
- *pos++ = MQTTSN_TYPE_WILLTOPIC;
- *pos++ = _qosWill | _retainWill;
- strcpy(pos,_willTopic); // WILLTOPIC
- _status = GW_WAIT_WILLMSGREQ;
- writeGwMsg();
- }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){
- uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
- *pos++ = 6 + clientIdLen;
- *pos++ = MQTTSN_TYPE_CONNECT;
- pos++;
- if (_cleanSession){
- _msg[2] = MQTTSN_FLAG_CLEAN;
- }
- *pos++ = MQTTSN_PROTOCOL_ID;
- setUint16((uint8_t*)pos, _tkeepAlive);
- pos += 2;
- strncpy(pos, _clientId, clientIdLen);
- _msg[ 6 + clientIdLen] = 0;
- _status = GW_WAIT_CONNACK;
- if ( _willMsg && _willTopic && _status != GW_SLEPT ){
- if (strlen(_willMsg) && strlen(_willTopic)){
- _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT
- _status = GW_WAIT_WILLTOPICREQ;
- }
- }
- writeGwMsg();
- _connectRetry = MQTTSN_RETRY_COUNT;
- }else if (_status == GW_LOST){
+ if (_status == GW_LOST)
+ {
*pos++ = 3;
*pos++ = MQTTSN_TYPE_SEARCHGW;
*pos = 0; // SERCHGW
_status = GW_SEARCHING;
writeGwMsg();
-
+ }
+ else if (_status == GW_SEND_WILLMSG)
+ {
+ *pos++ = 2 + (uint8_t) strlen(_willMsg);
+ *pos++ = MQTTSN_TYPE_WILLMSG;
+ strcpy(pos, _willMsg); // WILLMSG
+ _status = GW_WAIT_CONNACK;
+ writeGwMsg();
+ }
+ else if (_status == GW_SEND_WILLTOPIC)
+ {
+ *pos++ = 3 + (uint8_t) strlen(_willTopic);
+ *pos++ = MQTTSN_TYPE_WILLTOPIC;
+ *pos++ = _qosWill | _retainWill;
+ strcpy(pos, _willTopic); // WILLTOPIC
+ _status = GW_WAIT_WILLMSGREQ;
+ writeGwMsg();
+ }
+ else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT)
+ {
+ uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
+ if (_isQoSMinus1Mode)
+ {
+ _status = GW_CONNECTED;
+ }
+ else
+ {
+ *pos++ = 6 + clientIdLen;
+ *pos++ = MQTTSN_TYPE_CONNECT;
+ pos++;
+ if (_cleanSession)
+ {
+ _msg[2] = MQTTSN_FLAG_CLEAN;
+ }
+ *pos++ = MQTTSN_PROTOCOL_ID;
+ setUint16((uint8_t*) pos, _tkeepAlive);
+ pos += 2;
+ strncpy(pos, _clientId, clientIdLen);
+ _msg[6 + clientIdLen] = 0;
+ _status = GW_WAIT_CONNACK;
+ if (_willMsg && _willTopic && _status != GW_SLEPT)
+ {
+ if (strlen(_willMsg) && strlen(_willTopic))
+ {
+ _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT
+ _status = GW_WAIT_WILLTOPICREQ;
+ }
+ }
+ writeGwMsg();
+ _connectRetry = MQTTSN_RETRY_COUNT;
+ }
}
getConnectResponce();
}
return;
}
-int LGwProxy::getConnectResponce(void){
+int LGwProxy::getConnectResponce(void)
+{
int len = readMsg();
- if (len == 0){
- if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){
+ if (len == 0)
+ {
+ if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL))
+ {
if (_msg[1] == MQTTSN_TYPE_CONNECT)
{
_connectRetry--;
}
- if (--_retryCount > 0){
- writeMsg((const uint8_t*)_msg); // Not writeGwMsg() : not to reset the counter.
+ if (--_retryCount > 0)
+ {
+ writeMsg((const uint8_t*) _msg); // Not writeGwMsg() : not to reset the counter.
_sendUTC = time(NULL);
- }else{
+ }
+ else
+ {
_sendUTC = 0;
- if ( _status > GW_SEARCHING && _connectRetry > 0){
+ if (_status > GW_SEARCHING && _connectRetry > 0)
+ {
_status = GW_CONNECTING;
- }else{
+ }
+ else
+ {
_status = GW_LOST;
_gwId = 0;
}
@@ -158,66 +183,87 @@ int LGwProxy::getConnectResponce(void){
}
}
return 0;
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING)
+ {
_network.setGwAddress();
_gwId = _mqttsnMsg[1];
_status = GW_CONNECTING;
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ)
+ {
_status = GW_SEND_WILLTOPIC;
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ)
+ {
_status = GW_SEND_WILLMSG;
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK){
- if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK)
+ {
+ if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED)
+ {
_status = GW_CONNECTED;
_connectRetry = MQTTSN_RETRY_COUNT;
setPingReqTimer();
- if ( _tSleep ){
+ if (_tSleep)
+ {
_tSleep = 0;
- }else{
+ }
+ else
+ {
DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
- if ( _cleanSession || _initialized == 1 )
+ if (_cleanSession || _initialized == 1)
{
_topicTbl.clearTopic();
_initialized = 0;
theClient->onConnect(); // SUBSCRIBEs are conducted
}
}
- }else{
+ }
+ else
+ {
_status = GW_CONNECTING;
}
}
return 1;
}
-void LGwProxy::reconnect(void){
+void LGwProxy::reconnect(void)
+{
D_MQTTLOG("...Gateway reconnect\r\n");
_status = GW_DISCONNECTED;
connect();
}
-void LGwProxy::disconnect(uint16_t secs){
+void LGwProxy::disconnect(uint16_t secs)
+{
_tSleep = secs;
_tWake = 0;
_msg[1] = MQTTSN_TYPE_DISCONNECT;
- if (secs){
+ if (secs)
+ {
_msg[0] = 4;
setUint16((uint8_t*) _msg + 2, secs);
_status = GW_SLEEPING;
- }else{
+ }
+ else
+ {
_msg[0] = 2;
_keepAliveTimer.stop();
_status = GW_DISCONNECTING;
}
_retryCount = MQTTSN_RETRY_COUNT;
- writeMsg((const uint8_t*)_msg);
+ writeMsg((const uint8_t*) _msg);
_sendUTC = time(NULL);
- while ( _status != GW_DISCONNECTED && _status != GW_SLEPT){
- if (getDisconnectResponce() < 0){
+ while (_status != GW_DISCONNECTED && _status != GW_SLEPT)
+ {
+ if (getDisconnectResponce() < 0)
+ {
_status = GW_LOST;
DISPLAY("\033[0m\033[0;31m\n\n!!!!!! DISCONNECT Error !!!!!\033[0m\033[0;37m \n\n");
return;
@@ -225,48 +271,63 @@ void LGwProxy::disconnect(uint16_t secs){
}
}
-int LGwProxy::getDisconnectResponce(void){
+int LGwProxy::getDisconnectResponce(void)
+{
int len = readMsg();
- if (len == 0){
- if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){
- if (--_retryCount >= 0){
- writeMsg((const uint8_t*)_msg);
+ if (len == 0)
+ {
+ if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL))
+ {
+ if (--_retryCount >= 0)
+ {
+ writeMsg((const uint8_t*) _msg);
_sendUTC = time(NULL);
- }else{
+ }
+ else
+ {
_status = GW_LOST;
_gwId = 0;
return -1;
}
}
return 0;
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
- if (_status == GW_SLEEPING ){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT)
+ {
+ if (_status == GW_SLEEPING)
+ {
_status = GW_SLEPT;
uint32_t remain = _keepAliveTimer.getRemain();
theClient->setSleepMode(remain);
/* Wake up and starts from this point. */
- }else{
+ }
+ else
+ {
_status = GW_DISCONNECTED;
}
}
return 0;
}
-int LGwProxy::getMessage(void){
+int LGwProxy::getMessage(void)
+{
int len = readMsg();
- if (len < 0){
+ if (len < 0)
+ {
return len; //error
}
#ifdef DEBUG_MQTTSN
- if (len){
+ if (len)
+ {
D_MQTTLOG(" recved msgType %x\n", _mqttsnMsg[0]);
}
#endif
- if (len == 0){
+ if (len == 0)
+ {
// Check PINGREQ required
checkPingReq();
@@ -282,87 +343,115 @@ int LGwProxy::getMessage(void){
// Check Timeout of SUBSCRIBEs,
theClient->getSubscribeManager()->checkTimeout();
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH)
+ {
theClient->getPublishManager()->published(_mqttsnMsg, len);
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP ||
- _mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL ){
- theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t)len);
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP
+ || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL)
+ {
+ theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t) len);
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK)
+ {
theClient->getSubscribeManager()->responce(_mqttsnMsg);
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER)
+ {
_regMgr.responceRegister(_mqttsnMsg, len);
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK)
+ {
_regMgr.responceRegAck(getUint16(_mqttsnMsg + 3), getUint16(_mqttsnMsg + 1));
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){
- if (_pingStatus == GW_WAIT_PINGRESP){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP)
+ {
+ if (_pingStatus == GW_WAIT_PINGRESP)
+ {
_pingStatus = 0;
setPingReqTimer();
- if ( _tSleep > 0 ){
+ if (_tSleep > 0)
+ {
_tWake += _tkeepAlive;
- if ( _tWake < _tSleep ){
+ if (_tWake < _tSleep)
+ {
theClient->setSleepMode(_tkeepAlive * 1000UL);
- }else{
+ }
+ else
+ {
DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n");
_tWake = 0;
connect();
}
}
}
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT)
+ {
_status = GW_LOST;
_gwAliveTimer.stop();
_keepAliveTimer.stop();
- }else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE){
- if (getUint16((const uint8_t*)(_mqttsnMsg + 2)) < 61){
- _tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1500;
- }else{
- _tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1100;
+ }
+ else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE)
+ {
+ if (getUint16((const uint8_t*) (_mqttsnMsg + 2)) < 61)
+ {
+ _tAdv = getUint16((const uint8_t*) (_mqttsnMsg + 2)) * 1500;
+ }
+ else
+ {
+ _tAdv = getUint16((const uint8_t*) (_mqttsnMsg + 2)) * 1100;
}
_gwAliveTimer.start(_tAdv);
}
return 0;
}
-
-
-
-uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId){
+uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId)
+{
uint16_t id = topicId;
- if (id == 0){
+ if (id == 0)
+ {
id = _topicTbl.getTopicId(topicName);
_regMgr.registerTopic(topicName);
}
return id;
}
-
-int LGwProxy::writeMsg(const uint8_t* msg){
+int LGwProxy::writeMsg(const uint8_t* msg)
+{
uint16_t len;
uint8_t pos;
- uint8_t rc = 0;
+ uint8_t rc = 0;
- if (msg[0] == 0x01){
+ if (msg[0] == 0x01)
+ {
len = getUint16(msg + 1);
pos = 2;
- }else{
+ }
+ else
+ {
len = msg[0];
pos = 1;
}
- if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW){
- rc = _network.broadcast(msg,len);
- }else
+ if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW)
{
- if ( _isForwarderMode )
+ rc = _network.broadcast(msg, len);
+ }
+ else
+ {
+ if (_isForwarderMode)
{
// create a forwarder encapsulation message WirelessNodeId is a 4bytes fake data
- uint8_t* buf = (uint8_t*)malloc(len + 7);
+ uint8_t* buf = (uint8_t*) malloc(len + 7);
buf[0] = 7;
buf[1] = MQTTSN_TYPE_ENCAPSULATED;
buf[2] = 1;
@@ -371,16 +460,19 @@ int LGwProxy::writeMsg(const uint8_t* msg){
buf[5] = 'I';
buf[6] = 'd';
memcpy(buf + 7, msg, len);
- if ( buf)
- rc = _network.unicast(buf, len + 7);
+ if (buf)
+ rc = _network.unicast(buf, len + 7);
free(buf);
DISPLAY(" Encapsulated\n ");
- }else{
- rc = _network.unicast(msg,len);
+ }
+ else
+ {
+ rc = _network.unicast(msg, len);
}
- if (rc > 0){
- if ( msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP )
+ if (rc > 0)
+ {
+ if (msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP)
{
DISPLAY(" send %s\n", packet_names[msg[pos]]);
}
@@ -389,105 +481,134 @@ int LGwProxy::writeMsg(const uint8_t* msg){
return rc;
}
-void LGwProxy::writeGwMsg(void){
+void LGwProxy::writeGwMsg(void)
+{
_retryCount = MQTTSN_RETRY_COUNT;
- writeMsg((const uint8_t*)_msg);
+ writeMsg((const uint8_t*) _msg);
_sendUTC = time(NULL);
}
-int LGwProxy::readMsg(void){
+int LGwProxy::readMsg(void)
+{
int len = 0;
uint8_t* msg = _network.getMessage(&len);
_mqttsnMsg = msg;
- if (len == 0){
+ if (len == 0)
+ {
return 0;
}
- if (_mqttsnMsg[0] == 0x01){
- int msgLen = (int) getUint16((const uint8_t*)_mqttsnMsg + 1);
- if (len != msgLen){
+ if (_mqttsnMsg[0] == 0x01)
+ {
+ int msgLen = (int) getUint16((const uint8_t*) _mqttsnMsg + 1);
+ if (len != msgLen)
+ {
_mqttsnMsg += 3;
len = msgLen - 3;
}
- }else{
+ }
+ else
+ {
_mqttsnMsg += 1;
len -= 1;
}
- if ( *_mqttsnMsg == MQTTSN_TYPE_ENCAPSULATED )
+ if (*_mqttsnMsg == MQTTSN_TYPE_ENCAPSULATED)
{
int lenEncap = len + 1;
- if (msg[lenEncap] == 0x01){
- int msgLen = (int) getUint16((const uint8_t*)(msg + lenEncap + 1));
+ if (msg[lenEncap] == 0x01)
+ {
+ int msgLen = (int) getUint16((const uint8_t*) (msg + lenEncap + 1));
msg += (lenEncap + 3);
len = msgLen - 3;
- }else{
+ }
+ else
+ {
msg += (lenEncap + 1);
len = *(msg - 1);
}
_mqttsnMsg = msg;
- DISPLAY(" recv encapslated message\n" );
+ DISPLAY(" recv encapslated message\n");
}
- if ( *_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP )
+ if (*_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP)
{
DISPLAY(" recv %s\n", packet_names[*_mqttsnMsg]);
}
return len;
}
-void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain){
+void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain)
+{
_willTopic = willTopic;
_retainWill = _qosWill = 0;
- if (qos == 1){
+ if (qos == 1)
+ {
_qosWill = MQTTSN_FLAG_QOS_1;
- }else if (qos == 2){
+ }
+ else if (qos == 2)
+ {
_qosWill = MQTTSN_FLAG_QOS_2;
}
- if (retain){
+ if (retain)
+ {
_retainWill = MQTTSN_FLAG_RETAIN;
}
}
-void LGwProxy::setWillMsg(const char* willMsg){
+void LGwProxy::setWillMsg(const char* willMsg)
+{
_willMsg = willMsg;
}
-
-void LGwProxy::setCleanSession(bool flg){
- if (flg){
+void LGwProxy::setCleanSession(bool flg)
+{
+ if (flg)
+ {
_cleanSession = MQTTSN_FLAG_CLEAN;
- }else{
+ }
+ else
+ {
_cleanSession = 0;
}
}
-uint16_t LGwProxy::getNextMsgId(void){
+uint16_t LGwProxy::getNextMsgId(void)
+{
_nextMsgId++;
- if (_nextMsgId == 0){
+ if (_nextMsgId == 0)
+ {
_nextMsgId = 1;
}
return _nextMsgId;
}
-void LGwProxy::checkPingReq(void){
+void LGwProxy::checkPingReq(void)
+{
uint8_t msg[2];
msg[0] = 0x02;
msg[1] = MQTTSN_TYPE_PINGREQ;
-
- if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){
+
+ if ((_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP)
+ {
_pingStatus = GW_WAIT_PINGRESP;
_pingRetryCount = MQTTSN_RETRY_COUNT;
- writeMsg((const uint8_t*)msg);
+ writeMsg((const uint8_t*) msg);
_pingSendUTC = time(NULL);
- }else if (_pingStatus == GW_WAIT_PINGRESP){
- if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL)){
- if (--_pingRetryCount > 0){
- writeMsg((const uint8_t*)msg);
+ }
+ else if (_pingStatus == GW_WAIT_PINGRESP)
+ {
+ if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL))
+ {
+ if (--_pingRetryCount > 0)
+ {
+ writeMsg((const uint8_t*) msg);
_pingSendUTC = time(NULL);
- }else{
+ }
+ else
+ {
_status = GW_LOST;
_gwId = 0;
_pingStatus = 0;
@@ -498,8 +619,10 @@ void LGwProxy::checkPingReq(void){
}
}
-void LGwProxy::checkAdvertise(void){
- if ( _gwAliveTimer.isTimeUp()){
+void LGwProxy::checkAdvertise(void)
+{
+ if (_gwAliveTimer.isTimeUp())
+ {
_status = GW_LOST;
_gwId = 0;
_pingStatus = 0;
@@ -509,27 +632,37 @@ void LGwProxy::checkAdvertise(void){
}
}
-LTopicTable* LGwProxy::getTopicTable(void){
+LTopicTable* LGwProxy::getTopicTable(void)
+{
return &_topicTbl;
}
-LRegisterManager* LGwProxy::getRegisterManager(void){
+LRegisterManager* LGwProxy::getRegisterManager(void)
+{
return &_regMgr;
}
-bool LGwProxy::isPingReqRequired(void){
+bool LGwProxy::isPingReqRequired(void)
+{
return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL);
}
-void LGwProxy::setPingReqTimer(void){
+void LGwProxy::setPingReqTimer(void)
+{
_keepAliveTimer.start(_tkeepAlive * 1000UL);
}
-const char* LGwProxy::getClientId(void) {
+const char* LGwProxy::getClientId(void)
+{
return _clientId;
}
-void LGwProxy::setForwarderMode(void)
+void LGwProxy::setForwarderMode(bool valid)
{
- _isForwarderMode = true;
+ _isForwarderMode = valid;
+}
+
+void LGwProxy::setQoSMinus1Mode(bool valid)
+{
+ _isQoSMinus1Mode = valid;
}
diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.h b/MQTTSNGateway/GatewayTester/src/LGwProxy.h
index a34be1f..814affc 100644
--- a/MQTTSNGateway/GatewayTester/src/LGwProxy.h
+++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.h
@@ -65,7 +65,8 @@ public:
void setCleanSession(bool);
void setKeepAliveDuration(uint16_t duration);
void setAdvertiseDuration(uint16_t duration);
- void setForwarderMode(void);
+ void setForwarderMode(bool valid);
+ void setQoSMinus1Mode(bool valid);
void reconnect(void);
int writeMsg(const uint8_t* msg);
void setPingReqTimer(void);
@@ -109,6 +110,7 @@ private:
uint16_t _tSleep;
uint16_t _tWake;
bool _isForwarderMode;
+ bool _isQoSMinus1Mode;
char _msg[MQTTSN_MAX_MSG_LENGTH + 1];
};
diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h
index 9891921..b848e4c 100644
--- a/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h
+++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h
@@ -27,7 +27,6 @@
======================================*/
//#define DEBUG_NW
//#define DEBUG_MQTTSN
-//#define DEBUG_OTA
/****************************************
MQTT-SN Parameters
@@ -105,7 +104,9 @@ typedef enum
#define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0}
#define UDPCONF LUdpConfig theNetcon
#define MQTTSNCONF LMqttsnConfig theMqcon
-#define SetForwarderMode theClient->getGwProxy()->setForwarderMode
+#define SetForwarderMode(...) theClient->getGwProxy()->setForwarderMode(__VA_ARGS__)
+#define SetQoSMinus1Mode(...) theClient->getGwProxy()->setQoSMinus1Mode(__VA_ARGS__)
+
#ifdef CLIENT_MODE
#define DISPLAY(...)
#define PROMPT(...)
@@ -142,6 +143,7 @@ typedef enum
#define QoS0 0
#define QoS1 1
#define QoS2 2
+#define Q0Sm1 3
#define MQTTSN_TYPE_ADVERTISE 0x00
#define MQTTSN_TYPE_SEARCHGW 0x01
#define MQTTSN_TYPE_GWINFO 0x02
@@ -177,7 +179,7 @@ typedef enum
#define MQTTSN_FLAG_QOS_0 0x0
#define MQTTSN_FLAG_QOS_1 0x20
#define MQTTSN_FLAG_QOS_2 0x40
-#define MQTTSN_FLAG_QOS_N1 0xc0
+#define MQTTSN_FLAG_QOS_M1 0x60
#define MQTTSN_FLAG_RETAIN 0x10
#define MQTTSN_FLAG_WILL 0x08
#define MQTTSN_FLAG_CLEAN 0x04
diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
index 4e20c8d..ccdfe62 100644
--- a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
+++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
@@ -75,7 +75,7 @@ void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t
topicType = MQTTSN_TOPIC_TYPE_NORMAL;
}
- if ( qos > 0 )
+ if ( qos > 0 && qos < 3 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}
@@ -428,6 +428,10 @@ PubElement* LPublishManager::add(const char* topicName, uint16_t topicId, uint8_
{
elm->flag |= MQTTSN_FLAG_QOS_2;
}
+ else if (qos == 3)
+ {
+ elm->flag |= MQTTSN_FLAG_QOS_M1;
+ }
if (retain)
{
elm->flag |= MQTTSN_FLAG_RETAIN;
diff --git a/MQTTSNGateway/Makefile b/MQTTSNGateway/Makefile
index cfc9283..f9a2c6f 100644
--- a/MQTTSNGateway/Makefile
+++ b/MQTTSNGateway/Makefile
@@ -11,6 +11,7 @@ CONFIG := gateway.conf
CLIENTS := clients.conf
PREDEFTOPIC := predefinedTopic.conf
FORWARDERS := forwarders.conf
+QOSM1CLIENT := qos-1clients.conf
SRCDIR := src
SUBDIR := ../MQTTSNPacket/src
@@ -39,6 +40,7 @@ $(SRCDIR)/MQTTSNGWPublishHandler.cpp \
$(SRCDIR)/MQTTSNGWSubscribeHandler.cpp \
$(SRCDIR)/MQTTSNGWEncapsulatedPacket.cpp \
$(SRCDIR)/MQTTSNGWForwarder.cpp \
+$(SRCDIR)/MQTTSNGWClientProxy.cpp \
$(SRCDIR)/$(OS)/$(SENSORNET)/SensorNetwork.cpp \
$(SRCDIR)/$(OS)/Timer.cpp \
$(SRCDIR)/$(OS)/Network.cpp \
@@ -141,6 +143,7 @@ install:
cp -pf $(CLIENTS) ../../
cp -pf $(PREDEFTOPIC) ../../
cp -pf $(FORWARDERS) ../../
+ cp -pf $(QOSM1CLIENT) ../../
exectest:
diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md
index 28e149c..2144ec5 100644
--- a/MQTTSNGateway/README.md
+++ b/MQTTSNGateway/README.md
@@ -38,6 +38,9 @@ PredefinedTopicList=/path/to/your_predefinedTopic.conf
Forwarder=NO
ForwardersList=/home/tomoaki/tmp/forwarders.conf
+
+QoS-1=NO
+QoS-1ClientsList=/path/to/your_qos-1clients.conf
#RootCAfile=/path/to/your_Root_CA.crt
#RootCApath=/path/to/your_certs_directory/
@@ -69,6 +72,7 @@ Client should know the MulticastIP and MulticastPortNo to send a SEARCHGW messag
when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No.
When **PredefinedTopic** is YES, Pre-definedTopicID file specified by PredefinedTopicList is effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format.
When **Forwarder** is YES, Forwarder Encapsulation Message is available. Connectable Forwarders are specifed by ForwardersList file. In this file, ForwarderIds and those sensorNet addresses are declared in CSV format.
+When **QoS-1** is YES, QoS-1 PUBLISH Message is available. Clients which allow to send it, are specifed by QoS-1ClientsList file. In this file, ClientsId and those sensorNet addresses are declared in CSV format.
diff --git a/MQTTSNGateway/forwarders.conf b/MQTTSNGateway/forwarders.conf
index 3275924..cfb11dd 100644
--- a/MQTTSNGateway/forwarders.conf
+++ b/MQTTSNGateway/forwarders.conf
@@ -11,6 +11,15 @@
# http://www.eclipse.org/org/documents/edl-v10.php.
#***********************************************************************
#
-# SensorNetwork address format is defined by SensorNetAddress::setAddress(string* data) function.
+# This file declares valid MQTTSNForwarders.
+# Forwarders are defined by ForwarderId same as ClientId and those SensorNetAddress
+# in a CSV format as follow:
+#
+# ForwarderId, SensorNetAddress
#
+# where SensorNetwork address format is defined by SensorNetAddress::setAddress(string* data) function.
+#
+
Forwarder01,172.16.1.7:12002
+
+
diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf
index 10e02a4..a888771 100644
--- a/MQTTSNGateway/gateway.conf
+++ b/MQTTSNGateway/gateway.conf
@@ -26,6 +26,9 @@ PredefinedTopicList=/path/to/your_predefinedTopic.conf
Forwarder=NO
ForwardersList=/path/to/your_forwarers.conf
+QoS-1=NO
+QoS-1ClientsList=/path/to/your_qos-1clients.conf
+
#RootCAfile=/etc/ssl/certs/ca-certificates.crt
#RootCApath=/etc/ssl/certs/
#CertsFile=/path/to/certKey.pem
diff --git a/MQTTSNGateway/predefinedTopic.conf b/MQTTSNGateway/predefinedTopic.conf
index 9b50801..2ad779a 100644
--- a/MQTTSNGateway/predefinedTopic.conf
+++ b/MQTTSNGateway/predefinedTopic.conf
@@ -11,7 +11,26 @@
# http://www.eclipse.org/org/documents/edl-v10.php.
#***********************************************************************
#
-# ClientID, TopicName, TopicID
+# pre-defined-topics are defined by this file.
+# A format of this file is in CSV as follows:
+#
+# ClientID, TopicName, TopicID
+#
+# This file is consist from two sections.
+# One for QoS-1 PUBLISH Clients, the other for another clients.
+
+#
+# pre-defined-topics for QoS-1 clients.
+# ClientIDs should be "ClientProxy"
+#
+
+ClientProxy, ty4tw/proxy/predefTopic1, 1
+ClientProxy, ty4tw/proxy/predefTopic2, 2
+ClientProxy, ty4tw/proxy/predefTopic3, 3
+
+
+#
+# pre-defined-topics for another clients
#
GatewayTestClient,ty4tw/predefinedTopic1, 1
diff --git a/MQTTSNGateway/qos-1clients.conf b/MQTTSNGateway/qos-1clients.conf
new file mode 100644
index 0000000..e21832a
--- /dev/null
+++ b/MQTTSNGateway/qos-1clients.conf
@@ -0,0 +1,27 @@
+#***********************************************************************
+# Copyright (c) 2018, Tomoaki Yamaguchi
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Public License v1.0
+# and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+# The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+# and the Eclipse Distribution License is available at
+# http://www.eclipse.org/org/documents/edl-v10.php.
+#***********************************************************************
+#
+# Clients which send QoS-1 PUBLISH are defined by this file.
+#
+# Clients are defined by the ClientId and those SensorNetAddress
+# in a CSV format as follow:
+#
+# ClientId, SensorNetAddress
+#
+# where SensorNetwork address format is defined by
+# SensorNetAddress::setAddress(string* data) function.
+#
+#
+
+QoS-1_Client,172.16.1.7:12002
+
diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp
index b29fea0..5a2b5a6 100644
--- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp
+++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp
@@ -131,8 +131,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
/* create REGISTER */
MQTTSNPacket* regPacket = new MQTTSNPacket();
- MQTTSNString topicName;
- topicName.cstring = 0;
+ MQTTSNString topicName = MQTTSNString_initializer;
topicName.lenstring.len = topicId.data.long_.len;
topicName.lenstring.data = topicId.data.long_.name;
diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp
index 0a81ef8..124ad58 100644
--- a/MQTTSNGateway/src/MQTTSNGWClient.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp
@@ -19,12 +19,13 @@
#include "MQTTSNGWClient.h"
#include "MQTTSNGateway.h"
#include "SensorNetwork.h"
-#include "MQTTSNGWForwarder.h"
#include "Network.h"
#include
#include
#include
+#include "MQTTSNGWForwarder.h"
+
using namespace MQTTSNGW;
char* currentDateTime(void);
/*=====================================
@@ -80,11 +81,7 @@ bool ClientList::authorize(const char* fileName)
bool secure;
bool stable;
SensorNetAddress netAddr;
- MQTTSNString clientId;
-
- clientId.cstring = 0;
- clientId.lenstring.data = 0;
- clientId.lenstring.len = 0;
+ MQTTSNString clientId = MQTTSNString_initializer;
if ((fp = fopen(fileName, "r")) != 0)
{
@@ -131,13 +128,9 @@ bool ClientList::setPredefinedTopics(const char* fileName)
FILE* fp;
char buf[MAX_CLIENTID_LENGTH + 256];
size_t pos0, pos1;
- MQTTSNString clientId;
+ MQTTSNString clientId = MQTTSNString_initializer;;
bool rc = false;
- clientId.cstring = 0;
- clientId.lenstring.data = 0;
- clientId.lenstring.len = 0;
-
if ((fp = fopen(fileName, "r")) != 0)
{
while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
@@ -300,9 +293,8 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
}
else
{
- MQTTSNString dummyId;
+ MQTTSNString dummyId MQTTSNString_initializer;;
dummyId.cstring = strdup("");
- dummyId.lenstring.len = 0;
client->setClientId(dummyId);
free(dummyId.cstring);
}
@@ -367,6 +359,7 @@ Client* ClientList::createPredefinedTopic( MQTTSNString* clientId, string topicN
// create Topic & Add it
client->getTopics()->add((const char*)topicName.c_str(), topicId);
+ client->_hasPredefTopic = true;
return client;
}
@@ -413,9 +406,11 @@ Client::Client(bool secure)
_prevClient = 0;
_nextClient = 0;
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
+ _proxyPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
_hasPredefTopic = false;
_holdPingRequest = false;
_forwarder = 0;
+ _isProxy = false;
}
Client::~Client()
@@ -485,6 +480,30 @@ int Client::setClientSleepPacket(MQTTGWPacket* packet)
return rc;
}
+MQTTSNPacket* Client::getProxyPacket(void)
+{
+ return _proxyPacketQue.getPacket();
+}
+
+void Client::deleteFirstProxyPacket()
+{
+ _proxyPacketQue.pop();
+}
+
+int Client::setProxyPacket(MQTTSNPacket* packet)
+{
+ int rc = _proxyPacketQue.post(packet);
+ if ( rc )
+ {
+ WRITELOG("%s %s is Disconnected. the packet was saved.\n", currentDateTime(), _clientId);
+ }
+ else
+ {
+ WRITELOG("%s %s is Disconnected and discard the packet.\n", currentDateTime(), _clientId);
+ }
+ return rc;
+}
+
Connect* Client::getConnectData(void)
{
return &_connectData;
@@ -572,7 +591,10 @@ void Client::updateStatus(MQTTSNPacket* packet)
case MQTTSN_PUBCOMP:
case MQTTSN_PUBREL:
case MQTTSN_PUBREC:
- _keepAliveTimer.start(_keepAliveMsec * 1.5);
+ if ( !_isProxy )
+ {
+ _keepAliveTimer.start(_keepAliveMsec * 1.5);
+ }
break;
case MQTTSN_DISCONNECT:
uint16_t duration;
@@ -641,9 +663,14 @@ void Client::disconnected(void)
_waitWillMsgFlg = false;
}
+void Client::tryConnect(void)
+{
+ _status = Cstat_TryConnecting;
+}
+
bool Client::isConnectSendable(void)
{
- if (_status == Cstat_Lost || _status == Cstat_TryConnecting)
+ if ( _status == Cstat_Lost || _status == Cstat_TryConnecting )
{
return false;
}
@@ -703,6 +730,11 @@ void Client::setTopics(Topics* topics)
_topics = topics;
}
+ClientStatus Client::getClientStatus(void)
+{
+ return _status;
+}
+
void Client::setWaitWillMsgFlg(bool flg)
{
_waitWillMsgFlg = flg;
@@ -733,6 +765,11 @@ bool Client::isAwake(void)
return (_status == Cstat_Awake);
}
+bool Client::isConnecting(void)
+{
+ return (_status == Cstat_Connecting);
+}
+
bool Client::isSecureNetwork(void)
{
return _secureNetwork;
@@ -760,10 +797,18 @@ void Client::setClientId(MQTTSNString id)
free(_clientId);
}
- /* save clientId into (char*)_clientId NULL terminated */
- _clientId = (char*)calloc(MQTTSNstrlen(id) + 1, 1);
- unsigned char* ptr = (unsigned char*)_clientId;
- writeMQTTSNString((unsigned char**)&ptr, id);
+ if ( id.cstring )
+ {
+ _clientId = (char*)calloc(strlen(id.cstring) + 1, 1);
+ memcpy(_clientId, id.cstring, strlen(id.cstring));
+ }
+ else
+ {
+ /* save clientId into (char*)_clientId NULL terminated */
+ _clientId = (char*)calloc(MQTTSNstrlen(id) + 1, 1);
+ unsigned char* ptr = (unsigned char*)_clientId;
+ writeMQTTSNString((unsigned char**)&ptr, id);
+ }
}
void Client::setWillTopic(MQTTSNString willTopic)
@@ -812,14 +857,14 @@ const char* Client::getStatus(void)
return theClientStatus[_status];
}
-Client* Client::getOTAClient(void)
+bool Client::isProxy(void)
{
- return _otaClient;
+ return _isProxy;
}
-void Client::setOTAClient(Client* cl)
+void Client::setPorxy(bool isProxy)
{
- _otaClient =cl;
+ _isProxy = isProxy;;
}
void Client::holdPingRequest(void)
diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h
index 8900656..ff67365 100644
--- a/MQTTSNGateway/src/MQTTSNGWClient.h
+++ b/MQTTSNGateway/src/MQTTSNGWClient.h
@@ -253,6 +253,9 @@ public:
TopicIdMapelement* getWaitedSubTopicId(uint16_t msgId);
MQTTGWPacket* getClientSleepPacket(void);
void deleteFirstClientSleepPacket(void);
+
+ MQTTSNPacket* getProxyPacket(void);
+ void deleteFirstProxyPacket(void);
WaitREGACKPacketList* getWaitREGACKPacketList(void);
void eraseWaitedPubTopicId(uint16_t msgId);
@@ -261,6 +264,7 @@ public:
void clearWaitedSubTopicId(void);
int setClientSleepPacket(MQTTGWPacket*);
+ int setProxyPacket(MQTTSNPacket* packet);
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
@@ -271,6 +275,8 @@ public:
void connackSended(int rc);
void disconnected(void);
bool isConnectSendable(void);
+ void tryConnect(void);
+ ClientStatus getClientStatus(void);
uint16_t getNextPacketId(void);
uint8_t getNextSnMsgId(void);
@@ -286,6 +292,9 @@ public:
Forwarder* getForwarder(void);
void setForwarder(Forwarder* forwader);
+ void setPorxy(bool isProxy);
+ bool isProxy(void);
+
void setClientId(MQTTSNString id);
void setWillTopic(MQTTSNString willTopic);
void setWillMsg(MQTTSNString willmsg);
@@ -298,6 +307,7 @@ public:
bool erasable(void);
bool isDisconnect(void);
+ bool isConnecting(void);
bool isActive(void);
bool isSleep(void);
bool isAwake(void);
@@ -310,11 +320,11 @@ public:
bool isHoldPringReqest(void);
Client* getNextClient(void);
- Client* getOTAClient(void);
- void setOTAClient(Client* cl);
private:
PacketQue _clientSleepPacketQue;
+ PacketQue _proxyPacketQue;
+
WaitREGACKPacketList _waitREGACKList;
Topics* _topics;
@@ -345,6 +355,7 @@ private:
SensorNetAddress _sensorNetAddr;
Forwarder* _forwarder;
+ bool _isProxy;
bool _sessionStatus;
diff --git a/MQTTSNGateway/src/MQTTSNGWClientProxy.cpp b/MQTTSNGateway/src/MQTTSNGWClientProxy.cpp
new file mode 100644
index 0000000..d7ac795
--- /dev/null
+++ b/MQTTSNGateway/src/MQTTSNGWClientProxy.cpp
@@ -0,0 +1,252 @@
+/**************************************************************************************
+ * Copyright (c) 2018, Tomoaki Yamaguchi
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
+ **************************************************************************************/
+
+
+#include "MQTTSNGWDefines.h"
+#include "MQTTSNGWClientProxy.h"
+#include "MQTTSNGateway.h"
+#include "SensorNetwork.h"
+#include
+#include
+#include
+
+using namespace MQTTSNGW;
+
+#define RESPONSE_DURATION 900 // Secs
+
+/*
+ * Class ClientProxyElement
+ */
+
+ClientProxyElement::ClientProxyElement(void)
+ : _clientId{0}
+ , _next{0}
+{
+
+}
+
+ClientProxyElement::ClientProxyElement(SensorNetAddress* addr, string* clientId)
+ : _next{0}
+{
+ _clientId = *clientId;
+ _sensorNetAddr = *addr;
+}
+
+ClientProxyElement::~ClientProxyElement(void)
+{
+
+}
+
+/*
+ * Class ClientProxy
+ */
+
+ClientProxy:: ClientProxy(void)
+ : _head{0}
+{
+ _gateway = 0;
+ _client = 0;
+}
+
+ClientProxy:: ClientProxy(Gateway* gw)
+ : _head{0}
+{
+ _gateway = gw;
+ _client = 0;
+}
+
+
+ClientProxy::~ClientProxy(void)
+{
+ if ( _head )
+ {
+ ClientProxyElement* p = _head;
+ while ( p )
+ {
+ ClientProxyElement* next = p->_next;
+ delete p;
+ p = next;
+ }
+ }
+}
+
+void ClientProxy::setGateway(Gateway* gw)
+{
+ _gateway = gw;
+}
+
+ClientProxyElement* ClientProxy::add(SensorNetAddress* addr, string* clientId)
+{
+ ClientProxyElement* elm = new ClientProxyElement(addr, clientId);
+ if ( _head == 0 )
+ {
+ _head = elm;
+ }
+ else
+ {
+ ClientProxyElement* p = _head;
+ while ( p )
+ {
+ if ( p->_next == 0 )
+ {
+ p->_next = elm;
+ break;
+ }
+ else
+ {
+ p = p->_next;
+ }
+ }
+ }
+ return elm;
+}
+
+const char* ClientProxy::getClientId(SensorNetAddress* addr)
+{
+ ClientProxyElement* p = _head;
+ while ( p )
+ {
+ if ( p->_sensorNetAddr.isMatch(addr) )
+ {
+ return p->_clientId.c_str();
+ break;
+ }
+ p = p->_next;
+ }
+ return 0;
+}
+
+void ClientProxy::setClient(Client* client)
+{
+ _client = client;
+}
+
+Client* ClientProxy::getClient(void)
+{
+ return _client;
+}
+
+bool ClientProxy::setClientProxy(const char* fileName)
+{
+ FILE* fp;
+ char buf[MAX_CLIENTID_LENGTH + 256];
+ size_t pos;
+
+ SensorNetAddress netAddr;
+
+ if ((fp = fopen(fileName, "r")) != 0)
+ {
+ while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
+ {
+ if (*buf == '#')
+ {
+ continue;
+ }
+ string data = string(buf);
+ while ((pos = data.find_first_of(" \t\n")) != string::npos)
+ {
+ data.erase(pos, 1);
+ }
+ if (data.empty())
+ {
+ continue;
+ }
+
+ pos = data.find_first_of(",");
+ string id = data.substr(0, pos);
+ string addr = data.substr(pos + 1);
+
+ if (netAddr.setAddress(&addr) == 0)
+ {
+ add(&netAddr, &id);
+ }
+ else
+ {
+ WRITELOG("Invalid address %s\n", data.c_str());
+ return false;
+ }
+ }
+ fclose(fp);
+ }
+ else
+ {
+ WRITELOG("Can not open the QoS_1Client List. %s\n", fileName);
+ return false;
+ }
+ return true;
+}
+
+
+void ClientProxy::checkConnection(void)
+{
+ if ( _client->isDisconnect() || ( _client->isConnecting() && _responseTimer.isTimeup()) )
+ {
+ _client->connectSended();
+ _responseTimer.start(RESPONSE_DURATION * 1000UL);
+ MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer;
+ options.clientID.cstring = _client->getClientId();
+ options.duration = RESPONSE_DURATION;
+
+ MQTTSNPacket* packet = new MQTTSNPacket();
+ packet->setCONNECT(&options);
+ Event* ev = new Event();
+ ev->setClientRecvEvent(_client, packet);
+ _gateway->getPacketEventQue()->post(ev);
+
+ }
+ else if ( _client->isActive() && _keepAliveTimer.isTimeup() )
+ {
+ MQTTSNPacket* packet = new MQTTSNPacket();
+ MQTTSNString clientId = MQTTSNString_initializer;
+ packet->setPINGREQ(&clientId);
+ Event* ev = new Event();
+ ev->setClientRecvEvent(_client, packet);
+ _gateway->getPacketEventQue()->post(ev);
+ resetPingTimer();
+ }
+}
+
+void ClientProxy::resetPingTimer(void)
+{
+ _keepAliveTimer.start(RESPONSE_DURATION * 1000UL);
+}
+
+void ClientProxy::send(MQTTSNPacket* packet)
+{
+ if ( packet->getType() == MQTTSN_CONNACK || packet->getType() == MQTTSN_PINGRESP )
+ {
+ resetPingTimer();
+ sendStoredPublish();
+ }
+ else if ( packet->getType() == MQTTSN_PINGRESP )
+ {
+ resetPingTimer();
+ }
+}
+
+void ClientProxy::sendStoredPublish(void)
+{
+ MQTTSNPacket* msg = 0;
+
+ while ( ( msg = _client->getProxyPacket() ) != 0 )
+ {
+ _client->deleteFirstProxyPacket(); // pop the que to delete element.
+
+ Event* ev = new Event();
+ ev->setClientRecvEvent(_client, msg);
+ _gateway->getPacketEventQue()->post(ev);
+ }
+}
diff --git a/MQTTSNGateway/src/MQTTSNGWClientProxy.h b/MQTTSNGateway/src/MQTTSNGWClientProxy.h
new file mode 100644
index 0000000..43e0acc
--- /dev/null
+++ b/MQTTSNGateway/src/MQTTSNGWClientProxy.h
@@ -0,0 +1,77 @@
+/**************************************************************************************
+ * Copyright (c) 2018, Tomoaki Yamaguchi
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
+ **************************************************************************************/
+
+#ifndef MQTTSNGATEWAY_SRC_MQTTSNGWCLIENTPROXY_H_
+#define MQTTSNGATEWAY_SRC_MQTTSNGWCLIENTPROXY_H_
+
+#include "MQTTSNGateway.h"
+#include "MQTTGWPacket.h"
+#include "MQTTSNGWClient.h"
+#include "SensorNetwork.h"
+#include "MQTTSNGWProcess.h"
+
+
+
+namespace MQTTSNGW
+{
+class Gateway;
+
+class ClientProxyElement
+{
+ friend class ClientProxy;
+public:
+ ClientProxyElement(void);
+ ClientProxyElement(SensorNetAddress* addr, string* clientId);
+ ~ClientProxyElement(void);
+private:
+ SensorNetAddress _sensorNetAddr;
+ string _clientId;
+ ClientProxyElement* _next;
+};
+
+class ClientProxy
+{
+public:
+ ClientProxy(void);
+ ClientProxy(Gateway* gw);
+ ~ClientProxy(void);
+ bool setClientProxy(const char* fileName);
+ ClientProxyElement* add(SensorNetAddress* addr, string* clientId);
+ const char* getClientId(SensorNetAddress* addr);
+ void setClient(Client*);
+ Client* getClient(void);
+ void setGateway(Gateway* gw);
+ void setKeepAlive(uint16_t secs);
+
+ void checkConnection(void);
+ void resetPingTimer(void);
+ void send(MQTTSNPacket* packet);
+
+private:
+ void sendStoredPublish(void);
+
+ Gateway* _gateway;
+ Client* _client;
+ ClientProxyElement* _head;
+ Timer _keepAliveTimer;
+ Timer _responseTimer;
+};
+
+}
+
+
+
+#endif /* MQTTSNGATEWAY_SRC_MQTTSNGWCLIENTPROXY_H_ */
diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp
index 9b74341..c17eaf9 100644
--- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp
@@ -17,10 +17,11 @@
#include "MQTTSNGWClientRecvTask.h"
#include "MQTTSNGateway.h"
#include "MQTTSNPacket.h"
-#include "MQTTSNGWForwarder.h"
#include "MQTTSNGWEncapsulatedPacket.h"
#include
+#include "MQTTSNGWForwarder.h"
+
using namespace MQTTSNGW;
char* currentDateTime(void);
/*=====================================
@@ -91,31 +92,32 @@ void ClientRecvTask::run()
if ( packet->getType() == MQTTSN_SEARCHGW )
{
/* write log and post Event */
- log(0, packet);
+ log(0, packet, 0);
ev = new Event();
ev->setBrodcastEvent(packet);
_gateway->getPacketEventQue()->post(ev);
continue;
}
+
if ( packet->getType() == MQTTSN_ENCAPSULATED )
{
fwd = _gateway->getForwarderList()->getForwarder(_sensorNetwork->getSenderAddress());
if ( fwd == 0 )
{
- log(0, packet);
- WRITELOG("%s Forwarder %s is not authorized.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
+ log(0, packet, 0);
+ WRITELOG("%s Forwarder %s is not authenticated.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
delete packet;
continue;
}
else
{
- MQTTSNString fwdName;
- fwdName.lenstring.data = const_cast( fwd->getName() );
- fwdName.lenstring.len = strlen(fwdName.lenstring.data);
+ MQTTSNString fwdName = MQTTSNString_initializer;
+ fwdName.cstring = const_cast( fwd->getName() );
log(0, packet, &fwdName);
+ /* get the packet from the encapsulation message */
MQTTSNGWEncapsulatedPacket encap;
encap.desirialize(packet->getPacketData(), packet->getPacketLength());
nodeId.setId( encap.getWirelessNodeId() );
@@ -126,15 +128,35 @@ void ClientRecvTask::run()
}
else
{
- /* get client from the ClientList of Gateway by sensorNetAddress. */
- client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress());
+ const char* clientName = _gateway->getClientProxy()->getClientId(_sensorNetwork->getSenderAddress());
+
+ if ( clientName ) // This client is for QoS-1 PUBLISH.
+ {
+ if ( packet->isQoSMinusPUBLISH() )
+ {
+ client = _gateway->getClientProxy()->getClient(); // point to the ClientProxy
+ }
+ else
+ {
+ client = _gateway->getClientProxy()->getClient();
+ log(clientName, packet);
+ WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
+ delete packet;
+ continue;
+ }
+ }
+ else
+ {
+ /* get client from the ClientList of Gateway by sensorNetAddress. */
+ client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress());
+ }
}
if ( client )
{
/* write log and post Event */
- log(client, packet);
+ log(client, packet, 0);
ev = new Event();
ev->setClientRecvEvent(client,packet);
_gateway->getPacketEventQue()->post(ev);
@@ -196,7 +218,7 @@ void ClientRecvTask::run()
}
else
{
- log(client, packet);
+ log(client, packet, 0);
WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
delete packet;
@@ -216,16 +238,22 @@ void ClientRecvTask::run()
void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id)
{
- char pbuf[SIZE_OF_LOG_PACKET * 3];
const char* clientId;
char cstr[MAX_CLIENTID_LENGTH + 1];
- char msgId[6];
if ( id )
{
- memset((void*)cstr, 0, id->lenstring.len + 1);
- strncpy(cstr, id->lenstring.data, id->lenstring.len) ;
- clientId = cstr;
+ if ( id->cstring )
+ {
+ strncpy(cstr, id->cstring, strlen(id->cstring) );
+ clientId = cstr;
+ }
+ else
+ {
+ memset((void*)cstr, 0, id->lenstring.len + 1);
+ strncpy(cstr, id->lenstring.data, id->lenstring.len );
+ clientId = cstr;
+ }
}
else if ( client )
{
@@ -236,40 +264,48 @@ void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id)
clientId = UNKNOWNCL;
}
- switch (packet->getType())
- {
- case MQTTSN_SEARCHGW:
- WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, CLIENT, packet->print(pbuf));
- break;
- case MQTTSN_CONNECT:
- case MQTTSN_PINGREQ:
- WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
- break;
- case MQTTSN_DISCONNECT:
- case MQTTSN_WILLTOPICUPD:
- case MQTTSN_WILLMSGUPD:
- case MQTTSN_WILLTOPIC:
- case MQTTSN_WILLMSG:
- WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
- break;
- case MQTTSN_PUBLISH:
- case MQTTSN_REGISTER:
- case MQTTSN_SUBSCRIBE:
- case MQTTSN_UNSUBSCRIBE:
- WRITELOG(FORMAT_G_MSGID_G_G_NL, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
- break;
- case MQTTSN_REGACK:
- case MQTTSN_PUBACK:
- case MQTTSN_PUBREC:
- case MQTTSN_PUBREL:
- case MQTTSN_PUBCOMP:
- WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
- break;
- case MQTTSN_ENCAPSULATED:
- WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
- break;
- default:
- WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
- break;
- }
+ log(clientId, packet);
+}
+
+void ClientRecvTask::log(const char* clientId, MQTTSNPacket* packet)
+{
+ char pbuf[SIZE_OF_LOG_PACKET * 3];
+ char msgId[6];
+
+ switch (packet->getType())
+ {
+ case MQTTSN_SEARCHGW:
+ WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, CLIENT, packet->print(pbuf));
+ break;
+ case MQTTSN_CONNECT:
+ case MQTTSN_PINGREQ:
+ WRITELOG(FORMAT_Y_G_G_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
+ break;
+ case MQTTSN_DISCONNECT:
+ case MQTTSN_WILLTOPICUPD:
+ case MQTTSN_WILLMSGUPD:
+ case MQTTSN_WILLTOPIC:
+ case MQTTSN_WILLMSG:
+ WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
+ break;
+ case MQTTSN_PUBLISH:
+ case MQTTSN_REGISTER:
+ case MQTTSN_SUBSCRIBE:
+ case MQTTSN_UNSUBSCRIBE:
+ WRITELOG(FORMAT_G_MSGID_G_G_NL, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
+ break;
+ case MQTTSN_REGACK:
+ case MQTTSN_PUBACK:
+ case MQTTSN_PUBREC:
+ case MQTTSN_PUBREL:
+ case MQTTSN_PUBCOMP:
+ WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
+ break;
+ case MQTTSN_ENCAPSULATED:
+ WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
+ break;
+ default:
+ WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
+ break;
+ }
}
diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h
index 295c0ee..b681a24 100644
--- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h
+++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h
@@ -35,8 +35,8 @@ public:
void run();
private:
- void log(Client*, MQTTSNPacket*, MQTTSNString* id = 0);
-
+ void log(Client*, MQTTSNPacket*, MQTTSNString* id);
+ void log(const char* clientId, MQTTSNPacket* packet);
Gateway* _gateway;
SensorNetwork* _sensorNetwork;
};
diff --git a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp
index 87de924..76fe572 100644
--- a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp
@@ -71,6 +71,11 @@ void ClientSendTask::run()
else
{
log(client, packet);
+ if ( client->isProxy() )
+ {
+ _gateway->getClientProxy()->send(packet);
+ continue;
+ }
rc = packet->unicast(_sensorNetwork, client->getSensorNetAddress());
}
}
diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp
index 06b8d43..43aea61 100644
--- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp
@@ -90,7 +90,10 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
//* clear ConnectData of Client */
Connect* connectData = client->getConnectData();
memset(connectData, 0, sizeof(Connect));
- client->disconnected();
+ if ( !client->isProxy() )
+ {
+ client->disconnected();
+ }
Topics* topics = client->getTopics();
@@ -153,7 +156,7 @@ void MQTTSNConnectionHandler::handleWilltopic(Client* client, MQTTSNPacket* pack
{
int willQos;
uint8_t willRetain;
- MQTTSNString willTopic;
+ MQTTSNString willTopic = MQTTSNString_initializer;
if ( packet->getWILLTOPIC(&willQos, &willRetain, &willTopic) == 0 )
{
@@ -187,7 +190,7 @@ void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet
return;
}
- MQTTSNString willmsg;
+ MQTTSNString willmsg = MQTTSNString_initializer;
Connect* connectData = client->getConnectData();
if( client->isConnectSendable() )
diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h
index 8f7f837..7da2109 100644
--- a/MQTTSNGateway/src/MQTTSNGWDefines.h
+++ b/MQTTSNGateway/src/MQTTSNGWDefines.h
@@ -27,6 +27,7 @@ namespace MQTTSNGW
#define CLIENT_LIST "clients.conf"
#define PREDEFINEDTOPIC_FILE "predefinedTopic.conf"
#define FORWARDER_LIST "forwarders.conf"
+#define QOS_1CLIENT_LIST "qos-1clients.conf"
/*==========================================================
* Gateway default parameters
diff --git a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp
index 0bad175..ad1e94b 100644
--- a/MQTTSNGateway/src/MQTTSNGWForwarder.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWForwarder.cpp
@@ -13,9 +13,10 @@
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
-#include
#include "MQTTSNGWForwarder.h"
+#include
+
using namespace MQTTSNGW;
using namespace std;
@@ -155,10 +156,10 @@ Forwarder::~Forwarder(void)
{
if ( _headClient )
{
- ForwardedClient* p = _headClient;
+ ForwarderElement* p = _headClient;
while ( p )
{
- ForwardedClient* next = p->_next;
+ ForwarderElement* next = p->_next;
delete p;
p = next;
}
@@ -172,8 +173,8 @@ const char* Forwarder::getId(void)
void Forwarder::addClient(Client* client, WirelessNodeId* id)
{
- ForwardedClient* p = _headClient;
- ForwardedClient* prev = 0;
+ ForwarderElement* p = _headClient;
+ ForwarderElement* prev = 0;
client->setForwarder(this);
@@ -191,7 +192,7 @@ void Forwarder::addClient(Client* client, WirelessNodeId* id)
}
}
- ForwardedClient* fclient = new ForwardedClient();
+ ForwarderElement* fclient = new ForwarderElement();
fclient->setClient(client);
fclient->setWirelessNodeId(id);
@@ -210,7 +211,7 @@ Client* Forwarder::getClient(WirelessNodeId* id)
{
Client* cl = 0;
_mutex.lock();
- ForwardedClient* p = _headClient;
+ ForwarderElement* p = _headClient;
while ( p )
{
if ( *(p->_wirelessNodeId) == *id )
@@ -236,7 +237,7 @@ WirelessNodeId* Forwarder::getWirelessNodeId(Client* client)
{
WirelessNodeId* nodeId = 0;
_mutex.lock();
- ForwardedClient* p = _headClient;
+ ForwarderElement* p = _headClient;
while ( p )
{
if ( p->_client == client )
@@ -255,9 +256,9 @@ WirelessNodeId* Forwarder::getWirelessNodeId(Client* client)
void Forwarder::eraseClient(Client* client)
{
- ForwardedClient* prev = 0;
+ ForwarderElement* prev = 0;
_mutex.lock();
- ForwardedClient* p = _headClient;
+ ForwarderElement* p = _headClient;
while ( p )
{
@@ -291,14 +292,14 @@ SensorNetAddress* Forwarder::getSensorNetAddr(void)
* Class ForwardedClient
*/
-ForwardedClient::ForwardedClient()
+ForwarderElement::ForwarderElement()
: _client{0}
, _wirelessNodeId{0}
, _next{0}
{
}
-ForwardedClient::~ForwardedClient()
+ForwarderElement::~ForwarderElement()
{
if (_wirelessNodeId)
{
@@ -306,12 +307,12 @@ ForwardedClient::~ForwardedClient()
}
}
-void ForwardedClient::setClient(Client* client)
+void ForwarderElement::setClient(Client* client)
{
_client = client;
}
-void ForwardedClient::setWirelessNodeId(WirelessNodeId* id)
+void ForwarderElement::setWirelessNodeId(WirelessNodeId* id)
{
if ( _wirelessNodeId == 0 )
{
diff --git a/MQTTSNGateway/src/MQTTSNGWForwarder.h b/MQTTSNGateway/src/MQTTSNGWForwarder.h
index db6d099..2c58e66 100644
--- a/MQTTSNGateway/src/MQTTSNGWForwarder.h
+++ b/MQTTSNGateway/src/MQTTSNGWForwarder.h
@@ -28,18 +28,18 @@ namespace MQTTSNGW
class Client;
class WirelessNodeId;
-class ForwardedClient
+class ForwarderElement
{
friend class Forwarder;
public:
- ForwardedClient();
- ~ForwardedClient();
+ ForwarderElement();
+ ~ForwarderElement();
void setClient(Client* client);
void setWirelessNodeId(WirelessNodeId* id);
private:
Client* _client;
WirelessNodeId* _wirelessNodeId;
- ForwardedClient* _next;
+ ForwarderElement* _next;
};
@@ -62,7 +62,7 @@ public:
private:
string _forwarderName;
SensorNetAddress _sensorNetAddr;
- ForwardedClient* _headClient;
+ ForwarderElement* _headClient;
Forwarder* _next;
Mutex _mutex;
};
diff --git a/MQTTSNGateway/src/MQTTSNGWPacket.cpp b/MQTTSNGateway/src/MQTTSNGWPacket.cpp
index 74c00fb..222597f 100644
--- a/MQTTSNGateway/src/MQTTSNGWPacket.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWPacket.cpp
@@ -101,6 +101,17 @@ int MQTTSNPacket::getType(void)
return _buf[p];
}
+bool MQTTSNPacket::isQoSMinusPUBLISH(void)
+{
+ if ( _bufLen == 0 )
+ {
+ return false;;
+ }
+ int value = 0;
+ int p = MQTTSNPacket_decode(_buf, _bufLen, &value);
+ return ( (_buf[p] == MQTTSN_PUBLISH) && ((_buf[p + 1] & 0x60 ) == 0x60 ));
+}
+
unsigned char* MQTTSNPacket::getPacketData(void)
{
return _buf;
@@ -119,7 +130,8 @@ const char* MQTTSNPacket::getName()
int MQTTSNPacket::setADVERTISE(uint8_t gatewayid, uint16_t duration)
{
unsigned char buf[5];
- int len = MQTTSNSerialize_advertise(buf, 5, (unsigned char) gatewayid,
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_advertise(buf, buflen, (unsigned char) gatewayid,
(unsigned short) duration);
return desirialize(buf, len);
}
@@ -127,44 +139,50 @@ int MQTTSNPacket::setADVERTISE(uint8_t gatewayid, uint16_t duration)
int MQTTSNPacket::setGWINFO(uint8_t gatewayId)
{
unsigned char buf[3];
- int len = MQTTSNSerialize_gwinfo(buf, 3, (unsigned char) gatewayId, 0, 0);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_gwinfo(buf, buflen, (unsigned char) gatewayId, 0, 0);
return desirialize(buf, len);
}
int MQTTSNPacket::setConnect(void)
{
unsigned char buf[40];
+ int buflen = sizeof(buf);
MQTTSNPacket_connectData data;
data.clientID.cstring = (char*)"client01";
- int len = MQTTSNSerialize_connect(buf, 40, &data);
+ int len = MQTTSNSerialize_connect(buf, buflen, &data);
return desirialize(buf, len);
}
int MQTTSNPacket::setCONNACK(uint8_t returnCode)
{
unsigned char buf[3];
- int len = MQTTSNSerialize_connack(buf, 3, (int) returnCode);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_connack(buf, buflen, (int) returnCode);
return desirialize(buf, len);
}
int MQTTSNPacket::setWILLTOPICREQ(void)
{
unsigned char buf[2];
- int len = MQTTSNSerialize_willtopicreq(buf, 2);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_willtopicreq(buf, buflen);
return desirialize(buf, len);
}
int MQTTSNPacket::setWILLMSGREQ(void)
{
unsigned char buf[2];
- int len = MQTTSNSerialize_willmsgreq(buf, 2);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_willmsgreq(buf, buflen);
return desirialize(buf, len);
}
int MQTTSNPacket::setREGISTER(uint16_t topicId, uint16_t msgId, MQTTSNString* topicName)
{
unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
- int len = MQTTSNSerialize_register(buf, MQTTSNGW_MAX_PACKET_SIZE, (unsigned short) topicId, (unsigned short) msgId,
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_register(buf, buflen, (unsigned short) topicId, (unsigned short) msgId,
topicName);
return desirialize(buf, len);
}
@@ -172,7 +190,8 @@ int MQTTSNPacket::setREGISTER(uint16_t topicId, uint16_t msgId, MQTTSNString* to
int MQTTSNPacket::setREGACK(uint16_t topicId, uint16_t msgId, uint8_t returnCode)
{
unsigned char buf[7];
- int len = MQTTSNSerialize_regack(buf, 7, (unsigned short) topicId, (unsigned short) msgId,
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_regack(buf, buflen, (unsigned short) topicId, (unsigned short) msgId,
(unsigned char) returnCode);
return desirialize(buf, len);
}
@@ -181,7 +200,8 @@ int MQTTSNPacket::setPUBLISH(uint8_t dup, int qos, uint8_t retained, uint16_t ms
uint8_t* payload, uint16_t payloadlen)
{
unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
- int len = MQTTSNSerialize_publish(buf, MQTTSNGW_MAX_PACKET_SIZE, (unsigned char) dup, qos, (unsigned char) retained,
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_publish(buf, buflen, (unsigned char) dup, qos, (unsigned char) retained,
(unsigned short) msgId, topic, (unsigned char*) payload, (int) payloadlen);
return desirialize(buf, len);
}
@@ -189,7 +209,8 @@ int MQTTSNPacket::setPUBLISH(uint8_t dup, int qos, uint8_t retained, uint16_t ms
int MQTTSNPacket::setPUBACK(uint16_t topicId, uint16_t msgId, uint8_t returnCode)
{
unsigned char buf[7];
- int len = MQTTSNSerialize_puback(buf, 7, (unsigned short) topicId, (unsigned short) msgId,
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_puback(buf, buflen, (unsigned short) topicId, (unsigned short) msgId,
(unsigned char) returnCode);
return desirialize(buf, len);
}
@@ -197,28 +218,32 @@ int MQTTSNPacket::setPUBACK(uint16_t topicId, uint16_t msgId, uint8_t returnCode
int MQTTSNPacket::setPUBREC(uint16_t msgId)
{
unsigned char buf[4];
- int len = MQTTSNSerialize_pubrec(buf, 4, (unsigned short) msgId);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_pubrec(buf, buflen, (unsigned short) msgId);
return desirialize(buf, len);
}
int MQTTSNPacket::setPUBREL(uint16_t msgId)
{
unsigned char buf[4];
- int len = MQTTSNSerialize_pubrel(buf, 4, (unsigned short) msgId);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_pubrel(buf, buflen, (unsigned short) msgId);
return desirialize(buf, len);
}
int MQTTSNPacket::setPUBCOMP(uint16_t msgId)
{
unsigned char buf[4];
- int len = MQTTSNSerialize_pubcomp(buf, 4, (unsigned short) msgId);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_pubcomp(buf, buflen, (unsigned short) msgId);
return desirialize(buf, len);
}
int MQTTSNPacket::setSUBACK(int qos, uint16_t topicId, uint16_t msgId, uint8_t returnCode)
{
unsigned char buf[8];
- int len = MQTTSNSerialize_suback(buf, 8, qos, (unsigned short) topicId,
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_suback(buf, buflen, qos, (unsigned short) topicId,
(unsigned short) msgId, (unsigned char) returnCode);
return desirialize(buf, len);
}
@@ -226,38 +251,59 @@ int MQTTSNPacket::setSUBACK(int qos, uint16_t topicId, uint16_t msgId, uint8_t r
int MQTTSNPacket::setUNSUBACK(uint16_t msgId)
{
unsigned char buf[4];
- int len = MQTTSNSerialize_unsuback(buf, 4, (unsigned short) msgId);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_unsuback(buf, buflen, (unsigned short) msgId);
return desirialize(buf, len);
}
int MQTTSNPacket::setPINGRESP(void)
{
unsigned char buf[32];
- int len = MQTTSNSerialize_pingresp(buf, 32);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_pingresp(buf, buflen);
return desirialize(buf, len);
}
int MQTTSNPacket::setDISCONNECT(uint16_t duration)
{
unsigned char buf[4];
- int len = MQTTSNSerialize_disconnect(buf, 4, (int) duration);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_disconnect(buf, buflen, (int) duration);
return desirialize(buf, len);
}
int MQTTSNPacket::setWILLTOPICRESP(uint8_t returnCode)
{
unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
- int len = MQTTSNSerialize_willtopicresp(buf, MQTTSNGW_MAX_PACKET_SIZE, (int) returnCode);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_willtopicresp(buf, buflen, (int) returnCode);
return desirialize(buf, len);
}
int MQTTSNPacket::setWILLMSGRESP(uint8_t returnCode)
{
unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
- int len = MQTTSNSerialize_willmsgresp(buf, MQTTSNGW_MAX_PACKET_SIZE, (int) returnCode);
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_willmsgresp(buf, buflen, (int) returnCode);
return desirialize(buf, len);
}
+int MQTTSNPacket::setCONNECT(MQTTSNPacket_connectData* options)
+{
+ unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_connect(buf, buflen, options);
+ return desirialize(buf, len);
+}
+
+int MQTTSNPacket::setPINGREQ(MQTTSNString* clientId)
+{
+ unsigned char buf[MQTTSNGW_MAX_PACKET_SIZE];
+ int buflen = sizeof(buf);
+ int len = MQTTSNSerialize_pingreq( buf, buflen, *clientId);
+ return desirialize(buf, len);
+}
+
int MQTTSNPacket::getSERCHGW(uint8_t* radius)
{
return MQTTSNDeserialize_searchgw((unsigned char*) radius, (unsigned char*) _buf, _bufLen);
diff --git a/MQTTSNGateway/src/MQTTSNGWPacket.h b/MQTTSNGateway/src/MQTTSNGWPacket.h
index 5cd7578..22e3bd7 100644
--- a/MQTTSNGateway/src/MQTTSNGWPacket.h
+++ b/MQTTSNGateway/src/MQTTSNGWPacket.h
@@ -59,6 +59,9 @@ public:
int setWILLTOPICRESP(uint8_t returnCode);
int setWILLMSGRESP(uint8_t returnCode);
+ int setCONNECT(MQTTSNPacket_connectData* options);
+ int setPINGREQ(MQTTSNString* clientId);
+
int getSERCHGW(uint8_t* radius);
int getCONNECT(MQTTSNPacket_connectData* option);
int getCONNACK(uint8_t* returnCode);
@@ -76,6 +79,8 @@ public:
int getDISCONNECT(uint16_t* duration);
int getWILLTOPICUPD(uint8_t* willQoS, uint8_t* willRetain, MQTTSNString* willTopic);
int getWILLMSGUPD(MQTTSNString* willMsg);
+
+ bool isQoSMinusPUBLISH(void);
char* getMsgId(char* buf);
char* print(char* buf);
diff --git a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp
index 02dd9ab..e1c5e09 100644
--- a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp
@@ -113,6 +113,9 @@ void PacketHandleTask::run()
_mqttsnConnection->sendADVERTISE();
_advertiseTimer.start(_gateway->getGWParams()->keepAlive * 1000UL);
}
+
+ /*------ Check ClientProxy to Connect or send PINGREQ ------*/
+ _gateway->getClientProxy()->checkConnection();
}
/*------ Handle SEARCHGW Message ---------*/
diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp
index 4f0adc4..dac2040 100644
--- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp
+++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp
@@ -43,18 +43,26 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
uint8_t* payload;
MQTTSN_topicid topicid;
int payloadlen;
- Publish pub;
+ Publish pub = {0, 0, 0, 0, 0, 0};
+
char shortTopic[2];
if ( !client->isActive() )
{
- /* Reply DISCONNECT to the client */
- Event* ev = new Event();
- MQTTSNPacket* disconnect = new MQTTSNPacket();
- disconnect->setDISCONNECT(0);
- ev->setClientSendEvent(client, disconnect);
- _gateway->getClientSendQue()->post(ev);
- return;
+ if ( client->isProxy() )
+ {
+ client->setProxyPacket(packet);
+ }
+ else
+ {
+ /* Reply DISCONNECT to the client */
+ Event* ev = new Event();
+ MQTTSNPacket* disconnect = new MQTTSNPacket();
+ disconnect->setDISCONNECT(0);
+ ev->setClientSendEvent(client, disconnect);
+ _gateway->getClientSendQue()->post(ev);
+ }
+ return;
}
if ( packet->getPUBLISH(&dup, &qos, &retained, &msgId, &topicid, &payload, &payloadlen) ==0 )
@@ -63,7 +71,7 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
}
pub.msgId = msgId;
pub.header.bits.dup = dup;
- pub.header.bits.qos = qos;
+ pub.header.bits.qos = ( qos == 3 ? 0 : qos );
pub.header.bits.retain = retained;
Topic* topic = 0;
@@ -79,7 +87,13 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
{
topic = client->getTopics()->getTopicById(&topicid);
- if( !topic && msgId && qos > 0 )
+ if( !topic && qos == 3 )
+ {
+ WRITELOG("%s Invali TopicId.%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
+ return;
+ }
+
+ if( !topic && msgId && qos > 0 && qos < 3 )
{
/* Reply PubAck with INVALID_TOPIC_ID to the client */
MQTTSNPacket* pubAck = new MQTTSNPacket();
@@ -96,7 +110,7 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
}
}
/* Save a msgId & a TopicId pare for PUBACK */
- if( msgId && qos > 0 )
+ if( msgId && qos > 0 && qos < 3)
{
client->setWaitedPubTopicId(msgId, topicid.data.id, topicid.type);
}
@@ -161,7 +175,7 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
{
uint16_t id;
uint16_t msgId;
- MQTTSNString topicName;
+ MQTTSNString topicName = MQTTSNString_initializer;;
MQTTSN_topicid topicid;
if ( client->isActive() || client->isAwake())
diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp
index c6029ce..ac47ff7 100644
--- a/MQTTSNGateway/src/MQTTSNGateway.cpp
+++ b/MQTTSNGateway/src/MQTTSNGateway.cpp
@@ -30,6 +30,7 @@ Gateway::Gateway()
{
theMultiTaskProcess = this;
theProcess = this;
+ _clientProxy = new ClientProxy(this);
_params.loginId = 0;
_params.password = 0;
_params.keepAlive = 0;
@@ -48,6 +49,7 @@ Gateway::Gateway()
_params.configName = 0;
_params.predefinedTopicFileName = 0;
_params.forwarderListName = 0;
+ _params.qosMinusClientListName = 0;
_packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
}
@@ -109,12 +111,21 @@ Gateway::~Gateway()
{
free(_params.forwarderListName);
}
+ if ( _params.qosMinusClientListName )
+ {
+ free(_params.qosMinusClientListName);
+ }
+ if ( _clientProxy )
+ {
+ delete _clientProxy;
+ }
}
void Gateway::initialize(int argc, char** argv)
{
char param[MQTTSNGW_PARAM_MAX];
string fileName;
+ bool secure = false;
MultiTaskProcess::initialize(argc, argv);
resetRingBuffer();
@@ -201,6 +212,7 @@ void Gateway::initialize(int argc, char** argv)
{
if (!strcasecmp(param, "YES"))
{
+ secure = true;
if (getParam("ClientsList", param) == 0)
{
fileName = string(param);
@@ -218,7 +230,33 @@ void Gateway::initialize(int argc, char** argv)
}
}
+ /* Set ClientProxy's Client */
+ MQTTSNString id = MQTTSNString_initializer;
+ id.cstring = const_cast(CLIENTPROXY);
+ Client* client = _clientList.createClient(0, &id, true, secure);
+ _clientProxy->setClient(client);
+ client->setPorxy(true);
+ _clientProxy->setGateway(this);
+ if (getParam("QoS-1", param) == 0 )
+ {
+ if (!strcasecmp(param, "YES") )
+ {
+ if (getParam("QoS-1ClientsList", param) == 0)
+ {
+ fileName = string(param);
+ }
+ else
+ {
+ fileName = *getConfigDirName() + string(QOS_1CLIENT_LIST);
+ }
+ if ( !_clientProxy->setClientProxy(fileName.c_str()) )
+ {
+ throw Exception("Gateway::initialize: No QoS-1ClientsList file defined by the configuration..");
+ }
+ _params.qosMinusClientListName = strdup(fileName.c_str());
+ }
+ }
if (getParam("PredefinedTopic", param) == 0 )
{
@@ -238,38 +276,31 @@ void Gateway::initialize(int argc, char** argv)
}
_params.predefinedTopicFileName = strdup(fileName.c_str());
}
- else
- {
- _params.predefinedTopicFileName = 0;
- }
}
if (getParam("Forwarder", param) == 0 )
- {
- if (!strcasecmp(param, "YES") )
- {
- if (getParam("ForwardersList", param) == 0)
- {
- fileName = string(param);
- }
- else
- {
- fileName = *getConfigDirName() + string(FORWARDER_LIST);
- }
- if ( !_forwarderList.setFowerder(fileName.c_str()) )
- {
- throw Exception("Gateway::initialize: No ForwardersList file defined by the configuration..");
- }
- _params.forwarderListName = strdup(fileName.c_str());
- }
- else
- {
- _params.forwarderListName = 0;
- }
- }
+ {
+ if (!strcasecmp(param, "YES") )
+ {
+ if (getParam("ForwardersList", param) == 0)
+ {
+ fileName = string(param);
+ }
+ else
+ {
+ fileName = *getConfigDirName() + string(FORWARDER_LIST);
+ }
+ if ( !_forwarderList.setFowerder(fileName.c_str()) )
+ {
+ throw Exception("Gateway::initialize: No ForwardersList file defined by the configuration..");
+ }
+ _params.forwarderListName = strdup(fileName.c_str());
+ }
+ }
fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str());
+
}
void Gateway::run(void)
@@ -295,6 +326,10 @@ void Gateway::run(void)
if ( _params.forwarderListName )
{
WRITELOG(" Forwarders: %s\n", _params.forwarderListName);
+ }
+ if ( _params.qosMinusClientListName )
+ {
+ WRITELOG(" QoS-1: %s\n", _params.qosMinusClientListName);
}
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
@@ -302,7 +337,7 @@ void Gateway::run(void)
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
WRITELOG(" CertKey: %s\n", _params.certKey);
- WRITELOG(" PrivateKey: %s\n", _params.privateKey);
+ WRITELOG(" PrivateKey: %s\n\n\n", _params.privateKey);
MultiTaskProcess::run();
@@ -364,6 +399,11 @@ GatewayParams* Gateway::getGWParams(void)
return &_params;
}
+ClientProxy* Gateway::getClientProxy(void)
+{
+ return _clientProxy;
+}
+
/*=====================================
Class EventQue
=====================================*/
diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h
index d219d6f..dbe6ae4 100644
--- a/MQTTSNGateway/src/MQTTSNGateway.h
+++ b/MQTTSNGateway/src/MQTTSNGateway.h
@@ -17,10 +17,12 @@
#define MQTTSNGATEWAY_H_
#include "MQTTSNGWClient.h"
-#include "MQTTSNGWForwarder.h"
#include "MQTTSNGWProcess.h"
#include "MQTTSNPacket.h"
+#include "MQTTSNGWForwarder.h"
+#include "MQTTSNGWClientProxy.h"
+
namespace MQTTSNGW
{
/*=================================
@@ -43,6 +45,7 @@ namespace MQTTSNGW
#define CLIENT "Client"
#define CLIENTS "Clients"
#define UNKNOWNCL "Unknown Client !"
+#define CLIENTPROXY "ClientProxy"
#define LEFTARROW "<---"
#define RIGHTARROW "--->"
@@ -156,11 +159,14 @@ typedef struct
char* privateKey;
char* predefinedTopicFileName;
char* forwarderListName;
+ char* qosMinusClientListName;
}GatewayParams;
/*=====================================
Class Gateway
=====================================*/
+class ClientProxy;
+
class Gateway: public MultiTaskProcess{
public:
Gateway();
@@ -176,9 +182,11 @@ public:
SensorNetwork* getSensorNetwork(void);
LightIndicator* getLightIndicator(void);
GatewayParams* getGWParams(void);
+ ClientProxy* getClientProxy(void);
private:
ClientList _clientList;
+ ClientProxy* _clientProxy;
ForwarderList _forwarderList;
EventQue _packetEventQue;
EventQue _brokerSendQue;