mirror of
https://github.com/eclipse/paho.mqtt-sn.embedded-c.git
synced 2025-12-13 23:46:51 +01:00
Forwerders are declared by the ClientList file.
Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
@@ -10,7 +10,6 @@ TESTAPPL := mainTestProcess
|
|||||||
CONFIG := gateway.conf
|
CONFIG := gateway.conf
|
||||||
CLIENTS := clients.conf
|
CLIENTS := clients.conf
|
||||||
PREDEFTOPIC := predefinedTopic.conf
|
PREDEFTOPIC := predefinedTopic.conf
|
||||||
FORWARDERS := forwarders.conf
|
|
||||||
|
|
||||||
SRCDIR := src
|
SRCDIR := src
|
||||||
SUBDIR := ../MQTTSNPacket/src
|
SUBDIR := ../MQTTSNPacket/src
|
||||||
@@ -149,7 +148,6 @@ install:
|
|||||||
cp -pf $(CONFIG) ../../
|
cp -pf $(CONFIG) ../../
|
||||||
cp -pf $(CLIENTS) ../../
|
cp -pf $(CLIENTS) ../../
|
||||||
cp -pf $(PREDEFTOPIC) ../../
|
cp -pf $(PREDEFTOPIC) ../../
|
||||||
cp -pf $(FORWARDERS) ../../
|
|
||||||
|
|
||||||
|
|
||||||
exectest:
|
exectest:
|
||||||
|
|||||||
@@ -36,18 +36,16 @@ BrokerSecurePortNo=8883
|
|||||||
# All clients must be specified by the ClientList File
|
# All clients must be specified by the ClientList File
|
||||||
#
|
#
|
||||||
|
|
||||||
AggregateGateway=NO
|
|
||||||
ClientAuthentication=NO
|
|
||||||
|
|
||||||
#ClientsList=/path/to/your_clients.conf
|
#ClientsList=/path/to/your_clients.conf
|
||||||
|
|
||||||
|
ClientAuthentication=NO
|
||||||
|
AggregateGateway=NO
|
||||||
|
Forwarder=NO
|
||||||
QoS-1=NO
|
QoS-1=NO
|
||||||
OoS-1ProxyName=Proxy007
|
OoS-1ProxyName=Proxy007
|
||||||
|
|
||||||
#PredefinedTopicList=/path/to/your_predefinedTopic.conf
|
#PredefinedTopicList=/path/to/your_predefinedTopic.conf
|
||||||
|
|
||||||
Forwarder=NO
|
|
||||||
#ForwardersList=/path/to/your_forwarers.conf
|
|
||||||
|
|
||||||
#RootCAfile=/etc/ssl/certs/ca-certificates.crt
|
#RootCAfile=/etc/ssl/certs/ca-certificates.crt
|
||||||
#RootCApath=/etc/ssl/certs/
|
#RootCApath=/etc/ssl/certs/
|
||||||
@@ -84,9 +82,7 @@ Client should know the MulticastIP and MulticastPortNo to send a SEARCHGW messag
|
|||||||
when **AggregateGateway** or **ClientAuthentication** is **YES**, All clients which connect to the gateway must be declared by a **ClientsList** file.
|
when **AggregateGateway** or **ClientAuthentication** is **YES**, All clients which connect to the gateway must be declared by a **ClientsList** file.
|
||||||
Format of the file is ClientId and SensorNetwork Address. e.g. IP address and Port No etc, in CSV. more detail see clients.conf
|
Format of the file is ClientId and SensorNetwork Address. e.g. IP address and Port No etc, in CSV. more detail see clients.conf
|
||||||
When **PredefinedTopic** is **YES**, **Pre-definedTopicId**s specified by **PredefinedTopicList** are effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format.
|
When **PredefinedTopic** is **YES**, **Pre-definedTopicId**s specified by **PredefinedTopicList** are effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format.
|
||||||
When **Forwarder** is **YE**S, Forwarder Encapsulation Message is available. Connectable Forwarders are specifed by **ForwardersList** file. In this file, ForwarderIds and those sensorNet addresses are declared in CSV format.
|
When **Forwarder** is **YE**S, Forwarder Encapsulation Message is available. Connectable Forwarders must be declared by a **ClientsList** file.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### ** How to monitor the gateway from remote. **
|
### ** How to monitor the gateway from remote. **
|
||||||
|
|||||||
@@ -15,25 +15,33 @@
|
|||||||
# Lines bigning with # are comment line.
|
# Lines bigning with # are comment line.
|
||||||
# ClientId, SensorNetAddress, "unstableLine", "secureConnection"
|
# ClientId, SensorNetAddress, "unstableLine", "secureConnection"
|
||||||
# in case of UDP, SensorNetAddress format is portNo@IPAddress.
|
# in case of UDP, SensorNetAddress format is portNo@IPAddress.
|
||||||
# if the SensorNetwork is not stable, write unstableLine.
|
# if the SensorNetwork is not stable, write "unstableLine".
|
||||||
# if Broker's Connection is SSL, write secureConnection.
|
# if Broker's Connection is SSL, write "secureConnection".
|
||||||
# if the client send PUBLISH QoS-1, QoS-1 is required.
|
# if the client is a forwarder, "forwarder" is required.
|
||||||
|
# if the client send PUBLISH QoS-1, "QoS-1" is required.
|
||||||
#
|
#
|
||||||
# Ex:
|
# Ex:
|
||||||
# #Client List
|
# #Client List
|
||||||
# ClientId1,11200@192.168.10.10
|
# ClientId1,11200@192.168.10.10
|
||||||
# ClientID2,35000@192.168.50.200,unstableLine
|
# ClientID2,35000@192.168.50.200,unstableLine
|
||||||
# ClientID3,40000@192.168.200.50,secureConnection
|
# ClientID3,40000@192.168.200.50,secureConnection
|
||||||
# ClientID4,41000@192.168.200.51,unstableLine,secureConnection
|
# ClientID4,41000@192.168.200.52,unstableLine,secureConnection
|
||||||
# ClientID5,41000@192.168.200.51,unstableLine,secureConnection,QoS-1
|
# ClientID5,41000@192.168.200.53,unstableLine,secureConnection,QoS-1
|
||||||
|
# ClientID6,41000@192.168.200.54,unstableLine,secureConnection,forwarder
|
||||||
#
|
#
|
||||||
# SensorNetwork address format is defined by SensorNetAddress::setAddress(string* data) function.
|
# SensorNetwork address format is defined by SensorNetAddress::setAddress(string* data) function.
|
||||||
#
|
#
|
||||||
|
GatewayTester, 172.16.1.11:20020
|
||||||
|
ClientPUB,172.16.1.11:2010
|
||||||
|
Client01,172.16.1.11:12001
|
||||||
Client02,172.16.1.11:12002
|
Client02,172.16.1.11:12002
|
||||||
Client03,172.16.1.11:13003
|
Client03,172.16.1.11:13003
|
||||||
Client01,172.16.1.11:12001
|
|
||||||
|
|
||||||
QoS-1_Client01,172.16.1.11:20001,QoS-1
|
QoS-1_Client01,172.16.1.11:20001,QoS-1
|
||||||
QoS-1_Client02,172.16.1.11:20002,QoS-1
|
QoS-1_Client02,172.16.1.11:20002,QoS-1
|
||||||
QoS-1_Client03,172.16.1.11:20003,QoS-1
|
QoS-1_Client03,172.16.1.11:20003,QoS-1
|
||||||
|
|
||||||
|
Forwarder01,172.16.1.11:22002,forwarder
|
||||||
|
Forwarder02,172.16.1.11:22003,forwarder
|
||||||
|
Forwarder03,172.16.1.11:22004,forwarder
|
||||||
|
|
||||||
|
|||||||
@@ -1,25 +0,0 @@
|
|||||||
#***********************************************************************
|
|
||||||
# Copyright (c) 2018, Tomoaki Yamaguchi
|
|
||||||
#
|
|
||||||
# All rights reserved. This program and the accompanying materials
|
|
||||||
# are made available under the terms of the Eclipse Public License v1.0
|
|
||||||
# and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
||||||
#
|
|
||||||
# The Eclipse Public License is available at
|
|
||||||
# http://www.eclipse.org/legal/epl-v10.html
|
|
||||||
# and the Eclipse Distribution License is available at
|
|
||||||
# http://www.eclipse.org/org/documents/edl-v10.php.
|
|
||||||
#***********************************************************************
|
|
||||||
#
|
|
||||||
# This file declares valid MQTTSNForwarders.
|
|
||||||
# Forwarders are defined by ForwarderId same as ClientId and those SensorNetAddress
|
|
||||||
# in a CSV format as follow:
|
|
||||||
#
|
|
||||||
# ForwarderId, SensorNetAddress
|
|
||||||
#
|
|
||||||
# where SensorNetwork address format is defined by SensorNetAddress::setAddress(string* data) function.
|
|
||||||
#
|
|
||||||
|
|
||||||
Forwarder01,172.16.1.7:12002
|
|
||||||
|
|
||||||
|
|
||||||
@@ -91,72 +91,6 @@ bool AdapterManager::isAggregatedClient(Client* client)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Client* AdapterManager::getClient(MQTTSNPacket* packet, ClientRecvTask* task)
|
|
||||||
{
|
|
||||||
char buf[128];
|
|
||||||
WirelessNodeId nodeId;
|
|
||||||
SensorNetAddress* senderAddr = _gateway->getSensorNetwork()->getSenderAddress();
|
|
||||||
|
|
||||||
Client* client = nullptr;
|
|
||||||
|
|
||||||
if ( packet->getType() == MQTTSN_ENCAPSULATED )
|
|
||||||
{
|
|
||||||
Forwarder* fwd = getForwarderList()->getForwarder(senderAddr);
|
|
||||||
|
|
||||||
if ( fwd == nullptr )
|
|
||||||
{
|
|
||||||
task->log(0, packet, 0);
|
|
||||||
WRITELOG("%s Forwarder %s is not authenticated.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
|
|
||||||
delete packet;
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MQTTSNString fwdName = MQTTSNString_initializer;
|
|
||||||
fwdName.cstring = const_cast<char *>( fwd->getName() );
|
|
||||||
task->log(0, packet, &fwdName);
|
|
||||||
|
|
||||||
/* get the packet from the encapsulation message */
|
|
||||||
MQTTSNGWEncapsulatedPacket encap;
|
|
||||||
encap.desirialize(packet->getPacketData(), packet->getPacketLength());
|
|
||||||
nodeId.setId( encap.getWirelessNodeId() );
|
|
||||||
client = fwd->getClient(&nodeId);
|
|
||||||
delete packet;
|
|
||||||
packet = encap.getMQTTSNPacket();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* Check the client belonging to QoS-1Proxy ? */
|
|
||||||
|
|
||||||
if ( _qosm1Proxy->isActive() )
|
|
||||||
{
|
|
||||||
/* get ClientId not Client which can send QoS-1 PUBLISH */
|
|
||||||
const char* clientName = _qosm1Proxy->getClientId(senderAddr);
|
|
||||||
|
|
||||||
if ( clientName )
|
|
||||||
{
|
|
||||||
if ( !packet->isQoSMinusPUBLISH() )
|
|
||||||
{
|
|
||||||
client = _qosm1Proxy->getClient();
|
|
||||||
task->log(clientName, packet);
|
|
||||||
WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER);
|
|
||||||
delete packet;
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( client == nullptr )
|
|
||||||
{
|
|
||||||
/* get client from the ClientList of Gateway by sensorNetAddress. */
|
|
||||||
client = _gateway->getClientList()->getClient(senderAddr);
|
|
||||||
}
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
Client* AdapterManager::getClient(Client& client)
|
Client* AdapterManager::getClient(Client& client)
|
||||||
{
|
{
|
||||||
bool secure = client.isSecureNetwork();
|
bool secure = client.isSecureNetwork();
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ class Client;
|
|||||||
class QoSm1Proxy;
|
class QoSm1Proxy;
|
||||||
class Aggregater;
|
class Aggregater;
|
||||||
class ForwarderList;
|
class ForwarderList;
|
||||||
|
class Forwarder;
|
||||||
class MQTTSNPacket;
|
class MQTTSNPacket;
|
||||||
class MQTTSNGWPacket;
|
class MQTTSNGWPacket;
|
||||||
class ClientRecvTask;
|
class ClientRecvTask;
|
||||||
@@ -46,7 +47,6 @@ public:
|
|||||||
void checkConnection(void);
|
void checkConnection(void);
|
||||||
|
|
||||||
bool isAggregatedClient(Client* client);
|
bool isAggregatedClient(Client* client);
|
||||||
Client* getClient(MQTTSNPacket* packet, ClientRecvTask* task);
|
|
||||||
Client* getClient(Client& client);
|
Client* getClient(Client& client);
|
||||||
Client* convertClient(uint16_t msgId, uint16_t* clientMsgId);
|
Client* convertClient(uint16_t msgId, uint16_t* clientMsgId);
|
||||||
int unicastToClient(Client* client, MQTTSNPacket* packet, ClientSendTask* task);
|
int unicastToClient(Client* client, MQTTSNPacket* packet, ClientSendTask* task);
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ void Aggregater::initialize(void)
|
|||||||
if (!strcasecmp(param, "YES") )
|
if (!strcasecmp(param, "YES") )
|
||||||
{
|
{
|
||||||
/* Create Aggregated Clients */
|
/* Create Aggregated Clients */
|
||||||
_gateway->getClientList()->setClientList(_gateway, AGGREGATER_TYPE);
|
_gateway->getClientList()->setClientList(AGGREGATER_TYPE);
|
||||||
setup((const char*)(_gateway->getGWParams()->gatewayName), Atype_Aggregater);
|
setup((const char*)(_gateway->getGWParams()->gatewayName), Atype_Aggregater);
|
||||||
_isActive = true;
|
_isActive = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,9 +15,12 @@
|
|||||||
* Tieto Poland Sp. z o.o. - Gateway improvements
|
* Tieto Poland Sp. z o.o. - Gateway improvements
|
||||||
**************************************************************************************/
|
**************************************************************************************/
|
||||||
#include "MQTTSNGWClientList.h"
|
#include "MQTTSNGWClientList.h"
|
||||||
|
#include "MQTTSNGateway.h"
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
using namespace MQTTSNGW;
|
||||||
|
extern Gateway* theGateway;
|
||||||
/*=====================================
|
/*=====================================
|
||||||
Class ClientList
|
Class ClientList
|
||||||
=====================================*/
|
=====================================*/
|
||||||
@@ -44,26 +47,26 @@ ClientList::~ClientList()
|
|||||||
_mutex.unlock();
|
_mutex.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClientList::initialize(Gateway* gw, bool aggregate)
|
void ClientList::initialize(bool aggregate)
|
||||||
{
|
{
|
||||||
if (gw->getGWParams()->clientAuthentication )
|
if (theGateway->getGWParams()->clientAuthentication )
|
||||||
{
|
{
|
||||||
int type = TRANSPEARENT_TYPE;
|
int type = TRANSPEARENT_TYPE;
|
||||||
if ( aggregate )
|
if ( aggregate )
|
||||||
{
|
{
|
||||||
type = AGGREGATER_TYPE;
|
type = AGGREGATER_TYPE;
|
||||||
}
|
}
|
||||||
setClientList(gw, type);
|
setClientList(type);
|
||||||
_authorize = true;
|
_authorize = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClientList::setClientList(Gateway* gw, int type)
|
void ClientList::setClientList(int type)
|
||||||
{
|
{
|
||||||
char param[MQTTSNGW_PARAM_MAX];
|
char param[MQTTSNGW_PARAM_MAX];
|
||||||
string fileName;
|
string fileName;
|
||||||
GatewayParams* params = gw->getGWParams();
|
GatewayParams* params = theGateway->getGWParams();
|
||||||
if (gw->getParam("ClientsList", param) == 0)
|
if (theGateway->getParam("ClientsList", param) == 0)
|
||||||
{
|
{
|
||||||
fileName = string(param);
|
fileName = string(param);
|
||||||
}
|
}
|
||||||
@@ -79,14 +82,14 @@ void ClientList::setClientList(Gateway* gw, int type)
|
|||||||
params->clientListName = strdup(fileName.c_str());
|
params->clientListName = strdup(fileName.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClientList::setPredefinedTopics(Gateway* gw, bool aggrecate)
|
void ClientList::setPredefinedTopics(bool aggrecate)
|
||||||
{
|
{
|
||||||
char param[MQTTSNGW_PARAM_MAX];
|
char param[MQTTSNGW_PARAM_MAX];
|
||||||
|
|
||||||
string fileName;
|
string fileName;
|
||||||
GatewayParams* params = gw->getGWParams();
|
GatewayParams* params = theGateway->getGWParams();
|
||||||
|
|
||||||
if (gw->getParam("PredefinedTopicList", param) == 0)
|
if (theGateway->getParam("PredefinedTopicList", param) == 0)
|
||||||
{
|
{
|
||||||
fileName = string(param);
|
fileName = string(param);
|
||||||
}
|
}
|
||||||
@@ -131,6 +134,7 @@ bool ClientList::createList(const char* fileName, int type)
|
|||||||
bool secure;
|
bool secure;
|
||||||
bool stable;
|
bool stable;
|
||||||
bool qos_1;
|
bool qos_1;
|
||||||
|
bool forwarder;
|
||||||
bool rc = true;
|
bool rc = true;
|
||||||
SensorNetAddress netAddr;
|
SensorNetAddress netAddr;
|
||||||
MQTTSNString clientId = MQTTSNString_initializer;
|
MQTTSNString clientId = MQTTSNString_initializer;
|
||||||
@@ -160,15 +164,20 @@ bool ClientList::createList(const char* fileName, int type)
|
|||||||
if (netAddr.setAddress(&addr) == 0)
|
if (netAddr.setAddress(&addr) == 0)
|
||||||
{
|
{
|
||||||
qos_1 = (data.find("QoS-1") != string::npos);
|
qos_1 = (data.find("QoS-1") != string::npos);
|
||||||
|
forwarder = (data.find("forwarder") != string::npos);
|
||||||
secure = (data.find("secureConnection") != string::npos);
|
secure = (data.find("secureConnection") != string::npos);
|
||||||
stable = !(data.find("unstableLine") != string::npos);
|
stable = !(data.find("unstableLine") != string::npos);
|
||||||
if ( (qos_1 && type == QOSM1PROXY_TYPE) || (!qos_1 && type == AGGREGATER_TYPE) )
|
if ( (qos_1 && type == QOSM1PROXY_TYPE) || (!qos_1 && type == AGGREGATER_TYPE) )
|
||||||
{
|
{
|
||||||
createClient(&netAddr, &clientId, stable, secure, type);
|
createClient(&netAddr, &clientId, stable, secure, type);
|
||||||
}
|
}
|
||||||
else
|
else if ( forwarder && type == FORWARDER_TYPE)
|
||||||
{
|
{
|
||||||
createClient(&netAddr, &clientId, stable, secure, TRANSPEARENT_TYPE);
|
theGateway->getAdapterManager()->getForwarderList()->addForwarder(&netAddr, &clientId);
|
||||||
|
}
|
||||||
|
else if (type == TRANSPEARENT_TYPE )
|
||||||
|
{
|
||||||
|
createClient(&netAddr, &clientId, stable, secure, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ namespace MQTTSNGW
|
|||||||
#define TRANSPEARENT_TYPE 0
|
#define TRANSPEARENT_TYPE 0
|
||||||
#define QOSM1PROXY_TYPE 1
|
#define QOSM1PROXY_TYPE 1
|
||||||
#define AGGREGATER_TYPE 2
|
#define AGGREGATER_TYPE 2
|
||||||
|
#define FORWARDER_TYPE 3
|
||||||
|
|
||||||
class Client;
|
class Client;
|
||||||
|
|
||||||
@@ -38,9 +39,9 @@ public:
|
|||||||
ClientList();
|
ClientList();
|
||||||
~ClientList();
|
~ClientList();
|
||||||
|
|
||||||
void initialize(Gateway* gw, bool aggregate);
|
void initialize(bool aggregate);
|
||||||
void setClientList(Gateway* gw, int type);
|
void setClientList(int type);
|
||||||
void setPredefinedTopics(Gateway* gw, bool aggregate);
|
void setPredefinedTopics(bool aggregate);
|
||||||
void erase(Client*&);
|
void erase(Client*&);
|
||||||
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId,int type);
|
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId,int type);
|
||||||
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, bool secure, int type);
|
Client* createClient(SensorNetAddress* addr, MQTTSNString* clientId, bool unstableLine, bool secure, int type);
|
||||||
@@ -54,6 +55,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
bool readPredefinedList(const char* fileName, bool _aggregate);
|
bool readPredefinedList(const char* fileName, bool _aggregate);
|
||||||
|
Gateway* _gateway {nullptr};
|
||||||
Client* createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t toipcId, bool _aggregate);
|
Client* createPredefinedTopic( MQTTSNString* clientId, string topicName, uint16_t toipcId, bool _aggregate);
|
||||||
Client* _firstClient;
|
Client* _firstClient;
|
||||||
Client* _endClient;
|
Client* _endClient;
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ void ClientRecvTask::run()
|
|||||||
Event* ev = nullptr;
|
Event* ev = nullptr;
|
||||||
Client* client = nullptr;
|
Client* client = nullptr;
|
||||||
AdapterManager* adpMgr = _gateway->getAdapterManager();
|
AdapterManager* adpMgr = _gateway->getAdapterManager();
|
||||||
|
QoSm1Proxy* qosm1Proxy = adpMgr->getQoSm1Proxy();
|
||||||
bool isAggrActive = adpMgr->isAggregaterActive();
|
bool isAggrActive = adpMgr->isAggregaterActive();
|
||||||
ClientList* clientList = _gateway->getClientList();
|
ClientList* clientList = _gateway->getClientList();
|
||||||
EventQue* packetEventQue = _gateway->getPacketEventQue();
|
EventQue* packetEventQue = _gateway->getPacketEventQue();
|
||||||
@@ -103,7 +104,60 @@ void ClientRecvTask::run()
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
client = adpMgr->getClient(packet, this);
|
|
||||||
|
Client* client = nullptr;
|
||||||
|
SensorNetAddress* senderAddr = _gateway->getSensorNetwork()->getSenderAddress();
|
||||||
|
|
||||||
|
if ( packet->getType() == MQTTSN_ENCAPSULATED )
|
||||||
|
{
|
||||||
|
fwd = _gateway->getAdapterManager()->getForwarderList()->getForwarder(senderAddr);
|
||||||
|
|
||||||
|
if ( fwd != nullptr )
|
||||||
|
{
|
||||||
|
MQTTSNString fwdName = MQTTSNString_initializer;
|
||||||
|
fwdName.cstring = const_cast<char *>( fwd->getName() );
|
||||||
|
log(0, packet, &fwdName);
|
||||||
|
|
||||||
|
/* get the packet from the encapsulation message */
|
||||||
|
MQTTSNGWEncapsulatedPacket encap;
|
||||||
|
encap.desirialize(packet->getPacketData(), packet->getPacketLength());
|
||||||
|
nodeId.setId( encap.getWirelessNodeId() );
|
||||||
|
client = fwd->getClient(&nodeId);
|
||||||
|
packet = encap.getMQTTSNPacket();
|
||||||
|
|
||||||
|
if ( client == nullptr )
|
||||||
|
{
|
||||||
|
/* get client of Forwarder or QoSm1Proxy */
|
||||||
|
client = _gateway->getClientList()->getClient(senderAddr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Check the client belonging to QoS-1Proxy ? */
|
||||||
|
|
||||||
|
if ( qosm1Proxy->isActive() )
|
||||||
|
{
|
||||||
|
/* get ClientId not Client which can send QoS-1 PUBLISH */
|
||||||
|
const char* clientName = qosm1Proxy->getClientId(senderAddr);
|
||||||
|
|
||||||
|
if ( clientName )
|
||||||
|
{
|
||||||
|
if ( !packet->isQoSMinusPUBLISH() )
|
||||||
|
{
|
||||||
|
client = qosm1Proxy->getClient();
|
||||||
|
log(clientName, packet);
|
||||||
|
WRITELOG("%s %s %s can send only PUBLISH with QoS-1.%s\n", ERRMSG_HEADER, clientName, senderAddr->sprint(buf), ERRMSG_FOOTER);
|
||||||
|
delete packet;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
client = _gateway->getClientList()->getClient(senderAddr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ( client )
|
if ( client )
|
||||||
{
|
{
|
||||||
@@ -123,7 +177,7 @@ void ClientRecvTask::run()
|
|||||||
if ( !packet->getCONNECT(&data) )
|
if ( !packet->getCONNECT(&data) )
|
||||||
{
|
{
|
||||||
log(0, packet, &data.clientID);
|
log(0, packet, &data.clientID);
|
||||||
WRITELOG("%s CONNECT message form %s is incorrect.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
WRITELOG("%s CONNECT message form %s is incorrect.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
|
||||||
delete packet;
|
delete packet;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -145,12 +199,12 @@ void ClientRecvTask::run()
|
|||||||
if ( client )
|
if ( client )
|
||||||
{
|
{
|
||||||
/* Client exists. Set SensorNet Address of it. */
|
/* Client exists. Set SensorNet Address of it. */
|
||||||
client->setClientAddress(_sensorNetwork->getSenderAddress());
|
client->setClientAddress(senderAddr);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* create a new client */
|
/* create a new client */
|
||||||
client = clientList->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, isAggrActive);
|
client = clientList->createClient(senderAddr, &data.clientID, isAggrActive);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -158,7 +212,7 @@ void ClientRecvTask::run()
|
|||||||
|
|
||||||
if (!client)
|
if (!client)
|
||||||
{
|
{
|
||||||
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
|
||||||
delete packet;
|
delete packet;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -171,7 +225,14 @@ void ClientRecvTask::run()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
log(client, packet, 0);
|
log(client, packet, 0);
|
||||||
WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
if ( packet->getType() == MQTTSN_ENCAPSULATED )
|
||||||
|
{
|
||||||
|
WRITELOG("%s Forwarder(%s) is not declared by ClientList file. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, senderAddr->sprint(buf), ERRMSG_FOOTER);
|
||||||
|
}
|
||||||
delete packet;
|
delete packet;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
|
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
|
||||||
**************************************************************************************/
|
**************************************************************************************/
|
||||||
#include "MQTTSNGWForwarder.h"
|
#include "MQTTSNGWForwarder.h"
|
||||||
|
#include "SensorNetwork.h"
|
||||||
|
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
@@ -48,25 +49,12 @@ void ForwarderList::initialize(Gateway* gw)
|
|||||||
{
|
{
|
||||||
char param[MQTTSNGW_PARAM_MAX];
|
char param[MQTTSNGW_PARAM_MAX];
|
||||||
string fileName;
|
string fileName;
|
||||||
GatewayParams* params = gw->getGWParams();
|
|
||||||
|
|
||||||
if (gw->getParam("Forwarder", param) == 0 )
|
if (gw->getParam("Forwarder", param) == 0 )
|
||||||
{
|
{
|
||||||
if (!strcasecmp(param, "YES") )
|
if (!strcasecmp(param, "YES") )
|
||||||
{
|
{
|
||||||
if (gw->getParam("ForwardersList", param) == 0)
|
gw->getClientList()->setClientList(FORWARDER_TYPE);
|
||||||
{
|
|
||||||
fileName = string(param);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
fileName = params->configDir + string(FORWARDER_LIST);
|
|
||||||
}
|
|
||||||
if ( !setFowerder(fileName.c_str()) )
|
|
||||||
{
|
|
||||||
throw Exception("ForwarderList::initialize: No ForwardersList file defined by the configuration..");
|
|
||||||
}
|
|
||||||
params->forwarderListName = strdup(fileName.c_str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -86,58 +74,7 @@ Forwarder* ForwarderList::getForwarder(SensorNetAddress* addr)
|
|||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ForwarderList::setFowerder(const char* fileName)
|
Forwarder* ForwarderList::addForwarder(SensorNetAddress* addr, MQTTSNString* forwarderId)
|
||||||
{
|
|
||||||
FILE* fp;
|
|
||||||
char buf[MAX_CLIENTID_LENGTH + 256];
|
|
||||||
size_t pos;
|
|
||||||
|
|
||||||
SensorNetAddress netAddr;
|
|
||||||
|
|
||||||
if ((fp = fopen(fileName, "r")) != 0)
|
|
||||||
{
|
|
||||||
while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
|
|
||||||
{
|
|
||||||
if (*buf == '#')
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
string data = string(buf);
|
|
||||||
while ((pos = data.find_first_of(" \t\n")) != string::npos)
|
|
||||||
{
|
|
||||||
data.erase(pos, 1);
|
|
||||||
}
|
|
||||||
if (data.empty())
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
pos = data.find_first_of(",");
|
|
||||||
string id = data.substr(0, pos);
|
|
||||||
string addr = data.substr(pos + 1);
|
|
||||||
|
|
||||||
if (netAddr.setAddress(&addr) == 0)
|
|
||||||
{
|
|
||||||
addForwarder(&netAddr, &id);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
WRITELOG("Invalid address %s\n", data.c_str());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fclose(fp);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
WRITELOG("Can not open the forwarders List. %s\n", fileName);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Forwarder* ForwarderList::addForwarder(SensorNetAddress* addr, string* forwarderId)
|
|
||||||
{
|
{
|
||||||
Forwarder* fdr = new Forwarder(addr, forwarderId);
|
Forwarder* fdr = new Forwarder(addr, forwarderId);
|
||||||
if ( _head == nullptr )
|
if ( _head == nullptr )
|
||||||
@@ -173,9 +110,9 @@ Forwarder::Forwarder()
|
|||||||
Class ForwarderList
|
Class ForwarderList
|
||||||
=====================================*/
|
=====================================*/
|
||||||
|
|
||||||
Forwarder::Forwarder(SensorNetAddress* addr, string* forwarderId)
|
Forwarder::Forwarder(SensorNetAddress* addr, MQTTSNString* forwarderId)
|
||||||
{
|
{
|
||||||
_forwarderName = *forwarderId;
|
_forwarderName = string(forwarderId->cstring);
|
||||||
_sensorNetAddr = *addr;
|
_sensorNetAddr = *addr;
|
||||||
_headClient = nullptr;
|
_headClient = nullptr;
|
||||||
_next = nullptr;
|
_next = nullptr;
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class Forwarder
|
|||||||
friend class ForwarderList;
|
friend class ForwarderList;
|
||||||
public:
|
public:
|
||||||
Forwarder(void);
|
Forwarder(void);
|
||||||
Forwarder(SensorNetAddress* addr, string* forwarderId);
|
Forwarder(SensorNetAddress* addr, MQTTSNString* forwarderId);
|
||||||
~Forwarder();
|
~Forwarder();
|
||||||
|
|
||||||
void initialize(void);
|
void initialize(void);
|
||||||
@@ -86,8 +86,7 @@ public:
|
|||||||
|
|
||||||
void initialize(Gateway* gw);
|
void initialize(Gateway* gw);
|
||||||
Forwarder* getForwarder(SensorNetAddress* addr);
|
Forwarder* getForwarder(SensorNetAddress* addr);
|
||||||
bool setFowerder(const char* fileName);
|
Forwarder* addForwarder(SensorNetAddress* addr, MQTTSNString* forwarderId);
|
||||||
Forwarder* addForwarder(SensorNetAddress* addr, string* forwarderId);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Forwarder* _head;
|
Forwarder* _head;
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ void QoSm1Proxy::initialize(void)
|
|||||||
if (strcasecmp(param, "YES") == 0 )
|
if (strcasecmp(param, "YES") == 0 )
|
||||||
{
|
{
|
||||||
/* Create QoS-1 Clients */
|
/* Create QoS-1 Clients */
|
||||||
_gateway->getClientList()->setClientList(_gateway, QOSM1PROXY_TYPE);
|
_gateway->getClientList()->setClientList(QOSM1PROXY_TYPE);
|
||||||
|
|
||||||
/* set ClientId of this Proxy */
|
/* set ClientId of this Proxy */
|
||||||
const char* name = CLIENTPROXY;
|
const char* name = CLIENTPROXY;
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ char* currentDateTime(void);
|
|||||||
/*=====================================
|
/*=====================================
|
||||||
Class Gateway
|
Class Gateway
|
||||||
=====================================*/
|
=====================================*/
|
||||||
|
MQTTSNGW::Gateway* theGateway = nullptr;
|
||||||
|
|
||||||
Gateway::Gateway(void)
|
Gateway::Gateway(void)
|
||||||
{
|
{
|
||||||
theMultiTaskProcess = this;
|
theMultiTaskProcess = this;
|
||||||
@@ -87,14 +89,7 @@ Gateway::~Gateway()
|
|||||||
{
|
{
|
||||||
free(_params.configName);
|
free(_params.configName);
|
||||||
}
|
}
|
||||||
if ( _params.predefinedTopicFileName )
|
|
||||||
{
|
|
||||||
free(_params.predefinedTopicFileName);
|
|
||||||
}
|
|
||||||
if ( _params.forwarderListName )
|
|
||||||
{
|
|
||||||
free(_params.forwarderListName);
|
|
||||||
}
|
|
||||||
if ( _params.qosMinusClientListName )
|
if ( _params.qosMinusClientListName )
|
||||||
{
|
{
|
||||||
free(_params.qosMinusClientListName);
|
free(_params.qosMinusClientListName);
|
||||||
@@ -119,6 +114,7 @@ void Gateway::initialize(int argc, char** argv)
|
|||||||
{
|
{
|
||||||
char param[MQTTSNGW_PARAM_MAX];
|
char param[MQTTSNGW_PARAM_MAX];
|
||||||
string fileName;
|
string fileName;
|
||||||
|
theGateway = this;
|
||||||
|
|
||||||
MultiTaskProcess::initialize(argc, argv);
|
MultiTaskProcess::initialize(argc, argv);
|
||||||
resetRingBuffer();
|
resetRingBuffer();
|
||||||
@@ -222,10 +218,10 @@ void Gateway::initialize(int argc, char** argv)
|
|||||||
_adapterManager->initialize();
|
_adapterManager->initialize();
|
||||||
|
|
||||||
bool aggregate = _adapterManager->isAggregaterActive();
|
bool aggregate = _adapterManager->isAggregaterActive();
|
||||||
_clientList->initialize(this, aggregate);
|
_clientList->initialize(aggregate);
|
||||||
|
|
||||||
/* Setup predefined topics */
|
/* Setup predefined topics */
|
||||||
_clientList->setPredefinedTopics(this, aggregate);
|
_clientList->setPredefinedTopics(aggregate);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Gateway::run(void)
|
void Gateway::run(void)
|
||||||
@@ -242,7 +238,7 @@ void Gateway::run(void)
|
|||||||
WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName);
|
WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName);
|
||||||
WRITELOG(" ConfigFile: %s\n", _params.configName);
|
WRITELOG(" ConfigFile: %s\n", _params.configName);
|
||||||
|
|
||||||
if ( getClientList()->isAuthorized() )
|
if ( _params.clientListName )
|
||||||
{
|
{
|
||||||
WRITELOG(" ClientList: %s\n", _params.clientListName);
|
WRITELOG(" ClientList: %s\n", _params.clientListName);
|
||||||
}
|
}
|
||||||
@@ -252,16 +248,6 @@ void Gateway::run(void)
|
|||||||
WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
|
WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( _params.forwarderListName )
|
|
||||||
{
|
|
||||||
WRITELOG(" Forwarders: %s\n", _params.forwarderListName);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( _params.qosMinusClientListName )
|
|
||||||
{
|
|
||||||
WRITELOG(" QoS-1File: %s\n", _params.qosMinusClientListName);
|
|
||||||
}
|
|
||||||
|
|
||||||
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
|
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
|
||||||
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
|
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
|
||||||
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
|
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
|
||||||
|
|||||||
@@ -164,7 +164,6 @@ public:
|
|||||||
char* certKey {nullptr};
|
char* certKey {nullptr};
|
||||||
char* privateKey {nullptr};
|
char* privateKey {nullptr};
|
||||||
char* predefinedTopicFileName {nullptr};
|
char* predefinedTopicFileName {nullptr};
|
||||||
char* forwarderListName {nullptr};
|
|
||||||
char* qosMinusClientListName {nullptr};
|
char* qosMinusClientListName {nullptr};
|
||||||
bool clientAuthentication {false};
|
bool clientAuthentication {false};
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user