BugFix of #76 and #77

1.Return CONNACK instead of the broker when the gateway receives CONNECT
while the client is Sleep or Awake mode.

2.Define the max size of a que for PUBLISH while the client state is
Asleep mode.  Despose packets when the que is full of packets.

3.Return PUBACK or PUBREL to the broker when the client is Asleep or
Awake.


Signed-off-by: tomoaki <tomoaki@tomy-tech.com>



Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2017-08-27 15:59:31 +09:00
parent 7099531e0e
commit bc731210ae
24 changed files with 303 additions and 197 deletions

View File

@@ -44,26 +44,26 @@ extern int run(void);
*
*/
/*------------------------------------------------------
* UDP Configuration
* UDP Configuration (theNetcon)
*------------------------------------------------------*/
UDPCONF = {
"GatewayTester", // ClientId
"GatewayTestClient", // ClientId
{225,1,1,1}, // Multicast group IP
1883, // Multicast group Port
20001, // Local PortNo
};
/*------------------------------------------------------
* Client Configuration
* Client Configuration (theMqcon)
*------------------------------------------------------*/
MQTTSNCONF = {
300, //KeepAlive (seconds)
true, //Clean session
0, //Sleep duration in msecs
"willTopic", //WillTopic
"willMessage", //WillMessage
0, //WillQos
false //WillRetain
60, //KeepAlive [seconds]
true, //Clean session
300, //Sleep duration [seconds]
"", //WillTopic
"", //WillMessage
0, //WillQos
false //WillRetain
};
/*------------------------------------------------------
@@ -162,6 +162,11 @@ void disconnect(void)
DISCONNECT(0);
}
void asleep(void)
{
DISCONNECT(theMqcon.sleepDuration);
}
/*------------------------------------------------------
* A List of Test functions
*------------------------------------------------------*/
@@ -175,7 +180,9 @@ TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step6:Publish topic2", publishTopic2),
TEST("Step7:subscribe again", subscribechangeCallback),
TEST("Step8:Publish topic2", publishTopic2),
TEST("Step9:Disconnect", disconnect),
TEST("Step9:Sleep ", asleep),
TEST("Step10:Publish topic1", publishTopic1),
TEST("Step11:Disconnect", disconnect),
END_OF_TEST_LIST
};

View File

@@ -55,6 +55,8 @@ LGwProxy::LGwProxy(){
_cleanSession = 0;
_pingStatus = 0;
_connectRetry = MQTTSN_RETRY_COUNT;
_tSleep = 0;
_tWake = 0;
}
LGwProxy::~LGwProxy(){
@@ -91,7 +93,7 @@ void LGwProxy::connect(){
strcpy(pos,_willTopic); // WILLTOPIC
_status = GW_WAIT_WILLMSGREQ;
writeGwMsg();
}else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED){
}else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){
uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
*pos++ = 6 + clientIdLen;
*pos++ = MQTTSN_TYPE_CONNECT;
@@ -105,7 +107,7 @@ void LGwProxy::connect(){
strncpy(pos, _clientId, clientIdLen);
_msg[ 6 + clientIdLen] = 0;
_status = GW_WAIT_CONNACK;
if (_willMsg && _willTopic){
if ( _willMsg && _willTopic && _status != GW_SLEPT ){
if (strlen(_willMsg) && strlen(_willTopic)){
_msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT
_status = GW_WAIT_WILLTOPICREQ;
@@ -163,10 +165,14 @@ int LGwProxy::getConnectResponce(void){
if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){
_status = GW_CONNECTED;
_connectRetry = MQTTSN_RETRY_COUNT;
_keepAliveTimer.start(_tkeepAlive * 1000);
_topicTbl.clearTopic();
DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
theClient->onConnect(); // SUBSCRIBEs are conducted
setPingReqTimer();
if ( _tSleep ){
_tSleep = 0;
}else{
DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
_topicTbl.clearTopic();
theClient->onConnect(); // SUBSCRIBEs are conducted
}
}else{
_status = GW_CONNECTING;
}
@@ -182,16 +188,18 @@ void LGwProxy::reconnect(void){
void LGwProxy::disconnect(uint16_t secs){
_tSleep = secs;
_status = GW_DISCONNECTING;
_tWake = 0;
_msg[1] = MQTTSN_TYPE_DISCONNECT;
if (secs){
_msg[0] = 4;
setUint16((uint8_t*) _msg + 2, secs);
_status = GW_SLEEPING;
}else{
_msg[0] = 2;
_keepAliveTimer.stop();
_status = GW_DISCONNECTING;
}
_retryCount = MQTTSN_RETRY_COUNT;
@@ -223,9 +231,13 @@ int LGwProxy::getDisconnectResponce(void){
}
return 0;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
if (_tSleep){
_status = GW_SLEEPING;
_keepAliveTimer.start(_tSleep);
if (_status == GW_SLEEPING ){
_status = GW_SLEPT;
uint32_t remain = _keepAliveTimer.getRemain();
theClient->setSleepMode(remain);
/* Wake up and starts from this point. */
}else{
_status = GW_DISCONNECTED;
}
@@ -279,7 +291,18 @@ int LGwProxy::getMessage(void){
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){
if (_pingStatus == GW_WAIT_PINGRESP){
_pingStatus = 0;
resetPingReqTimer();
setPingReqTimer();
if ( _tSleep > 0 ){
_tWake += _tkeepAlive;
if ( _tWake < _tSleep ){
theClient->setSleepMode(_tkeepAlive * 1000UL);
}else{
DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n");
_tWake = 0;
connect();
}
}
}
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
_status = GW_LOST;
@@ -408,7 +431,7 @@ void LGwProxy::checkPingReq(void){
msg[0] = 0x02;
msg[1] = MQTTSN_TYPE_PINGREQ;
if (_status == GW_CONNECTED && _keepAliveTimer.isTimeUp() && _pingStatus != GW_WAIT_PINGRESP){
if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){
_pingStatus = GW_WAIT_PINGRESP;
_pingRetryCount = MQTTSN_RETRY_COUNT;
@@ -449,8 +472,12 @@ LRegisterManager* LGwProxy::getRegisterManager(void){
return &_regMgr;
}
void LGwProxy::resetPingReqTimer(void){
_keepAliveTimer.start(_tkeepAlive * 1000);
bool LGwProxy::isPingReqRequired(void){
return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL);
}
void LGwProxy::setPingReqTimer(void){
_keepAliveTimer.start(_tkeepAlive * 1000UL);
}
const char* LGwProxy::getClientId(void) {

View File

@@ -66,7 +66,7 @@ public:
void setAdvertiseDuration(uint16_t duration);
void reconnect(void);
int writeMsg(const uint8_t* msg);
void resetPingReqTimer(void);
void setPingReqTimer(void);
uint16_t getNextMsgId();
LTopicTable* getTopicTable(void);
LRegisterManager* getRegisterManager(void);
@@ -78,6 +78,7 @@ private:
void checkAdvertise(void);
int getConnectResponce(void);
int getDisconnectResponce(void);
bool isPingReqRequired(void);
LNetwork _network;
uint8_t* _mqttsnMsg;
@@ -103,6 +104,7 @@ private:
LTimer _gwAliveTimer;
LTimer _keepAliveTimer;
uint16_t _tSleep;
uint16_t _tWake;
char _msg[MQTTSN_MAX_MSG_LENGTH + 1];
};

View File

@@ -198,12 +198,17 @@ void LMqttsnClient::run()
{
_gwProxy.connect();
_taskMgr.run();
sleep();
}
void LMqttsnClient::setSleepMode(uint32_t duration)
{
// ToDo: set WDT and sleep mode
DISPLAY("\033[0m\033[0;32m\n\n Get into SLEEP mode %u [msec].\033[0m\033[0;37m\n\n", duration);
}
void LMqttsnClient::sleep(void)
{
disconnect(_sleepDuration);
}
void LMqttsnClient::setSleepDuration(uint32_t duration)

View File

@@ -59,6 +59,7 @@ public:
void run(void);
void addTask(bool test);
void setSleepDuration(uint32_t duration);
void setSleepMode(uint32_t duration);
void sleep(void);
const char* getClientId(void);
LGwProxy* getGwProxy(void);

View File

@@ -23,7 +23,6 @@
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <termios.h>
#include "LNetworkUdp.h"

View File

@@ -150,7 +150,7 @@ void LPublishManager::sendPublish(PubElement* elm)
memcpy(msg + org + 7, elm->payload, elm->payloadlen);
theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->resetPingReqTimer();
theClient->getGwProxy()->setPingReqTimer();
if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0)
{
DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName);

View File

@@ -23,7 +23,6 @@
#include <termios.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdarg.h>
#include "LScreen.h"

View File

@@ -142,7 +142,7 @@ void LSubscribeManager::send(SubElement* elm)
theClient->getGwProxy()->connect();
theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->resetPingReqTimer();
theClient->getGwProxy()->setPingReqTimer();
elm->sendUTC = time(NULL);
elm->retryCount--;
}

View File

@@ -16,7 +16,6 @@
#include <stdlib.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
@@ -63,3 +62,17 @@ void LTimer::stop(){
_millis = 0;
}
uint32_t LTimer::getRemain(void)
{
struct timeval curTime;
uint32_t secs, usecs;
if (_millis <= 0){
return 0;
}else{
gettimeofday(&curTime, 0);
secs = (curTime.tv_sec - _startTime.tv_sec) * 1000;
usecs = (curTime.tv_usec - _startTime.tv_usec) / 1000.0;
secs = _millis - (secs + usecs);
return secs;
}
}

View File

@@ -35,6 +35,7 @@ public:
bool isTimeUp(void);
void stop(void);
void changeUTC(void){};
uint32_t getRemain(void);
static void setUnixTime(uint32_t utc){};
private:
struct timeval _startTime;