mirror of
https://github.com/eclipse/paho.mqtt-sn.embedded-c.git
synced 2025-12-13 15:36:51 +01:00
Update: DISCONNECT (Issue #31) and others
Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
@@ -52,6 +52,9 @@
|
|||||||
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="ssl"/>
|
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="ssl"/>
|
||||||
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="crypto"/>
|
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="crypto"/>
|
||||||
</option>
|
</option>
|
||||||
|
<option id="gnu.cpp.link.option.paths.170974409" name="Library search path (-L)" superClass="gnu.cpp.link.option.paths" useByScannerDiscovery="false" valueType="libPaths">
|
||||||
|
<listOptionValue builtIn="false" value="/usr/local/lib"/>
|
||||||
|
</option>
|
||||||
<inputType id="cdt.managedbuild.tool.gnu.cpp.linker.input.98211496" superClass="cdt.managedbuild.tool.gnu.cpp.linker.input">
|
<inputType id="cdt.managedbuild.tool.gnu.cpp.linker.input.98211496" superClass="cdt.managedbuild.tool.gnu.cpp.linker.input">
|
||||||
<additionalInput kind="additionalinputdependency" paths="$(USER_OBJS)"/>
|
<additionalInput kind="additionalinputdependency" paths="$(USER_OBJS)"/>
|
||||||
<additionalInput kind="additionalinput" paths="$(LIBS)"/>
|
<additionalInput kind="additionalinput" paths="$(LIBS)"/>
|
||||||
@@ -63,7 +66,7 @@
|
|||||||
</toolChain>
|
</toolChain>
|
||||||
</folderInfo>
|
</folderInfo>
|
||||||
<sourceEntries>
|
<sourceEntries>
|
||||||
<entry excluding="MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
|
<entry excluding="MQTTSNGateway/src/tests/mainTestProcessFramework.cpp|MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
|
||||||
</sourceEntries>
|
</sourceEntries>
|
||||||
</configuration>
|
</configuration>
|
||||||
</storageModule>
|
</storageModule>
|
||||||
@@ -130,7 +133,7 @@
|
|||||||
</folderInfo>
|
</folderInfo>
|
||||||
<sourceEntries>
|
<sourceEntries>
|
||||||
<entry excluding="MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNPacket/src|MQTTSNGateway/src|MQTTSNGateway/src/linux|MQTTSNGateway/src/linux/udp|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
|
<entry excluding="MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNPacket/src|MQTTSNGateway/src|MQTTSNGateway/src/linux|MQTTSNGateway/src/linux/udp|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
|
||||||
<entry excluding="mainLogmonitor.cpp|linux|linux/udp" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src"/>
|
<entry excluding="tests/mainTestProcessFramework.cpp|mainLogmonitor.cpp|linux|linux/udp" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src"/>
|
||||||
<entry excluding="xbee" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src/linux"/>
|
<entry excluding="xbee" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src/linux"/>
|
||||||
<entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNPacket/src"/>
|
<entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNPacket/src"/>
|
||||||
</sourceEntries>
|
</sourceEntries>
|
||||||
|
|||||||
@@ -40,3 +40,6 @@ Baudrate=38400
|
|||||||
SerialDevice=/dev/ttyUSB0
|
SerialDevice=/dev/ttyUSB0
|
||||||
ApiMode=2
|
ApiMode=2
|
||||||
|
|
||||||
|
# LOG
|
||||||
|
ShearedMemory=NO;
|
||||||
|
|
||||||
|
|||||||
@@ -95,12 +95,4 @@ void MQTTGWConnectionHandler::handlePingresp(Client* client, MQTTGWPacket* packe
|
|||||||
_gateway->getClientSendQue()->post(ev1);
|
_gateway->getClientSendQue()->post(ev1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MQTTGWConnectionHandler::handleDisconnect(Client* client, MQTTGWPacket* packet)
|
|
||||||
{
|
|
||||||
MQTTSNPacket* snPacket = new MQTTSNPacket();
|
|
||||||
snPacket->setDISCONNECT(0);
|
|
||||||
client->disconnected();
|
|
||||||
Event* ev1 = new Event();
|
|
||||||
ev1->setClientSendEvent(client, snPacket);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|||||||
@@ -30,13 +30,9 @@ public:
|
|||||||
~MQTTGWConnectionHandler();
|
~MQTTGWConnectionHandler();
|
||||||
void handleConnack(Client* client, MQTTGWPacket* packet);
|
void handleConnack(Client* client, MQTTGWPacket* packet);
|
||||||
void handlePingresp(Client* client, MQTTGWPacket* packet);
|
void handlePingresp(Client* client, MQTTGWPacket* packet);
|
||||||
void handleDisconnect(Client* client, MQTTGWPacket* packet);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Gateway* _gateway;
|
Gateway* _gateway;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif /* MQTTGWCONNECTIONHANDLER_H_ */
|
#endif /* MQTTGWCONNECTIONHANDLER_H_ */
|
||||||
|
|||||||
@@ -176,7 +176,6 @@ int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet)
|
|||||||
switch (packet->getType())
|
switch (packet->getType())
|
||||||
{
|
{
|
||||||
case CONNACK:
|
case CONNACK:
|
||||||
case DISCONNECT:
|
|
||||||
WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf));
|
WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf));
|
||||||
break;
|
break;
|
||||||
case PUBLISH:
|
case PUBLISH:
|
||||||
|
|||||||
@@ -34,41 +34,13 @@ BrokerSendTask::BrokerSendTask(Gateway* gateway)
|
|||||||
{
|
{
|
||||||
_gateway = gateway;
|
_gateway = gateway;
|
||||||
_gateway->attach((Thread*)this);
|
_gateway->attach((Thread*)this);
|
||||||
_host = 0;
|
_gwparams = 0;
|
||||||
_port = 0;
|
|
||||||
_portSecure = 0;
|
|
||||||
_certDirectory = 0;
|
|
||||||
_rootCAfile = 0;
|
|
||||||
|
|
||||||
_light = 0;
|
_light = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
BrokerSendTask::~BrokerSendTask()
|
BrokerSendTask::~BrokerSendTask()
|
||||||
{
|
{
|
||||||
if (_host)
|
|
||||||
{
|
|
||||||
free(_host);
|
|
||||||
}
|
|
||||||
if (_port)
|
|
||||||
{
|
|
||||||
free(_port);
|
|
||||||
}
|
|
||||||
if (_portSecure)
|
|
||||||
{
|
|
||||||
free(_portSecure);
|
|
||||||
}
|
|
||||||
if (_certDirectory)
|
|
||||||
{
|
|
||||||
free(_certDirectory);
|
|
||||||
}
|
|
||||||
if (_rootCApath)
|
|
||||||
{
|
|
||||||
free(_rootCApath);
|
|
||||||
}
|
|
||||||
if (_rootCAfile)
|
|
||||||
{
|
|
||||||
free(_rootCAfile);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -76,34 +48,7 @@ BrokerSendTask::~BrokerSendTask()
|
|||||||
*/
|
*/
|
||||||
void BrokerSendTask::initialize(int argc, char** argv)
|
void BrokerSendTask::initialize(int argc, char** argv)
|
||||||
{
|
{
|
||||||
char param[MQTTSNGW_PARAM_MAX];
|
_gwparams = _gateway->getGWParams();
|
||||||
|
|
||||||
if (_gateway->getParam("BrokerName", param) == 0)
|
|
||||||
{
|
|
||||||
_host = strdup(param);
|
|
||||||
}
|
|
||||||
if (_gateway->getParam("BrokerPortNo", param) == 0)
|
|
||||||
{
|
|
||||||
_port = strdup(param);
|
|
||||||
}
|
|
||||||
if (_gateway->getParam("BrokerSecurePortNo", param) == 0)
|
|
||||||
{
|
|
||||||
_portSecure = strdup(param);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_gateway->getParam("CertsDirectory", param) == 0)
|
|
||||||
{
|
|
||||||
_certDirectory = strdup(param);
|
|
||||||
}
|
|
||||||
if (_gateway->getParam("RootCApath", param) == 0)
|
|
||||||
{
|
|
||||||
_rootCApath = strdup(param);
|
|
||||||
}
|
|
||||||
if (_gateway->getParam("RootCAfile", param) == 0)
|
|
||||||
{
|
|
||||||
_rootCAfile = strdup(param);
|
|
||||||
}
|
|
||||||
|
|
||||||
_light = _gateway->getLightIndicator();
|
_light = _gateway->getLightIndicator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,16 +67,16 @@ void BrokerSendTask::run()
|
|||||||
ev = _gateway->getBrokerSendQue()->wait();
|
ev = _gateway->getBrokerSendQue()->wait();
|
||||||
client = ev->getClient();
|
client = ev->getClient();
|
||||||
packet = ev->getMQTTGWPacket();
|
packet = ev->getMQTTGWPacket();
|
||||||
|
/*
|
||||||
if ( client->getNetwork()->isValid() && !client->getNetwork()->isSecure() && packet->getType() == CONNECT )
|
if ( client->getNetwork()->isValid() && !client->getNetwork()->isSecure() && packet->getType() == CONNECT )
|
||||||
{
|
{
|
||||||
client->getNetwork()->close();
|
client->getNetwork()->close();
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
if ( !client->getNetwork()->isValid() )
|
if ( !client->getNetwork()->isValid() )
|
||||||
{
|
{
|
||||||
/* connect to the broker and send a packet */
|
/* connect to the broker and send a packet */
|
||||||
char* portNo = _port;
|
char* portNo = _gwparams->port;
|
||||||
const char* cert = 0;
|
const char* cert = 0;
|
||||||
const char* keyFile = 0;
|
const char* keyFile = 0;
|
||||||
string certFile;
|
string certFile;
|
||||||
@@ -139,33 +84,33 @@ void BrokerSendTask::run()
|
|||||||
|
|
||||||
if (client->isSecureNetwork())
|
if (client->isSecureNetwork())
|
||||||
{
|
{
|
||||||
portNo = _portSecure;
|
portNo = _gwparams->portSecure;
|
||||||
if ( _certDirectory )
|
if ( _gwparams->certDirectory )
|
||||||
{
|
{
|
||||||
certFile = _certDirectory;
|
certFile = _gwparams->certDirectory;
|
||||||
certFile += client->getClientId();
|
certFile += client->getClientId();
|
||||||
certFile += ".crt";
|
certFile += ".crt";
|
||||||
cert = certFile.c_str();
|
cert = certFile.c_str();
|
||||||
privateKeyFile = _certDirectory;
|
privateKeyFile = _gwparams->certDirectory;
|
||||||
privateKeyFile += client->getClientId();
|
privateKeyFile += client->getClientId();
|
||||||
privateKeyFile += ".key";
|
privateKeyFile += ".key";
|
||||||
keyFile = privateKeyFile.c_str();
|
keyFile = privateKeyFile.c_str();
|
||||||
}
|
}
|
||||||
rc = client->getNetwork()->connect(_host, portNo, _rootCApath, _rootCAfile, cert, keyFile);
|
rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->portSecure, _gwparams->rootCApath, _gwparams->rootCAfile, cert, keyFile);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
rc = client->getNetwork()->connect(_host, portNo);
|
rc = client->getNetwork()->connect(_gwparams->brokerName, portNo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( !rc )
|
if ( !rc )
|
||||||
{
|
{
|
||||||
/* disconnect the broker and change the client's status */
|
/* disconnect the broker and the client */
|
||||||
WRITELOG("%s BrokerSendTask can't connect to the broker. errno=%d %s%s\n",
|
WRITELOG("%s BrokerSendTask can't connect to the broker. errno=%d %s%s\n",
|
||||||
ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER);
|
ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER);
|
||||||
client->disconnected();
|
|
||||||
client->getNetwork()->disconnect();
|
|
||||||
delete ev;
|
delete ev;
|
||||||
|
disconnect(client);
|
||||||
|
client->getNetwork()->disconnect();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -231,3 +176,13 @@ void BrokerSendTask::log(Client* client, MQTTGWPacket* packet)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void BrokerSendTask::disconnect(Client* client)
|
||||||
|
{
|
||||||
|
MQTTSNPacket* snMsg = new MQTTSNPacket();
|
||||||
|
snMsg->setDISCONNECT(0);
|
||||||
|
Event* ev1 = new Event();
|
||||||
|
ev1->setClientSendEvent(client, snMsg);
|
||||||
|
_gateway->getClientSendQue()->post(ev1);
|
||||||
|
client->disconnected();
|
||||||
|
}
|
||||||
|
|||||||
@@ -35,14 +35,9 @@ public:
|
|||||||
void run();
|
void run();
|
||||||
private:
|
private:
|
||||||
void log(Client*, MQTTGWPacket*);
|
void log(Client*, MQTTGWPacket*);
|
||||||
|
void disconnect(Client*);
|
||||||
Gateway* _gateway;
|
Gateway* _gateway;
|
||||||
char* _host;
|
GatewayParams* _gwparams;
|
||||||
char* _port;
|
|
||||||
char* _portSecure;
|
|
||||||
char* _rootCApath;
|
|
||||||
char* _rootCAfile;
|
|
||||||
char* _certDirectory;
|
|
||||||
LightIndicator* _light;
|
LightIndicator* _light;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -375,13 +375,9 @@ void Client::setClientSleepPacket(MQTTSNPacket* packet)
|
|||||||
_clientSleepPacketQue.post(packet);
|
_clientSleepPacketQue.post(packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Client::checkTimeover(void)
|
bool Client::checkTimeover(void)
|
||||||
{
|
{
|
||||||
if (_status == Cstat_Active && _keepAliveTimer.isTimeup())
|
return (_status == Cstat_Active && _keepAliveTimer.isTimeup());
|
||||||
{
|
|
||||||
_status = Cstat_Lost;
|
|
||||||
_network->disconnect();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Client::setKeepAlive(MQTTSNPacket* packet)
|
void Client::setKeepAlive(MQTTSNPacket* packet)
|
||||||
|
|||||||
@@ -248,7 +248,7 @@ public:
|
|||||||
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
|
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
|
||||||
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
|
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
|
||||||
|
|
||||||
void checkTimeover(void);
|
bool checkTimeover(void);
|
||||||
void updateStatus(MQTTSNPacket*);
|
void updateStatus(MQTTSNPacket*);
|
||||||
void updateStatus(ClientStatus);
|
void updateStatus(ClientStatus);
|
||||||
void connectSended(void);
|
void connectSended(void);
|
||||||
|
|||||||
@@ -20,11 +20,11 @@
|
|||||||
namespace MQTTSNGW
|
namespace MQTTSNGW
|
||||||
{
|
{
|
||||||
/*=================================
|
/*=================================
|
||||||
* Log controls
|
* Config Parametrs
|
||||||
==================================*/
|
==================================*/
|
||||||
//#define DEBUG // print out log for debug
|
#define CONFIG_DIRECTORY "./"
|
||||||
//#define RINGBUFFER // print out Packets log into shared memory
|
#define CONFIG_FILE "gateway.conf"
|
||||||
//#define DEBUG_NWSTACK // print out SensorNetwork log
|
#define CLIENT_LIST "clients.conf"
|
||||||
|
|
||||||
/*=================================
|
/*=================================
|
||||||
* MQTT-SN Parametrs
|
* MQTT-SN Parametrs
|
||||||
@@ -43,11 +43,13 @@ typedef unsigned short uint16_t;
|
|||||||
typedef unsigned int uint32_t;
|
typedef unsigned int uint32_t;
|
||||||
|
|
||||||
/*=================================
|
/*=================================
|
||||||
* Macros
|
* Log controls
|
||||||
==================================*/
|
==================================*/
|
||||||
|
//#define DEBUG // print out log for debug
|
||||||
|
//#define DEBUG_NWSTACK // print out SensorNetwork log
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
#define DEBUGLOG(...) printf(__VA_ARGS__)
|
#define DEBUGLOG(...) printf(__VA_ARGS__)
|
||||||
#undef RINGBUFFER
|
|
||||||
#else
|
#else
|
||||||
#define DEBUGLOG(...)
|
#define DEBUGLOG(...)
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -109,7 +109,11 @@ void PacketHandleTask::run()
|
|||||||
client = clist->getClient();
|
client = clist->getClient();
|
||||||
while (client > 0)
|
while (client > 0)
|
||||||
{
|
{
|
||||||
client->checkTimeover();
|
if ( client->checkTimeover() )
|
||||||
|
{
|
||||||
|
_mqttsnConnection->handleDisconnect(client, 0);
|
||||||
|
client->disconnected();
|
||||||
|
}
|
||||||
client = client->getNextClient();
|
client = client->getNextClient();
|
||||||
}
|
}
|
||||||
/*------ Check Keep Alive Timer & send Advertise ------*/
|
/*------ Check Keep Alive Timer & send Advertise ------*/
|
||||||
@@ -200,9 +204,6 @@ void PacketHandleTask::run()
|
|||||||
case CONNACK:
|
case CONNACK:
|
||||||
_mqttConnection->handleConnack(client, brPacket);
|
_mqttConnection->handleConnack(client, brPacket);
|
||||||
break;
|
break;
|
||||||
case DISCONNECT:
|
|
||||||
_mqttConnection->handleDisconnect(client, brPacket);
|
|
||||||
break;
|
|
||||||
case PINGRESP:
|
case PINGRESP:
|
||||||
_mqttConnection->handlePingresp(client, brPacket);
|
_mqttConnection->handlePingresp(client, brPacket);
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -53,8 +53,9 @@ Process::Process()
|
|||||||
{
|
{
|
||||||
_argc = 0;
|
_argc = 0;
|
||||||
_argv = 0;
|
_argv = 0;
|
||||||
_configDir = MQTTSNGW_CONFIG_DIRECTORY;
|
_configDir = CONFIG_DIRECTORY;
|
||||||
_configFile = MQTTSNGW_CONFIG_FILE;
|
_configFile = CONFIG_FILE;
|
||||||
|
_log = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Process::~Process()
|
Process::~Process()
|
||||||
@@ -76,6 +77,7 @@ void Process::run()
|
|||||||
|
|
||||||
void Process::initialize(int argc, char** argv)
|
void Process::initialize(int argc, char** argv)
|
||||||
{
|
{
|
||||||
|
char param[MQTTSNGW_PARAM_MAX];
|
||||||
_argc = argc;
|
_argc = argc;
|
||||||
_argv = argv;
|
_argv = argv;
|
||||||
signal(SIGINT, signalHandler);
|
signal(SIGINT, signalHandler);
|
||||||
@@ -102,6 +104,18 @@ void Process::initialize(int argc, char** argv)
|
|||||||
}
|
}
|
||||||
_rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0);
|
_rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0);
|
||||||
_rb = new RingBuffer(_configDir.c_str());
|
_rb = new RingBuffer(_configDir.c_str());
|
||||||
|
|
||||||
|
if (getParam("ShearedMemory", param) == 0)
|
||||||
|
{
|
||||||
|
if (!strcasecmp(param, "YES"))
|
||||||
|
{
|
||||||
|
_log = 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_log = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int Process::getArgc()
|
int Process::getArgc()
|
||||||
@@ -178,8 +192,15 @@ void Process::putLog(const char* format, ...)
|
|||||||
va_end(arg);
|
va_end(arg);
|
||||||
if (strlen(_rbdata))
|
if (strlen(_rbdata))
|
||||||
{
|
{
|
||||||
_rb->put(_rbdata);
|
if ( _log > 0 )
|
||||||
_rbsem->post();
|
{
|
||||||
|
_rb->put(_rbdata);
|
||||||
|
_rbsem->post();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
printf("%s", _rbdata);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_mt.unlock();
|
_mt.unlock();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,11 +31,6 @@ namespace MQTTSNGW
|
|||||||
/*=================================
|
/*=================================
|
||||||
* Parameters
|
* Parameters
|
||||||
==================================*/
|
==================================*/
|
||||||
#define MQTTSNGW_CONFIG_DIRECTORY "./"
|
|
||||||
|
|
||||||
#define MQTTSNGW_CONFIG_FILE "gateway.conf"
|
|
||||||
#define MQTTSNGW_CLIENT_LIST "clients.conf"
|
|
||||||
|
|
||||||
#define MQTTSNGW_MAX_TASK 10 // number of Tasks
|
#define MQTTSNGW_MAX_TASK 10 // number of Tasks
|
||||||
#define PROCESS_LOG_BUFFER_SIZE 16384 // Ring buffer size for Logs
|
#define PROCESS_LOG_BUFFER_SIZE 16384 // Ring buffer size for Logs
|
||||||
#define MQTTSNGW_PARAM_MAX 128 // Max length of config records.
|
#define MQTTSNGW_PARAM_MAX 128 // Max length of config records.
|
||||||
@@ -43,12 +38,7 @@ namespace MQTTSNGW
|
|||||||
/*=================================
|
/*=================================
|
||||||
* Macros
|
* Macros
|
||||||
==================================*/
|
==================================*/
|
||||||
#ifdef RINGBUFFER
|
|
||||||
#define WRITELOG theProcess->putLog
|
#define WRITELOG theProcess->putLog
|
||||||
#else
|
|
||||||
#define WRITELOG printf
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
/*=================================
|
/*=================================
|
||||||
Class Process
|
Class Process
|
||||||
@@ -77,6 +67,7 @@ private:
|
|||||||
RingBuffer* _rb;
|
RingBuffer* _rb;
|
||||||
Semaphore* _rbsem;
|
Semaphore* _rbsem;
|
||||||
Mutex _mt;
|
Mutex _mt;
|
||||||
|
int _log;
|
||||||
char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1];
|
char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,17 @@ Gateway::Gateway()
|
|||||||
theProcess = this;
|
theProcess = this;
|
||||||
_params.loginId = 0;
|
_params.loginId = 0;
|
||||||
_params.password = 0;
|
_params.password = 0;
|
||||||
|
_params.keepAlive = 0;
|
||||||
|
_params.gatewayId = 0;
|
||||||
|
_params.mqttVersion = 0;
|
||||||
|
_params.maxInflightMsgs = 0;
|
||||||
|
_params.gatewayName = 0;
|
||||||
|
_params.brokerName = 0;
|
||||||
|
_params.port = 0;
|
||||||
|
_params.portSecure = 0;
|
||||||
|
_params.rootCApath = 0;
|
||||||
|
_params.rootCAfile = 0;
|
||||||
|
_params.certDirectory = 0;
|
||||||
_packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
|
_packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -44,15 +55,71 @@ Gateway::~Gateway()
|
|||||||
{
|
{
|
||||||
free(_params.password);
|
free(_params.password);
|
||||||
}
|
}
|
||||||
|
if ( _params.gatewayName )
|
||||||
|
{
|
||||||
|
free(_params.gatewayName);
|
||||||
|
}
|
||||||
|
if ( _params.brokerName )
|
||||||
|
{
|
||||||
|
free(_params.brokerName);
|
||||||
|
}
|
||||||
|
if ( _params.port )
|
||||||
|
{
|
||||||
|
free(_params.port);
|
||||||
|
}
|
||||||
|
if ( _params.portSecure )
|
||||||
|
{
|
||||||
|
free(_params.portSecure);
|
||||||
|
}
|
||||||
|
if ( _params.rootCApath )
|
||||||
|
{
|
||||||
|
free(_params.rootCApath);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( _params.clientListName )
|
||||||
|
{
|
||||||
|
free(_params.clientListName);
|
||||||
|
}
|
||||||
|
if ( _params.configName )
|
||||||
|
{
|
||||||
|
free(_params.configName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Gateway::initialize(int argc, char** argv)
|
void Gateway::initialize(int argc, char** argv)
|
||||||
{
|
{
|
||||||
char param[MQTTSNGW_PARAM_MAX];
|
char param[MQTTSNGW_PARAM_MAX];
|
||||||
|
string fileName;
|
||||||
MultiTaskProcess::initialize(argc, argv);
|
MultiTaskProcess::initialize(argc, argv);
|
||||||
resetRingBuffer();
|
resetRingBuffer();
|
||||||
|
|
||||||
_params.gatewayId = 0;
|
|
||||||
|
if (getParam("BrokerName", param) == 0)
|
||||||
|
{
|
||||||
|
_params.brokerName = strdup(param);
|
||||||
|
}
|
||||||
|
if (getParam("BrokerPortNo", param) == 0)
|
||||||
|
{
|
||||||
|
_params.port = strdup(param);
|
||||||
|
}
|
||||||
|
if (getParam("BrokerSecurePortNo", param) == 0)
|
||||||
|
{
|
||||||
|
_params.portSecure = strdup(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (getParam("CertsDirectory", param) == 0)
|
||||||
|
{
|
||||||
|
_params.certDirectory = strdup(param);
|
||||||
|
}
|
||||||
|
if (getParam("RootCApath", param) == 0)
|
||||||
|
{
|
||||||
|
_params.rootCApath = strdup(param);
|
||||||
|
}
|
||||||
|
if (getParam("RootCAfile", param) == 0)
|
||||||
|
{
|
||||||
|
_params.rootCAfile = strdup(param);
|
||||||
|
}
|
||||||
|
|
||||||
if (getParam("GatewayID", param) == 0)
|
if (getParam("GatewayID", param) == 0)
|
||||||
{
|
{
|
||||||
_params.gatewayId = atoi(param);
|
_params.gatewayId = atoi(param);
|
||||||
@@ -65,7 +132,7 @@ void Gateway::initialize(int argc, char** argv)
|
|||||||
|
|
||||||
if (getParam("GatewayName", param) == 0)
|
if (getParam("GatewayName", param) == 0)
|
||||||
{
|
{
|
||||||
_params.gatewayName = (uint8_t*) strdup(param);
|
_params.gatewayName = strdup(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
_params.mqttVersion = DEFAULT_MQTT_VERSION;
|
_params.mqttVersion = DEFAULT_MQTT_VERSION;
|
||||||
@@ -94,17 +161,16 @@ void Gateway::initialize(int argc, char** argv)
|
|||||||
|
|
||||||
if (getParam("LoginID", param) == 0)
|
if (getParam("LoginID", param) == 0)
|
||||||
{
|
{
|
||||||
_params.loginId = (uint8_t*) strdup(param);
|
_params.loginId = strdup(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (getParam("Password", param) == 0)
|
if (getParam("Password", param) == 0)
|
||||||
{
|
{
|
||||||
_params.password = (uint8_t*) strdup(param);
|
_params.password = strdup(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (getParam("ClientAuthentication", param) == 0)
|
if (getParam("ClientAuthentication", param) == 0)
|
||||||
{
|
{
|
||||||
string fileName;
|
|
||||||
if (!strcasecmp(param, "YES"))
|
if (!strcasecmp(param, "YES"))
|
||||||
{
|
{
|
||||||
if (getParam("ClientsList", param) == 0)
|
if (getParam("ClientsList", param) == 0)
|
||||||
@@ -113,15 +179,18 @@ void Gateway::initialize(int argc, char** argv)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
fileName = *getConfigDirName() + string(MQTTSNGW_CLIENT_LIST);
|
fileName = *getConfigDirName() + string(CLIENT_LIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!_clientList.authorize(fileName.c_str()))
|
if (!_clientList.authorize(fileName.c_str()))
|
||||||
{
|
{
|
||||||
throw Exception("Gateway::initialize: No client list defined by configuration.");
|
throw Exception("Gateway::initialize: No client list defined by configuration.");
|
||||||
}
|
}
|
||||||
|
_params.clientListName = strdup(fileName.c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fileName = *getConfigDirName() + *getConfigFileName();
|
||||||
|
_params.configName = strdup(fileName.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Gateway::run(void)
|
void Gateway::run(void)
|
||||||
@@ -134,15 +203,19 @@ void Gateway::run(void)
|
|||||||
WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3);
|
WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3);
|
||||||
WRITELOG("%s\n", GATEWAY_VERSION);
|
WRITELOG("%s\n", GATEWAY_VERSION);
|
||||||
WRITELOG("%s\n", PAHO_COPYRIGHT4);
|
WRITELOG("%s\n", PAHO_COPYRIGHT4);
|
||||||
WRITELOG("\n%s %s has been started.\n listening on, %s\n", currentDateTime(), _params.gatewayName, _sensorNetwork.getDescription());
|
WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName);
|
||||||
|
WRITELOG(" ConfigFile: %s\n", _params.configName);
|
||||||
if ( getClientList()->isAuthorized() )
|
if ( getClientList()->isAuthorized() )
|
||||||
{
|
{
|
||||||
WRITELOG("\nClient authentication is required by the configuration settings.\n\n");
|
WRITELOG(" ClientList: %s\n", _params.clientListName);
|
||||||
}
|
}
|
||||||
|
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
|
||||||
|
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
|
||||||
|
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
|
||||||
|
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
|
||||||
|
WRITELOG(" ClientCerts: %s\n", _params.certDirectory);
|
||||||
|
|
||||||
/* execute threads & wait StopProcessEvent MQTTSNGWPacketHandleTask posts by CTL-C */
|
/* Execute threads and wait StopProcessEvent from MQTTSNGWPacketHandleTask */
|
||||||
|
|
||||||
MultiTaskProcess::run();
|
MultiTaskProcess::run();
|
||||||
WRITELOG("%s MQTT-SN Gateway stoped\n", currentDateTime());
|
WRITELOG("%s MQTT-SN Gateway stoped\n", currentDateTime());
|
||||||
_lightIndicator.allLightOff();
|
_lightIndicator.allLightOff();
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ namespace MQTTSNGW
|
|||||||
/*=================================
|
/*=================================
|
||||||
* Starting prompt
|
* Starting prompt
|
||||||
==================================*/
|
==================================*/
|
||||||
#define GATEWAY_VERSION " * Version: 0.6.0"
|
#define GATEWAY_VERSION " * Version: 0.8.0"
|
||||||
|
|
||||||
#define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway"
|
#define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway"
|
||||||
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"
|
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"
|
||||||
@@ -145,13 +145,21 @@ private:
|
|||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
uint8_t* loginId;
|
char* configName;
|
||||||
uint8_t* password;
|
char* clientListName;
|
||||||
|
char* loginId;
|
||||||
|
char* password;
|
||||||
uint16_t keepAlive;
|
uint16_t keepAlive;
|
||||||
uint8_t gatewayId;
|
uint8_t gatewayId;
|
||||||
uint8_t mqttVersion;
|
uint8_t mqttVersion;
|
||||||
uint16_t maxInflightMsgs;
|
uint16_t maxInflightMsgs;
|
||||||
uint8_t* gatewayName;
|
char* gatewayName;
|
||||||
|
char* brokerName;
|
||||||
|
char* port;
|
||||||
|
char* portSecure;
|
||||||
|
char* rootCApath;
|
||||||
|
char* rootCAfile;
|
||||||
|
char* certDirectory;
|
||||||
}GatewayParams;
|
}GatewayParams;
|
||||||
|
|
||||||
/*=====================================
|
/*=====================================
|
||||||
|
|||||||
@@ -248,6 +248,7 @@ Network::~Network()
|
|||||||
{
|
{
|
||||||
if (_ssl)
|
if (_ssl)
|
||||||
{
|
{
|
||||||
|
SSL_shutdown(_ssl);
|
||||||
SSL_free(_ssl);
|
SSL_free(_ssl);
|
||||||
_numOfInstance--;
|
_numOfInstance--;
|
||||||
}
|
}
|
||||||
@@ -351,7 +352,7 @@ bool Network::connect(const char* host, const char* port, const char* caPath, co
|
|||||||
SSL_free(_ssl);
|
SSL_free(_ssl);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSL_set_options(_ssl, SSL_OP_NO_TICKET);
|
//SSL_set_options(_ssl, SSL_OP_NO_TICKET);
|
||||||
|
|
||||||
if ( cert )
|
if ( cert )
|
||||||
{
|
{
|
||||||
@@ -535,6 +536,7 @@ int Network::recv(uint8_t* buf, uint16_t len)
|
|||||||
case SSL_ERROR_ZERO_RETURN:
|
case SSL_ERROR_ZERO_RETURN:
|
||||||
SSL_shutdown(_ssl);
|
SSL_shutdown(_ssl);
|
||||||
_ssl = 0;
|
_ssl = 0;
|
||||||
|
_numOfInstance--;
|
||||||
TCPStack::close();
|
TCPStack::close();
|
||||||
_busy = false;
|
_busy = false;
|
||||||
_mutex.unlock();
|
_mutex.unlock();
|
||||||
|
|||||||
@@ -236,7 +236,7 @@ void Semaphore::timedwait(uint16_t millsec)
|
|||||||
=========================================*/
|
=========================================*/
|
||||||
RingBuffer::RingBuffer()
|
RingBuffer::RingBuffer()
|
||||||
{
|
{
|
||||||
RingBuffer(MQTTSNGW_CONFIG_DIRECTORY);
|
RingBuffer(MQTTSNGW_KEY_DIRECTORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
RingBuffer::RingBuffer(const char* keyDirectory)
|
RingBuffer::RingBuffer(const char* keyDirectory)
|
||||||
|
|||||||
@@ -23,7 +23,7 @@
|
|||||||
|
|
||||||
namespace MQTTSNGW
|
namespace MQTTSNGW
|
||||||
{
|
{
|
||||||
|
#define MQTTSNGW_KEY_DIRECTORY "./"
|
||||||
#define MQTTSNGW_RINGBUFFER_KEY "ringbuffer.key"
|
#define MQTTSNGW_RINGBUFFER_KEY "ringbuffer.key"
|
||||||
#define MQTTSNGW_RB_MUTEX_KEY "rbmutex.key"
|
#define MQTTSNGW_RB_MUTEX_KEY "rbmutex.key"
|
||||||
#define MQTTSNGW_RB_SEMAPHOR_NAME "/rbsemaphor"
|
#define MQTTSNGW_RB_SEMAPHOR_NAME "/rbsemaphor"
|
||||||
|
|||||||
@@ -180,24 +180,30 @@ void LightIndicator::init()
|
|||||||
pinMode(LIGHT_INDICATOR_BLUE);
|
pinMode(LIGHT_INDICATOR_BLUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
void LightIndicator::lit(int gpioNo, const char* onoff)
|
int LightIndicator::lit(int gpioNo, const char* onoff)
|
||||||
{
|
{
|
||||||
|
int rc = 0;
|
||||||
if( _gpio[gpioNo] )
|
if( _gpio[gpioNo] )
|
||||||
{
|
{
|
||||||
write(_gpio[gpioNo], onoff, 1);
|
rc = write(_gpio[gpioNo], onoff, 1);
|
||||||
}
|
}
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
void LightIndicator::pinMode(int gpioNo)
|
void LightIndicator::pinMode(int gpioNo)
|
||||||
{
|
{
|
||||||
int fd = open("/sys/class/gpio/export", O_WRONLY);
|
int rc = 0;
|
||||||
|
int fd = rc; // eliminate unused warnning of compiler
|
||||||
|
|
||||||
|
fd = open("/sys/class/gpio/export", O_WRONLY);
|
||||||
if ( fd < 0 )
|
if ( fd < 0 )
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
char no[4];
|
char no[4];
|
||||||
|
|
||||||
sprintf(no,"%d", gpioNo);
|
sprintf(no,"%d", gpioNo);
|
||||||
write(fd, no, strlen(no));
|
rc = write(fd, no, strlen(no));
|
||||||
close(fd);
|
close(fd);
|
||||||
|
|
||||||
char fileName[64];
|
char fileName[64];
|
||||||
@@ -208,9 +214,8 @@ void LightIndicator::pinMode(int gpioNo)
|
|||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
write(fd,"out", 3);
|
rc = write(fd,"out", 3);
|
||||||
close(fd);
|
close(fd);
|
||||||
|
|
||||||
sprintf( fileName, "/sys/class/gpio/gpio%d/value", gpioNo);
|
sprintf( fileName, "/sys/class/gpio/gpio%d/value", gpioNo);
|
||||||
fd = open(fileName, O_WRONLY);
|
fd = open(fileName, O_WRONLY);
|
||||||
if ( fd > 0 )
|
if ( fd > 0 )
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
void init();
|
void init();
|
||||||
void lit(int gpioNo, const char* onoff);
|
int lit(int gpioNo, const char* onoff);
|
||||||
void pinMode(int gpioNo);
|
void pinMode(int gpioNo);
|
||||||
bool _greenStatus;
|
bool _greenStatus;
|
||||||
int _gpio[MAX_GPIO + 1];
|
int _gpio[MAX_GPIO + 1];
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ int SensorNetwork::initialize(void)
|
|||||||
if (theProcess->getParam("GatewayPortNo", param) == 0)
|
if (theProcess->getParam("GatewayPortNo", param) == 0)
|
||||||
{
|
{
|
||||||
unicastPortNo = atoi(param);
|
unicastPortNo = atoi(param);
|
||||||
_description += " and Gateway Port ";
|
_description += " Gateway Port ";
|
||||||
_description += param;
|
_description += param;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -39,11 +39,11 @@ void TestTask::run(void)
|
|||||||
{
|
{
|
||||||
while(true)
|
while(true)
|
||||||
{
|
{
|
||||||
printf("Task is running. Enter CTRL+C \n");
|
|
||||||
if (theProcess->checkSignal() == SIGINT)
|
if (theProcess->checkSignal() == SIGINT)
|
||||||
{
|
{
|
||||||
throw Exception("Terminated by CTL-C");
|
throw Exception("Terminated by CTL-C");
|
||||||
}
|
}
|
||||||
|
printf("Task is running. Enter CTRL+C\n");
|
||||||
sleep(1);
|
sleep(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user