Merge pull request #78 from ty4tw/develop

BugFix of #76 and #77
This commit is contained in:
Tomoaki Yamaguchi
2017-08-27 16:45:56 +09:00
committed by GitHub
24 changed files with 303 additions and 197 deletions

View File

@@ -44,24 +44,24 @@ extern int run(void);
* *
*/ */
/*------------------------------------------------------ /*------------------------------------------------------
* UDP Configuration * UDP Configuration (theNetcon)
*------------------------------------------------------*/ *------------------------------------------------------*/
UDPCONF = { UDPCONF = {
"GatewayTester", // ClientId "GatewayTestClient", // ClientId
{225,1,1,1}, // Multicast group IP {225,1,1,1}, // Multicast group IP
1883, // Multicast group Port 1883, // Multicast group Port
20001, // Local PortNo 20001, // Local PortNo
}; };
/*------------------------------------------------------ /*------------------------------------------------------
* Client Configuration * Client Configuration (theMqcon)
*------------------------------------------------------*/ *------------------------------------------------------*/
MQTTSNCONF = { MQTTSNCONF = {
300, //KeepAlive (seconds) 60, //KeepAlive [seconds]
true, //Clean session true, //Clean session
0, //Sleep duration in msecs 300, //Sleep duration [seconds]
"willTopic", //WillTopic "", //WillTopic
"willMessage", //WillMessage "", //WillMessage
0, //WillQos 0, //WillQos
false //WillRetain false //WillRetain
}; };
@@ -162,6 +162,11 @@ void disconnect(void)
DISCONNECT(0); DISCONNECT(0);
} }
void asleep(void)
{
DISCONNECT(theMqcon.sleepDuration);
}
/*------------------------------------------------------ /*------------------------------------------------------
* A List of Test functions * A List of Test functions
*------------------------------------------------------*/ *------------------------------------------------------*/
@@ -175,7 +180,9 @@ TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step6:Publish topic2", publishTopic2), TEST("Step6:Publish topic2", publishTopic2),
TEST("Step7:subscribe again", subscribechangeCallback), TEST("Step7:subscribe again", subscribechangeCallback),
TEST("Step8:Publish topic2", publishTopic2), TEST("Step8:Publish topic2", publishTopic2),
TEST("Step9:Disconnect", disconnect), TEST("Step9:Sleep ", asleep),
TEST("Step10:Publish topic1", publishTopic1),
TEST("Step11:Disconnect", disconnect),
END_OF_TEST_LIST END_OF_TEST_LIST
}; };

View File

@@ -55,6 +55,8 @@ LGwProxy::LGwProxy(){
_cleanSession = 0; _cleanSession = 0;
_pingStatus = 0; _pingStatus = 0;
_connectRetry = MQTTSN_RETRY_COUNT; _connectRetry = MQTTSN_RETRY_COUNT;
_tSleep = 0;
_tWake = 0;
} }
LGwProxy::~LGwProxy(){ LGwProxy::~LGwProxy(){
@@ -91,7 +93,7 @@ void LGwProxy::connect(){
strcpy(pos,_willTopic); // WILLTOPIC strcpy(pos,_willTopic); // WILLTOPIC
_status = GW_WAIT_WILLMSGREQ; _status = GW_WAIT_WILLMSGREQ;
writeGwMsg(); writeGwMsg();
}else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED){ }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){
uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId)); uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
*pos++ = 6 + clientIdLen; *pos++ = 6 + clientIdLen;
*pos++ = MQTTSN_TYPE_CONNECT; *pos++ = MQTTSN_TYPE_CONNECT;
@@ -105,7 +107,7 @@ void LGwProxy::connect(){
strncpy(pos, _clientId, clientIdLen); strncpy(pos, _clientId, clientIdLen);
_msg[ 6 + clientIdLen] = 0; _msg[ 6 + clientIdLen] = 0;
_status = GW_WAIT_CONNACK; _status = GW_WAIT_CONNACK;
if (_willMsg && _willTopic){ if ( _willMsg && _willTopic && _status != GW_SLEPT ){
if (strlen(_willMsg) && strlen(_willTopic)){ if (strlen(_willMsg) && strlen(_willTopic)){
_msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT
_status = GW_WAIT_WILLTOPICREQ; _status = GW_WAIT_WILLTOPICREQ;
@@ -163,10 +165,14 @@ int LGwProxy::getConnectResponce(void){
if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){ if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){
_status = GW_CONNECTED; _status = GW_CONNECTED;
_connectRetry = MQTTSN_RETRY_COUNT; _connectRetry = MQTTSN_RETRY_COUNT;
_keepAliveTimer.start(_tkeepAlive * 1000); setPingReqTimer();
_topicTbl.clearTopic(); if ( _tSleep ){
_tSleep = 0;
}else{
DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
_topicTbl.clearTopic();
theClient->onConnect(); // SUBSCRIBEs are conducted theClient->onConnect(); // SUBSCRIBEs are conducted
}
}else{ }else{
_status = GW_CONNECTING; _status = GW_CONNECTING;
} }
@@ -182,16 +188,18 @@ void LGwProxy::reconnect(void){
void LGwProxy::disconnect(uint16_t secs){ void LGwProxy::disconnect(uint16_t secs){
_tSleep = secs; _tSleep = secs;
_status = GW_DISCONNECTING; _tWake = 0;
_msg[1] = MQTTSN_TYPE_DISCONNECT; _msg[1] = MQTTSN_TYPE_DISCONNECT;
if (secs){ if (secs){
_msg[0] = 4; _msg[0] = 4;
setUint16((uint8_t*) _msg + 2, secs); setUint16((uint8_t*) _msg + 2, secs);
_status = GW_SLEEPING;
}else{ }else{
_msg[0] = 2; _msg[0] = 2;
_keepAliveTimer.stop(); _keepAliveTimer.stop();
_status = GW_DISCONNECTING;
} }
_retryCount = MQTTSN_RETRY_COUNT; _retryCount = MQTTSN_RETRY_COUNT;
@@ -223,9 +231,13 @@ int LGwProxy::getDisconnectResponce(void){
} }
return 0; return 0;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
if (_tSleep){ if (_status == GW_SLEEPING ){
_status = GW_SLEEPING; _status = GW_SLEPT;
_keepAliveTimer.start(_tSleep); uint32_t remain = _keepAliveTimer.getRemain();
theClient->setSleepMode(remain);
/* Wake up and starts from this point. */
}else{ }else{
_status = GW_DISCONNECTED; _status = GW_DISCONNECTED;
} }
@@ -279,7 +291,18 @@ int LGwProxy::getMessage(void){
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){
if (_pingStatus == GW_WAIT_PINGRESP){ if (_pingStatus == GW_WAIT_PINGRESP){
_pingStatus = 0; _pingStatus = 0;
resetPingReqTimer(); setPingReqTimer();
if ( _tSleep > 0 ){
_tWake += _tkeepAlive;
if ( _tWake < _tSleep ){
theClient->setSleepMode(_tkeepAlive * 1000UL);
}else{
DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n");
_tWake = 0;
connect();
}
}
} }
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
_status = GW_LOST; _status = GW_LOST;
@@ -408,7 +431,7 @@ void LGwProxy::checkPingReq(void){
msg[0] = 0x02; msg[0] = 0x02;
msg[1] = MQTTSN_TYPE_PINGREQ; msg[1] = MQTTSN_TYPE_PINGREQ;
if (_status == GW_CONNECTED && _keepAliveTimer.isTimeUp() && _pingStatus != GW_WAIT_PINGRESP){ if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){
_pingStatus = GW_WAIT_PINGRESP; _pingStatus = GW_WAIT_PINGRESP;
_pingRetryCount = MQTTSN_RETRY_COUNT; _pingRetryCount = MQTTSN_RETRY_COUNT;
@@ -449,8 +472,12 @@ LRegisterManager* LGwProxy::getRegisterManager(void){
return &_regMgr; return &_regMgr;
} }
void LGwProxy::resetPingReqTimer(void){ bool LGwProxy::isPingReqRequired(void){
_keepAliveTimer.start(_tkeepAlive * 1000); return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL);
}
void LGwProxy::setPingReqTimer(void){
_keepAliveTimer.start(_tkeepAlive * 1000UL);
} }
const char* LGwProxy::getClientId(void) { const char* LGwProxy::getClientId(void) {

View File

@@ -66,7 +66,7 @@ public:
void setAdvertiseDuration(uint16_t duration); void setAdvertiseDuration(uint16_t duration);
void reconnect(void); void reconnect(void);
int writeMsg(const uint8_t* msg); int writeMsg(const uint8_t* msg);
void resetPingReqTimer(void); void setPingReqTimer(void);
uint16_t getNextMsgId(); uint16_t getNextMsgId();
LTopicTable* getTopicTable(void); LTopicTable* getTopicTable(void);
LRegisterManager* getRegisterManager(void); LRegisterManager* getRegisterManager(void);
@@ -78,6 +78,7 @@ private:
void checkAdvertise(void); void checkAdvertise(void);
int getConnectResponce(void); int getConnectResponce(void);
int getDisconnectResponce(void); int getDisconnectResponce(void);
bool isPingReqRequired(void);
LNetwork _network; LNetwork _network;
uint8_t* _mqttsnMsg; uint8_t* _mqttsnMsg;
@@ -103,6 +104,7 @@ private:
LTimer _gwAliveTimer; LTimer _gwAliveTimer;
LTimer _keepAliveTimer; LTimer _keepAliveTimer;
uint16_t _tSleep; uint16_t _tSleep;
uint16_t _tWake;
char _msg[MQTTSN_MAX_MSG_LENGTH + 1]; char _msg[MQTTSN_MAX_MSG_LENGTH + 1];
}; };

View File

@@ -198,12 +198,17 @@ void LMqttsnClient::run()
{ {
_gwProxy.connect(); _gwProxy.connect();
_taskMgr.run(); _taskMgr.run();
sleep(); }
void LMqttsnClient::setSleepMode(uint32_t duration)
{
// ToDo: set WDT and sleep mode
DISPLAY("\033[0m\033[0;32m\n\n Get into SLEEP mode %u [msec].\033[0m\033[0;37m\n\n", duration);
} }
void LMqttsnClient::sleep(void) void LMqttsnClient::sleep(void)
{ {
disconnect(_sleepDuration);
} }
void LMqttsnClient::setSleepDuration(uint32_t duration) void LMqttsnClient::setSleepDuration(uint32_t duration)

View File

@@ -59,6 +59,7 @@ public:
void run(void); void run(void);
void addTask(bool test); void addTask(bool test);
void setSleepDuration(uint32_t duration); void setSleepDuration(uint32_t duration);
void setSleepMode(uint32_t duration);
void sleep(void); void sleep(void);
const char* getClientId(void); const char* getClientId(void);
LGwProxy* getGwProxy(void); LGwProxy* getGwProxy(void);

View File

@@ -23,7 +23,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <fcntl.h> #include <fcntl.h>
#include <errno.h>
#include <termios.h> #include <termios.h>
#include "LNetworkUdp.h" #include "LNetworkUdp.h"

View File

@@ -150,7 +150,7 @@ void LPublishManager::sendPublish(PubElement* elm)
memcpy(msg + org + 7, elm->payload, elm->payloadlen); memcpy(msg + org + 7, elm->payload, elm->payloadlen);
theClient->getGwProxy()->writeMsg(msg); theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->resetPingReqTimer(); theClient->getGwProxy()->setPingReqTimer();
if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0) if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0)
{ {
DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName); DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName);

View File

@@ -23,7 +23,6 @@
#include <termios.h> #include <termios.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <errno.h>
#include <stdarg.h> #include <stdarg.h>
#include "LScreen.h" #include "LScreen.h"

View File

@@ -142,7 +142,7 @@ void LSubscribeManager::send(SubElement* elm)
theClient->getGwProxy()->connect(); theClient->getGwProxy()->connect();
theClient->getGwProxy()->writeMsg(msg); theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->resetPingReqTimer(); theClient->getGwProxy()->setPingReqTimer();
elm->sendUTC = time(NULL); elm->sendUTC = time(NULL);
elm->retryCount--; elm->retryCount--;
} }

View File

@@ -16,7 +16,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "LMqttsnClientApp.h" #include "LMqttsnClientApp.h"
#include "LTimer.h" #include "LTimer.h"
@@ -63,3 +62,17 @@ void LTimer::stop(){
_millis = 0; _millis = 0;
} }
uint32_t LTimer::getRemain(void)
{
struct timeval curTime;
uint32_t secs, usecs;
if (_millis <= 0){
return 0;
}else{
gettimeofday(&curTime, 0);
secs = (curTime.tv_sec - _startTime.tv_sec) * 1000;
usecs = (curTime.tv_usec - _startTime.tv_usec) / 1000.0;
secs = _millis - (secs + usecs);
return secs;
}
}

View File

@@ -35,6 +35,7 @@ public:
bool isTimeUp(void); bool isTimeUp(void);
void stop(void); void stop(void);
void changeUTC(void){}; void changeUTC(void){};
uint32_t getRemain(void);
static void setUnixTime(uint32_t utc){}; static void setUnixTime(uint32_t utc){};
private: private:
struct timeval _startTime; struct timeval _startTime;

View File

@@ -76,14 +76,6 @@ void MQTTGWConnectionHandler::handleConnack(Client* client, MQTTGWPacket* packet
ev1->setClientSendEvent(client, snPacket); ev1->setClientSendEvent(client, snPacket);
client->connackSended(rc); // update the client's status client->connackSended(rc); // update the client's status
_gateway->getClientSendQue()->post(ev1); _gateway->getClientSendQue()->post(ev1);
MQTTSNPacket* sleepPacket = 0;
while ( (sleepPacket = client->getClientSleepPacket()) )
{
Event* ev1 = new Event();
ev1->setClientSendEvent(client, sleepPacket);
_gateway->getClientSendQue()->post(ev1);
}
} }
void MQTTGWConnectionHandler::handlePingresp(Client* client, MQTTGWPacket* packet) void MQTTGWConnectionHandler::handlePingresp(Client* client, MQTTGWPacket* packet)

View File

@@ -543,3 +543,20 @@ char* MQTTGWPacket::print(char* pbuf)
return ptr; return ptr;
} }
MQTTGWPacket& MQTTGWPacket::operator =(MQTTGWPacket& packet)
{
clearData();
this->_header.byte = packet._header.byte;
this->_remainingLength = packet._remainingLength;
_data = (unsigned char*)calloc(_remainingLength, 1);
if (_data)
{
memcpy(this->_data, packet._data, _remainingLength);
}
else
{
clearData();
}
return *this;
}

View File

@@ -205,6 +205,7 @@ public:
int setUNSUBSCRIBE(const char* topics, unsigned short msgid); int setUNSUBSCRIBE(const char* topics, unsigned short msgid);
char* getMsgId(char* buf); char* getMsgId(char* buf);
char* print(char* buf); char* print(char* buf);
MQTTGWPacket& operator =(MQTTGWPacket& packet);
private: private:
void clearData(void); void clearData(void);

View File

@@ -36,9 +36,39 @@ MQTTGWPublishHandler::~MQTTGWPublishHandler()
void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet) void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
{ {
if ( !client->isActive() && !client->isSleep() ) if ( !client->isActive() && !client->isSleep() && !client->isAwake())
{ {
WRITELOG(" The client is neither active nor sleep %s\n", client->getStatus()); WRITELOG("%s The client is neither active nor sleep %s%s\n", ERRMSG_HEADER, client->getStatus(), ERRMSG_FOOTER);
return;
}
/* client is sleeping. save PUBLISH */
if ( client->isSleep() )
{
Publish pub;
packet->getPUBLISH(&pub);
WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
if (pub.header.bits.qos == 1)
{
replyACK(client, &pub, PUBACK);
}
else if ( pub.header.bits.qos == 2)
{
replyACK(client, &pub, PUBREC);
}
MQTTGWPacket* msg = new MQTTGWPacket();
*msg = *packet;
if ( msg->getType() == 0 )
{
WRITELOG("%s MQTTGWPublishHandler::handlePublish can't allocate memories for Packet.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
delete msg;
return;
}
client->setClientSleepPacket(msg);
return; return;
} }
@@ -102,19 +132,10 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
uint16_t regackMsgId = client->getNextSnMsgId(); uint16_t regackMsgId = client->getNextSnMsgId();
regPacket->setREGISTER(id, regackMsgId, &topicName); regPacket->setREGISTER(id, regackMsgId, &topicName);
if (client->isSleep())
{
client->setClientSleepPacket(regPacket);
WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(),
RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved.");
}
else if (client->isActive())
{
/* send REGISTER */ /* send REGISTER */
Event* evrg = new Event(); Event* evrg = new Event();
evrg->setClientSendEvent(client, regPacket); evrg->setClientSendEvent(client, regPacket);
_gateway->getClientSendQue()->post(evrg); _gateway->getClientSendQue()->post(evrg);
}
/* send PUBLISH */ /* send PUBLISH */
topicId.data.id = id; topicId.data.id = id;
@@ -125,47 +146,18 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
} }
else else
{ {
WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n"); WRITELOG("%sMQTTGWPublishHandler Can't create a Topic.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
delete snPacket; delete snPacket;
return; return;
} }
} }
} }
/* TopicId was acquired. */
if (client->isSleep())
{
/* client is sleeping. save PUBLISH */
client->setClientSleepPacket(snPacket);
WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(),
RIGHTARROW, client->getClientId(), "is sleeping. a message was saved.");
int type = 0;
if (pub.header.bits.qos == 1)
{
type = PUBACK;
}
else if ( pub.header.bits.qos == 2)
{
WRITELOG(" While Client is sleeping, QoS2 is not supported.\n");
type = PUBREC;
}
replyACK(client, &pub, type);
pub.header.bits.qos = 0;
replyACK(client, &pub, PUBACK);
pub.msgId = 0;
snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
(uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
pub.payloadlen);
client->setClientSleepPacket(snPacket);
}
else if (client->isActive())
{
snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain, snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos, (uint8_t) pub.header.bits.retain,
(uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen); (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload, pub.payloadlen);
Event* ev1 = new Event(); Event* ev1 = new Event();
ev1->setClientSendEvent(client, snPacket); ev1->setClientSendEvent(client, snPacket);
_gateway->getClientSendQue()->post(ev1); _gateway->getClientSendQue()->post(ev1);
}
} }
@@ -201,6 +193,9 @@ void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int t
{ {
Ack ack; Ack ack;
packet->getAck(&ack); packet->getAck(&ack);
if ( client->isActive() || client->isAwake() )
{
MQTTSNPacket* mqttsnPacket = new MQTTSNPacket(); MQTTSNPacket* mqttsnPacket = new MQTTSNPacket();
if (type == PUBREC) if (type == PUBREC)
{ {
@@ -219,4 +214,16 @@ void MQTTGWPublishHandler::handleAck(Client* client, MQTTGWPacket* packet, int t
ev1->setClientSendEvent(client, mqttsnPacket); ev1->setClientSendEvent(client, mqttsnPacket);
_gateway->getClientSendQue()->post(ev1); _gateway->getClientSendQue()->post(ev1);
} }
else if ( client->isSleep() )
{
if (type == PUBREL)
{
MQTTGWPacket* pubComp = new MQTTGWPacket();
pubComp->setAck(PUBCOMP, (uint16_t)ack.msgId);
Event* ev1 = new Event();
ev1->setBrokerSendEvent(client, pubComp);
_gateway->getBrokerSendQue()->post(ev1);
}
}
}

View File

@@ -23,7 +23,7 @@
#include <stdio.h> #include <stdio.h>
using namespace MQTTSNGW; using namespace MQTTSNGW;
char* currentDateTime(void);
/*===================================== /*=====================================
Class ClientList Class ClientList
=====================================*/ =====================================*/
@@ -185,7 +185,6 @@ Client* ClientList::getClient(uint8_t* clientId)
while (client != 0) while (client != 0)
{ {
//printf("ClientList: clientId = %s\n", client->getClientId());
if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 ) if (strcmp((const char*)client->getClientId(), (const char*)clientId) == 0 )
{ {
_mutex.unlock(); _mutex.unlock();
@@ -259,7 +258,7 @@ bool ClientList::isAuthorized()
/*===================================== /*=====================================
Class Client Class Client
=====================================*/ =====================================*/
static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Awake", "Asleep", "Lost" }; static const char* theClientStatus[] = { "Disconnected", "TryConnecting", "Connecting", "Active", "Asleep", "Awake", "Lost" };
Client::Client(bool secure) Client::Client(bool secure)
{ {
@@ -288,6 +287,7 @@ Client::Client(bool secure)
_otaClient = 0; _otaClient = 0;
_prevClient = 0; _prevClient = 0;
_nextClient = 0; _nextClient = 0;
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
} }
Client::~Client() Client::~Client()
@@ -335,11 +335,30 @@ uint16_t Client::getWaitedSubTopicId(uint16_t msgId)
return _waitedSubTopicIdMap.getTopicId(msgId, &type); return _waitedSubTopicIdMap.getTopicId(msgId, &type);
} }
MQTTSNPacket* Client::getClientSleepPacket() MQTTGWPacket* Client::getClientSleepPacket()
{ {
return _clientSleepPacketQue.getPacket(); return _clientSleepPacketQue.getPacket();
} }
void Client::deleteFirstClientSleepPacket()
{
_clientSleepPacketQue.pop();
}
int Client::setClientSleepPacket(MQTTGWPacket* packet)
{
int rc = _clientSleepPacketQue.post(packet);
if ( rc )
{
WRITELOG("%s %s is sleeping. the packet was saved.\n", currentDateTime(), _clientId);
}
else
{
WRITELOG("%s %s is sleeping but discard the packet.\n", currentDateTime(), _clientId);
}
return rc;
}
Connect* Client::getConnectData(void) Connect* Client::getConnectData(void)
{ {
return &_connectData; return &_connectData;
@@ -374,12 +393,6 @@ void Client::setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicT
_waitedSubTopicIdMap.add(msgId, topicId, type); _waitedSubTopicIdMap.add(msgId, topicId, type);
} }
void Client::setClientSleepPacket(MQTTSNPacket* packet)
{
updateStatus(packet);
_clientSleepPacketQue.post(packet);
}
bool Client::checkTimeover(void) bool Client::checkTimeover(void)
{ {
return (_status == Cstat_Active && _keepAliveTimer.isTimeup()); return (_status == Cstat_Active && _keepAliveTimer.isTimeup());
@@ -426,51 +439,34 @@ void Client::updateStatus(MQTTSNPacket* packet)
_keepAliveTimer.start(_keepAliveMsec * 1.5); _keepAliveTimer.start(_keepAliveMsec * 1.5);
break; break;
case MQTTSN_DISCONNECT: case MQTTSN_DISCONNECT:
{
uint16_t duration; uint16_t duration;
packet->getDISCONNECT(&duration); packet->getDISCONNECT(&duration);
if (duration) if (duration)
{ {
_status = Cstat_Asleep; _status = Cstat_Asleep;
_keepAliveMsec = duration * 1000UL;
} }
else else
{ {
disconnected(); disconnected();
} }
}
break; break;
default: default:
break; break;
} }
} }
else if (_status == Cstat_Asleep) else if (_status == Cstat_Awake || _status == Cstat_Asleep)
{
if (packet->getType() == MQTTSN_CONNECT)
{
setKeepAlive(packet);
_status = Cstat_Connecting;
}
else if (packet->getType() == MQTTSN_PINGREQ)
{
if ( packet->getPINGREQ() > 0 )
{
_status = Cstat_Awake;
}
}
}
else if (_status == Cstat_Awake)
{ {
switch (packet->getType()) switch (packet->getType())
{ {
case MQTTSN_CONNECT: case MQTTSN_CONNECT:
_status = Cstat_Connecting; _status = Cstat_Active;
setKeepAlive(packet);
break; break;
case MQTTSN_DISCONNECT: case MQTTSN_DISCONNECT:
disconnected(); disconnected();
break; break;
case MQTTSN_PINGREQ:
_status = Cstat_Awake;
break;
case MQTTSN_PINGRESP: case MQTTSN_PINGRESP:
_status = Cstat_Asleep; _status = Cstat_Asleep;
break; break;
@@ -478,6 +474,7 @@ void Client::updateStatus(MQTTSNPacket* packet)
break; break;
} }
} }
DEBUGLOG("Client Status = %s\n", theClientStatus[_status]);
} }
void Client::updateStatus(ClientStatus stat) void Client::updateStatus(ClientStatus stat)
@@ -595,6 +592,11 @@ bool Client::isSleep(void)
return (_status == Cstat_Asleep); return (_status == Cstat_Asleep);
} }
bool Client::isAwake(void)
{
return (_status == Cstat_Awake);
}
bool Client::isSecureNetwork(void) bool Client::isSecureNetwork(void)
{ {
return _secureNetwork; return _secureNetwork;

View File

@@ -43,6 +43,7 @@ public:
_que = new Que<T>; _que = new Que<T>;
} }
~PacketQue() ~PacketQue()
{ {
clear(); clear();
@@ -65,11 +66,14 @@ public:
} }
} }
void post(T* packet) int
post(T* packet)
{ {
int rc;
_mutex.lock(); _mutex.lock();
_que->post(packet); rc = _que->post(packet);
_mutex.unlock(); _mutex.unlock();
return rc;
} }
void pop() void pop()
@@ -93,6 +97,11 @@ public:
_mutex.unlock(); _mutex.unlock();
} }
void setMaxSize(int size)
{
_que->setMaxSize(size);
}
private: private:
Que<T>* _que; Que<T>* _que;
Mutex _mutex; Mutex _mutex;
@@ -232,7 +241,8 @@ public:
Connect* getConnectData(void); Connect* getConnectData(void);
uint16_t getWaitedPubTopicId(uint16_t msgId); uint16_t getWaitedPubTopicId(uint16_t msgId);
uint16_t getWaitedSubTopicId(uint16_t msgId); uint16_t getWaitedSubTopicId(uint16_t msgId);
MQTTSNPacket* getClientSleepPacket(); MQTTGWPacket* getClientSleepPacket(void);
void deleteFirstClientSleepPacket(void);
WaitREGACKPacketList* getWaitREGACKPacketList(void); WaitREGACKPacketList* getWaitREGACKPacketList(void);
void eraseWaitedPubTopicId(uint16_t msgId); void eraseWaitedPubTopicId(uint16_t msgId);
@@ -240,7 +250,7 @@ public:
void clearWaitedPubTopicId(void); void clearWaitedPubTopicId(void);
void clearWaitedSubTopicId(void); void clearWaitedSubTopicId(void);
void setClientSleepPacket(MQTTSNPacket*); int setClientSleepPacket(MQTTGWPacket*);
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
@@ -277,6 +287,7 @@ public:
bool isDisconnect(void); bool isDisconnect(void);
bool isActive(void); bool isActive(void);
bool isSleep(void); bool isSleep(void);
bool isAwake(void);
bool isSecureNetwork(void); bool isSecureNetwork(void);
bool isSensorNetStable(void); bool isSensorNetStable(void);
bool isWaitWillMsg(void); bool isWaitWillMsg(void);
@@ -286,7 +297,7 @@ public:
void setOTAClient(Client* cl); void setOTAClient(Client* cl);
private: private:
PacketQue<MQTTSNPacket> _clientSleepPacketQue; PacketQue<MQTTGWPacket> _clientSleepPacketQue;
WaitREGACKPacketList _waitREGACKList; WaitREGACKPacketList _waitREGACKList;
Topics* _topics; Topics* _topics;

View File

@@ -74,7 +74,18 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
return; return;
} }
/* clear ConnectData of Client */ /* return CONNACK when the client is sleeping */
if ( client->isSleep() || client->isAwake() )
{
MQTTSNPacket* packet = new MQTTSNPacket();
packet->setCONNACK(MQTTSN_RC_ACCEPTED);
Event* ev = new Event();
ev->setClientSendEvent(client, packet);
_gateway->getClientSendQue()->post(ev);
return;
}
//* clear ConnectData of Client */
Connect* connectData = client->getConnectData(); Connect* connectData = client->getConnectData();
memset(connectData, 0, sizeof(Connect)); memset(connectData, 0, sizeof(Connect));
client->disconnected(); client->disconnected();
@@ -125,7 +136,6 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
} }
else else
{ {
/* CONNECT message was not qued in. /* CONNECT message was not qued in.
* create CONNECT message & send it to the broker */ * create CONNECT message & send it to the broker */
MQTTGWPacket* mqMsg = new MQTTGWPacket(); MQTTGWPacket* mqMsg = new MQTTGWPacket();
@@ -259,6 +269,21 @@ void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* pac
*/ */
void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet) void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet)
{ {
MQTTGWPacket* msg = 0;
if ( client->isSleep() || client->isAwake() )
{
while ( ( msg = client->getClientSleepPacket() ) != 0 )
{
// ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
client->deleteFirstClientSleepPacket(); // pop the que to delete element.
Event* ev = new Event();
ev->setBrokerRecvEvent(client, msg);
_gateway->getPacketEventQue()->post(ev);
}
}
/* send PINGREQ to the broker */ /* send PINGREQ to the broker */
MQTTGWPacket* pingreq = new MQTTGWPacket(); MQTTGWPacket* pingreq = new MQTTGWPacket();
pingreq->setHeader(PINGREQ); pingreq->setHeader(PINGREQ);

View File

@@ -19,6 +19,7 @@
namespace MQTTSNGW namespace MQTTSNGW
{ {
#define DEBUG
/*================================= /*=================================
* Config Parametrs * Config Parametrs
==================================*/ ==================================*/
@@ -38,6 +39,7 @@ namespace MQTTSNGW
#define MAX_CLIENTS (100) // Number of Clients can be handled. #define MAX_CLIENTS (100) // Number of Clients can be handled.
#define MAX_CLIENTID_LENGTH (64) // Max length of clientID #define MAX_CLIENTID_LENGTH (64) // Max length of clientID
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen) #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen)
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes

View File

@@ -178,10 +178,8 @@ void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet)
uint16_t msgId; uint16_t msgId;
uint8_t rc; uint8_t rc;
if ( !client->isActive() ) if ( client->isActive() )
{ {
return;
}
MQTTGWPacket* pubAck = new MQTTGWPacket(); MQTTGWPacket* pubAck = new MQTTGWPacket();
if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 ) if ( packet->getPUBACK(&topicId, &msgId, &rc) == 0 )
@@ -201,15 +199,14 @@ void MQTTSNPublishHandler::handlePuback(Client* client, MQTTSNPacket* packet)
WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId); WRITELOG(" PUBACK %d : Invalid Topic ID\n", msgId);
} }
} }
}
void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8_t packetType) void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8_t packetType)
{ {
uint16_t msgId; uint16_t msgId;
if ( !client->isActive() ) if ( client->isActive() )
{ {
return;
}
if ( packet->getACK(&msgId) == 0 ) if ( packet->getACK(&msgId) == 0 )
{ {
return; return;
@@ -220,6 +217,7 @@ void MQTTSNPublishHandler::handleAck(Client* client, MQTTSNPacket* packet, uint8
ev1->setBrokerSendEvent(client, ackPacket); ev1->setBrokerSendEvent(client, ackPacket);
_gateway->getBrokerSendQue()->post(ev1); _gateway->getBrokerSendQue()->post(ev1);
} }
}
void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet) void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
{ {
@@ -229,10 +227,8 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
MQTTSN_topicid topicid; MQTTSN_topicid topicid;
if ( !client->isActive() ) if ( client->isActive() || client->isAwake())
{ {
return;
}
MQTTSNPacket* regAck = new MQTTSNPacket(); MQTTSNPacket* regAck = new MQTTSNPacket();
if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 ) if ( packet->getREGISTER(&id, &msgId, &topicName) == 0 )
{ {
@@ -248,5 +244,6 @@ void MQTTSNPublishHandler::handleRegister(Client* client, MQTTSNPacket* packet)
Event* ev = new Event(); Event* ev = new Event();
ev->setClientSendEvent(client, regAck); ev->setClientSendEvent(client, regAck);
_gateway->getClientSendQue()->post(ev); _gateway->getClientSendQue()->post(ev);
}
} }

View File

@@ -16,8 +16,10 @@
#ifndef MQTTSNGATEWAY_SRC_LINUX_TIMER_H_ #ifndef MQTTSNGATEWAY_SRC_LINUX_TIMER_H_
#define MQTTSNGATEWAY_SRC_LINUX_TIMER_H_ #define MQTTSNGATEWAY_SRC_LINUX_TIMER_H_
#include <stdint.h>
#include <sys/time.h>
#include "MQTTSNGWDefines.h" #include "MQTTSNGWDefines.h"
#include <time.h>
namespace MQTTSNGW namespace MQTTSNGW
{ {
/*========================================================== /*==========================================================

View File

@@ -21,7 +21,6 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netdb.h> #include <netdb.h>
#include <string.h> #include <string.h>
#include <errno.h>
#include <regex> #include <regex>
#include <string> #include <string>
#include <stdlib.h> #include <stdlib.h>

View File

@@ -25,7 +25,6 @@
#include <netdb.h> #include <netdb.h>
#include <string.h> #include <string.h>
#include <iostream> #include <iostream>
#include <errno.h>
#include <regex> #include <regex>
#include <string> #include <string>
#include <stdlib.h> #include <stdlib.h>

View File

@@ -18,10 +18,8 @@
#include <unistd.h> #include <unistd.h>
#include <termios.h> #include <termios.h>
#include <fcntl.h> #include <fcntl.h>
#include <errno.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include "SensorNetwork.h" #include "SensorNetwork.h"
#include "MQTTSNGWProcess.h" #include "MQTTSNGWProcess.h"