mirror of
https://github.com/eclipse/paho.mqtt-sn.embedded-c.git
synced 2025-12-16 08:56:51 +01:00
Merge pull request #125 from eclipse/develop
Forwarder Encapsulation are supported.
This commit is contained in:
@@ -82,7 +82,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
|
||||
MQTTSN_topicid topicId;
|
||||
uint16_t id = 0;
|
||||
|
||||
if (pub.topiclen == 2)
|
||||
if (pub.topiclen <= 2)
|
||||
{
|
||||
topicId.type = MQTTSN_TOPIC_TYPE_SHORT;
|
||||
*(topicId.data.short_name) = *pub.topic;
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "MQTTSNGWClient.h"
|
||||
#include "MQTTSNGateway.h"
|
||||
#include "SensorNetwork.h"
|
||||
#include "MQTTSNGWForwarder.h"
|
||||
#include "Network.h"
|
||||
#include <string>
|
||||
#include <string.h>
|
||||
@@ -75,7 +76,7 @@ bool ClientList::authorize(const char* fileName)
|
||||
{
|
||||
FILE* fp;
|
||||
char buf[MAX_CLIENTID_LENGTH + 256];
|
||||
size_t pos;;
|
||||
size_t pos;
|
||||
bool secure;
|
||||
bool stable;
|
||||
SensorNetAddress netAddr;
|
||||
@@ -167,6 +168,11 @@ bool ClientList::setPredefinedTopics(const char* fileName)
|
||||
fclose(fp);
|
||||
rc = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
WRITELOG("Can not open the Predefined Topic List. %s\n", fileName);
|
||||
return false;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
@@ -196,6 +202,11 @@ void ClientList::erase(Client*& client)
|
||||
_endClient = prev;
|
||||
}
|
||||
_clientCnt--;
|
||||
Forwarder* fwd = client->getForwarder();
|
||||
if ( fwd )
|
||||
{
|
||||
fwd->eraseClient(client);
|
||||
}
|
||||
delete client;
|
||||
client = 0;
|
||||
_mutex.unlock();
|
||||
@@ -204,19 +215,22 @@ void ClientList::erase(Client*& client)
|
||||
|
||||
Client* ClientList::getClient(SensorNetAddress* addr)
|
||||
{
|
||||
_mutex.lock();
|
||||
Client* client = _firstClient;
|
||||
if ( addr )
|
||||
{
|
||||
_mutex.lock();
|
||||
Client* client = _firstClient;
|
||||
|
||||
while (client != 0)
|
||||
{
|
||||
if (client->getSensorNetAddress()->isMatch(addr) )
|
||||
{
|
||||
_mutex.unlock();
|
||||
return client;
|
||||
}
|
||||
client = client->_nextClient;
|
||||
}
|
||||
_mutex.unlock();
|
||||
while (client != 0)
|
||||
{
|
||||
if (client->getSensorNetAddress()->isMatch(addr) )
|
||||
{
|
||||
_mutex.unlock();
|
||||
return client;
|
||||
}
|
||||
client = client->_nextClient;
|
||||
}
|
||||
_mutex.unlock();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -400,6 +414,8 @@ Client::Client(bool secure)
|
||||
_nextClient = 0;
|
||||
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
|
||||
_hasPredefTopic = false;
|
||||
_holdPingRequest = false;
|
||||
_forwarder = 0;
|
||||
}
|
||||
|
||||
Client::~Client()
|
||||
@@ -518,6 +534,16 @@ void Client::setKeepAlive(MQTTSNPacket* packet)
|
||||
}
|
||||
}
|
||||
|
||||
void Client::setForwarder(Forwarder* forwarder)
|
||||
{
|
||||
_forwarder = forwarder;
|
||||
}
|
||||
|
||||
Forwarder* Client::getForwarder(void)
|
||||
{
|
||||
return _forwarder;
|
||||
}
|
||||
|
||||
void Client::setSessionStatus(bool status)
|
||||
{
|
||||
_sessionStatus = status;
|
||||
@@ -796,6 +822,21 @@ void Client::setOTAClient(Client* cl)
|
||||
_otaClient =cl;
|
||||
}
|
||||
|
||||
void Client::holdPingRequest(void)
|
||||
{
|
||||
_holdPingRequest = true;
|
||||
}
|
||||
|
||||
void Client::resetPingRequest(void)
|
||||
{
|
||||
_holdPingRequest = false;
|
||||
}
|
||||
|
||||
bool Client::isHoldPringReqest(void)
|
||||
{
|
||||
return _holdPingRequest;
|
||||
}
|
||||
|
||||
/*=====================================
|
||||
Class Topic
|
||||
======================================*/
|
||||
@@ -1214,7 +1255,7 @@ TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId)
|
||||
|
||||
TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
|
||||
{
|
||||
if ( _cnt > _maxInflight * 2 || topicId == 0)
|
||||
if ( _cnt > _maxInflight * 2 || ( topicId == 0 && type != MQTTSN_TOPIC_TYPE_SHORT ) )
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
@@ -1314,6 +1355,7 @@ WaitREGACKPacketList::WaitREGACKPacketList()
|
||||
{
|
||||
_first = 0;
|
||||
_end = 0;
|
||||
_cnt = 0;
|
||||
}
|
||||
|
||||
WaitREGACKPacketList::~WaitREGACKPacketList()
|
||||
@@ -1346,6 +1388,7 @@ int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId)
|
||||
elm->_prev = _end;
|
||||
_end = elm;
|
||||
}
|
||||
_cnt++;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -1387,10 +1430,17 @@ void WaitREGACKPacketList::erase(uint16_t REGACKMsgId)
|
||||
{
|
||||
p->_next->_prev = p->_prev;
|
||||
}
|
||||
break;
|
||||
// Do not delete element. Element is deleted after sending to Client.
|
||||
_cnt--;
|
||||
break;
|
||||
// Do not delete element. Element is deleted after sending to Client.
|
||||
}
|
||||
p = p->_next;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t WaitREGACKPacketList::getCount(void)
|
||||
{
|
||||
return _cnt;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@
|
||||
#include "Network.h"
|
||||
#include "SensorNetwork.h"
|
||||
#include "MQTTSNPacket.h"
|
||||
#include "MQTTSNGWEncapsulatedPacket.h"
|
||||
#include "MQTTSNGWForwarder.h"
|
||||
|
||||
namespace MQTTSNGW
|
||||
{
|
||||
@@ -219,8 +221,10 @@ public:
|
||||
int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
|
||||
MQTTSNPacket* getPacket(uint16_t REGACKMsgId);
|
||||
void erase(uint16_t REGACKMsgId);
|
||||
uint8_t getCount(void);
|
||||
|
||||
private:
|
||||
uint8_t _cnt;
|
||||
waitREGACKPacket* _first;
|
||||
waitREGACKPacket* _end;
|
||||
};
|
||||
@@ -228,13 +232,13 @@ private:
|
||||
/*=====================================
|
||||
Class Client
|
||||
=====================================*/
|
||||
#define MQTTSN_CLIENTID_LENGTH 23
|
||||
|
||||
typedef enum
|
||||
{
|
||||
Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost
|
||||
} ClientStatus;
|
||||
|
||||
class Forwarder;
|
||||
|
||||
class Client
|
||||
{
|
||||
@@ -279,6 +283,9 @@ public:
|
||||
void setClientAddress(SensorNetAddress* sensorNetAddr);
|
||||
void setSensorNetType(bool stable);
|
||||
|
||||
Forwarder* getForwarder(void);
|
||||
void setForwarder(Forwarder* forwader);
|
||||
|
||||
void setClientId(MQTTSNString id);
|
||||
void setWillTopic(MQTTSNString willTopic);
|
||||
void setWillMsg(MQTTSNString willmsg);
|
||||
@@ -298,6 +305,10 @@ public:
|
||||
bool isSensorNetStable(void);
|
||||
bool isWaitWillMsg(void);
|
||||
|
||||
void holdPingRequest(void);
|
||||
void resetPingRequest(void);
|
||||
bool isHoldPringReqest(void);
|
||||
|
||||
Client* getNextClient(void);
|
||||
Client* getOTAClient(void);
|
||||
void setOTAClient(Client* cl);
|
||||
@@ -317,6 +328,8 @@ private:
|
||||
char* _willTopic;
|
||||
char* _willMsg;
|
||||
|
||||
bool _holdPingRequest;
|
||||
|
||||
Timer _keepAliveTimer;
|
||||
uint32_t _keepAliveMsec;
|
||||
|
||||
@@ -331,6 +344,9 @@ private:
|
||||
bool _sensorNetype; // false: unstable network like a G3
|
||||
SensorNetAddress _sensorNetAddr;
|
||||
|
||||
Forwarder* _forwarder;
|
||||
|
||||
|
||||
bool _sessionStatus;
|
||||
bool _hasPredefTopic;
|
||||
|
||||
@@ -365,5 +381,7 @@ private:
|
||||
bool _authorize;
|
||||
};
|
||||
|
||||
|
||||
|
||||
}
|
||||
#endif /* MQTTSNGWCLIENT_H_ */
|
||||
|
||||
@@ -16,7 +16,12 @@
|
||||
|
||||
#include "MQTTSNGWClientRecvTask.h"
|
||||
#include "MQTTSNGateway.h"
|
||||
#include <string.h>
|
||||
#include "MQTTSNPacket.h"
|
||||
#include "MQTTSNGWForwarder.h"
|
||||
#include "MQTTSNGWEncapsulatedPacket.h"
|
||||
#include <cstring>
|
||||
|
||||
using namespace MQTTSNGW;
|
||||
char* currentDateTime(void);
|
||||
/*=====================================
|
||||
Class ClientRecvTask
|
||||
@@ -55,8 +60,12 @@ void ClientRecvTask::run()
|
||||
Client* client = 0;
|
||||
char buf[128];
|
||||
|
||||
|
||||
while (true)
|
||||
{
|
||||
Forwarder* fwd = 0;
|
||||
WirelessNodeId nodeId;
|
||||
|
||||
MQTTSNPacket* packet = new MQTTSNPacket();
|
||||
int packetLen = packet->recv(_sensorNetwork);
|
||||
|
||||
@@ -89,8 +98,38 @@ void ClientRecvTask::run()
|
||||
continue;
|
||||
}
|
||||
|
||||
/* get client from the ClientList of Gateway by sensorNetAddress. */
|
||||
client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress());
|
||||
if ( packet->getType() == MQTTSN_ENCAPSULATED )
|
||||
{
|
||||
fwd = _gateway->getForwarderList()->getForwarder(_sensorNetwork->getSenderAddress());
|
||||
|
||||
if ( fwd == 0 )
|
||||
{
|
||||
log(0, packet);
|
||||
WRITELOG("%s Forwarder %s is not authorized.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
||||
delete packet;
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
MQTTSNString fwdName;
|
||||
fwdName.lenstring.data = const_cast<char *>( fwd->getName() );
|
||||
fwdName.lenstring.len = strlen(fwdName.lenstring.data);
|
||||
log(0, packet, &fwdName);
|
||||
|
||||
MQTTSNGWEncapsulatedPacket encap;
|
||||
encap.desirialize(packet->getPacketData(), packet->getPacketLength());
|
||||
nodeId.setId( encap.getWirelessNodeId() );
|
||||
client = fwd->getClient(&nodeId);
|
||||
delete packet;
|
||||
packet = encap.getMQTTSNPacket();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* get client from the ClientList of Gateway by sensorNetAddress. */
|
||||
client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress());
|
||||
}
|
||||
|
||||
|
||||
if ( client )
|
||||
{
|
||||
@@ -117,27 +156,40 @@ void ClientRecvTask::run()
|
||||
|
||||
client = _gateway->getClientList()->getClient(&data.clientID);
|
||||
|
||||
if ( client )
|
||||
if ( fwd )
|
||||
{
|
||||
/* set SensorNet Address */
|
||||
client->setClientAddress(_sensorNetwork->getSenderAddress());
|
||||
if ( client == 0 )
|
||||
{
|
||||
/* create a new client */
|
||||
client = _gateway->getClientList()->createClient(0, &data.clientID, false, false);
|
||||
}
|
||||
/* Add to af forwarded client list of forwarder. */
|
||||
fwd->addClient(client, &nodeId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* create a client */
|
||||
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
|
||||
if ( client )
|
||||
{
|
||||
/* Client exists. Set SensorNet Address of it. */
|
||||
client->setClientAddress(_sensorNetwork->getSenderAddress());
|
||||
}
|
||||
else
|
||||
{
|
||||
/* create a new client */
|
||||
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
log(client, packet, &data.clientID);
|
||||
|
||||
if (!client)
|
||||
{
|
||||
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
||||
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
||||
delete packet;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* set sensorNetAddress & post Event */
|
||||
/* post Client RecvEvent */
|
||||
ev = new Event();
|
||||
ev->setClientRecvEvent(client, packet);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
@@ -145,16 +197,18 @@ void ClientRecvTask::run()
|
||||
else
|
||||
{
|
||||
log(client, packet);
|
||||
delete packet;
|
||||
/* Send DISCONNECT */
|
||||
SensorNetAddress* addr = new SensorNetAddress();
|
||||
*addr = (*_sensorNetwork->getSenderAddress());
|
||||
packet = new MQTTSNPacket();
|
||||
packet->setDISCONNECT(0);
|
||||
ev = new Event();
|
||||
ev->setClientSendEvent(addr, packet);
|
||||
_gateway->getClientSendQue()->post(ev);
|
||||
continue;
|
||||
WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
||||
delete packet;
|
||||
|
||||
/* Send DISCONNECT */
|
||||
if ( fwd == 0 )
|
||||
{
|
||||
packet = new MQTTSNPacket();
|
||||
packet->setDISCONNECT(0);
|
||||
ev = new Event();
|
||||
ev->setClientSendEvent(_sensorNetwork->getSenderAddress(), packet);
|
||||
_gateway->getClientSendQue()->post(ev);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -211,6 +265,9 @@ void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id)
|
||||
case MQTTSN_PUBCOMP:
|
||||
WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
|
||||
break;
|
||||
case MQTTSN_ENCAPSULATED:
|
||||
WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
|
||||
break;
|
||||
default:
|
||||
WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
|
||||
break;
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include "MQTTSNGWPacket.h"
|
||||
#include "MQTTSNGWPacket.h"
|
||||
#include "MQTTSNGateway.h"
|
||||
#include "MQTTSNGWEncapsulatedPacket.h"
|
||||
|
||||
using namespace MQTTSNGW;
|
||||
using namespace std;
|
||||
@@ -56,8 +57,22 @@ void ClientSendTask::run()
|
||||
{
|
||||
client = ev->getClient();
|
||||
packet = ev->getMQTTSNPacket();
|
||||
log(client, packet);
|
||||
rc = packet->unicast(_sensorNetwork, client->getSensorNetAddress());
|
||||
Forwarder* fwd = client->getForwarder();
|
||||
|
||||
if ( fwd )
|
||||
{
|
||||
MQTTSNGWEncapsulatedPacket encap(packet);
|
||||
WirelessNodeId* wnId = fwd->getWirelessNodeId(client);
|
||||
encap.setWirelessNodeId(wnId);
|
||||
log(fwd, &encap);
|
||||
log(client, packet);
|
||||
rc = encap.unicast(_sensorNetwork,fwd->getSensorNetAddr());
|
||||
}
|
||||
else
|
||||
{
|
||||
log(client, packet);
|
||||
rc = packet->unicast(_sensorNetwork, client->getSensorNetAddress());
|
||||
}
|
||||
}
|
||||
else if (ev->getEventType() == EtBroadcast)
|
||||
{
|
||||
@@ -119,3 +134,10 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ClientSendTask::log(Forwarder* forwarder, MQTTSNGWEncapsulatedPacket* packet)
|
||||
{
|
||||
char pbuf[SIZE_OF_LOG_PACKET * 3];
|
||||
const char* forwarderId = forwarder->getId();
|
||||
WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, forwarderId, packet->print(pbuf));
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ public:
|
||||
|
||||
private:
|
||||
void log(Client*, MQTTSNPacket*);
|
||||
void log(Forwarder*, MQTTSNGWEncapsulatedPacket*);
|
||||
|
||||
Gateway* _gateway;
|
||||
SensorNetwork* _sensorNetwork;
|
||||
|
||||
@@ -82,6 +82,8 @@ void MQTTSNConnectionHandler::handleConnect(Client* client, MQTTSNPacket* packet
|
||||
Event* ev = new Event();
|
||||
ev->setClientSendEvent(client, packet);
|
||||
_gateway->getClientSendQue()->post(ev);
|
||||
|
||||
sendStoredPublish(client);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -216,7 +218,6 @@ void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet
|
||||
void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet)
|
||||
{
|
||||
uint16_t duration = 0;
|
||||
Event* ev = new Event();
|
||||
|
||||
if ( packet->getDISCONNECT(&duration) != 0 )
|
||||
{
|
||||
@@ -224,7 +225,7 @@ void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* pac
|
||||
{
|
||||
MQTTGWPacket* mqMsg = new MQTTGWPacket();
|
||||
mqMsg->setHeader(DISCONNECT);
|
||||
ev = new Event();
|
||||
Event* ev = new Event();
|
||||
ev->setBrokerSendEvent(client, mqMsg);
|
||||
_gateway->getBrokerSendQue()->post(ev);
|
||||
}
|
||||
@@ -268,25 +269,34 @@ void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* pac
|
||||
*/
|
||||
void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet)
|
||||
{
|
||||
MQTTGWPacket* msg = 0;
|
||||
|
||||
if ( client->isSleep() || client->isAwake() )
|
||||
if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() )
|
||||
{
|
||||
while ( ( msg = client->getClientSleepPacket() ) != 0 )
|
||||
{
|
||||
// ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
|
||||
client->deleteFirstClientSleepPacket(); // pop the que to delete element.
|
||||
|
||||
Event* ev = new Event();
|
||||
ev->setBrokerRecvEvent(client, msg);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
}
|
||||
sendStoredPublish(client);
|
||||
client->holdPingRequest();
|
||||
}
|
||||
else
|
||||
{
|
||||
/* send PINGREQ to the broker */
|
||||
client->resetPingRequest();
|
||||
MQTTGWPacket* pingreq = new MQTTGWPacket();
|
||||
pingreq->setHeader(PINGREQ);
|
||||
Event* evt = new Event();
|
||||
evt->setBrokerSendEvent(client, pingreq);
|
||||
_gateway->getBrokerSendQue()->post(evt);
|
||||
}
|
||||
|
||||
/* send PINGREQ to the broker */
|
||||
MQTTGWPacket* pingreq = new MQTTGWPacket();
|
||||
pingreq->setHeader(PINGREQ);
|
||||
Event* evt = new Event();
|
||||
evt->setBrokerSendEvent(client, pingreq);
|
||||
_gateway->getBrokerSendQue()->post(evt);
|
||||
}
|
||||
|
||||
void MQTTSNConnectionHandler::sendStoredPublish(Client* client)
|
||||
{
|
||||
MQTTGWPacket* msg = 0;
|
||||
|
||||
while ( ( msg = client->getClientSleepPacket() ) != 0 )
|
||||
{
|
||||
// ToDo: This version can't re-send PUBLISH when PUBACK is not returned.
|
||||
client->deleteFirstClientSleepPacket(); // pop the que to delete element.
|
||||
|
||||
Event* ev = new Event();
|
||||
ev->setBrokerRecvEvent(client, msg);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,8 @@ public:
|
||||
void handleWillmsgupd(Client* client, MQTTSNPacket* packet);
|
||||
void handlePingreq(Client* client, MQTTSNPacket* packet);
|
||||
private:
|
||||
void sendStoredPublish(Client* client);
|
||||
|
||||
char _pbuf[MQTTSNGW_MAX_PACKET_SIZE * 3];
|
||||
Gateway* _gateway;
|
||||
};
|
||||
|
||||
@@ -26,6 +26,7 @@ namespace MQTTSNGW
|
||||
#define CONFIG_FILE "gateway.conf"
|
||||
#define CLIENT_LIST "clients.conf"
|
||||
#define PREDEFINEDTOPIC_FILE "predefinedTopic.conf"
|
||||
#define FORWARDER_LIST "forwarders.conf"
|
||||
|
||||
/*==========================================================
|
||||
* Gateway default parameters
|
||||
@@ -41,7 +42,7 @@ namespace MQTTSNGW
|
||||
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
|
||||
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
|
||||
#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
|
||||
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen)
|
||||
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation)
|
||||
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes
|
||||
|
||||
/*=================================
|
||||
|
||||
179
MQTTSNGateway/src/MQTTSNGWEncapsulatedPacket.cpp
Normal file
179
MQTTSNGateway/src/MQTTSNGWEncapsulatedPacket.cpp
Normal file
@@ -0,0 +1,179 @@
|
||||
/**************************************************************************************
|
||||
* Copyright (c) 2018, Tomoaki Yamaguchi
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
|
||||
**************************************************************************************/
|
||||
#include "MQTTSNGWPacket.h"
|
||||
#include "MQTTSNGWEncapsulatedPacket.h"
|
||||
#include "MQTTSNPacket.h"
|
||||
#include <string.h>
|
||||
|
||||
using namespace MQTTSNGW;
|
||||
using namespace std;
|
||||
|
||||
WirelessNodeId::WirelessNodeId()
|
||||
:
|
||||
_len{0},
|
||||
_nodeId{0}
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
WirelessNodeId::~WirelessNodeId()
|
||||
{
|
||||
if ( _nodeId )
|
||||
{
|
||||
free(_nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
void WirelessNodeId::setId(uint8_t* id, uint8_t len)
|
||||
{
|
||||
if ( _nodeId )
|
||||
{
|
||||
free(_nodeId);
|
||||
}
|
||||
uint8_t* buf = (uint8_t*)malloc(len);
|
||||
if ( buf )
|
||||
{
|
||||
memcpy(buf, id, len);
|
||||
_len = len;
|
||||
_nodeId = buf;
|
||||
}
|
||||
else
|
||||
{
|
||||
_nodeId = 0;
|
||||
_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void WirelessNodeId::setId(WirelessNodeId* id)
|
||||
{
|
||||
setId(id->_nodeId, id->_len);
|
||||
}
|
||||
|
||||
bool WirelessNodeId::operator ==(WirelessNodeId& id)
|
||||
{
|
||||
if ( _len == id._len )
|
||||
{
|
||||
return memcmp(_nodeId, id._nodeId, _len) == 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Class MQTTSNGWEncapsulatedPacket
|
||||
*/
|
||||
MQTTSNGWEncapsulatedPacket::MQTTSNGWEncapsulatedPacket()
|
||||
: _mqttsn{0},
|
||||
_ctrl{0}
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
MQTTSNGWEncapsulatedPacket::MQTTSNGWEncapsulatedPacket(MQTTSNPacket* packet)
|
||||
: _mqttsn{packet},
|
||||
_ctrl{0}
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
MQTTSNGWEncapsulatedPacket::~MQTTSNGWEncapsulatedPacket()
|
||||
{
|
||||
/* Do not delete the MQTTSNPacket. MQTTSNPacket is deleted by delete Event */
|
||||
}
|
||||
|
||||
int MQTTSNGWEncapsulatedPacket::unicast(SensorNetwork* network, SensorNetAddress* sendTo)
|
||||
{
|
||||
uint8_t buf[MQTTSNGW_MAX_PACKET_SIZE];
|
||||
int len = serialize(buf);
|
||||
return network->unicast(buf, len, sendTo);
|
||||
}
|
||||
|
||||
int MQTTSNGWEncapsulatedPacket::serialize(uint8_t* buf)
|
||||
{
|
||||
int len = 0;
|
||||
buf[0] = _id._len + 3;
|
||||
buf[1] = MQTTSN_ENCAPSULATED;
|
||||
buf[2] = _ctrl;
|
||||
memcpy( buf + 3, _id._nodeId, _id._len);
|
||||
if ( _mqttsn )
|
||||
{
|
||||
len = _mqttsn->getPacketLength();
|
||||
memcpy(buf + buf[0], _mqttsn->getPacketData(), len);
|
||||
}
|
||||
return buf[0] + len;
|
||||
}
|
||||
|
||||
int MQTTSNGWEncapsulatedPacket::desirialize(unsigned char* buf, unsigned short len)
|
||||
{
|
||||
if ( _mqttsn )
|
||||
{
|
||||
delete _mqttsn;
|
||||
_mqttsn = 0;
|
||||
}
|
||||
|
||||
_ctrl = buf[2];
|
||||
_id.setId(buf + 3, buf[0] - 3);
|
||||
|
||||
_mqttsn = new MQTTSNPacket;
|
||||
_mqttsn->desirialize(buf + buf[0], len - buf[0]);
|
||||
return buf[0];
|
||||
}
|
||||
|
||||
int MQTTSNGWEncapsulatedPacket::getType(void)
|
||||
{
|
||||
return MQTTSN_ENCAPSULATED;
|
||||
}
|
||||
|
||||
const char* MQTTSNGWEncapsulatedPacket::getName()
|
||||
{
|
||||
return MQTTSNPacket_name(MQTTSN_ENCAPSULATED);
|
||||
}
|
||||
|
||||
MQTTSNPacket* MQTTSNGWEncapsulatedPacket::getMQTTSNPacket(void)
|
||||
{
|
||||
return _mqttsn;
|
||||
}
|
||||
|
||||
WirelessNodeId* MQTTSNGWEncapsulatedPacket::getWirelessNodeId(void)
|
||||
{
|
||||
return &_id;
|
||||
}
|
||||
|
||||
void MQTTSNGWEncapsulatedPacket::setWirelessNodeId(WirelessNodeId* id)
|
||||
{
|
||||
_id.setId(id);
|
||||
}
|
||||
|
||||
char* MQTTSNGWEncapsulatedPacket::print(char* pbuf)
|
||||
{
|
||||
char* ptr = pbuf;
|
||||
char** pptr = &pbuf;
|
||||
|
||||
uint8_t buf[MQTTSNGW_MAX_PACKET_SIZE];
|
||||
int len = serialize(buf);
|
||||
int size = len > SIZE_OF_LOG_PACKET ? SIZE_OF_LOG_PACKET : len;
|
||||
|
||||
for (int i = 1; i < size; i++)
|
||||
{
|
||||
sprintf(*pptr, " %02X", *(buf + i));
|
||||
*pptr += 3;
|
||||
}
|
||||
**pptr = 0;
|
||||
return ptr;
|
||||
}
|
||||
|
||||
65
MQTTSNGateway/src/MQTTSNGWEncapsulatedPacket.h
Normal file
65
MQTTSNGateway/src/MQTTSNGWEncapsulatedPacket.h
Normal file
@@ -0,0 +1,65 @@
|
||||
/**************************************************************************************
|
||||
* Copyright (c) 2018, Tomoaki Yamaguchi
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
|
||||
**************************************************************************************/
|
||||
|
||||
#ifndef MQTTSNGATEWAY_SRC_MQTTSNGWENCAPSULATEDPACKET_H_
|
||||
#define MQTTSNGATEWAY_SRC_MQTTSNGWENCAPSULATEDPACKET_H_
|
||||
|
||||
namespace MQTTSNGW
|
||||
{
|
||||
|
||||
class WirelessNodeId
|
||||
{
|
||||
friend class MQTTSNGWEncapsulatedPacket;
|
||||
public:
|
||||
WirelessNodeId();
|
||||
~WirelessNodeId();
|
||||
void setId(uint8_t* id, uint8_t len);
|
||||
void setId(WirelessNodeId* id);
|
||||
bool operator ==(WirelessNodeId& id);
|
||||
private:
|
||||
uint8_t _len;
|
||||
uint8_t* _nodeId;
|
||||
};
|
||||
|
||||
class MQTTSNGWEncapsulatedPacket
|
||||
{
|
||||
public:
|
||||
MQTTSNGWEncapsulatedPacket();
|
||||
MQTTSNGWEncapsulatedPacket(MQTTSNPacket* packet);
|
||||
~MQTTSNGWEncapsulatedPacket();
|
||||
int unicast(SensorNetwork* network, SensorNetAddress* sendTo);
|
||||
int serialize(uint8_t* buf);
|
||||
int desirialize(unsigned char* buf, unsigned short len);
|
||||
int getType(void);
|
||||
unsigned char* getPacketData(void);
|
||||
int getPacketLength(void);
|
||||
const char* getName();
|
||||
MQTTSNPacket* getMQTTSNPacket(void);
|
||||
void setWirelessNodeId(WirelessNodeId* id);
|
||||
WirelessNodeId* getWirelessNodeId(void);
|
||||
char* print(char* buf);
|
||||
|
||||
private:
|
||||
MQTTSNPacket* _mqttsn;
|
||||
WirelessNodeId _id;
|
||||
uint8_t _ctrl;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
#endif /* MQTTSNGATEWAY_SRC_MQTTSNGWENCAPSULATEDPACKET_H_ */
|
||||
321
MQTTSNGateway/src/MQTTSNGWForwarder.cpp
Normal file
321
MQTTSNGateway/src/MQTTSNGWForwarder.cpp
Normal file
@@ -0,0 +1,321 @@
|
||||
/**************************************************************************************
|
||||
* Copyright (c) 2018, Tomoaki Yamaguchi
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
|
||||
**************************************************************************************/
|
||||
#include <string.h>
|
||||
#include "MQTTSNGWForwarder.h"
|
||||
|
||||
using namespace MQTTSNGW;
|
||||
using namespace std;
|
||||
|
||||
/*
|
||||
* Class ForwarderList
|
||||
*/
|
||||
|
||||
ForwarderList::ForwarderList()
|
||||
{
|
||||
_head = 0;
|
||||
}
|
||||
|
||||
ForwarderList::~ForwarderList()
|
||||
{
|
||||
if ( _head )
|
||||
{
|
||||
Forwarder* p = _head;
|
||||
while ( p )
|
||||
{
|
||||
Forwarder* next = p->_next;
|
||||
delete p;
|
||||
p = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Forwarder* ForwarderList::getForwarder(SensorNetAddress* addr)
|
||||
{
|
||||
Forwarder* p = _head;
|
||||
while ( p )
|
||||
{
|
||||
if ( p->_sensorNetAddr.isMatch(addr) )
|
||||
{
|
||||
break;
|
||||
}
|
||||
p = p->_next;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
bool ForwarderList::setFowerder(const char* fileName)
|
||||
{
|
||||
FILE* fp;
|
||||
char buf[MAX_CLIENTID_LENGTH + 256];
|
||||
size_t pos;
|
||||
|
||||
SensorNetAddress netAddr;
|
||||
|
||||
if ((fp = fopen(fileName, "r")) != 0)
|
||||
{
|
||||
while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
|
||||
{
|
||||
if (*buf == '#')
|
||||
{
|
||||
continue;
|
||||
}
|
||||
string data = string(buf);
|
||||
while ((pos = data.find_first_of(" \t\n")) != string::npos)
|
||||
{
|
||||
data.erase(pos, 1);
|
||||
}
|
||||
if (data.empty())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
pos = data.find_first_of(",");
|
||||
string id = data.substr(0, pos);
|
||||
string addr = data.substr(pos + 1);
|
||||
|
||||
if (netAddr.setAddress(&addr) == 0)
|
||||
{
|
||||
addForwarder(&netAddr, &id);
|
||||
}
|
||||
else
|
||||
{
|
||||
WRITELOG("Invalid address %s\n", data.c_str());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
fclose(fp);
|
||||
}
|
||||
else
|
||||
{
|
||||
WRITELOG("Can not open the forwarders List. %s\n", fileName);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Forwarder* ForwarderList::addForwarder(SensorNetAddress* addr, string* forwarderId)
|
||||
{
|
||||
Forwarder* fdr = new Forwarder(addr, forwarderId);
|
||||
if ( _head == 0 )
|
||||
{
|
||||
_head = fdr;
|
||||
}
|
||||
else
|
||||
{
|
||||
Forwarder* p = _head;
|
||||
while ( p )
|
||||
{
|
||||
if ( p->_next == 0 )
|
||||
{
|
||||
p->_next = fdr;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
p = p->_next;
|
||||
}
|
||||
}
|
||||
}
|
||||
return fdr;
|
||||
}
|
||||
|
||||
Forwarder::Forwarder()
|
||||
{
|
||||
_headClient = 0;
|
||||
_next = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class Forwarder
|
||||
*/
|
||||
|
||||
Forwarder::Forwarder(SensorNetAddress* addr, string* forwarderId)
|
||||
{
|
||||
_forwarderName = *forwarderId;
|
||||
_sensorNetAddr = *addr;
|
||||
_headClient = 0;
|
||||
_next = 0;
|
||||
}
|
||||
|
||||
Forwarder::~Forwarder(void)
|
||||
{
|
||||
if ( _headClient )
|
||||
{
|
||||
ForwardedClient* p = _headClient;
|
||||
while ( p )
|
||||
{
|
||||
ForwardedClient* next = p->_next;
|
||||
delete p;
|
||||
p = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const char* Forwarder::getId(void)
|
||||
{
|
||||
return _forwarderName.c_str();
|
||||
}
|
||||
|
||||
void Forwarder::addClient(Client* client, WirelessNodeId* id)
|
||||
{
|
||||
ForwardedClient* p = _headClient;
|
||||
ForwardedClient* prev = 0;
|
||||
|
||||
client->setForwarder(this);
|
||||
|
||||
if ( p != 0 )
|
||||
{
|
||||
while ( p )
|
||||
{
|
||||
if ( p->_client == client )
|
||||
{
|
||||
client->setForwarder(this);
|
||||
return;
|
||||
}
|
||||
prev = p;
|
||||
p = p->_next;
|
||||
}
|
||||
}
|
||||
|
||||
ForwardedClient* fclient = new ForwardedClient();
|
||||
|
||||
fclient->setClient(client);
|
||||
fclient->setWirelessNodeId(id);
|
||||
|
||||
if ( prev )
|
||||
{
|
||||
prev->_next = fclient;
|
||||
}
|
||||
else
|
||||
{
|
||||
_headClient = fclient;
|
||||
}
|
||||
}
|
||||
|
||||
Client* Forwarder::getClient(WirelessNodeId* id)
|
||||
{
|
||||
Client* cl = 0;
|
||||
_mutex.lock();
|
||||
ForwardedClient* p = _headClient;
|
||||
while ( p )
|
||||
{
|
||||
if ( *(p->_wirelessNodeId) == *id )
|
||||
{
|
||||
cl = p->_client;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
p = p->_next;
|
||||
}
|
||||
}
|
||||
_mutex.unlock();
|
||||
return cl;
|
||||
}
|
||||
|
||||
const char* Forwarder::getName(void)
|
||||
{
|
||||
return _forwarderName.c_str();
|
||||
}
|
||||
|
||||
WirelessNodeId* Forwarder::getWirelessNodeId(Client* client)
|
||||
{
|
||||
WirelessNodeId* nodeId = 0;
|
||||
_mutex.lock();
|
||||
ForwardedClient* p = _headClient;
|
||||
while ( p )
|
||||
{
|
||||
if ( p->_client == client )
|
||||
{
|
||||
nodeId = p->_wirelessNodeId;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
p = p->_next;
|
||||
}
|
||||
}
|
||||
_mutex.unlock();
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
void Forwarder::eraseClient(Client* client)
|
||||
{
|
||||
ForwardedClient* prev = 0;
|
||||
_mutex.lock();
|
||||
ForwardedClient* p = _headClient;
|
||||
|
||||
while ( p )
|
||||
{
|
||||
if ( p->_client == client )
|
||||
{
|
||||
if ( prev )
|
||||
{
|
||||
prev->_next = p->_next;
|
||||
}
|
||||
else
|
||||
{
|
||||
_headClient = p->_next;
|
||||
}
|
||||
delete p;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
p = p->_next;
|
||||
}
|
||||
}
|
||||
_mutex.unlock();
|
||||
}
|
||||
|
||||
SensorNetAddress* Forwarder::getSensorNetAddr(void)
|
||||
{
|
||||
return &_sensorNetAddr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Class ForwardedClient
|
||||
*/
|
||||
|
||||
ForwardedClient::ForwardedClient()
|
||||
: _client{0}
|
||||
, _wirelessNodeId{0}
|
||||
, _next{0}
|
||||
{
|
||||
}
|
||||
|
||||
ForwardedClient::~ForwardedClient()
|
||||
{
|
||||
if (_wirelessNodeId)
|
||||
{
|
||||
delete _wirelessNodeId;
|
||||
}
|
||||
}
|
||||
|
||||
void ForwardedClient::setClient(Client* client)
|
||||
{
|
||||
_client = client;
|
||||
}
|
||||
|
||||
void ForwardedClient::setWirelessNodeId(WirelessNodeId* id)
|
||||
{
|
||||
if ( _wirelessNodeId == 0 )
|
||||
{
|
||||
_wirelessNodeId = new WirelessNodeId();
|
||||
}
|
||||
_wirelessNodeId->setId(id);
|
||||
}
|
||||
88
MQTTSNGateway/src/MQTTSNGWForwarder.h
Normal file
88
MQTTSNGateway/src/MQTTSNGWForwarder.h
Normal file
@@ -0,0 +1,88 @@
|
||||
/**************************************************************************************
|
||||
* Copyright (c) 2018, Tomoaki Yamaguchi
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
|
||||
**************************************************************************************/
|
||||
|
||||
#ifndef MQTTSNGATEWAY_SRC_MQTTSNGWFORWARDER_H_
|
||||
#define MQTTSNGATEWAY_SRC_MQTTSNGWFORWARDER_H_
|
||||
|
||||
#include "MQTTSNGWClient.h"
|
||||
#include "MQTTSNGWEncapsulatedPacket.h"
|
||||
#include "SensorNetwork.h"
|
||||
|
||||
|
||||
namespace MQTTSNGW
|
||||
{
|
||||
|
||||
class Client;
|
||||
class WirelessNodeId;
|
||||
|
||||
class ForwardedClient
|
||||
{
|
||||
friend class Forwarder;
|
||||
public:
|
||||
ForwardedClient();
|
||||
~ForwardedClient();
|
||||
void setClient(Client* client);
|
||||
void setWirelessNodeId(WirelessNodeId* id);
|
||||
private:
|
||||
Client* _client;
|
||||
WirelessNodeId* _wirelessNodeId;
|
||||
ForwardedClient* _next;
|
||||
};
|
||||
|
||||
|
||||
class Forwarder
|
||||
{
|
||||
friend class ForwarderList;
|
||||
public:
|
||||
Forwarder();
|
||||
Forwarder(SensorNetAddress* addr, string* forwarderId);
|
||||
~Forwarder();
|
||||
|
||||
const char* getId(void);
|
||||
void addClient(Client* client, WirelessNodeId* id);
|
||||
Client* getClient(WirelessNodeId* id);
|
||||
WirelessNodeId* getWirelessNodeId(Client* client);
|
||||
void eraseClient(Client* client);
|
||||
SensorNetAddress* getSensorNetAddr(void);
|
||||
const char* getName(void);
|
||||
|
||||
private:
|
||||
string _forwarderName;
|
||||
SensorNetAddress _sensorNetAddr;
|
||||
ForwardedClient* _headClient;
|
||||
Forwarder* _next;
|
||||
Mutex _mutex;
|
||||
};
|
||||
|
||||
class ForwarderList
|
||||
{
|
||||
public:
|
||||
ForwarderList();
|
||||
~ForwarderList();
|
||||
|
||||
Forwarder* getForwarder(SensorNetAddress* addr);
|
||||
bool setFowerder(const char* fileName);
|
||||
Forwarder* addForwarder(SensorNetAddress* addr, string* forwarderId);
|
||||
|
||||
private:
|
||||
Forwarder* _head;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
#endif /* MQTTSNGATEWAY_SRC_MQTTSNGWFORWARDER_H_ */
|
||||
@@ -198,6 +198,7 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet)
|
||||
}
|
||||
|
||||
MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId);
|
||||
|
||||
if ( regAck != 0 )
|
||||
{
|
||||
client->getWaitREGACKPacketList()->erase(msgId);
|
||||
@@ -205,6 +206,16 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet)
|
||||
ev->setClientSendEvent(client, regAck);
|
||||
_gateway->getClientSendQue()->post(ev);
|
||||
}
|
||||
if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 )
|
||||
{
|
||||
/* send PINGREQ to the broker */
|
||||
client->resetPingRequest();
|
||||
MQTTGWPacket* pingreq = new MQTTGWPacket();
|
||||
pingreq->setHeader(PINGREQ);
|
||||
Event* evt = new Event();
|
||||
evt->setBrokerSendEvent(client, pingreq);
|
||||
_gateway->getBrokerSendQue()->post(evt);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,6 +17,6 @@
|
||||
#ifndef MQTTSNGWVERSION_H_IN_
|
||||
#define MQTTSNGWVERSION_H_IN_
|
||||
|
||||
#define PAHO_GATEWAY_VERSION "1.1.0"
|
||||
#define PAHO_GATEWAY_VERSION "1.2.0"
|
||||
|
||||
#endif /* MQTTSNGWVERSION_H_IN_ */
|
||||
|
||||
@@ -46,6 +46,8 @@ Gateway::Gateway()
|
||||
_params.privateKey = 0;
|
||||
_params.clientListName = 0;
|
||||
_params.configName = 0;
|
||||
_params.predefinedTopicFileName = 0;
|
||||
_params.forwarderListName = 0;
|
||||
_packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
|
||||
}
|
||||
|
||||
@@ -99,6 +101,14 @@ Gateway::~Gateway()
|
||||
{
|
||||
free(_params.configName);
|
||||
}
|
||||
if ( _params.predefinedTopicFileName )
|
||||
{
|
||||
free(_params.predefinedTopicFileName);
|
||||
}
|
||||
if ( _params.forwarderListName )
|
||||
{
|
||||
free(_params.forwarderListName);
|
||||
}
|
||||
}
|
||||
|
||||
void Gateway::initialize(int argc, char** argv)
|
||||
@@ -214,9 +224,9 @@ void Gateway::initialize(int argc, char** argv)
|
||||
{
|
||||
if (!strcasecmp(param, "YES") )
|
||||
{
|
||||
if (getParam("PredefinedTopicFile", param) == 0)
|
||||
if (getParam("PredefinedTopicList", param) == 0)
|
||||
{
|
||||
fileName =*getConfigDirName() + string(param);
|
||||
fileName = string(param);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -234,6 +244,30 @@ void Gateway::initialize(int argc, char** argv)
|
||||
}
|
||||
}
|
||||
|
||||
if (getParam("Forwarder", param) == 0 )
|
||||
{
|
||||
if (!strcasecmp(param, "YES") )
|
||||
{
|
||||
if (getParam("ForwardersList", param) == 0)
|
||||
{
|
||||
fileName = string(param);
|
||||
}
|
||||
else
|
||||
{
|
||||
fileName = *getConfigDirName() + string(FORWARDER_LIST);
|
||||
}
|
||||
if ( !_forwarderList.setFowerder(fileName.c_str()) )
|
||||
{
|
||||
throw Exception("Gateway::initialize: No ForwardersList file defined by the configuration..");
|
||||
}
|
||||
_params.forwarderListName = strdup(fileName.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
_params.forwarderListName = 0;
|
||||
}
|
||||
}
|
||||
|
||||
fileName = *getConfigDirName() + *getConfigFileName();
|
||||
_params.configName = strdup(fileName.c_str());
|
||||
}
|
||||
@@ -254,12 +288,17 @@ void Gateway::run(void)
|
||||
{
|
||||
WRITELOG(" ClientList: %s\n", _params.clientListName);
|
||||
}
|
||||
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
|
||||
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
|
||||
if ( _params.predefinedTopicFileName )
|
||||
{
|
||||
WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
|
||||
}
|
||||
if ( _params.forwarderListName )
|
||||
{
|
||||
WRITELOG(" Forwarders: %s\n", _params.forwarderListName);
|
||||
}
|
||||
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
|
||||
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
|
||||
|
||||
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
|
||||
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
|
||||
WRITELOG(" CertKey: %s\n", _params.certKey);
|
||||
@@ -305,6 +344,11 @@ ClientList* Gateway::getClientList()
|
||||
return &_clientList;
|
||||
}
|
||||
|
||||
ForwarderList* Gateway::getForwarderList(void)
|
||||
{
|
||||
return &_forwarderList;
|
||||
}
|
||||
|
||||
SensorNetwork* Gateway::getSensorNetwork()
|
||||
{
|
||||
return &_sensorNetwork;
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#define MQTTSNGATEWAY_H_
|
||||
|
||||
#include "MQTTSNGWClient.h"
|
||||
#include "MQTTSNGWForwarder.h"
|
||||
#include "MQTTSNGWProcess.h"
|
||||
#include "MQTTSNPacket.h"
|
||||
|
||||
@@ -42,6 +43,7 @@ namespace MQTTSNGW
|
||||
#define CLIENT "Client"
|
||||
#define CLIENTS "Clients"
|
||||
#define UNKNOWNCL "Unknown Client !"
|
||||
|
||||
#define LEFTARROW "<---"
|
||||
#define RIGHTARROW "--->"
|
||||
#define LEFTARROWB "<==="
|
||||
@@ -69,14 +71,6 @@ namespace MQTTSNGW
|
||||
#define ERRMSG_HEADER "\033[0m\033[0;31mError:"
|
||||
#define ERRMSG_FOOTER "\033[0m\033[0;37m"
|
||||
|
||||
/*=====================================
|
||||
Predefined TopicId for OTA
|
||||
====================================*/
|
||||
//#define OTA_CLIENTS
|
||||
//#define PREDEFINEDID_OTA_REQ (0x0ff0)
|
||||
//#define PREDEFINEDID_OTA_READY (0x0ff1)
|
||||
//#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
|
||||
|
||||
/*=====================================
|
||||
Class Event
|
||||
====================================*/
|
||||
@@ -161,6 +155,7 @@ typedef struct
|
||||
char* certKey;
|
||||
char* privateKey;
|
||||
char* predefinedTopicFileName;
|
||||
char* forwarderListName;
|
||||
}GatewayParams;
|
||||
|
||||
/*=====================================
|
||||
@@ -177,12 +172,14 @@ public:
|
||||
EventQue* getClientSendQue(void);
|
||||
EventQue* getBrokerSendQue(void);
|
||||
ClientList* getClientList(void);
|
||||
ForwarderList* getForwarderList(void);
|
||||
SensorNetwork* getSensorNetwork(void);
|
||||
LightIndicator* getLightIndicator(void);
|
||||
GatewayParams* getGWParams(void);
|
||||
|
||||
private:
|
||||
ClientList _clientList;
|
||||
ForwarderList _forwarderList;
|
||||
EventQue _packetEventQue;
|
||||
EventQue _brokerSendQue;
|
||||
EventQue _clientSendQue;
|
||||
|
||||
@@ -93,10 +93,10 @@ void TestProcess::run(void)
|
||||
delete tque;
|
||||
|
||||
/* Test Tree23 */
|
||||
printf("Test Tree23 ");
|
||||
TestTree23* tree23 = new TestTree23();
|
||||
tree23->test();
|
||||
delete tree23;
|
||||
//printf("Test Tree23 ");
|
||||
//TestTree23* tree23 = new TestTree23();
|
||||
//tree23->test();
|
||||
//delete tree23;
|
||||
|
||||
/* Test TopicTable */
|
||||
printf("Test Topic ");
|
||||
@@ -111,6 +111,7 @@ void TestProcess::run(void)
|
||||
delete testMap;
|
||||
|
||||
/* Test EventQue */
|
||||
/*
|
||||
printf("Test EventQue ");
|
||||
Client* client = new Client();
|
||||
_evQue.setMaxSize(EVENT_CNT);
|
||||
@@ -122,6 +123,7 @@ void TestProcess::run(void)
|
||||
ev->setClientSendEvent(client, packet);
|
||||
_evQue.post(ev);
|
||||
}
|
||||
|
||||
delete client;
|
||||
*/
|
||||
//MultiTaskProcess::run();
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
using namespace MQTTSNGW;
|
||||
|
||||
TestProcess* test = new TestProcess();
|
||||
TestTask* task = new TestTask(test);
|
||||
//TestTask* task = new TestTask(test);
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user