BugFix Adapters

PINGREQ and PINGRESP
Add Error message when the Aggregate GW receives PUBLISH from the
broker.

Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2020-02-07 18:21:48 +09:00
parent 6e3c53ec83
commit 1715d5d47c
13 changed files with 47 additions and 27 deletions

View File

@@ -1,7 +1,8 @@
# MQTT-SN Transparent / Aggrigating Gateway # MQTT-SN Transparent / Aggrigating Gateway
**MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP). **MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP).
This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf. This Gateway can run as a transparent or aggrigating Gateway by specifying the gateway.conf.
The Transparent Gateway can not receive PUBLISH message from the broker at this version.
### **step1. Build the gateway** ### **step1. Build the gateway**
```` ````
@@ -37,7 +38,7 @@ $ ./MQTT-SNGateway [-f Config file name]
# config file of MQTT-SN Gateway # config file of MQTT-SN Gateway
# #
BrokerName=iot.eclipse.org BrokerName=mqtt.eclipse.org
BrokerPortNo=1883 BrokerPortNo=1883
BrokerSecurePortNo=8883 BrokerSecurePortNo=8883

View File

@@ -272,8 +272,8 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket*
Publish pub; Publish pub;
packet->getPUBLISH(&pub); packet->getPUBLISH(&pub);
WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), // Start of temporary code
RIGHTARROW, client->getClientId(), "is sleeping. a message was saved."); WRITELOG("%s MQTTGWPublishHandler::handleAggregatePublish Aggregater can't handle a PUBLISH from the broker at the current version.%s\n", ERRMSG_HEADER,ERRMSG_FOOTER);
if (pub.header.bits.qos == 1) if (pub.header.bits.qos == 1)
{ {
@@ -283,11 +283,12 @@ void MQTTGWPublishHandler::handleAggregatePublish(Client* client, MQTTGWPacket*
{ {
replyACK(client, &pub, PUBREC); replyACK(client, &pub, PUBREC);
} }
// End of temporary code
string* topicName = new string(pub.topic, pub.topiclen); string* topicName = new string(pub.topic, pub.topiclen);
Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); Topic topic = Topic(topicName, MQTTSN_TOPIC_TYPE_NORMAL); // topic deletes topicName when the topic is deleted
AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic); AggregateTopicElement* list = _gateway->getAdapterManager()->createClientList(&topic);
if ( list != nullptr ) if ( list != nullptr )
{ {

View File

@@ -126,6 +126,11 @@ bool Adapter::isSecure(SensorNetAddress* addr)
} }
} }
bool Adapter::isSecure(void)
{
return _isSecure;
}
void Adapter::setClient(Client* client, bool secure) void Adapter::setClient(Client* client, bool secure)
{ {
if ( secure ) if ( secure )
@@ -241,10 +246,10 @@ void Proxy::checkConnection(Client* client)
if ( client->isDisconnect() || ( client->isConnecting() && _responseTimer.isTimeup()) ) if ( client->isDisconnect() || ( client->isConnecting() && _responseTimer.isTimeup()) )
{ {
client->connectSended(); client->connectSended();
_responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL); _responseTimer.start(PROXY_RESPONSE_DURATION * 1000UL);
MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer; MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer;
options.clientID.cstring = client->getClientId(); options.clientID.cstring = client->getClientId();
options.duration = QOSM1_PROXY_KEEPALIVE_DURATION; options.duration = PROXY_KEEPALIVE_DURATION;
MQTTSNPacket* packet = new MQTTSNPacket(); MQTTSNPacket* packet = new MQTTSNPacket();
packet->setCONNECT(&options); packet->setCONNECT(&options);
@@ -260,10 +265,10 @@ void Proxy::checkConnection(Client* client)
Event* ev = new Event(); Event* ev = new Event();
ev->setClientRecvEvent(client, packet); ev->setClientRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev); _gateway->getPacketEventQue()->post(ev);
_responseTimer.start(QOSM1_PROXY_RESPONSE_DURATION * 1000UL); _responseTimer.start(PROXY_RESPONSE_DURATION * 1000UL);
_isWaitingResp = true; _isWaitingResp = true;
if ( ++_retryCnt > QOSM1_PROXY_MAX_RETRY_CNT ) if ( ++_retryCnt > PROXY_MAX_RETRY_CNT )
{ {
client->disconnected(); client->disconnected();
} }
@@ -274,7 +279,7 @@ void Proxy::checkConnection(Client* client)
void Proxy::resetPingTimer(void) void Proxy::resetPingTimer(void)
{ {
_keepAliveTimer.start(QOSM1_PROXY_KEEPALIVE_DURATION * 1000UL); _keepAliveTimer.start(PROXY_KEEPALIVE_DURATION * 1000UL);
} }
void Proxy::recv(MQTTSNPacket* packet, Client* client) void Proxy::recv(MQTTSNPacket* packet, Client* client)

View File

@@ -56,6 +56,7 @@ public:
void send(MQTTSNPacket* packet, Client* client); void send(MQTTSNPacket* packet, Client* client);
bool isActive(void); bool isActive(void);
bool isSecure(SensorNetAddress* addr); bool isSecure(SensorNetAddress* addr);
bool isSecure(void);
void savePacket(Client* client, MQTTSNPacket* packet); void savePacket(Client* client, MQTTSNPacket* packet);
private: private:

View File

@@ -101,12 +101,18 @@ Client* AdapterManager::getClient(Client& client)
_qosm1Proxy->resetPingTimer(secure); _qosm1Proxy->resetPingTimer(secure);
} }
else if ( client.isAggregated() ) else if ( client.isAggregated() )
{ {
newClient = _aggregater->getAdapterClient(&client); newClient = _aggregater->getAdapterClient(&client);
_aggregater->resetPingTimer(secure); _aggregater->resetPingTimer(secure);
} }
else if ( client.isQoSm1Proxy() )
{
_qosm1Proxy->resetPingTimer(secure);
}
else if ( client.isAggregater() )
{
_aggregater->resetPingTimer(secure);
}
return newClient; return newClient;
} }

View File

@@ -44,10 +44,11 @@ void Aggregater::initialize(void)
{ {
if (!strcasecmp(param, "YES") ) if (!strcasecmp(param, "YES") )
{ {
/* Create Aggregated Clients */ /* Create Aggregated Clients from clients.conf */
_gateway->getClientList()->setClientList(AGGREGATER_TYPE); _gateway->getClientList()->setClientList(AGGREGATER_TYPE);
string name = _gateway->getGWParams()->gatewayName; /* Create Aggregater Client */
string name = string(_gateway->getGWParams()->gatewayName) + "_Aggregater";
setup(name.c_str(), Atype_Aggregater); setup(name.c_str(), Atype_Aggregater);
_isActive = true; _isActive = true;
} }
@@ -64,6 +65,7 @@ bool Aggregater::isActive(void)
uint16_t Aggregater::msgId(void) uint16_t Aggregater::msgId(void)
{ {
// Only SecureClient generates msgId to avoid duplication of msgId. Client does not generate it.
return Adapter::getSecureClient()->getNextPacketId(); return Adapter::getSecureClient()->getNextPacketId();
} }

View File

@@ -127,7 +127,10 @@ void BrokerSendTask::run()
{ {
WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n", WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n",
ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER); ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER);
client->getNetwork()->close(); if ( errno != EBADF )
{
client->getNetwork()->close();
}
/* Disconnect the client */ /* Disconnect the client */
packet = new MQTTGWPacket(); packet = new MQTTGWPacket();

View File

@@ -118,7 +118,7 @@ void ClientList::setPredefinedTopics(bool aggrecate)
* File format is: * File format is:
* Lines bigning with # are comment line. * Lines bigning with # are comment line.
* ClientId, SensorNetAddress, "unstableLine", "secureConnection" * ClientId, SensorNetAddress, "unstableLine", "secureConnection"
* in case of UDP, SensorNetAddress format is portNo@IPAddress. * in case of UDP, SensorNetAddress format is IPAddress:portNo.
* if the SensorNetwork is not stable, write unstableLine. * if the SensorNetwork is not stable, write unstableLine.
* if BrokerConnection is SSL, write secureConnection. * if BrokerConnection is SSL, write secureConnection.
* if the client send PUBLISH QoS-1, QoS-1 is required. * if the client send PUBLISH QoS-1, QoS-1 is required.
@@ -380,7 +380,7 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId,
} }
else else
{ {
MQTTSNString dummyId MQTTSNString_initializer;; MQTTSNString dummyId MQTTSNString_initializer;
dummyId.cstring = strdup(""); dummyId.cstring = strdup("");
client->setClientId(dummyId); client->setClientId(dummyId);
free(dummyId.cstring); free(dummyId.cstring);

View File

@@ -42,13 +42,14 @@ namespace MQTTSNGW
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
#define MAX_MESSAGEID_TABLE_SIZE (500) // Number of MessageIdTable size #define MAX_MESSAGEID_TABLE_SIZE (500) // Number of MessageIdTable size
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state #define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256 #define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation) #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation)
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes
#define QOSM1_PROXY_KEEPALIVE_DURATION 900 // Secs #define PROXY_KEEPALIVE_DURATION (900) // Secs
#define QOSM1_PROXY_RESPONSE_DURATION 10 // Secs #define PROXY_RESPONSE_DURATION (10) // Secs
#define QOSM1_PROXY_MAX_RETRY_CNT 3 #define PROXY_MAX_RETRY_CNT (3)
/*================================= /*=================================
* Data Type * Data Type
==================================*/ ==================================*/

View File

@@ -54,6 +54,7 @@ void ForwarderList::initialize(Gateway* gw)
{ {
if (!strcasecmp(param, "YES") ) if (!strcasecmp(param, "YES") )
{ {
/* Create Fowarders from clients.conf */
gw->getClientList()->setClientList(FORWARDER_TYPE); gw->getClientList()->setClientList(FORWARDER_TYPE);
} }
} }

View File

@@ -148,7 +148,7 @@ void PacketHandleTask::run()
if ( adpMgr->isAggregatedClient(client) ) if ( adpMgr->isAggregatedClient(client) )
{ {
aggregatePacketHandler(client, snPacket); aggregatePacketHandler(client, snPacket); // client is converted to Aggregater by BrokerSendTask
} }
else else
{ {

View File

@@ -51,11 +51,11 @@ void QoSm1Proxy::initialize(void)
{ {
if (strcasecmp(param, "YES") == 0 ) if (strcasecmp(param, "YES") == 0 )
{ {
/* Create QoS-1 Clients */ /* Create QoS-1 Clients from clients.conf */
_gateway->getClientList()->setClientList(QOSM1PROXY_TYPE); _gateway->getClientList()->setClientList(QOSM1PROXY_TYPE);
/* initialize Adapter */ /* Create a client for QoS-1 proxy */
string name = string(_gateway->getGWParams()->gatewayName) + "QoS-1"; string name = string(_gateway->getGWParams()->gatewayName) + "_QoS-1";
setup(name.c_str(), Atype_QoSm1Proxy); setup(name.c_str(), Atype_QoSm1Proxy);
_isActive = true; _isActive = true;
} }

View File

@@ -223,7 +223,6 @@ void MQTTSNSubscribeHandler::handleAggregateSubscribe(Client* client, MQTTSNPack
WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s MQTTSNSubscribeHandler can't create MessageIdTableElement %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
return; return;
} }
WRITELOG("msgId=%d\n",msgId);
subscribe->setMsgId(msgId); subscribe->setMsgId(msgId);
Event* ev = new Event(); Event* ev = new Event();
ev->setBrokerSendEvent(client, subscribe); ev->setBrokerSendEvent(client, subscribe);