From bc731210aeeb7c080c3428a881fdeddaa0f1dc8b Mon Sep 17 00:00:00 2001 From: tomoaki Date: Sun, 27 Aug 2017 15:59:31 +0900 Subject: [PATCH] BugFix of #76 and #77 1.Return CONNACK instead of the broker when the gateway receives CONNECT while the client is Sleep or Awake mode. 2.Define the max size of a que for PUBLISH while the client state is Asleep mode. Despose packets when the que is full of packets. 3.Return PUBACK or PUBREL to the broker when the client is Asleep or Awake. Signed-off-by: tomoaki Signed-off-by: tomoaki --- .../GatewayTester/samples/mainTest.cpp | 29 ++-- MQTTSNGateway/GatewayTester/src/LGwProxy.cpp | 57 +++++-- MQTTSNGateway/GatewayTester/src/LGwProxy.h | 4 +- .../GatewayTester/src/LMqttsnClient.cpp | 9 +- .../GatewayTester/src/LMqttsnClient.h | 1 + .../GatewayTester/src/LNetworkUdp.cpp | 1 - .../GatewayTester/src/LPublishManager.cpp | 2 +- MQTTSNGateway/GatewayTester/src/LScreen.cpp | 1 - .../GatewayTester/src/LSubscribeManager.cpp | 2 +- MQTTSNGateway/GatewayTester/src/LTimer.cpp | 15 +- MQTTSNGateway/GatewayTester/src/LTimer.h | 1 + MQTTSNGateway/src/MQTTGWConnectionHandler.cpp | 8 - MQTTSNGateway/src/MQTTGWPacket.cpp | 17 +++ MQTTSNGateway/src/MQTTGWPacket.h | 1 + MQTTSNGateway/src/MQTTGWPublishHandler.cpp | 139 +++++++++--------- MQTTSNGateway/src/MQTTSNGWClient.cpp | 66 +++++---- MQTTSNGateway/src/MQTTSNGWClient.h | 21 ++- .../src/MQTTSNGWConnectionHandler.cpp | 29 +++- MQTTSNGateway/src/MQTTSNGWDefines.h | 2 + MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp | 87 ++++++----- MQTTSNGateway/src/linux/Timer.h | 4 +- MQTTSNGateway/src/linux/udp/SensorNetwork.cpp | 1 - .../src/linux/udp6/SensorNetwork.cpp | 1 - .../src/linux/xbee/SensorNetwork.cpp | 2 - 24 files changed, 303 insertions(+), 197 deletions(-) diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp index 38e82dd..cbd7cd8 100644 --- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp +++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp @@ -44,26 +44,26 @@ extern int run(void); * */ /*------------------------------------------------------ - * UDP Configuration + * UDP Configuration (theNetcon) *------------------------------------------------------*/ UDPCONF = { - "GatewayTester", // ClientId + "GatewayTestClient", // ClientId {225,1,1,1}, // Multicast group IP 1883, // Multicast group Port 20001, // Local PortNo }; /*------------------------------------------------------ - * Client Configuration + * Client Configuration (theMqcon) *------------------------------------------------------*/ MQTTSNCONF = { - 300, //KeepAlive (seconds) - true, //Clean session - 0, //Sleep duration in msecs - "willTopic", //WillTopic - "willMessage", //WillMessage - 0, //WillQos - false //WillRetain + 60, //KeepAlive [seconds] + true, //Clean session + 300, //Sleep duration [seconds] + "", //WillTopic + "", //WillMessage + 0, //WillQos + false //WillRetain }; /*------------------------------------------------------ @@ -162,6 +162,11 @@ void disconnect(void) DISCONNECT(0); } +void asleep(void) +{ + DISCONNECT(theMqcon.sleepDuration); +} + /*------------------------------------------------------ * A List of Test functions *------------------------------------------------------*/ @@ -175,7 +180,9 @@ TEST_LIST = {// e.g. TEST( Label, Test), TEST("Step6:Publish topic2", publishTopic2), TEST("Step7:subscribe again", subscribechangeCallback), TEST("Step8:Publish topic2", publishTopic2), - TEST("Step9:Disconnect", disconnect), + TEST("Step9:Sleep ", asleep), + TEST("Step10:Publish topic1", publishTopic1), + TEST("Step11:Disconnect", disconnect), END_OF_TEST_LIST }; diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp index 47cc88a..7f5add1 100644 --- a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp +++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp @@ -55,6 +55,8 @@ LGwProxy::LGwProxy(){ _cleanSession = 0; _pingStatus = 0; _connectRetry = MQTTSN_RETRY_COUNT; + _tSleep = 0; + _tWake = 0; } LGwProxy::~LGwProxy(){ @@ -91,7 +93,7 @@ void LGwProxy::connect(){ strcpy(pos,_willTopic); // WILLTOPIC _status = GW_WAIT_WILLMSGREQ; writeGwMsg(); - }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED){ + }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){ uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId)); *pos++ = 6 + clientIdLen; *pos++ = MQTTSN_TYPE_CONNECT; @@ -105,7 +107,7 @@ void LGwProxy::connect(){ strncpy(pos, _clientId, clientIdLen); _msg[ 6 + clientIdLen] = 0; _status = GW_WAIT_CONNACK; - if (_willMsg && _willTopic){ + if ( _willMsg && _willTopic && _status != GW_SLEPT ){ if (strlen(_willMsg) && strlen(_willTopic)){ _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT _status = GW_WAIT_WILLTOPICREQ; @@ -163,10 +165,14 @@ int LGwProxy::getConnectResponce(void){ if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){ _status = GW_CONNECTED; _connectRetry = MQTTSN_RETRY_COUNT; - _keepAliveTimer.start(_tkeepAlive * 1000); - _topicTbl.clearTopic(); - DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); - theClient->onConnect(); // SUBSCRIBEs are conducted + setPingReqTimer(); + if ( _tSleep ){ + _tSleep = 0; + }else{ + DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); + _topicTbl.clearTopic(); + theClient->onConnect(); // SUBSCRIBEs are conducted + } }else{ _status = GW_CONNECTING; } @@ -182,16 +188,18 @@ void LGwProxy::reconnect(void){ void LGwProxy::disconnect(uint16_t secs){ _tSleep = secs; - _status = GW_DISCONNECTING; - + _tWake = 0; + _msg[1] = MQTTSN_TYPE_DISCONNECT; if (secs){ _msg[0] = 4; setUint16((uint8_t*) _msg + 2, secs); + _status = GW_SLEEPING; }else{ _msg[0] = 2; _keepAliveTimer.stop(); + _status = GW_DISCONNECTING; } _retryCount = MQTTSN_RETRY_COUNT; @@ -223,9 +231,13 @@ int LGwProxy::getDisconnectResponce(void){ } return 0; }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ - if (_tSleep){ - _status = GW_SLEEPING; - _keepAliveTimer.start(_tSleep); + if (_status == GW_SLEEPING ){ + _status = GW_SLEPT; + uint32_t remain = _keepAliveTimer.getRemain(); + theClient->setSleepMode(remain); + + /* Wake up and starts from this point. */ + }else{ _status = GW_DISCONNECTED; } @@ -279,7 +291,18 @@ int LGwProxy::getMessage(void){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){ if (_pingStatus == GW_WAIT_PINGRESP){ _pingStatus = 0; - resetPingReqTimer(); + setPingReqTimer(); + + if ( _tSleep > 0 ){ + _tWake += _tkeepAlive; + if ( _tWake < _tSleep ){ + theClient->setSleepMode(_tkeepAlive * 1000UL); + }else{ + DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n"); + _tWake = 0; + connect(); + } + } } }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ _status = GW_LOST; @@ -408,7 +431,7 @@ void LGwProxy::checkPingReq(void){ msg[0] = 0x02; msg[1] = MQTTSN_TYPE_PINGREQ; - if (_status == GW_CONNECTED && _keepAliveTimer.isTimeUp() && _pingStatus != GW_WAIT_PINGRESP){ + if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){ _pingStatus = GW_WAIT_PINGRESP; _pingRetryCount = MQTTSN_RETRY_COUNT; @@ -449,8 +472,12 @@ LRegisterManager* LGwProxy::getRegisterManager(void){ return &_regMgr; } -void LGwProxy::resetPingReqTimer(void){ - _keepAliveTimer.start(_tkeepAlive * 1000); +bool LGwProxy::isPingReqRequired(void){ + return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL); +} + +void LGwProxy::setPingReqTimer(void){ + _keepAliveTimer.start(_tkeepAlive * 1000UL); } const char* LGwProxy::getClientId(void) { diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.h b/MQTTSNGateway/GatewayTester/src/LGwProxy.h index acbd37a..41d6078 100644 --- a/MQTTSNGateway/GatewayTester/src/LGwProxy.h +++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.h @@ -66,7 +66,7 @@ public: void setAdvertiseDuration(uint16_t duration); void reconnect(void); int writeMsg(const uint8_t* msg); - void resetPingReqTimer(void); + void setPingReqTimer(void); uint16_t getNextMsgId(); LTopicTable* getTopicTable(void); LRegisterManager* getRegisterManager(void); @@ -78,6 +78,7 @@ private: void checkAdvertise(void); int getConnectResponce(void); int getDisconnectResponce(void); + bool isPingReqRequired(void); LNetwork _network; uint8_t* _mqttsnMsg; @@ -103,6 +104,7 @@ private: LTimer _gwAliveTimer; LTimer _keepAliveTimer; uint16_t _tSleep; + uint16_t _tWake; char _msg[MQTTSN_MAX_MSG_LENGTH + 1]; }; diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp index 1779930..857af17 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp @@ -198,12 +198,17 @@ void LMqttsnClient::run() { _gwProxy.connect(); _taskMgr.run(); - sleep(); +} + +void LMqttsnClient::setSleepMode(uint32_t duration) +{ + // ToDo: set WDT and sleep mode + DISPLAY("\033[0m\033[0;32m\n\n Get into SLEEP mode %u [msec].\033[0m\033[0;37m\n\n", duration); } void LMqttsnClient::sleep(void) { - + disconnect(_sleepDuration); } void LMqttsnClient::setSleepDuration(uint32_t duration) diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h index 8bdd70e..cf93048 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h @@ -59,6 +59,7 @@ public: void run(void); void addTask(bool test); void setSleepDuration(uint32_t duration); + void setSleepMode(uint32_t duration); void sleep(void); const char* getClientId(void); LGwProxy* getGwProxy(void); diff --git a/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp b/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp index 9fc8a1f..b61cf84 100644 --- a/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp +++ b/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include "LNetworkUdp.h" diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp index 94d0886..d59dc02 100644 --- a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp @@ -150,7 +150,7 @@ void LPublishManager::sendPublish(PubElement* elm) memcpy(msg + org + 7, elm->payload, elm->payloadlen); theClient->getGwProxy()->writeMsg(msg); - theClient->getGwProxy()->resetPingReqTimer(); + theClient->getGwProxy()->setPingReqTimer(); if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0) { DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName); diff --git a/MQTTSNGateway/GatewayTester/src/LScreen.cpp b/MQTTSNGateway/GatewayTester/src/LScreen.cpp index f5a3fb8..5336a78 100644 --- a/MQTTSNGateway/GatewayTester/src/LScreen.cpp +++ b/MQTTSNGateway/GatewayTester/src/LScreen.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include "LScreen.h" diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp index d0c13bf..4424155 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp @@ -142,7 +142,7 @@ void LSubscribeManager::send(SubElement* elm) theClient->getGwProxy()->connect(); theClient->getGwProxy()->writeMsg(msg); - theClient->getGwProxy()->resetPingReqTimer(); + theClient->getGwProxy()->setPingReqTimer(); elm->sendUTC = time(NULL); elm->retryCount--; } diff --git a/MQTTSNGateway/GatewayTester/src/LTimer.cpp b/MQTTSNGateway/GatewayTester/src/LTimer.cpp index dbc28d3..781693b 100644 --- a/MQTTSNGateway/GatewayTester/src/LTimer.cpp +++ b/MQTTSNGateway/GatewayTester/src/LTimer.cpp @@ -16,7 +16,6 @@ #include #include - #include "LMqttsnClientApp.h" #include "LTimer.h" @@ -63,3 +62,17 @@ void LTimer::stop(){ _millis = 0; } +uint32_t LTimer::getRemain(void) +{ + struct timeval curTime; + uint32_t secs, usecs; + if (_millis <= 0){ + return 0; + }else{ + gettimeofday(&curTime, 0); + secs = (curTime.tv_sec - _startTime.tv_sec) * 1000; + usecs = (curTime.tv_usec - _startTime.tv_usec) / 1000.0; + secs = _millis - (secs + usecs); + return secs; + } +} diff --git a/MQTTSNGateway/GatewayTester/src/LTimer.h b/MQTTSNGateway/GatewayTester/src/LTimer.h index 27d6b5e..cb44cc0 100644 --- a/MQTTSNGateway/GatewayTester/src/LTimer.h +++ b/MQTTSNGateway/GatewayTester/src/LTimer.h @@ -35,6 +35,7 @@ public: bool isTimeUp(void); void stop(void); void changeUTC(void){}; + uint32_t getRemain(void); static void setUnixTime(uint32_t utc){}; private: struct timeval _startTime; diff --git a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp index 8440142..dd351ae 100644 --- a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp @@ -76,14 +76,6 @@ void MQTTGWConnectionHandler::handleConnack(Client* client, MQTTGWPacket* packet ev1->setClientSendEvent(client, snPacket); client->connackSended(rc); // update the client's status _gateway->getClientSendQue()->post(ev1); - - MQTTSNPacket* sleepPacket = 0; - while ( (sleepPacket = client->getClientSleepPacket()) ) - { - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, sleepPacket); - _gateway->getClientSendQue()->post(ev1); - } } void MQTTGWConnectionHandler::handlePingresp(Client* client, MQTTGWPacket* packet) diff --git a/MQTTSNGateway/src/MQTTGWPacket.cpp b/MQTTSNGateway/src/MQTTGWPacket.cpp index 0f87572..8bba53f 100644 --- a/MQTTSNGateway/src/MQTTGWPacket.cpp +++ b/MQTTSNGateway/src/MQTTGWPacket.cpp @@ -543,3 +543,20 @@ char* MQTTGWPacket::print(char* pbuf) return ptr; } +MQTTGWPacket& MQTTGWPacket::operator =(MQTTGWPacket& packet) +{ + clearData(); + this->_header.byte = packet._header.byte; + this->_remainingLength = packet._remainingLength; + _data = (unsigned char*)calloc(_remainingLength, 1); + if (_data) + { + memcpy(this->_data, packet._data, _remainingLength); + } + else + { + clearData(); + } + return *this; +} + diff --git a/MQTTSNGateway/src/MQTTGWPacket.h b/MQTTSNGateway/src/MQTTGWPacket.h index ecba6bb..458dd4d 100644 --- a/MQTTSNGateway/src/MQTTGWPacket.h +++ b/MQTTSNGateway/src/MQTTGWPacket.h @@ -205,6 +205,7 @@ public: int setUNSUBSCRIBE(const char* topics, unsigned short msgid); char* getMsgId(char* buf); char* print(char* buf); + MQTTGWPacket& operator =(MQTTGWPacket& packet); private: void clearData(void); diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index 67d3df7..cfd6772 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -36,9 +36,39 @@ MQTTGWPublishHandler::~MQTTGWPublishHandler() void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) { - if ( !client->isActive() && !client->isSleep() ) + if ( !client->isActive() && !client->isSleep() && !client->isAwake()) { - WRITELOG(" The client is neither active nor sleep %s\n", client->getStatus()); + WRITELOG("%s The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER); + return; + } + + /* client is sleeping. save PUBLISH */ + if ( client->isSleep() ) + { + Publish pub; + packet->getPUBLISH(&pub); + + WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), + RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); + + if (pub.header.bits.qos == 1) + { + replyACK(client, &pub, PUBACK); + } + else if ( pub.header.bits.qos == 2) + { + replyACK(client, &pub, PUBREC); + } + + MQTTGWPacket* msg = new MQTTGWPacket(); + *msg = *packet; + if ( msg->getType() == 0 ) + { + WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); + delete msg; + return; + } + client->setClientSleepPacket(msg); return; } @@ -102,19 +132,10 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) uint16_t regackMsgId = client->getNextSnMsgId(); regPacket->setREGISTER(id, regackMsgId, &topicName); - if (client->isSleep()) - { - client->setClientSleepPacket(regPacket); - WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(), - RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved."); - } - else if (client->isActive()) - { - /* send REGISTER */ - Event* evrg = new Event(); - evrg->setClientSendEvent(client, regPacket); - _gateway->getClientSendQue()->post(evrg); - } + /* send REGISTER */ + Event* evrg = new Event(); + evrg->setClientSendEvent(client, regPacket); + _gateway->getClientSendQue()->post(evrg); /* send PUBLISH */ topicId.data.id = id; @@ -125,47 +146,18 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) } else { - WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n"); + WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); delete snPacket; return; } } } - /* TopicId was acquired. */ - if (client->isSleep()) - { - /* client is sleeping. save PUBLISH */ - client->setClientSleepPacket(snPacket); - WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), - RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); - int type = 0; - if (pub.header.bits.qos == 1) - { - type = PUBACK; - } - else if ( pub.header.bits.qos == 2) - { - WRITELOG(" While Client is sleeping, QoS2 is not supported.\n"); - type = PUBREC; - } - replyACK(client, &pub, type); - pub.header.bits.qos = 0; - replyACK(client, &pub, PUBACK); - pub.msgId = 0; - snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, - (uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, - pub.payloadlen); - client->setClientSleepPacket(snPacket); - } - else if (client->isActive()) - { - snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, - (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, snPacket); - _gateway->getClientSendQue()->post(ev1); - } + snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, + (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); + Event* ev1 = new Event(); + ev1->setClientSendEvent(client, snPacket); + _gateway->getClientSendQue()->post(ev1); } @@ -201,22 +193,37 @@ void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int t { Ack ack; packet->getAck(&ack); - MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); - if (type == PUBREC) - { - mqttsnPacket->setPUBREC((uint16_t) ack.msgId); - } - else if (type == PUBREL) - { - mqttsnPacket->setPUBREL((uint16_t) ack.msgId); - } - else if (type == PUBCOMP) - { - mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId); - } - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, mqttsnPacket); - _gateway->getClientSendQue()->post(ev1); + if ( client->isActive() || client->isAwake() ) + { + MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); + if (type == PUBREC) + { + mqttsnPacket->setPUBREC((uint16_t) ack.msgId); + } + else if (type == PUBREL) + { + mqttsnPacket->setPUBREL((uint16_t) ack.msgId); + } + else if (type == PUBCOMP) + { + mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId); + } + + Event* ev1 = new Event(); + ev1->setClientSendEvent(client, mqttsnPacket); + _gateway->getClientSendQue()->post(ev1); + } + else if ( client->isSleep() ) + { + if (type == PUBREL) + { + MQTTGWPacket* pubComp = new MQTTGWPacket(); + pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId); + Event* ev1 = new Event(); + ev1->setBrokerSendEvent(client, pubComp); + _gateway->getBrokerSendQue()->post(ev1); + } + } } diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 93bee22..d31e3f0 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -23,7 +23,7 @@ #include using namespace MQTTSNGW; - +char* currentDateTime(void); /*===================================== Class ClientList =====================================*/ @@ -185,7 +185,6 @@ Client* ClientList::getClient(uint8_t* clientId) while (client != 0) { - //printf("ClientList: clientId = %s\n", client->getClientId()); if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 ) { _mutex.unlock(); @@ -259,7 +258,7 @@ bool ClientList::isAuthorized() /*===================================== Class Client =====================================*/ -static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Awake", "Asleep", "Lost" }; +static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Asleep", "Awake", "Lost" }; Client::Client(bool secure) { @@ -288,6 +287,7 @@ Client::Client(bool secure) _otaClient = 0; _prevClient = 0; _nextClient = 0; + _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); } Client::~Client() @@ -335,11 +335,30 @@ uint16_t Client::getWaitedSubTopicId(uint16_t msgId) return _waitedSubTopicIdMap.getTopicId(msgId, &type); } -MQTTSNPacket* Client::getClientSleepPacket() +MQTTGWPacket* Client::getClientSleepPacket() { return _clientSleepPacketQue.getPacket(); } +void Client::deleteFirstClientSleepPacket() +{ + _clientSleepPacketQue.pop(); +} + +int Client::setClientSleepPacket(MQTTGWPacket* packet) +{ + int rc = _clientSleepPacketQue.post(packet); + if ( rc ) + { + WRITELOG("%s %s is sleeping. the packet was saved.\n", currentDateTime(), _clientId); + } + else + { + WRITELOG("%s %s is sleeping but discard the packet.\n", currentDateTime(), _clientId); + } + return rc; +} + Connect* Client::getConnectData(void) { return &_connectData; @@ -374,12 +393,6 @@ void Client::setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicT _waitedSubTopicIdMap.add(msgId, topicId, type); } -void Client::setClientSleepPacket(MQTTSNPacket* packet) -{ - updateStatus(packet); - _clientSleepPacketQue.post(packet); -} - bool Client::checkTimeover(void) { return (_status == Cstat_Active && _keepAliveTimer.isTimeup()); @@ -426,51 +439,34 @@ void Client::updateStatus(MQTTSNPacket* packet) _keepAliveTimer.start(_keepAliveMsec * 1.5); break; case MQTTSN_DISCONNECT: - { uint16_t duration; packet->getDISCONNECT(&duration); if (duration) { _status = Cstat_Asleep; - _keepAliveMsec = duration * 1000UL; } else { disconnected(); } - } break; default: break; } - } - else if (_status == Cstat_Asleep) - { - if (packet->getType() == MQTTSN_CONNECT) - { - setKeepAlive(packet); - _status = Cstat_Connecting; - } - else if (packet->getType() == MQTTSN_PINGREQ) - { - if ( packet->getPINGREQ() > 0 ) - { - _status = Cstat_Awake; - } - } - } - else if (_status == Cstat_Awake) + else if (_status == Cstat_Awake || _status == Cstat_Asleep) { switch (packet->getType()) { case MQTTSN_CONNECT: - _status = Cstat_Connecting; - setKeepAlive(packet); + _status = Cstat_Active; break; case MQTTSN_DISCONNECT: disconnected(); break; + case MQTTSN_PINGREQ: + _status = Cstat_Awake; + break; case MQTTSN_PINGRESP: _status = Cstat_Asleep; break; @@ -478,6 +474,7 @@ void Client::updateStatus(MQTTSNPacket* packet) break; } } + DEBUGLOG("Client Status = %s\n", theClientStatus[_status]); } void Client::updateStatus(ClientStatus stat) @@ -595,6 +592,11 @@ bool Client::isSleep(void) return (_status == Cstat_Asleep); } +bool Client::isAwake(void) +{ + return (_status == Cstat_Awake); +} + bool Client::isSecureNetwork(void) { return _secureNetwork; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index 76c5c11..690f98b 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -43,6 +43,7 @@ public: _que = new Que; } + ~PacketQue() { clear(); @@ -65,11 +66,14 @@ public: } } - void post(T* packet) + int + post(T* packet) { + int rc; _mutex.lock(); - _que->post(packet); + rc = _que->post(packet); _mutex.unlock(); + return rc; } void pop() @@ -93,6 +97,11 @@ public: _mutex.unlock(); } + void setMaxSize(int size) + { + _que->setMaxSize(size); + } + private: Que* _que; Mutex _mutex; @@ -232,7 +241,8 @@ public: Connect* getConnectData(void); uint16_t getWaitedPubTopicId(uint16_t msgId); uint16_t getWaitedSubTopicId(uint16_t msgId); - MQTTSNPacket* getClientSleepPacket(); + MQTTGWPacket* getClientSleepPacket(void); + void deleteFirstClientSleepPacket(void); WaitREGACKPacketList* getWaitREGACKPacketList(void); void eraseWaitedPubTopicId(uint16_t msgId); @@ -240,7 +250,7 @@ public: void clearWaitedPubTopicId(void); void clearWaitedSubTopicId(void); - void setClientSleepPacket(MQTTSNPacket*); + int setClientSleepPacket(MQTTGWPacket*); void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); @@ -277,6 +287,7 @@ public: bool isDisconnect(void); bool isActive(void); bool isSleep(void); + bool isAwake(void); bool isSecureNetwork(void); bool isSensorNetStable(void); bool isWaitWillMsg(void); @@ -286,7 +297,7 @@ public: void setOTAClient(Client* cl); private: - PacketQue _clientSleepPacketQue; + PacketQue _clientSleepPacketQue; WaitREGACKPacketList _waitREGACKList; Topics* _topics; diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index 9512ad5..9cf5ccc 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -74,7 +74,18 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet return; } - /* clear ConnectData of Client */ + /* return CONNACK when the client is sleeping */ + if ( client->isSleep() || client->isAwake() ) + { + MQTTSNPacket* packet = new MQTTSNPacket(); + packet->setCONNACK(MQTTSN_RC_ACCEPTED); + Event* ev = new Event(); + ev->setClientSendEvent(client, packet); + _gateway->getClientSendQue()->post(ev); + return; + } + + //* clear ConnectData of Client */ Connect* connectData = client->getConnectData(); memset(connectData, 0, sizeof(Connect)); client->disconnected(); @@ -125,7 +136,6 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet } else { - /* CONNECT message was not qued in. * create CONNECT message & send it to the broker */ MQTTGWPacket* mqMsg = new MQTTGWPacket(); @@ -259,6 +269,21 @@ void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* pac */ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet) { + MQTTGWPacket* msg = 0; + + if ( client->isSleep() || client->isAwake() ) + { + while ( ( msg = client->getClientSleepPacket() ) != 0 ) + { + // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. + client->deleteFirstClientSleepPacket(); // pop the que to delete element. + + Event* ev = new Event(); + ev->setBrokerRecvEvent(client, msg); + _gateway->getPacketEventQue()->post(ev); + } + } + /* send PINGREQ to the broker */ MQTTGWPacket* pingreq = new MQTTGWPacket(); pingreq->setHeader(PINGREQ); diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index 96f73f6..a21f6de 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -19,6 +19,7 @@ namespace MQTTSNGW { +#define DEBUG /*================================= * Config Parametrs ==================================*/ @@ -38,6 +39,7 @@ namespace MQTTSNGW #define MAX_CLIENTS (100) // Number of Clients can be handled. #define MAX_CLIENTID_LENGTH (64) // Max length of clientID #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages +#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen) #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index b4e091e..52dff3e 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -178,27 +178,26 @@ void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet) uint16_t msgId; uint8_t rc; - if ( !client->isActive() ) + if ( client->isActive() ) { - return; - } - MQTTGWPacket* pubAck = new MQTTGWPacket(); + MQTTGWPacket* pubAck = new MQTTGWPacket(); - if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) - { - return; - } + if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) + { + return; + } - if ( rc == MQTTSN_RC_ACCEPTED) - { - pubAck->setAck(PUBACK, msgId); - Event* ev1 = new Event(); - ev1->setBrokerSendEvent(client, pubAck); - _gateway->getBrokerSendQue()->post(ev1); - } - else if ( rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID) - { - WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId); + if ( rc == MQTTSN_RC_ACCEPTED) + { + pubAck->setAck(PUBACK, msgId); + Event* ev1 = new Event(); + ev1->setBrokerSendEvent(client, pubAck); + _gateway->getBrokerSendQue()->post(ev1); + } + else if ( rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID) + { + WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId); + } } } @@ -206,19 +205,18 @@ void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8 { uint16_t msgId; - if ( !client->isActive() ) + if ( client->isActive() ) { - return; + if ( packet->getACK(&msgId) == 0 ) + { + return; + } + MQTTGWPacket* ackPacket = new MQTTGWPacket(); + ackPacket->setAck(packetType, msgId); + Event* ev1 = new Event(); + ev1->setBrokerSendEvent(client, ackPacket); + _gateway->getBrokerSendQue()->post(ev1); } - if ( packet->getACK(&msgId) == 0 ) - { - return; - } - MQTTGWPacket* ackPacket = new MQTTGWPacket(); - ackPacket->setAck(packetType, msgId); - Event* ev1 = new Event(); - ev1->setBrokerSendEvent(client, ackPacket); - _gateway->getBrokerSendQue()->post(ev1); } void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet) @@ -229,24 +227,23 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet) MQTTSN_topicid topicid; - if ( !client->isActive() ) + if ( client->isActive() || client->isAwake()) { - return; - } - MQTTSNPacket* regAck = new MQTTSNPacket(); - if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) - { - return; - } + MQTTSNPacket* regAck = new MQTTSNPacket(); + if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) + { + return; + } - topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; - topicid.data.long_.len = topicName.lenstring.len; - topicid.data.long_.name = topicName.lenstring.data; + topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; + topicid.data.long_.len = topicName.lenstring.len; + topicid.data.long_.name = topicName.lenstring.data; - id = client->getTopics()->add(&topicid)->getTopicId(); - regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED); - Event* ev = new Event(); - ev->setClientSendEvent(client, regAck); - _gateway->getClientSendQue()->post(ev); + id = client->getTopics()->add(&topicid)->getTopicId(); + regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED); + Event* ev = new Event(); + ev->setClientSendEvent(client, regAck); + _gateway->getClientSendQue()->post(ev); + } } diff --git a/MQTTSNGateway/src/linux/Timer.h b/MQTTSNGateway/src/linux/Timer.h index 43885b1..7935f4f 100644 --- a/MQTTSNGateway/src/linux/Timer.h +++ b/MQTTSNGateway/src/linux/Timer.h @@ -16,8 +16,10 @@ #ifndef MQTTSNGATEWAY_SRC_LINUX_TIMER_H_ #define MQTTSNGATEWAY_SRC_LINUX_TIMER_H_ +#include +#include #include "MQTTSNGWDefines.h" -#include + namespace MQTTSNGW { /*========================================================== diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp index 18eace4..6f08481 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include diff --git a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp index 07f5ad3..3ff93fd 100644 --- a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp index 1a806a7..65b9946 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp @@ -18,10 +18,8 @@ #include #include #include -#include #include #include - #include "SensorNetwork.h" #include "MQTTSNGWProcess.h"