diff --git a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp index 38e82dd..cbd7cd8 100644 --- a/MQTTSNGateway/GatewayTester/samples/mainTest.cpp +++ b/MQTTSNGateway/GatewayTester/samples/mainTest.cpp @@ -44,26 +44,26 @@ extern int run(void); * */ /*------------------------------------------------------ - * UDP Configuration + * UDP Configuration (theNetcon) *------------------------------------------------------*/ UDPCONF = { - "GatewayTester", // ClientId + "GatewayTestClient", // ClientId {225,1,1,1}, // Multicast group IP 1883, // Multicast group Port 20001, // Local PortNo }; /*------------------------------------------------------ - * Client Configuration + * Client Configuration (theMqcon) *------------------------------------------------------*/ MQTTSNCONF = { - 300, //KeepAlive (seconds) - true, //Clean session - 0, //Sleep duration in msecs - "willTopic", //WillTopic - "willMessage", //WillMessage - 0, //WillQos - false //WillRetain + 60, //KeepAlive [seconds] + true, //Clean session + 300, //Sleep duration [seconds] + "", //WillTopic + "", //WillMessage + 0, //WillQos + false //WillRetain }; /*------------------------------------------------------ @@ -162,6 +162,11 @@ void disconnect(void) DISCONNECT(0); } +void asleep(void) +{ + DISCONNECT(theMqcon.sleepDuration); +} + /*------------------------------------------------------ * A List of Test functions *------------------------------------------------------*/ @@ -175,7 +180,9 @@ TEST_LIST = {// e.g. TEST( Label, Test), TEST("Step6:Publish topic2", publishTopic2), TEST("Step7:subscribe again", subscribechangeCallback), TEST("Step8:Publish topic2", publishTopic2), - TEST("Step9:Disconnect", disconnect), + TEST("Step9:Sleep ", asleep), + TEST("Step10:Publish topic1", publishTopic1), + TEST("Step11:Disconnect", disconnect), END_OF_TEST_LIST }; diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp index 47cc88a..7f5add1 100644 --- a/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp +++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.cpp @@ -55,6 +55,8 @@ LGwProxy::LGwProxy(){ _cleanSession = 0; _pingStatus = 0; _connectRetry = MQTTSN_RETRY_COUNT; + _tSleep = 0; + _tWake = 0; } LGwProxy::~LGwProxy(){ @@ -91,7 +93,7 @@ void LGwProxy::connect(){ strcpy(pos,_willTopic); // WILLTOPIC _status = GW_WAIT_WILLMSGREQ; writeGwMsg(); - }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED){ + }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){ uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId)); *pos++ = 6 + clientIdLen; *pos++ = MQTTSN_TYPE_CONNECT; @@ -105,7 +107,7 @@ void LGwProxy::connect(){ strncpy(pos, _clientId, clientIdLen); _msg[ 6 + clientIdLen] = 0; _status = GW_WAIT_CONNACK; - if (_willMsg && _willTopic){ + if ( _willMsg && _willTopic && _status != GW_SLEPT ){ if (strlen(_willMsg) && strlen(_willTopic)){ _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT _status = GW_WAIT_WILLTOPICREQ; @@ -163,10 +165,14 @@ int LGwProxy::getConnectResponce(void){ if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){ _status = GW_CONNECTED; _connectRetry = MQTTSN_RETRY_COUNT; - _keepAliveTimer.start(_tkeepAlive * 1000); - _topicTbl.clearTopic(); - DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); - theClient->onConnect(); // SUBSCRIBEs are conducted + setPingReqTimer(); + if ( _tSleep ){ + _tSleep = 0; + }else{ + DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); + _topicTbl.clearTopic(); + theClient->onConnect(); // SUBSCRIBEs are conducted + } }else{ _status = GW_CONNECTING; } @@ -182,16 +188,18 @@ void LGwProxy::reconnect(void){ void LGwProxy::disconnect(uint16_t secs){ _tSleep = secs; - _status = GW_DISCONNECTING; - + _tWake = 0; + _msg[1] = MQTTSN_TYPE_DISCONNECT; if (secs){ _msg[0] = 4; setUint16((uint8_t*) _msg + 2, secs); + _status = GW_SLEEPING; }else{ _msg[0] = 2; _keepAliveTimer.stop(); + _status = GW_DISCONNECTING; } _retryCount = MQTTSN_RETRY_COUNT; @@ -223,9 +231,13 @@ int LGwProxy::getDisconnectResponce(void){ } return 0; }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ - if (_tSleep){ - _status = GW_SLEEPING; - _keepAliveTimer.start(_tSleep); + if (_status == GW_SLEEPING ){ + _status = GW_SLEPT; + uint32_t remain = _keepAliveTimer.getRemain(); + theClient->setSleepMode(remain); + + /* Wake up and starts from this point. */ + }else{ _status = GW_DISCONNECTED; } @@ -279,7 +291,18 @@ int LGwProxy::getMessage(void){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){ if (_pingStatus == GW_WAIT_PINGRESP){ _pingStatus = 0; - resetPingReqTimer(); + setPingReqTimer(); + + if ( _tSleep > 0 ){ + _tWake += _tkeepAlive; + if ( _tWake < _tSleep ){ + theClient->setSleepMode(_tkeepAlive * 1000UL); + }else{ + DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n"); + _tWake = 0; + connect(); + } + } } }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ _status = GW_LOST; @@ -408,7 +431,7 @@ void LGwProxy::checkPingReq(void){ msg[0] = 0x02; msg[1] = MQTTSN_TYPE_PINGREQ; - if (_status == GW_CONNECTED && _keepAliveTimer.isTimeUp() && _pingStatus != GW_WAIT_PINGRESP){ + if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){ _pingStatus = GW_WAIT_PINGRESP; _pingRetryCount = MQTTSN_RETRY_COUNT; @@ -449,8 +472,12 @@ LRegisterManager* LGwProxy::getRegisterManager(void){ return &_regMgr; } -void LGwProxy::resetPingReqTimer(void){ - _keepAliveTimer.start(_tkeepAlive * 1000); +bool LGwProxy::isPingReqRequired(void){ + return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL); +} + +void LGwProxy::setPingReqTimer(void){ + _keepAliveTimer.start(_tkeepAlive * 1000UL); } const char* LGwProxy::getClientId(void) { diff --git a/MQTTSNGateway/GatewayTester/src/LGwProxy.h b/MQTTSNGateway/GatewayTester/src/LGwProxy.h index acbd37a..41d6078 100644 --- a/MQTTSNGateway/GatewayTester/src/LGwProxy.h +++ b/MQTTSNGateway/GatewayTester/src/LGwProxy.h @@ -66,7 +66,7 @@ public: void setAdvertiseDuration(uint16_t duration); void reconnect(void); int writeMsg(const uint8_t* msg); - void resetPingReqTimer(void); + void setPingReqTimer(void); uint16_t getNextMsgId(); LTopicTable* getTopicTable(void); LRegisterManager* getRegisterManager(void); @@ -78,6 +78,7 @@ private: void checkAdvertise(void); int getConnectResponce(void); int getDisconnectResponce(void); + bool isPingReqRequired(void); LNetwork _network; uint8_t* _mqttsnMsg; @@ -103,6 +104,7 @@ private: LTimer _gwAliveTimer; LTimer _keepAliveTimer; uint16_t _tSleep; + uint16_t _tWake; char _msg[MQTTSN_MAX_MSG_LENGTH + 1]; }; diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp index 1779930..857af17 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp @@ -198,12 +198,17 @@ void LMqttsnClient::run() { _gwProxy.connect(); _taskMgr.run(); - sleep(); +} + +void LMqttsnClient::setSleepMode(uint32_t duration) +{ + // ToDo: set WDT and sleep mode + DISPLAY("\033[0m\033[0;32m\n\n Get into SLEEP mode %u [msec].\033[0m\033[0;37m\n\n", duration); } void LMqttsnClient::sleep(void) { - + disconnect(_sleepDuration); } void LMqttsnClient::setSleepDuration(uint32_t duration) diff --git a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h index 8bdd70e..cf93048 100644 --- a/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h +++ b/MQTTSNGateway/GatewayTester/src/LMqttsnClient.h @@ -59,6 +59,7 @@ public: void run(void); void addTask(bool test); void setSleepDuration(uint32_t duration); + void setSleepMode(uint32_t duration); void sleep(void); const char* getClientId(void); LGwProxy* getGwProxy(void); diff --git a/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp b/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp index 9fc8a1f..b61cf84 100644 --- a/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp +++ b/MQTTSNGateway/GatewayTester/src/LNetworkUdp.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include "LNetworkUdp.h" diff --git a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp index 94d0886..d59dc02 100644 --- a/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp @@ -150,7 +150,7 @@ void LPublishManager::sendPublish(PubElement* elm) memcpy(msg + org + 7, elm->payload, elm->payloadlen); theClient->getGwProxy()->writeMsg(msg); - theClient->getGwProxy()->resetPingReqTimer(); + theClient->getGwProxy()->setPingReqTimer(); if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0) { DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName); diff --git a/MQTTSNGateway/GatewayTester/src/LScreen.cpp b/MQTTSNGateway/GatewayTester/src/LScreen.cpp index f5a3fb8..5336a78 100644 --- a/MQTTSNGateway/GatewayTester/src/LScreen.cpp +++ b/MQTTSNGateway/GatewayTester/src/LScreen.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include "LScreen.h" diff --git a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp index d0c13bf..4424155 100644 --- a/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp +++ b/MQTTSNGateway/GatewayTester/src/LSubscribeManager.cpp @@ -142,7 +142,7 @@ void LSubscribeManager::send(SubElement* elm) theClient->getGwProxy()->connect(); theClient->getGwProxy()->writeMsg(msg); - theClient->getGwProxy()->resetPingReqTimer(); + theClient->getGwProxy()->setPingReqTimer(); elm->sendUTC = time(NULL); elm->retryCount--; } diff --git a/MQTTSNGateway/GatewayTester/src/LTimer.cpp b/MQTTSNGateway/GatewayTester/src/LTimer.cpp index dbc28d3..781693b 100644 --- a/MQTTSNGateway/GatewayTester/src/LTimer.cpp +++ b/MQTTSNGateway/GatewayTester/src/LTimer.cpp @@ -16,7 +16,6 @@ #include #include - #include "LMqttsnClientApp.h" #include "LTimer.h" @@ -63,3 +62,17 @@ void LTimer::stop(){ _millis = 0; } +uint32_t LTimer::getRemain(void) +{ + struct timeval curTime; + uint32_t secs, usecs; + if (_millis <= 0){ + return 0; + }else{ + gettimeofday(&curTime, 0); + secs = (curTime.tv_sec - _startTime.tv_sec) * 1000; + usecs = (curTime.tv_usec - _startTime.tv_usec) / 1000.0; + secs = _millis - (secs + usecs); + return secs; + } +} diff --git a/MQTTSNGateway/GatewayTester/src/LTimer.h b/MQTTSNGateway/GatewayTester/src/LTimer.h index 27d6b5e..cb44cc0 100644 --- a/MQTTSNGateway/GatewayTester/src/LTimer.h +++ b/MQTTSNGateway/GatewayTester/src/LTimer.h @@ -35,6 +35,7 @@ public: bool isTimeUp(void); void stop(void); void changeUTC(void){}; + uint32_t getRemain(void); static void setUnixTime(uint32_t utc){}; private: struct timeval _startTime; diff --git a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp index 8440142..dd351ae 100644 --- a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp @@ -76,14 +76,6 @@ void MQTTGWConnectionHandler::handleConnack(Client* client, MQTTGWPacket* packet ev1->setClientSendEvent(client, snPacket); client->connackSended(rc); // update the client's status _gateway->getClientSendQue()->post(ev1); - - MQTTSNPacket* sleepPacket = 0; - while ( (sleepPacket = client->getClientSleepPacket()) ) - { - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, sleepPacket); - _gateway->getClientSendQue()->post(ev1); - } } void MQTTGWConnectionHandler::handlePingresp(Client* client, MQTTGWPacket* packet) diff --git a/MQTTSNGateway/src/MQTTGWPacket.cpp b/MQTTSNGateway/src/MQTTGWPacket.cpp index 0f87572..8bba53f 100644 --- a/MQTTSNGateway/src/MQTTGWPacket.cpp +++ b/MQTTSNGateway/src/MQTTGWPacket.cpp @@ -543,3 +543,20 @@ char* MQTTGWPacket::print(char* pbuf) return ptr; } +MQTTGWPacket& MQTTGWPacket::operator =(MQTTGWPacket& packet) +{ + clearData(); + this->_header.byte = packet._header.byte; + this->_remainingLength = packet._remainingLength; + _data = (unsigned char*)calloc(_remainingLength, 1); + if (_data) + { + memcpy(this->_data, packet._data, _remainingLength); + } + else + { + clearData(); + } + return *this; +} + diff --git a/MQTTSNGateway/src/MQTTGWPacket.h b/MQTTSNGateway/src/MQTTGWPacket.h index ecba6bb..458dd4d 100644 --- a/MQTTSNGateway/src/MQTTGWPacket.h +++ b/MQTTSNGateway/src/MQTTGWPacket.h @@ -205,6 +205,7 @@ public: int setUNSUBSCRIBE(const char* topics, unsigned short msgid); char* getMsgId(char* buf); char* print(char* buf); + MQTTGWPacket& operator =(MQTTGWPacket& packet); private: void clearData(void); diff --git a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp index 67d3df7..cfd6772 100644 --- a/MQTTSNGateway/src/MQTTGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWPublishHandler.cpp @@ -36,9 +36,39 @@ MQTTGWPublishHandler::~MQTTGWPublishHandler() void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) { - if ( !client->isActive() && !client->isSleep() ) + if ( !client->isActive() && !client->isSleep() && !client->isAwake()) { - WRITELOG(" The client is neither active nor sleep %s\n", client->getStatus()); + WRITELOG("%s The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER); + return; + } + + /* client is sleeping. save PUBLISH */ + if ( client->isSleep() ) + { + Publish pub; + packet->getPUBLISH(&pub); + + WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), + RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); + + if (pub.header.bits.qos == 1) + { + replyACK(client, &pub, PUBACK); + } + else if ( pub.header.bits.qos == 2) + { + replyACK(client, &pub, PUBREC); + } + + MQTTGWPacket* msg = new MQTTGWPacket(); + *msg = *packet; + if ( msg->getType() == 0 ) + { + WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); + delete msg; + return; + } + client->setClientSleepPacket(msg); return; } @@ -102,19 +132,10 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) uint16_t regackMsgId = client->getNextSnMsgId(); regPacket->setREGISTER(id, regackMsgId, &topicName); - if (client->isSleep()) - { - client->setClientSleepPacket(regPacket); - WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(), - RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved."); - } - else if (client->isActive()) - { - /* send REGISTER */ - Event* evrg = new Event(); - evrg->setClientSendEvent(client, regPacket); - _gateway->getClientSendQue()->post(evrg); - } + /* send REGISTER */ + Event* evrg = new Event(); + evrg->setClientSendEvent(client, regPacket); + _gateway->getClientSendQue()->post(evrg); /* send PUBLISH */ topicId.data.id = id; @@ -125,47 +146,18 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) } else { - WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n"); + WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER); delete snPacket; return; } } } - /* TopicId was acquired. */ - if (client->isSleep()) - { - /* client is sleeping. save PUBLISH */ - client->setClientSleepPacket(snPacket); - WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), - RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); - int type = 0; - if (pub.header.bits.qos == 1) - { - type = PUBACK; - } - else if ( pub.header.bits.qos == 2) - { - WRITELOG(" While Client is sleeping, QoS2 is not supported.\n"); - type = PUBREC; - } - replyACK(client, &pub, type); - pub.header.bits.qos = 0; - replyACK(client, &pub, PUBACK); - pub.msgId = 0; - snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, - (uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, - pub.payloadlen); - client->setClientSleepPacket(snPacket); - } - else if (client->isActive()) - { - snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, - (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, snPacket); - _gateway->getClientSendQue()->post(ev1); - } + snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, + (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); + Event* ev1 = new Event(); + ev1->setClientSendEvent(client, snPacket); + _gateway->getClientSendQue()->post(ev1); } @@ -201,22 +193,37 @@ void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int t { Ack ack; packet->getAck(&ack); - MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); - if (type == PUBREC) - { - mqttsnPacket->setPUBREC((uint16_t) ack.msgId); - } - else if (type == PUBREL) - { - mqttsnPacket->setPUBREL((uint16_t) ack.msgId); - } - else if (type == PUBCOMP) - { - mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId); - } - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, mqttsnPacket); - _gateway->getClientSendQue()->post(ev1); + if ( client->isActive() || client->isAwake() ) + { + MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); + if (type == PUBREC) + { + mqttsnPacket->setPUBREC((uint16_t) ack.msgId); + } + else if (type == PUBREL) + { + mqttsnPacket->setPUBREL((uint16_t) ack.msgId); + } + else if (type == PUBCOMP) + { + mqttsnPacket->setPUBCOMP((uint16_t) ack.msgId); + } + + Event* ev1 = new Event(); + ev1->setClientSendEvent(client, mqttsnPacket); + _gateway->getClientSendQue()->post(ev1); + } + else if ( client->isSleep() ) + { + if (type == PUBREL) + { + MQTTGWPacket* pubComp = new MQTTGWPacket(); + pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId); + Event* ev1 = new Event(); + ev1->setBrokerSendEvent(client, pubComp); + _gateway->getBrokerSendQue()->post(ev1); + } + } } diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 93bee22..d31e3f0 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -23,7 +23,7 @@ #include using namespace MQTTSNGW; - +char* currentDateTime(void); /*===================================== Class ClientList =====================================*/ @@ -185,7 +185,6 @@ Client* ClientList::getClient(uint8_t* clientId) while (client != 0) { - //printf("ClientList: clientId = %s\n", client->getClientId()); if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 ) { _mutex.unlock(); @@ -259,7 +258,7 @@ bool ClientList::isAuthorized() /*===================================== Class Client =====================================*/ -static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Awake", "Asleep", "Lost" }; +static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Asleep", "Awake", "Lost" }; Client::Client(bool secure) { @@ -288,6 +287,7 @@ Client::Client(bool secure) _otaClient = 0; _prevClient = 0; _nextClient = 0; + _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); } Client::~Client() @@ -335,11 +335,30 @@ uint16_t Client::getWaitedSubTopicId(uint16_t msgId) return _waitedSubTopicIdMap.getTopicId(msgId, &type); } -MQTTSNPacket* Client::getClientSleepPacket() +MQTTGWPacket* Client::getClientSleepPacket() { return _clientSleepPacketQue.getPacket(); } +void Client::deleteFirstClientSleepPacket() +{ + _clientSleepPacketQue.pop(); +} + +int Client::setClientSleepPacket(MQTTGWPacket* packet) +{ + int rc = _clientSleepPacketQue.post(packet); + if ( rc ) + { + WRITELOG("%s %s is sleeping. the packet was saved.\n", currentDateTime(), _clientId); + } + else + { + WRITELOG("%s %s is sleeping but discard the packet.\n", currentDateTime(), _clientId); + } + return rc; +} + Connect* Client::getConnectData(void) { return &_connectData; @@ -374,12 +393,6 @@ void Client::setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicT _waitedSubTopicIdMap.add(msgId, topicId, type); } -void Client::setClientSleepPacket(MQTTSNPacket* packet) -{ - updateStatus(packet); - _clientSleepPacketQue.post(packet); -} - bool Client::checkTimeover(void) { return (_status == Cstat_Active && _keepAliveTimer.isTimeup()); @@ -426,51 +439,34 @@ void Client::updateStatus(MQTTSNPacket* packet) _keepAliveTimer.start(_keepAliveMsec * 1.5); break; case MQTTSN_DISCONNECT: - { uint16_t duration; packet->getDISCONNECT(&duration); if (duration) { _status = Cstat_Asleep; - _keepAliveMsec = duration * 1000UL; } else { disconnected(); } - } break; default: break; } - } - else if (_status == Cstat_Asleep) - { - if (packet->getType() == MQTTSN_CONNECT) - { - setKeepAlive(packet); - _status = Cstat_Connecting; - } - else if (packet->getType() == MQTTSN_PINGREQ) - { - if ( packet->getPINGREQ() > 0 ) - { - _status = Cstat_Awake; - } - } - } - else if (_status == Cstat_Awake) + else if (_status == Cstat_Awake || _status == Cstat_Asleep) { switch (packet->getType()) { case MQTTSN_CONNECT: - _status = Cstat_Connecting; - setKeepAlive(packet); + _status = Cstat_Active; break; case MQTTSN_DISCONNECT: disconnected(); break; + case MQTTSN_PINGREQ: + _status = Cstat_Awake; + break; case MQTTSN_PINGRESP: _status = Cstat_Asleep; break; @@ -478,6 +474,7 @@ void Client::updateStatus(MQTTSNPacket* packet) break; } } + DEBUGLOG("Client Status = %s\n", theClientStatus[_status]); } void Client::updateStatus(ClientStatus stat) @@ -595,6 +592,11 @@ bool Client::isSleep(void) return (_status == Cstat_Asleep); } +bool Client::isAwake(void) +{ + return (_status == Cstat_Awake); +} + bool Client::isSecureNetwork(void) { return _secureNetwork; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index 76c5c11..690f98b 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -43,6 +43,7 @@ public: _que = new Que; } + ~PacketQue() { clear(); @@ -65,11 +66,14 @@ public: } } - void post(T* packet) + int + post(T* packet) { + int rc; _mutex.lock(); - _que->post(packet); + rc = _que->post(packet); _mutex.unlock(); + return rc; } void pop() @@ -93,6 +97,11 @@ public: _mutex.unlock(); } + void setMaxSize(int size) + { + _que->setMaxSize(size); + } + private: Que* _que; Mutex _mutex; @@ -232,7 +241,8 @@ public: Connect* getConnectData(void); uint16_t getWaitedPubTopicId(uint16_t msgId); uint16_t getWaitedSubTopicId(uint16_t msgId); - MQTTSNPacket* getClientSleepPacket(); + MQTTGWPacket* getClientSleepPacket(void); + void deleteFirstClientSleepPacket(void); WaitREGACKPacketList* getWaitREGACKPacketList(void); void eraseWaitedPubTopicId(uint16_t msgId); @@ -240,7 +250,7 @@ public: void clearWaitedPubTopicId(void); void clearWaitedSubTopicId(void); - void setClientSleepPacket(MQTTSNPacket*); + int setClientSleepPacket(MQTTGWPacket*); void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); @@ -277,6 +287,7 @@ public: bool isDisconnect(void); bool isActive(void); bool isSleep(void); + bool isAwake(void); bool isSecureNetwork(void); bool isSensorNetStable(void); bool isWaitWillMsg(void); @@ -286,7 +297,7 @@ public: void setOTAClient(Client* cl); private: - PacketQue _clientSleepPacketQue; + PacketQue _clientSleepPacketQue; WaitREGACKPacketList _waitREGACKList; Topics* _topics; diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index 9512ad5..9cf5ccc 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -74,7 +74,18 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet return; } - /* clear ConnectData of Client */ + /* return CONNACK when the client is sleeping */ + if ( client->isSleep() || client->isAwake() ) + { + MQTTSNPacket* packet = new MQTTSNPacket(); + packet->setCONNACK(MQTTSN_RC_ACCEPTED); + Event* ev = new Event(); + ev->setClientSendEvent(client, packet); + _gateway->getClientSendQue()->post(ev); + return; + } + + //* clear ConnectData of Client */ Connect* connectData = client->getConnectData(); memset(connectData, 0, sizeof(Connect)); client->disconnected(); @@ -125,7 +136,6 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet } else { - /* CONNECT message was not qued in. * create CONNECT message & send it to the broker */ MQTTGWPacket* mqMsg = new MQTTGWPacket(); @@ -259,6 +269,21 @@ void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* pac */ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet) { + MQTTGWPacket* msg = 0; + + if ( client->isSleep() || client->isAwake() ) + { + while ( ( msg = client->getClientSleepPacket() ) != 0 ) + { + // ToDo: This version can't re-send PUBLISH when PUBACK is not returned. + client->deleteFirstClientSleepPacket(); // pop the que to delete element. + + Event* ev = new Event(); + ev->setBrokerRecvEvent(client, msg); + _gateway->getPacketEventQue()->post(ev); + } + } + /* send PINGREQ to the broker */ MQTTGWPacket* pingreq = new MQTTGWPacket(); pingreq->setHeader(PINGREQ); diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index 96f73f6..a21f6de 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -19,6 +19,7 @@ namespace MQTTSNGW { +#define DEBUG /*================================= * Config Parametrs ==================================*/ @@ -38,6 +39,7 @@ namespace MQTTSNGW #define MAX_CLIENTS (100) // Number of Clients can be handled. #define MAX_CLIENTID_LENGTH (64) // Max length of clientID #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages +#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen) #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index b4e091e..52dff3e 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -178,27 +178,26 @@ void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet) uint16_t msgId; uint8_t rc; - if ( !client->isActive() ) + if ( client->isActive() ) { - return; - } - MQTTGWPacket* pubAck = new MQTTGWPacket(); + MQTTGWPacket* pubAck = new MQTTGWPacket(); - if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) - { - return; - } + if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) + { + return; + } - if ( rc == MQTTSN_RC_ACCEPTED) - { - pubAck->setAck(PUBACK, msgId); - Event* ev1 = new Event(); - ev1->setBrokerSendEvent(client, pubAck); - _gateway->getBrokerSendQue()->post(ev1); - } - else if ( rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID) - { - WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId); + if ( rc == MQTTSN_RC_ACCEPTED) + { + pubAck->setAck(PUBACK, msgId); + Event* ev1 = new Event(); + ev1->setBrokerSendEvent(client, pubAck); + _gateway->getBrokerSendQue()->post(ev1); + } + else if ( rc == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID) + { + WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId); + } } } @@ -206,19 +205,18 @@ void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8 { uint16_t msgId; - if ( !client->isActive() ) + if ( client->isActive() ) { - return; + if ( packet->getACK(&msgId) == 0 ) + { + return; + } + MQTTGWPacket* ackPacket = new MQTTGWPacket(); + ackPacket->setAck(packetType, msgId); + Event* ev1 = new Event(); + ev1->setBrokerSendEvent(client, ackPacket); + _gateway->getBrokerSendQue()->post(ev1); } - if ( packet->getACK(&msgId) == 0 ) - { - return; - } - MQTTGWPacket* ackPacket = new MQTTGWPacket(); - ackPacket->setAck(packetType, msgId); - Event* ev1 = new Event(); - ev1->setBrokerSendEvent(client, ackPacket); - _gateway->getBrokerSendQue()->post(ev1); } void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet) @@ -229,24 +227,23 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet) MQTTSN_topicid topicid; - if ( !client->isActive() ) + if ( client->isActive() || client->isAwake()) { - return; - } - MQTTSNPacket* regAck = new MQTTSNPacket(); - if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) - { - return; - } + MQTTSNPacket* regAck = new MQTTSNPacket(); + if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) + { + return; + } - topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; - topicid.data.long_.len = topicName.lenstring.len; - topicid.data.long_.name = topicName.lenstring.data; + topicid.type = MQTTSN_TOPIC_TYPE_NORMAL; + topicid.data.long_.len = topicName.lenstring.len; + topicid.data.long_.name = topicName.lenstring.data; - id = client->getTopics()->add(&topicid)->getTopicId(); - regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED); - Event* ev = new Event(); - ev->setClientSendEvent(client, regAck); - _gateway->getClientSendQue()->post(ev); + id = client->getTopics()->add(&topicid)->getTopicId(); + regAck->setREGACK(id, msgId, MQTTSN_RC_ACCEPTED); + Event* ev = new Event(); + ev->setClientSendEvent(client, regAck); + _gateway->getClientSendQue()->post(ev); + } } diff --git a/MQTTSNGateway/src/linux/Timer.h b/MQTTSNGateway/src/linux/Timer.h index 43885b1..7935f4f 100644 --- a/MQTTSNGateway/src/linux/Timer.h +++ b/MQTTSNGateway/src/linux/Timer.h @@ -16,8 +16,10 @@ #ifndef MQTTSNGATEWAY_SRC_LINUX_TIMER_H_ #define MQTTSNGATEWAY_SRC_LINUX_TIMER_H_ +#include +#include #include "MQTTSNGWDefines.h" -#include + namespace MQTTSNGW { /*========================================================== diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp index 18eace4..6f08481 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include diff --git a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp index 07f5ad3..3ff93fd 100644 --- a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp index 1a806a7..65b9946 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp @@ -18,10 +18,8 @@ #include #include #include -#include #include #include - #include "SensorNetwork.h" #include "MQTTSNGWProcess.h"