diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md index 9444a3a..3bae83e 100644 --- a/MQTTSNGateway/README.md +++ b/MQTTSNGateway/README.md @@ -31,16 +31,17 @@ BrokerPortNo=1883 BrokerSecurePortNo=8883 ClientAuthentication=NO -ClientsList=/path/to/your_clients.conf +#ClientsList=/path/to/your_clients.conf PredefinedTopic=NO -PredefinedTopicList=/path/to/your_predefinedTopic.conf +#PredefinedTopicList=/path/to/your_predefinedTopic.conf Forwarder=NO -ForwardersList=/home/tomoaki/tmp/forwarders.conf +#ForwardersList=/home/tomoaki/tmp/forwarders.conf QoS-1=NO -QoS-1ClientsList=/path/to/your_qos-1clients.conf +QoS-1ProxyName=Proxy007 +#QoS-1ClientsList=/path/to/your_qos-1clients.conf #RootCAfile=/path/to/your_Root_CA.crt #RootCApath=/path/to/your_certs_directory/ diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf index d3f5fe6..277f7ae 100644 --- a/MQTTSNGateway/gateway.conf +++ b/MQTTSNGateway/gateway.conf @@ -41,6 +41,7 @@ KeepAlive=900 #LoginID=your_ID #Password=your_Password + # UDP GatewayPortNo=10000 MulticastIP=225.1.1.1 diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index a996051..d5397b4 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -128,6 +128,8 @@ void ClientRecvTask::run() } else { + client = 0; + /* when QoSm1Proxy is available, select QoS-1 PUBLISH message */ QoSm1Proxy* pxy = _gateway->getQoSm1Proxy(); if ( pxy ) @@ -152,7 +154,8 @@ void ClientRecvTask::run() } } } - else + + if ( client == 0 ) { /* get client from the ClientList of Gateway by sensorNetAddress. */ client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress()); diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index 7da2109..c15ae45 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -46,6 +46,9 @@ namespace MQTTSNGW #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation) #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes +#define QOSM1_PROXY_KEEPALIVE_DURATION 900 // Secs +#define QOSM1_PROXY_RESPONSE_DURATION 10 // Secs +#define QOSM1_PROXY_MAX_RETRY_CNT 3 /*================================= * Data Type ==================================*/ diff --git a/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.cpp b/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.cpp index 5858655..67e98ce 100644 --- a/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.cpp +++ b/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.cpp @@ -24,10 +24,11 @@ #include #include -using namespace MQTTSNGW; +#define QOSM1_PROXY_KEEPALIVE_DURATION 900 // Secs +#define QOSM1_PROXY_RESPONSE_DURATION 10 // Secs +#define QOSM1_PROXY_MAX_RETRY_CNT 3 -#define KEEPALIVE_DURATION 900 // Secs -#define RESPONSE_DURATION 10 // Secs +using namespace MQTTSNGW; /* * Class ClientProxyElement @@ -58,6 +59,8 @@ QoSm1ProxyElement::~QoSm1ProxyElement(void) QoSm1Proxy:: QoSm1Proxy(void) : _head{0} + , _isWaitingResp{false} + , _retryCnt{0} { _gateway = 0; _client = 0; @@ -65,6 +68,8 @@ QoSm1Proxy:: QoSm1Proxy(void) QoSm1Proxy:: QoSm1Proxy(Gateway* gw) : _head{0} +, _isWaitingResp{false} +, _retryCnt{0} { _gateway = gw; _client = 0; @@ -197,19 +202,18 @@ void QoSm1Proxy::checkConnection(void) if ( _client->isDisconnect() || ( _client->isConnecting() && _responseTimer.isTimeup()) ) { _client->connectSended(); - _responseTimer.start(RESPONSE_DURATION * 1000UL); + _responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL); MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer; options.clientID.cstring = _client->getClientId(); - options.duration = KEEPALIVE_DURATION; + options.duration = QOSM1_PROXY_KEEPALIVE_DURATION; MQTTSNPacket* packet = new MQTTSNPacket(); packet->setCONNECT(&options); Event* ev = new Event(); ev->setClientRecvEvent(_client, packet); _gateway->getPacketEventQue()->post(ev); - } - else if ( _client->isActive() && _keepAliveTimer.isTimeup() ) + else if ( (_client->isActive() && _keepAliveTimer.isTimeup() ) || (_isWaitingResp && _responseTimer.isTimeup() ) ) { MQTTSNPacket* packet = new MQTTSNPacket(); MQTTSNString clientId = MQTTSNString_initializer; @@ -217,13 +221,20 @@ void QoSm1Proxy::checkConnection(void) Event* ev = new Event(); ev->setClientRecvEvent(_client, packet); _gateway->getPacketEventQue()->post(ev); + _responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL); + _isWaitingResp = true; + + if ( ++_retryCnt > QOSM1_PROXY_MAX_RETRY_CNT ) + { + _client->disconnected(); + } resetPingTimer(); } } void QoSm1Proxy::resetPingTimer(void) { - _keepAliveTimer.start(KEEPALIVE_DURATION * 1000UL); + _keepAliveTimer.start(QOSM1_PROXY_KEEPALIVE_DURATION * 1000UL); } void QoSm1Proxy::send(MQTTSNPacket* packet) @@ -231,10 +242,16 @@ void QoSm1Proxy::send(MQTTSNPacket* packet) if ( packet->getType() == MQTTSN_CONNACK ) { resetPingTimer(); + _responseTimer.stop(); + _retryCnt = 0; sendStoredPublish(); } else if ( packet->getType() == MQTTSN_PINGRESP ) { + _responseTimer.stop(); + _isWaitingResp = false; + _retryCnt = 0; + resetPingTimer(); } else if ( packet->getType() == MQTTSN_DISCONNECT ) diff --git a/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.h b/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.h index c5db5e4..9960668 100644 --- a/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.h +++ b/MQTTSNGateway/src/MQTTSNGWQoS-1Proxy.h @@ -68,6 +68,8 @@ private: QoSm1ProxyElement* _head; Timer _keepAliveTimer; Timer _responseTimer; + bool _isWaitingResp; + int _retryCnt; }; }