From 0f799cff7f28fe9476ceaa448d6c1fc7f5b750c4 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Tue, 30 Aug 2016 12:04:50 +0900 Subject: [PATCH 1/5] Update: set Max EventQue size to avoid Buffer over flow Signed-off-by: tomoaki --- .cproject | 43 ++++++++++---------- MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp | 5 ++- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 15 +++++-- MQTTSNGateway/src/MQTTSNGateway.cpp | 15 +++++-- MQTTSNGateway/src/MQTTSNGateway.h | 5 ++- 5 files changed, 52 insertions(+), 31 deletions(-) diff --git a/.cproject b/.cproject index e99b880..7a8dff5 100644 --- a/.cproject +++ b/.cproject @@ -5,28 +5,29 @@ + - - + - - @@ -46,7 +47,7 @@ - - + @@ -72,37 +73,37 @@ + - - + - - - @@ -128,7 +129,7 @@ - + diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp index 67ee8d0..cf00553 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp @@ -117,7 +117,10 @@ void BrokerRecvTask::run(void) /* post a BrokerRecvEvent */ ev = new Event(); ev->setBrokerRecvEvent(client, packet); - _gateway->getPacketEventQue()->post(ev); + if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + { + delete ev; + } } else { diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index 980e2fd..01937a5 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -76,7 +76,10 @@ void ClientRecvTask::run() log(0, packet); ev = new Event(); ev->setBrodcastEvent(packet); - _gateway->getPacketEventQue()->post(ev); + if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + { + delete ev; + } continue; } @@ -89,7 +92,10 @@ void ClientRecvTask::run() log(client, packet); ev = new Event(); ev->setClientRecvEvent(client,packet); - _gateway->getPacketEventQue()->post(ev); + if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + { + delete ev; + } } else { @@ -116,7 +122,10 @@ void ClientRecvTask::run() client->setClientAddress(_sensorNetwork->getSenderAddress()); ev = new Event(); ev->setClientRecvEvent(client, packet); - _gateway->getPacketEventQue()->post(ev); + if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + { + delete ev; + } } else { diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index 0590d77..e18a77b 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -15,8 +15,8 @@ **************************************************************************************/ #include "MQTTSNGateway.h" -#include "MQTTSNGWProcess.h" #include "SensorNetwork.h" +#include "MQTTSNGWProcess.h" using namespace MQTTSNGW; @@ -31,6 +31,7 @@ Gateway::Gateway() theProcess = this; _params.loginId = 0; _params.password = 0; + _packetEventQue.setMaxSize(DEFAULT_INFLIGHTMESSAGE * DEFAULT_MAX_CLIENTS); } Gateway::~Gateway() @@ -197,7 +198,7 @@ GatewayParams* Gateway::getGWParams(void) =====================================*/ EventQue::EventQue() { - + _maxSize = 0; } EventQue::~EventQue() @@ -205,6 +206,11 @@ EventQue::~EventQue() } +void EventQue::setMaxSize(uint16_t maxSize) +{ + _maxSize = maxSize; +} + Event* EventQue::wait(void) { Event* ev; @@ -249,14 +255,15 @@ Event* EventQue::timedwait(uint16_t millsec) int EventQue::post(Event* ev) { - if ( ev ) + if ( ev && ( _maxSize == 0 || size() < _maxSize ) ) { _mutex.lock(); _que.post(ev); _sem.post(); _mutex.unlock(); + return 0; } - return 0; + return 1; } int EventQue::size() diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index 6ded583..3f7d540 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -16,10 +16,9 @@ #ifndef MQTTSNGATEWAY_H_ #define MQTTSNGATEWAY_H_ -#include "MQTTSNGWProcess.h" #include "MQTTSNGWClient.h" +#include "MQTTSNGWProcess.h" #include "MQTTSNPacket.h" -#include "MQTTGWPacket.h" namespace MQTTSNGW { @@ -133,6 +132,7 @@ public: ~EventQue(); Event* wait(void); Event* timedwait(uint16_t millsec); + void setMaxSize(uint16_t maxSize); int post(Event*); int size(); @@ -140,6 +140,7 @@ private: Que _que; Mutex _mutex; Semaphore _sem; + uint16_t _maxSize; }; /* From 039e063c8b1114956a427764188e4535bdeaed82 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Thu, 1 Sep 2016 06:44:58 +0900 Subject: [PATCH 2/5] Update: change xbee class method name to same as class udp Signed-off-by: tomoaki --- .../src/linux/xbee/SensorNetwork.cpp | 23 ++++++++++++------- MQTTSNGateway/src/linux/xbee/SensorNetwork.h | 4 ++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp index 5996401..91754e4 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp @@ -14,15 +14,17 @@ * Tomoaki Yamaguchi - initial API and implementation **************************************************************************************/ -#include "SensorNetwork.h" -#include "MQTTSNGWProcess.h" -#include "Threading.h" #include #include #include #include #include #include +#include + +#include "SensorNetwork.h" +#include "MQTTSNGWProcess.h" +#include "Threading.h" using namespace std; using namespace MQTTSNGW; @@ -32,8 +34,8 @@ using namespace MQTTSNGW; ============================================*/ SensorNetAddress::SensorNetAddress() { - memset(_address64, 0, sizeof(_address64)); - memset(_address16, 0, sizeof(_address16)); + memset(_address64, 0, 8); + memset(_address16, 0, 2); } SensorNetAddress::~SensorNetAddress() @@ -110,19 +112,24 @@ int SensorNetwork::initialize(void) char param[MQTTSNGW_PARAM_MAX]; uint16_t baudrate = 9600; - if (theProcess->getParam("Baudrate", param) == 0) + if (theProcess->getParam("XBee Baudrate", param) == 0) { baudrate = (uint16_t)atoi(param); } + _description = "Baudrate "; + sprintf(param ,"%d", baudrate); + _description += param; theProcess->getParam("SerialDevice", param); + _description = "SerialDevice "; + _description += param; return XBee::open(param, baudrate); } -const char* SensorNetwork::getType(void) +const char* SensorNetwork::getDescription(void) { - return "XBee"; + return _description.c_str(); } /*=========================================== diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.h b/MQTTSNGateway/src/linux/xbee/SensorNetwork.h index f4a18ee..9b3abdd 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.h +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.h @@ -124,8 +124,7 @@ public: int broadcast(const uint8_t* payload, uint16_t payloadLength); int read(uint8_t* buf, uint16_t bufLen); int initialize(void); - const char* getType(void); - + const char* getDescription(void); SensorNetAddress* getSenderAddress(void) { return &_clientAddr; @@ -134,6 +133,7 @@ public: private: SensorNetAddress _clientAddr; // Sender's address. not gateway's one. + string _description; }; } From 2537dd76dd6a91d08162e925834f5d4af1626b91 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Fri, 2 Sep 2016 10:24:58 +0900 Subject: [PATCH 3/5] Update: indivisual client assigns TLS connection by clients.conf file. BugFix: TLS certificate required connection error Signed-off-by: tomoaki --- MQTTSNGateway/README.md | 15 +- MQTTSNGateway/gateway.conf | 12 +- MQTTSNGateway/src/MQTTGWPacket.cpp | 1 + MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp | 4 +- MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp | 85 +++- MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h | 7 +- MQTTSNGateway/src/MQTTSNGWClient.cpp | 11 +- MQTTSNGateway/src/MQTTSNGWClient.h | 2 +- MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 5 +- MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp | 2 +- .../src/MQTTSNGWConnectionHandler.cpp | 3 +- MQTTSNGateway/src/MQTTSNGWDefines.h | 8 +- MQTTSNGateway/src/MQTTSNGWPacket.cpp | 2 +- .../src/MQTTSNGWPacketHandleTask.cpp | 1 + MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp | 2 +- MQTTSNGateway/src/MQTTSNGateway.cpp | 18 +- MQTTSNGateway/src/MQTTSNGateway.h | 5 +- MQTTSNGateway/src/linux/Network.cpp | 384 +++++++++++------- MQTTSNGateway/src/linux/Network.h | 21 +- MQTTSNGateway/src/mainGateway.cpp | 3 - 20 files changed, 352 insertions(+), 239 deletions(-) diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md index 86f9f56..3c7b8bb 100644 --- a/MQTTSNGateway/README.md +++ b/MQTTSNGateway/README.md @@ -28,11 +28,14 @@ $ ./MQTT-SNGateway [-f Config file name] BrokerName=iot.eclipse.org BrokerPortNo=1883 -SecureConnection=NO -#BrokerPortNo=8883 -#SecureConnection=YES +BrokerSecurePortNo=8883 + ClientAuthentication=NO -ClientList=clients.conf +#ClientsList=/path/to/your_clients.conf + +RootCAfile=/path/to/your_Root_CA.crt +RootCApath=/path/to/your_certs_directory/ + GatewayID=1 GatewayName=PahoGateway-01 KeepAlive=900 @@ -49,12 +52,12 @@ Baudrate=38400 SerialDevice=/dev/ttyUSB0 ``` -**BrokerName** to specify a domain name of the Broker, and **BrokerPortNo** is a port No of the Broker. If the Broker have to connected via TLS, set BrokerPortNo=8883 and **SecureConnection=YES**. +**BrokerName** to specify a domain name of the Broker, and **BrokerPortNo** is a port No of the Broker. **BrokerSecurePortNo** is for TLS connection. **MulticastIP** and **MulticastPortNo** is a multicast address for ADVERTISE, GWSEARCH and GWINFO messages. Gateway is waiting GWSEARCH multicast message and when receiving it send GWINFO message via Broadcast address. Clients can get the gateway address (Gateway IP address and **GatewayPortNo**) from GWINFO message by means of std::recvfrom(), Client should know the BroadcastIP and PortNo to send a SEARCHGW message. **GatewayId** is defined by GWSEARCH message. **KeepAlive** is a duration of ADVERTISE message in seconds. -when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientList is required. This file defines connect allowed clients by ClientId, IPaddress and PortNo. +when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId, IPaddress and PortNo. diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf index 80a0858..89dd921 100644 --- a/MQTTSNGateway/gateway.conf +++ b/MQTTSNGateway/gateway.conf @@ -16,14 +16,18 @@ BrokerName=iot.eclipse.org BrokerPortNo=1883 BrokerSecurePortNo=8883 -#SecureConnection=YES + ClientAuthentication=NO -ClientList=clients.conf +#ClientsList=/path/to/your_clients.conf + +RootCAfile=/usr/share/ca-certificates/ISRG_Root_X1.crt +RootCApath=/etc/ssl/certs/ + GatewayID=1 GatewayName=PahoGateway-01 KeepAlive=900 -#LoginID= -#Password= +#LoginID=your_ID +#Password=your_Password # UDP GatewayPortNo=10000 diff --git a/MQTTSNGateway/src/MQTTGWPacket.cpp b/MQTTSNGateway/src/MQTTGWPacket.cpp index 1c10522..74a885f 100644 --- a/MQTTSNGateway/src/MQTTGWPacket.cpp +++ b/MQTTSNGateway/src/MQTTGWPacket.cpp @@ -17,6 +17,7 @@ #include "MQTTGWPacket.h" #include +#include using namespace MQTTSNGW; diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp index cf00553..967c63a 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp @@ -89,7 +89,6 @@ void BrokerRecvTask::run(void) { /* Check sockets is ready to read */ int activity = select(maxSock + 1, &rset, 0, 0, &timeout); - if (activity > 0) { client = _gateway->getClientList()->getClient(); @@ -110,6 +109,7 @@ void BrokerRecvTask::run(void) if ( rc > 0 ) { if ( log(client, packet) == -1 ) + { continue; } @@ -169,7 +169,7 @@ void BrokerRecvTask::run(void) */ int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet) { - char pbuf[(SIZEOF_LOG_PACKET + 5 )* 3]; + char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3]; char msgId[6]; int rc = 0; diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp index 252be70..0c44ee7 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp @@ -19,6 +19,7 @@ #include "MQTTSNGateway.h" #include "MQTTSNGWClient.h" #include "MQTTGWPacket.h" +#include using namespace std; using namespace MQTTSNGW; @@ -34,8 +35,11 @@ BrokerSendTask::BrokerSendTask(Gateway* gateway) _gateway = gateway; _gateway->attach((Thread*)this); _host = 0; - _service = 0; - _serviceSecure = 0; + _port = 0; + _portSecure = 0; + _certDirectory = 0; + _rootCAfile = 0; + _light = 0; } @@ -45,13 +49,25 @@ BrokerSendTask::~BrokerSendTask() { free(_host); } - if (_service) + if (_port) { - free(_service); + free(_port); } - if (_serviceSecure) + if (_portSecure) { - free(_serviceSecure); + free(_portSecure); + } + if (_certDirectory) + { + free(_certDirectory); + } + if (_rootCApath) + { + free(_rootCApath); + } + if (_rootCAfile) + { + free(_rootCAfile); } } @@ -68,12 +84,26 @@ void BrokerSendTask::initialize(int argc, char** argv) } if (_gateway->getParam("BrokerPortNo", param) == 0) { - _service = strdup(param); + _port = strdup(param); } if (_gateway->getParam("BrokerSecurePortNo", param) == 0) - { - _serviceSecure = strdup(param); - } + { + _portSecure = strdup(param); + } + + if (_gateway->getParam("CertsDirectory", param) == 0) + { + _certDirectory = strdup(param); + } + if (_gateway->getParam("RootCApath", param) == 0) + { + _rootCApath = strdup(param); + } + if (_gateway->getParam("RootCAfile", param) == 0) + { + _rootCAfile = strdup(param); + } + _light = _gateway->getLightIndicator(); } @@ -93,7 +123,7 @@ void BrokerSendTask::run() client = ev->getClient(); packet = ev->getMQTTGWPacket(); - if ( client->getNetwork()->isValid() && packet->getType() == CONNECT ) + if ( client->getNetwork()->isValid() && !client->getNetwork()->isSecure() && packet->getType() == CONNECT ) { client->getNetwork()->close(); } @@ -101,16 +131,37 @@ void BrokerSendTask::run() if ( !client->getNetwork()->isValid() ) { /* connect to the broker and send a packet */ - char* service = _service; + char* portNo = _port; + const char* cert = 0; + const char* keyFile = 0; + string certFile; + string privateKeyFile; + if (client->isSecureNetwork()) { - service = _serviceSecure; + portNo = _portSecure; + if ( _certDirectory ) + { + certFile = _certDirectory; + certFile += client->getClientId(); + certFile += ".crt"; + cert = certFile.c_str(); + privateKeyFile = _certDirectory; + privateKeyFile += client->getClientId(); + privateKeyFile += ".key"; + keyFile = privateKeyFile.c_str(); + } + rc = client->getNetwork()->connect(_host, portNo, _rootCApath, _rootCAfile, cert, keyFile); + } + else + { + rc = client->getNetwork()->connect(_host, portNo); } - if ( !client->getNetwork()->connect(_host, service) ) + if ( !rc ) { - /* disconnect the broker and chage the client's status */ - WRITELOG("%s BrokerSendTask can't open the socket. errno=%d %s%s\n", + /* disconnect the broker and change the client's status */ + WRITELOG("%s BrokerSendTask can't connect to the broker. errno=%d %s%s\n", ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); client->disconnected(); client->getNetwork()->disconnect(); @@ -151,7 +202,7 @@ void BrokerSendTask::run() */ void BrokerSendTask::log(Client* client, MQTTGWPacket* packet) { - char pbuf[(SIZEOF_LOG_PACKET + 5 )* 3]; + char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3]; char msgId[6]; switch (packet->getType()) diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h index f663f7d..81b0a7a 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h @@ -39,8 +39,11 @@ private: Gateway* _gateway; char* _host; - char* _service; - char* _serviceSecure; + char* _port; + char* _portSecure; + char* _rootCApath; + char* _rootCAfile; + char* _certDirectory; LightIndicator* _light; }; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 6ff6725..430498b 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -206,7 +206,7 @@ Client* ClientList::createClient(SensorNetAddress* addr, MQTTSNString* clientId, } /* anonimous clients */ - if ( _clientCnt > DEFAULT_MAX_CLIENTS ) + if ( _clientCnt > MAX_CLIENTS ) { return 0; // full of clients } @@ -951,14 +951,7 @@ TopicIdMapelement::~TopicIdMapelement() TopicIdMap::TopicIdMap() { - char param[MQTTSNGW_PARAM_MAX]; - - _maxInflight = DEFAULT_INFLIGHTMESSAGE; - if ( theProcess->getParam("MaxInflightMsg", param) == 0 ) - { - _maxInflight = atoi(param); - } - + _maxInflight = MAX_INFLIGHTMESSAGES; _msgIds = 0; _first = 0; _end = 0; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index d4081b4..0ba2b03 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -312,7 +312,7 @@ private: uint8_t _snMsgId; Network* _network; // Broker - bool _secureNetwork; // SSL + bool _secureNetwork; // SSL bool _sensorNetype; // false: unstable network like a G3 SensorNetAddress _sensorNetAddr; diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index 01937a5..a8db6d3 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -16,6 +16,7 @@ #include "MQTTSNGWClientRecvTask.h" #include "MQTTSNGateway.h" +#include char* currentDateTime(void); /*===================================== Class ClientRecvTask @@ -107,7 +108,7 @@ void ClientRecvTask::run() packet->getCONNECT(&data); /* create a client */ - client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, _gateway->getGWParams()->secureConnection); + client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); //_gateway->getGWParams()->secureConnection); if (!client) { @@ -139,7 +140,7 @@ void ClientRecvTask::run() void ClientRecvTask::log(Client* client, MQTTSNPacket* packet) { - char pbuf[SIZEOF_LOG_PACKET * 3]; + char pbuf[SIZE_OF_LOG_PACKET * 3]; char msgId[6]; const char* clientId = client ? (const char*)client->getClientId() :"Non Active Client !" ; diff --git a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp index 9cdf43f..b7d7bff 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp @@ -64,7 +64,7 @@ void ClientSendTask::run() void ClientSendTask::log(Client* client, MQTTSNPacket* packet) { - char pbuf[SIZEOF_LOG_PACKET * 3]; + char pbuf[SIZE_OF_LOG_PACKET * 3]; char msgId[6]; switch (packet->getType()) diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index 980f170..cae95f2 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -18,6 +18,7 @@ #include "MQTTSNGateway.h" #include "MQTTSNGWPacket.h" #include "MQTTGWPacket.h" +#include using namespace std; using namespace MQTTSNGW; @@ -54,7 +55,7 @@ void MQTTSNConnectionHandler::handleSearchgw(MQTTSNPacket* packet) { if (packet->getType() == MQTTSN_SEARCHGW) { - if (_gateway->getClientList()->getClientCount() < DEFAULT_MAX_CLIENTS) + if (_gateway->getClientList()->getClientCount() < MAX_CLIENTS) { MQTTSNPacket* gwinfo = new MQTTSNPacket(); gwinfo->setGWINFO(_gateway->getGWParams()->gatewayId); diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index 2c897fc..839708e 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -27,13 +27,13 @@ namespace MQTTSNGW //#define DEBUG_NWSTACK // print out SensorNetwork log /*================================= - * Parametrs + * MQTT-SN Parametrs ==================================*/ +#define MAX_CLIENTS (100) // Number of Clients can be handled. #define MAX_CLIENTID_LENGTH (64) // Max length of clientID +#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen) -#define SIZEOF_LOG_PACKET (500) // Length of the packet log in bytes - -#define MQTTSNGW_TLS_CA_DIR "/etc/ssl/certs" +#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes /*================================= * Data Type diff --git a/MQTTSNGateway/src/MQTTSNGWPacket.cpp b/MQTTSNGateway/src/MQTTSNGWPacket.cpp index 1dc9324..23ec9f9 100644 --- a/MQTTSNGateway/src/MQTTSNGWPacket.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPacket.cpp @@ -354,7 +354,7 @@ char* MQTTSNPacket::print(char* pbuf) int value = 0; int i = MQTTSNPacket_decode(_buf, _bufLen, &value); - int size = _bufLen > SIZEOF_LOG_PACKET ? SIZEOF_LOG_PACKET : _bufLen; + int size = _bufLen > SIZE_OF_LOG_PACKET ? SIZE_OF_LOG_PACKET : _bufLen; for (; i < size; i++) { diff --git a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp index c24659b..77dc86b 100644 --- a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp @@ -26,6 +26,7 @@ #include "MQTTSNGWConnectionHandler.h" #include "MQTTSNGWPublishHandler.h" #include "MQTTSNGWSubscribeHandler.h" +#include using namespace std; using namespace MQTTSNGW; diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index 68bd47a..6c4bff3 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -19,7 +19,7 @@ #include "MQTTGWPacket.h" #include "MQTTSNGateway.h" #include "MQTTSNGWClient.h" - +#include using namespace std; using namespace MQTTSNGW; diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index e18a77b..8b6e72c 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -17,7 +17,7 @@ #include "MQTTSNGateway.h" #include "SensorNetwork.h" #include "MQTTSNGWProcess.h" - +#include using namespace MQTTSNGW; char* currentDateTime(void); @@ -31,7 +31,7 @@ Gateway::Gateway() theProcess = this; _params.loginId = 0; _params.password = 0; - _packetEventQue.setMaxSize(DEFAULT_INFLIGHTMESSAGE * DEFAULT_MAX_CLIENTS); + _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS); } Gateway::~Gateway() @@ -107,7 +107,7 @@ void Gateway::initialize(int argc, char** argv) string fileName; if (!strcasecmp(param, "YES")) { - if (getParam("ClientList", param) == 0) + if (getParam("ClientsList", param) == 0) { fileName = string(param); } @@ -118,20 +118,10 @@ void Gateway::initialize(int argc, char** argv) if (!_clientList.authorize(fileName.c_str())) { - throw Exception("Gateway::initialize: No client list which defined by configuration."); + throw Exception("Gateway::initialize: No client list defined by configuration."); } } } - - if (getParam("SecureConnection", param) == 0) - { - _params.secureConnection = !strcasecmp(param, "YES"); - } - else - { - _params.secureConnection = false; - } - } void Gateway::run(void) diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index 3f7d540..cc60367 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -26,14 +26,12 @@ namespace MQTTSNGW * Gateway default parameters ===========================================================*/ #define DEFAULT_KEEP_ALIVE_TIME (900) // 900 secs = 15 mins -#define DEFAULT_MAX_CLIENTS (100) // Number of Clients can be handled. #define DEFAULT_MQTT_VERSION (4) // Defualt MQTT version -#define DEFAULT_INFLIGHTMESSAGE (10) // Number of inflight messages /*================================= * Starting prompt ==================================*/ -#define GATEWAY_VERSION " * Version: 0.3.3" +#define GATEWAY_VERSION " * Version: 0.4.0" #define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" @@ -155,7 +153,6 @@ typedef struct uint8_t mqttVersion; uint16_t maxInflightMsgs; uint8_t* gatewayName; - bool secureConnection; }GatewayParams; /*===================================== diff --git a/MQTTSNGateway/src/linux/Network.cpp b/MQTTSNGateway/src/linux/Network.cpp index 0ba5661..6d43da7 100644 --- a/MQTTSNGateway/src/linux/Network.cpp +++ b/MQTTSNGateway/src/linux/Network.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "Network.h" #include "MQTTSNGWDefines.h" @@ -232,54 +233,27 @@ int TCPStack::getSock() =======================================*/ int Network::_numOfInstance = 0; SSL_CTX* Network::_ctx = 0; -SSL_SESSION* Network::_session = 0; Network::Network(bool secure) : TCPStack() { - char error[256]; - if (secure) - { - _numOfInstance++; - if (_ctx == 0) - { - SSL_load_error_strings(); - SSL_library_init(); - _ctx = SSL_CTX_new(TLSv1_2_client_method()); - if (_ctx == 0) - { - ERR_error_string_n(ERR_get_error(), error, sizeof(error)); - WRITELOG("SSL_CTX_new() %s\n", error); - throw Exception( ERR_get_error(), "Network can't create SSL context."); - } - if (!SSL_CTX_load_verify_locations(_ctx, 0, MQTTSNGW_TLS_CA_DIR)) - { - ERR_error_string_n(ERR_get_error(), error, sizeof(error)); - WRITELOG("SSL_CTX_load_verify_locations() %s\n", error); - throw Exception( ERR_get_error(), "Network can't load CA_LIST."); - } - } - } _ssl = 0; - _disconReq = false; _secureFlg = secure; _busy = false; + _session = 0; + _sslValid = false; } Network::~Network() { - if (_secureFlg) - { - _numOfInstance--; - } if (_ssl) { SSL_free(_ssl); + _numOfInstance--; } - if (_session && _numOfInstance == 0) + if (_session ) { SSL_SESSION_free(_session); - _session = 0; } if (_ctx && _numOfInstance == 0) { @@ -289,78 +263,172 @@ Network::~Network() } } -bool Network::connect(const char* host, const char* service) +bool Network::connect(const char* host, const char* port) +{ + if (_secureFlg) + { + return false; + } + + if (getSock() == 0) + { + if (!TCPStack::connect(host, port)) + { + return false; + } + } + return true; +} + +bool Network::connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* cert, const char* prvkey) { char errmsg[256]; - int rc = 0; char peer_CN[256]; - SSL_SESSION* sess = 0; - X509* peer; + int rc = 0; - if (isValid()) - { - return false; - } - if (!TCPStack::connect(host, service)) - { - return false; - } if (!_secureFlg) { - return true; + WRITELOG("TLS is not required.\n"); + return false; } - SSL* ssl = SSL_new(_ctx); - if (ssl == 0) + if (_ctx == 0) + { + SSL_load_error_strings(); + SSL_library_init(); + _ctx = SSL_CTX_new(TLS_client_method()); + if (_ctx == 0) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_CTX_new() %s\n", errmsg); + return false; + } + + if (!SSL_CTX_load_verify_locations(_ctx, caFile, caPath)) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_CTX_load_verify_locations() %s\n", errmsg); + return false; + } + } + + if (!_sslValid) + { + if ( !TCPStack::connect(host, port) ) + { + return false; + } + /* + if ( _ssl ) + { + if (!SSL_set_fd(_ssl, getSock())) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_set_fd() %s\n", errmsg); + SSL_free(_ssl); + } + else + { + _sslValid = true; + return true; + } + } + */ + } + + _ssl = SSL_new(_ctx); + if (_ssl == 0) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); WRITELOG("SSL_new() %s\n", errmsg); return false; } - rc = SSL_set_fd(ssl, TCPStack::getSock()); - if (rc == 0) + if (!SSL_set_fd(_ssl, getSock())) { - SSL_free(ssl); + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_set_fd() %s\n", errmsg); + SSL_free(_ssl); + } + + SSL_set_options(_ssl, SSL_OP_NO_TICKET); + + if ( cert ) + { + if ( SSL_use_certificate_file(_ssl, cert, SSL_FILETYPE_PEM) <= 0 ) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_use_certificate_file() %s %s\n", cert, errmsg); + SSL_free(_ssl); + _ssl = 0; + return false; + } + } + if ( prvkey ) + { + if ( SSL_use_PrivateKey_file(_ssl, prvkey, SSL_FILETYPE_PEM) <= 0 ) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_use_PrivateKey_file() %s %s\n", prvkey, errmsg); + SSL_free(_ssl); + _ssl = 0; + return false; + } + } + + if (!SSL_set_fd(_ssl, TCPStack::getSock())) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_set_fd() %s\n", errmsg); + SSL_free(_ssl); + _ssl = 0; return false; } if (_session) { - rc = SSL_set_session(ssl, sess); + rc = SSL_set_session(_ssl, _session); } - else - { - rc = SSL_connect(ssl); - } - if (rc != 1) + + if (SSL_connect(_ssl) != 1) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); WRITELOG("SSL_connect() %s\n", errmsg); - SSL_free(ssl); + SSL_free(_ssl); + _ssl = 0; return false; } - if (SSL_get_verify_result(ssl) != X509_V_OK) + if ( (rc = SSL_get_verify_result(_ssl)) != X509_V_OK) { - WRITELOG("SSL_get_verify_result() error: Certificate doesn't verify.\n"); - SSL_free(ssl); + WRITELOG("SSL_get_verify_result() error: %s.\n", X509_verify_cert_error_string(rc)); + SSL_free(_ssl); + _ssl = 0; return false; } - peer = SSL_get_peer_certificate(ssl); + X509* peer = SSL_get_peer_certificate(_ssl); X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, 256); - if (strcasecmp(peer_CN, host)) + char* pos = peer_CN; + if ( *pos == '*') { - WRITELOG("SSL_get_peer_certificate() error: Broker dosen't much host name.\n"); - SSL_free(ssl); + while (*host++ != '.'); + pos += 2; + } + if ( strcmp(host, pos)) + { + WRITELOG("SSL_get_peer_certificate() error: Broker %s dosen't match the host name %s\n", peer_CN, host); + SSL_free(_ssl); + _ssl = 0; return false; } + if (_session == 0) { - _session = sess; + _session = SSL_get1_session(_ssl); } - _ssl = ssl; + _numOfInstance++; + _sslValid = true; return true; } @@ -372,7 +440,11 @@ int Network::send(const uint8_t* buf, uint16_t length) bool writeBlockedOnRead = false; int bpos = 0; - if (_secureFlg) + if (!_secureFlg) + { + return TCPStack::send(buf, length); + } + else { _mutex.lock(); _busy = true; @@ -387,7 +459,7 @@ int Network::send(const uint8_t* buf, uint16_t length) int activity = select(getSock() + 1, &rset, &wset, 0, 0); if (activity > 0) { - if (FD_ISSET(getSock(), &wset) || (writeBlockedOnRead && FD_ISSET(getSock(), &rset))) + if (FD_ISSET(getSock(), &wset) || (writeBlockedOnRead && FD_ISSET(getSock(), &rset))) { writeBlockedOnRead = false; @@ -421,10 +493,6 @@ int Network::send(const uint8_t* buf, uint16_t length) } } } - else - { - return TCPStack::send(buf, length); - } } int Network::recv(uint8_t* buf, uint16_t len) @@ -438,92 +506,107 @@ int Network::recv(uint8_t* buf, uint16_t len) fd_set rset; fd_set wset; - if (_secureFlg) + if (!_secureFlg) { - if (_busy) + return TCPStack::recv(buf, len); + } + + if (_busy) + { + return 0; + } + _mutex.lock(); + _busy = true; + + loop: do + { + readBlockedOnWrite = false; + readBlocked = false; + + rlen = SSL_read(_ssl, buf + bpos, len - bpos); + + switch (SSL_get_error(_ssl, rlen)) { - return 0; + case SSL_ERROR_NONE: + _busy = false; + _mutex.unlock(); + return rlen + bpos; + break; + case SSL_ERROR_ZERO_RETURN: + SSL_shutdown(_ssl); + _ssl = 0; + TCPStack::close(); + _busy = false; + _mutex.unlock(); + return -1; + break; + case SSL_ERROR_WANT_READ: + readBlocked = true; + break; + case SSL_ERROR_WANT_WRITE: + readBlockedOnWrite = true; + break; + default: + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("Network::recv() %s\n", errmsg); + _busy = false; + _mutex.unlock(); + return -1; } - _mutex.lock(); - _busy = true; + } while (SSL_pending(_ssl) && !readBlocked); - loop: do + bpos += rlen; + while (true) + { + FD_ZERO(&rset); + FD_ZERO(&wset); + FD_SET(getSock(), &rset); + FD_SET(getSock(), &wset); + + int activity = select(getSock() + 1, &rset, &wset, 0, 0); + if (activity > 0) { - readBlockedOnWrite = false; - readBlocked = false; - - rlen = SSL_read(_ssl, buf + bpos, len - bpos); - - switch (SSL_get_error(_ssl, rlen)) + if ((FD_ISSET(getSock(),&rset) && !writeBlockedOnRead) + || (readBlockedOnWrite && FD_ISSET(getSock(), &wset))) { - case SSL_ERROR_NONE: - _busy = false; - _mutex.unlock(); - return rlen + bpos; - break; - case SSL_ERROR_ZERO_RETURN: - SSL_shutdown(_ssl); - _ssl = 0; - TCPStack::close(); - _busy = false; - _mutex.unlock(); - return -1; - break; - case SSL_ERROR_WANT_READ: - readBlocked = true; - break; - case SSL_ERROR_WANT_WRITE: - readBlockedOnWrite = true; - break; - default: - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("TLSStack::recv() default %s\n", errmsg); - _busy = false; - _mutex.unlock(); - return -1; + goto loop; } - } while (SSL_pending(_ssl) && !readBlocked); - - bpos += rlen; - while (true) + } + else { - FD_ZERO(&rset); - FD_ZERO(&wset); - FD_SET(getSock(), &rset); - FD_SET(getSock(), &wset); - - int activity = select(getSock() + 1, &rset, &wset, 0, 0); - if (activity > 0) - { - if ((FD_ISSET(getSock(),&rset) && !writeBlockedOnRead) - || (readBlockedOnWrite && FD_ISSET(getSock(), &wset))) - { - goto loop; - } - } - else - { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("TLSStack::recv() select %s\n", errmsg); - _busy = false; - _mutex.unlock(); - return -1; - } + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("TLSStack::recv() select %s\n", errmsg); + _busy = false; + _mutex.unlock(); + return -1; } } - return TCPStack::recv(buf, len); +} + +void Network::close(void) +{ + if (_secureFlg) + { + _sslValid = false; + SSL_free(_ssl); + _ssl = 0; + } + TCPStack::close(); } bool Network::isValid() { - if (!_secureFlg) + if (_secureFlg) + { + if (_sslValid && !_busy) + { + return true; + } + } + else { return TCPStack::isValid(); } - if (_ssl) - { - return true; - } return false; } @@ -533,12 +616,11 @@ void Network::disconnect() { SSL_shutdown(_ssl); _ssl = 0; - TCPStack::close(); - } - else - { - TCPStack::close(); } + _sslValid = false; + _busy = false; + TCPStack::close(); + } int Network::getSock() @@ -546,18 +628,6 @@ int Network::getSock() return TCPStack::getSock(); } -SSL* Network::getSSL() -{ - if (_secureFlg) - { - return _ssl; - } - else - { - return 0; - } -} - bool Network::isSecure() { return _secureFlg; diff --git a/MQTTSNGateway/src/linux/Network.h b/MQTTSNGateway/src/linux/Network.h index d8ed45b..c5bfaed 100644 --- a/MQTTSNGateway/src/linux/Network.h +++ b/MQTTSNGateway/src/linux/Network.h @@ -71,26 +71,27 @@ public: Network(bool secure); virtual ~Network(); - bool connect(const char* host, const char* service); - void disconnect(); - int send(const uint8_t* buf, uint16_t length); - int recv(uint8_t* buf, uint16_t len); + bool connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* sert, const char* prvkey); + bool connect(const char* host, const char* port); + void disconnect(void); + void close(void); + int send(const uint8_t* buf, uint16_t length); + int recv(uint8_t* buf, uint16_t len); - bool isValid(); - bool isSecure(); - int getSock(); - SSL* getSSL(); + bool isValid(void); + bool isSecure(void); + int getSock(void); private: static SSL_CTX* _ctx; static int _numOfInstance; - static SSL_SESSION* _session; + SSL_SESSION* _session; SSL* _ssl; bool _secureFlg; - bool _disconReq; Mutex _mutex; bool _busy; + bool _sslValid; }; #endif /* NETWORK_H_ */ diff --git a/MQTTSNGateway/src/mainGateway.cpp b/MQTTSNGateway/src/mainGateway.cpp index 43f8988..9ffc74a 100644 --- a/MQTTSNGateway/src/mainGateway.cpp +++ b/MQTTSNGateway/src/mainGateway.cpp @@ -24,9 +24,6 @@ using namespace MQTTSNGW; /* * Gateway Process - * - * Certificate file "/etc/ssl/certs" - * This is defined in MQTTSNGWDefines.h */ Gateway* gateway = new Gateway(); PacketHandleTask* t0 = new PacketHandleTask(gateway); From e3dd9fa01addd71703a4dad7d88d3016e58f75e0 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Sun, 25 Sep 2016 17:06:05 +0900 Subject: [PATCH 4/5] BugFix: Termination by ctl + C Update: API mode is configurable Signed-off-by: tomoaki --- MQTTSNGateway/gateway.conf | 7 ++-- MQTTSNGateway/src/MQTTSNGWProcess.cpp | 27 ++++++++-------- MQTTSNGateway/src/MQTTSNGWProcess.h | 4 +-- MQTTSNGateway/src/MQTTSNGateway.h | 2 +- MQTTSNGateway/src/linux/Threading.cpp | 4 +-- MQTTSNGateway/src/linux/Threading.h | 2 +- MQTTSNGateway/src/linux/udp/SensorNetwork.cpp | 5 +++ MQTTSNGateway/src/linux/udp/SensorNetwork.h | 6 +--- .../src/linux/xbee/SensorNetwork.cpp | 32 +++++++++++++++---- MQTTSNGateway/src/linux/xbee/SensorNetwork.h | 9 ++---- 10 files changed, 60 insertions(+), 38 deletions(-) diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf index 89dd921..3438767 100644 --- a/MQTTSNGateway/gateway.conf +++ b/MQTTSNGateway/gateway.conf @@ -20,8 +20,9 @@ BrokerSecurePortNo=8883 ClientAuthentication=NO #ClientsList=/path/to/your_clients.conf -RootCAfile=/usr/share/ca-certificates/ISRG_Root_X1.crt -RootCApath=/etc/ssl/certs/ +RootCAfile=/etc/ssl/certs/ca-certificates.crt +#RootCApath=/etc/ssl/certs/ +#CertsDirectory=/usr/share/GW/IoTcerts/ GatewayID=1 GatewayName=PahoGateway-01 @@ -37,3 +38,5 @@ MulticastPortNo=1883 # XBee Baudrate=38400 SerialDevice=/dev/ttyUSB0 +ApiMode=2 + diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.cpp b/MQTTSNGateway/src/MQTTSNGWProcess.cpp index 1afec5c..ce5085e 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.cpp +++ b/MQTTSNGateway/src/MQTTSNGWProcess.cpp @@ -91,12 +91,12 @@ void Process::initialize(int argc, char** argv) size_t pos = 0; if ( (pos = config.find_last_of("/")) == string::npos ) { - _configFile = config; + _configFile = optarg; } else { + _configFile = config.substr(pos + 1, config.size() - pos - 1);; _configDir = config.substr(0, pos + 1); - _configFile = config.substr(pos + 1, config.size() - pos - 1); } } } @@ -121,11 +121,11 @@ int Process::getParam(const char* parameter, char* value) FILE *fp; int i = 0, j = 0; - string config = _configDir + _configFile; + string configPath = _configDir + _configFile; - if ((fp = fopen(config.c_str(), "r")) == NULL) + if ((fp = fopen(configPath.c_str(), "r")) == NULL) { - WRITELOG("No config file:[%s]\n", config.c_str()); + WRITELOG("No config file:[%s]\n", configPath.c_str()); return -1; } @@ -232,13 +232,7 @@ MultiTaskProcess::MultiTaskProcess() MultiTaskProcess::~MultiTaskProcess() { - for (int i = 0; i < _threadCount; i++) - { - if ( _threadList[i] ) - { - delete _threadList[i]; - } - } + } void MultiTaskProcess::initialize(int argc, char** argv) @@ -264,6 +258,13 @@ void MultiTaskProcess::run(void) } catch (Exception* ex) { + for (int i = 0; i < _threadCount; i++) + { + if ( _threadList[i] ) + { + _threadList[i]->cancel();; + } + } ex->writeMessage(); } } @@ -363,7 +364,7 @@ void Exception::writeMessage() { if (getExceptionNo() == 0 ) { - WRITELOG("%s : %s\n", currentDateTime(), what()); + WRITELOG("%s %s\n", currentDateTime(), what()); } else { diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.h b/MQTTSNGateway/src/MQTTSNGWProcess.h index 666ef3b..d79f829 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.h +++ b/MQTTSNGateway/src/MQTTSNGWProcess.h @@ -71,8 +71,8 @@ public: private: int _argc; char** _argv; - string _configDir; - string _configFile; + string _configDir; + string _configFile; RingBuffer* _rb; Semaphore* _rbsem; Mutex _mt; diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index cc60367..b9c1710 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -31,7 +31,7 @@ namespace MQTTSNGW /*================================= * Starting prompt ==================================*/ -#define GATEWAY_VERSION " * Version: 0.4.0" +#define GATEWAY_VERSION " * Version: 0.5.0" #define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" diff --git a/MQTTSNGateway/src/linux/Threading.cpp b/MQTTSNGateway/src/linux/Threading.cpp index 660ba39..25a7d8e 100644 --- a/MQTTSNGateway/src/linux/Threading.cpp +++ b/MQTTSNGateway/src/linux/Threading.cpp @@ -511,7 +511,7 @@ void Thread::stopProcess(void) _stopProcessEvent->post(); } -void Thread::testThreadCancel(void) +void Thread::cancel(void) { - pthread_testcancel(); + pthread_cancel(_threadID); } diff --git a/MQTTSNGateway/src/linux/Threading.h b/MQTTSNGateway/src/linux/Threading.h index 1bdc6ba..8c74784 100644 --- a/MQTTSNGateway/src/linux/Threading.h +++ b/MQTTSNGateway/src/linux/Threading.h @@ -131,7 +131,7 @@ public: static bool equals(pthread_t*, pthread_t*); virtual void initialize(int argc, char** argv); void stopProcess(void); - void testThreadCancel(void); + void cancel(void); private: pthread_t _threadID; Semaphore* _stopProcessEvent; diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp index 85808c5..9c54d4e 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp @@ -153,6 +153,11 @@ const char* SensorNetwork::getDescription(void) return _description.c_str(); } +SensorNetAddress* SensorNetwork::getSenderAddress(void) +{ + return &_clientAddr; +} + /*========================================= Class udpStack =========================================*/ diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.h b/MQTTSNGateway/src/linux/udp/SensorNetwork.h index 691a4de..86d4595 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.h +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.h @@ -99,11 +99,7 @@ public: int read(uint8_t* buf, uint16_t bufLen); int initialize(void); const char* getDescription(void); - SensorNetAddress* getSenderAddress(void) - { - return &_clientAddr; - } - + SensorNetAddress* getSenderAddress(void); private: SensorNetAddress _clientAddr; // Sender's address. not gateway's one. diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp index 91754e4..1d37654 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp @@ -24,7 +24,6 @@ #include "SensorNetwork.h" #include "MQTTSNGWProcess.h" -#include "Threading.h" using namespace std; using namespace MQTTSNGW; @@ -111,17 +110,27 @@ int SensorNetwork::initialize(void) { char param[MQTTSNGW_PARAM_MAX]; uint16_t baudrate = 9600; + uint8_t apimode = 2; - if (theProcess->getParam("XBee Baudrate", param) == 0) + if (theProcess->getParam("ApiMode", param) == 0) + { + apimode = (uint8_t)atoi(param); + } + setApiMode(apimode); + _description = "API mode "; + sprintf(param, "%d", apimode); + _description += param; + + if (theProcess->getParam("Baudrate", param) == 0) { baudrate = (uint16_t)atoi(param); } - _description = "Baudrate "; + _description += ", Baudrate "; sprintf(param ,"%d", baudrate); _description += param; theProcess->getParam("SerialDevice", param); - _description = "SerialDevice "; + _description += ", SerialDevice "; _description += param; return XBee::open(param, baudrate); @@ -132,6 +141,11 @@ const char* SensorNetwork::getDescription(void) return _description.c_str(); } +SensorNetAddress* SensorNetwork::getSenderAddress(void) +{ + return &_clientAddr; +} + /*=========================================== Class XBee ============================================*/ @@ -140,6 +154,7 @@ XBee::XBee(){ _respCd = 0; _dataLen = 0; _frameId = 0; + _apiMode = 2; } XBee::~XBee(){ @@ -340,7 +355,7 @@ int XBee::send(const uint8_t* payload, uint8_t pLen, SensorNetAddress* addr){ void XBee::send(uint8_t c) { - if(c == START_BYTE || c == ESCAPE || c == XON || c == XOFF){ + if(_apiMode == 2 && (c == START_BYTE || c == ESCAPE || c == XON || c == XOFF)){ _serialPort->send(ESCAPE); _serialPort->send(c ^ 0x20); }else{ @@ -352,7 +367,7 @@ int XBee::recv(uint8_t* buf) { if (_serialPort->recv(buf) ) { - if ( *buf == ESCAPE) + if ( *buf == ESCAPE && _apiMode == 2 ) { _serialPort->recv(buf); *buf = 0x20 ^ *buf; @@ -362,6 +377,11 @@ int XBee::recv(uint8_t* buf) return -1; } +void XBee::setApiMode(uint8_t mode) +{ + _apiMode = mode; +} + /*========================================= Class SerialPort =========================================*/ diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.h b/MQTTSNGateway/src/linux/xbee/SensorNetwork.h index 9b3abdd..4de0932 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.h +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.h @@ -94,6 +94,7 @@ public: int unicast(const uint8_t* buf, uint16_t length, SensorNetAddress* sendToAddr); int broadcast(const uint8_t* buf, uint16_t length); int recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr); + void setApiMode(uint8_t mode); private: int readApiFrame(uint8_t* recvData); @@ -105,10 +106,10 @@ private: Mutex _meutex; SerialPort* _serialPort; uint8_t _frameId; - uint8_t _respCd; uint8_t _respId; uint8_t _dataLen; + uint8_t _apiMode; }; /*=========================================== @@ -125,11 +126,7 @@ public: int read(uint8_t* buf, uint16_t bufLen); int initialize(void); const char* getDescription(void); - SensorNetAddress* getSenderAddress(void) - { - return &_clientAddr; - } - + SensorNetAddress* getSenderAddress(void); private: SensorNetAddress _clientAddr; // Sender's address. not gateway's one. From 76f58a60a3ee5fb5249b39709b03511544632992 Mon Sep 17 00:00:00 2001 From: tomoaki Date: Sat, 1 Oct 2016 08:47:10 +0900 Subject: [PATCH 5/5] BugFix: Que template Update: Add ProcessFramework test and change Makefile for it. Signed-off-by: tomoaki --- MQTTSNGateway/README.md | 10 +- MQTTSNGateway/gateway.conf | 2 +- MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp | 2 +- MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.h | 1 - MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h | 1 - MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp | 6 +- MQTTSNGateway/src/MQTTSNGWProcess.h | 39 +++-- MQTTSNGateway/src/MQTTSNGateway.cpp | 19 +-- MQTTSNGateway/src/MQTTSNGateway.h | 3 +- MQTTSNGateway/src/linux/Threading.cpp | 2 +- MQTTSNGateway/src/linux/Threading.h | 1 + .../src/linux/xbee/SensorNetwork.cpp | 1 + MQTTSNGateway/src/mainGateway.cpp | 19 ++- .../src/tests/TestProcessFramework.cpp | 151 ++++++++++++++++++ .../src/tests/TestProcessFramework.h | 50 ++++++ MQTTSNGateway/src/tests/TestTask.cpp | 49 ++++++ MQTTSNGateway/src/tests/TestTask.h | 42 +++++ .../src/tests/mainTestProcessFramework.cpp | 31 ++++ Makefile | 43 +++-- 19 files changed, 415 insertions(+), 57 deletions(-) create mode 100644 MQTTSNGateway/src/tests/TestProcessFramework.cpp create mode 100644 MQTTSNGateway/src/tests/TestProcessFramework.h create mode 100644 MQTTSNGateway/src/tests/TestTask.cpp create mode 100644 MQTTSNGateway/src/tests/TestTask.h create mode 100644 MQTTSNGateway/src/tests/mainTestProcessFramework.cpp diff --git a/MQTTSNGateway/README.md b/MQTTSNGateway/README.md index 3c7b8bb..67f5e57 100644 --- a/MQTTSNGateway/README.md +++ b/MQTTSNGateway/README.md @@ -33,14 +33,15 @@ BrokerSecurePortNo=8883 ClientAuthentication=NO #ClientsList=/path/to/your_clients.conf -RootCAfile=/path/to/your_Root_CA.crt -RootCApath=/path/to/your_certs_directory/ +#RootCAfile=/path/to/your_Root_CA.crt +#RootCApath=/path/to/your_certs_directory/ +#CertsDirectory=/path/to/your_client_certs_directory/ GatewayID=1 GatewayName=PahoGateway-01 KeepAlive=900 -#LoginID= -#Password= +#LoginID=your_ID +#Password=your_Password # UDP GatewayPortNo=10000 @@ -50,6 +51,7 @@ MulticastPortNo=1883 # XBee Baudrate=38400 SerialDevice=/dev/ttyUSB0 +ApiMode=2 ``` **BrokerName** to specify a domain name of the Broker, and **BrokerPortNo** is a port No of the Broker. **BrokerSecurePortNo** is for TLS connection. diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf index 3438767..6d020c6 100644 --- a/MQTTSNGateway/gateway.conf +++ b/MQTTSNGateway/gateway.conf @@ -20,7 +20,7 @@ BrokerSecurePortNo=8883 ClientAuthentication=NO #ClientsList=/path/to/your_clients.conf -RootCAfile=/etc/ssl/certs/ca-certificates.crt +#RootCAfile=/etc/ssl/certs/ca-certificates.crt #RootCApath=/etc/ssl/certs/ #CertsDirectory=/usr/share/GW/IoTcerts/ diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp index 967c63a..11a2d3e 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp @@ -117,7 +117,7 @@ void BrokerRecvTask::run(void) /* post a BrokerRecvEvent */ ev = new Event(); ev->setBrokerRecvEvent(client, packet); - if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + if ( _gateway->getPacketEventQue()->post(ev) == 0 ) { delete ev; } diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.h b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.h index a5dd9a8..4177015 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.h +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.h @@ -22,7 +22,6 @@ namespace MQTTSNGW { -#define ERRNO_APL_03 13 // Task Initialize Error /*===================================== Class BrokerRecvTask =====================================*/ diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h index 81b0a7a..7ce73ea 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h @@ -22,7 +22,6 @@ namespace MQTTSNGW { -#define ERRNO_APL_04 14 // Task Initialize Error /*===================================== Class BrokerSendTask =====================================*/ diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index a8db6d3..ff87f5b 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -77,7 +77,7 @@ void ClientRecvTask::run() log(0, packet); ev = new Event(); ev->setBrodcastEvent(packet); - if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + if ( _gateway->getPacketEventQue()->post(ev) == 0 ) { delete ev; } @@ -93,7 +93,7 @@ void ClientRecvTask::run() log(client, packet); ev = new Event(); ev->setClientRecvEvent(client,packet); - if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + if ( _gateway->getPacketEventQue()->post(ev) == 0 ) { delete ev; } @@ -123,7 +123,7 @@ void ClientRecvTask::run() client->setClientAddress(_sensorNetwork->getSenderAddress()); ev = new Event(); ev->setClientRecvEvent(client, packet); - if ( _gateway->getPacketEventQue()->post(ev) == 1 ) + if ( _gateway->getPacketEventQue()->post(ev) == 0 ) { delete ev; } diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.h b/MQTTSNGateway/src/MQTTSNGWProcess.h index d79f829..c1b9110 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.h +++ b/MQTTSNGateway/src/MQTTSNGWProcess.h @@ -19,6 +19,7 @@ #include #include +#include #include "MQTTSNGWDefines.h" #include "Threading.h" @@ -164,6 +165,7 @@ public: _head = 0; _tail = 0; _cnt = 0; + _maxSize = 0; } ~Que() @@ -213,28 +215,33 @@ public: int post(T* t) { - QueElement* elm = new QueElement(t); - if ( _head ) + if ( t && ( _maxSize == 0 || size() < _maxSize )) { - if ( _tail == _head ) + QueElement* elm = new QueElement(t); + if ( _head ) { + if ( _tail == _head ) + { + elm->_prev = _tail; + _tail = elm; + _head->_next = elm; + } + else + { + _tail->_next = elm; elm->_prev = _tail; _tail = elm; + } } else { - _tail->_next = elm; - elm->_prev = _tail; - _tail = elm; + _head = elm; + _tail = elm; } + _cnt++; + return _cnt; } - else - { - _head = elm; - _tail = elm; - } - _cnt++; - return _cnt; + return 0; } int size(void) @@ -242,8 +249,14 @@ public: return _cnt; } + void setMaxSize(int maxSize) + { + _maxSize = maxSize; + } + private: int _cnt; + int _maxSize; QueElement* _head; QueElement* _tail; }; diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index 8b6e72c..fa90643 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -188,7 +188,7 @@ GatewayParams* Gateway::getGWParams(void) =====================================*/ EventQue::EventQue() { - _maxSize = 0; + } EventQue::~EventQue() @@ -198,7 +198,7 @@ EventQue::~EventQue() void EventQue::setMaxSize(uint16_t maxSize) { - _maxSize = maxSize; + _que.setMaxSize((int)maxSize); } Event* EventQue::wait(void) @@ -245,15 +245,12 @@ Event* EventQue::timedwait(uint16_t millsec) int EventQue::post(Event* ev) { - if ( ev && ( _maxSize == 0 || size() < _maxSize ) ) - { - _mutex.lock(); - _que.post(ev); - _sem.post(); - _mutex.unlock(); - return 0; - } - return 1; + int rc = 0; + _mutex.lock(); + rc = _que.post(ev); + _sem.post(); + _mutex.unlock(); + return rc; } int EventQue::size() diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index b9c1710..f3e2b4d 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -31,7 +31,7 @@ namespace MQTTSNGW /*================================= * Starting prompt ==================================*/ -#define GATEWAY_VERSION " * Version: 0.5.0" +#define GATEWAY_VERSION " * Version: 0.6.0" #define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" @@ -138,7 +138,6 @@ private: Que _que; Mutex _mutex; Semaphore _sem; - uint16_t _maxSize; }; /* diff --git a/MQTTSNGateway/src/linux/Threading.cpp b/MQTTSNGateway/src/linux/Threading.cpp index 25a7d8e..b6eeb5c 100644 --- a/MQTTSNGateway/src/linux/Threading.cpp +++ b/MQTTSNGateway/src/linux/Threading.cpp @@ -502,7 +502,7 @@ bool Thread::equals(pthread_t *t1, pthread_t *t2) int Thread::start(void) { - Runnable *runnable = this; + Runnable* runnable = this; return pthread_create(&_threadID, 0, _run, runnable); } diff --git a/MQTTSNGateway/src/linux/Threading.h b/MQTTSNGateway/src/linux/Threading.h index 8c74784..185ce69 100644 --- a/MQTTSNGateway/src/linux/Threading.h +++ b/MQTTSNGateway/src/linux/Threading.h @@ -19,6 +19,7 @@ #include #include +#include "MQTTSNGWDefines.h" namespace MQTTSNGW { diff --git a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp index 1d37654..75e90a0 100644 --- a/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/xbee/SensorNetwork.cpp @@ -152,6 +152,7 @@ SensorNetAddress* SensorNetwork::getSenderAddress(void) XBee::XBee(){ _serialPort = new SerialPort(); _respCd = 0; + _respId = 0; _dataLen = 0; _frameId = 0; _apiMode = 2; diff --git a/MQTTSNGateway/src/mainGateway.cpp b/MQTTSNGateway/src/mainGateway.cpp index 9ffc74a..227f9c8 100644 --- a/MQTTSNGateway/src/mainGateway.cpp +++ b/MQTTSNGateway/src/mainGateway.cpp @@ -25,18 +25,17 @@ using namespace MQTTSNGW; /* * Gateway Process */ -Gateway* gateway = new Gateway(); -PacketHandleTask* t0 = new PacketHandleTask(gateway); -ClientRecvTask* t1 = new ClientRecvTask(gateway); -ClientSendTask* t2 = new ClientSendTask(gateway); -BrokerRecvTask* t3 = new BrokerRecvTask(gateway); -BrokerSendTask* t4 = new BrokerSendTask(gateway); +Gateway* gw = new Gateway(); +PacketHandleTask task1(gw); +ClientRecvTask task2(gw); +ClientSendTask task3(gw); +BrokerRecvTask task4(gw); +BrokerSendTask task5(gw); int main(int argc, char** argv) { - gateway->initialize(argc, argv); - gateway->run(); - delete gateway; + gw->initialize(argc, argv); + gw->run(); + delete gw; return 0; } - diff --git a/MQTTSNGateway/src/tests/TestProcessFramework.cpp b/MQTTSNGateway/src/tests/TestProcessFramework.cpp new file mode 100644 index 0000000..6b6fe04 --- /dev/null +++ b/MQTTSNGateway/src/tests/TestProcessFramework.cpp @@ -0,0 +1,151 @@ +/************************************************************************************** + * Copyright (c) 2016, Tomoaki Yamaguchi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Tomoaki Yamaguchi - initial API and implementation + **************************************************************************************/ + +#include +#include +#include "TestProcessFramework.h" +#include "MQTTSNGWProcess.h" +#include "Timer.h" + +using namespace std; +using namespace MQTTSNGW; + +#define ARGV "./testPFW" +#define CONFDIR "./" +#define CONF "gateway.conf" + +const char* currentDateTime(void); + +TestProcessFramework::TestProcessFramework() +{ + theMultiTaskProcess = this; + theProcess = this; +} + +TestProcessFramework::~TestProcessFramework() +{ + +} + +void TestProcessFramework::initialize(int argc, char** argv) +{ + MultiTaskProcess::initialize(argc, argv); + assert(0 == strcmp(CONFDIR, getConfigDirName()->c_str())); + assert(0 == strcmp(CONF, getConfigFileName()->c_str())); + resetRingBuffer(); +} + +void TestProcessFramework::run(void) +{ + char value[256]; + int* v = 0; + int i = 0; + Timer tm; + TestQue que; + + assert(1 == getArgc() || 3 == getArgc() ); + assert(0 == strcmp(ARGV, *getArgv())); + getParam("BrokerName", value); + assert(0 == strcmp("iot.eclipse.org", value)); + + for ( i = 0; i < 1000; i++) + { + putLog("Test RingBuffer %d ", 1234567890); + } + putLog("\n\nRingBuffer Test complieted. Enter CTRL+C\n"); + + for ( i = 0; i < 10; i++ ) + { + v = new int(i); + que.post(v); + } + assert( 10 == que.size()); + + for ( i = 0; i < 10; i++ ) + { + assert(i == *que.front()); + int* p = que.front(); + if ( p ) + { + assert(i == *p); + que.pop(); + delete p; + } + } + assert(0 == que.front()); + assert(0 == que.size()); + + que.setMaxSize(5); + for ( i = 0; i < 10; i++ ) + { + v = new int(i); + que.post(v); + assert( 5 >= que.size()); + } + for ( i = 0; i < 10; i++ ) + { + int* p = que.front(); + if ( p ) + { + que.pop(); + delete p; + } + } + + printf("%s Timer start\n", currentDateTime()); + tm.start(1000); + while (!tm.isTimeup()); + printf("%s Timer 1sec\n", currentDateTime()); + + tm.start(); + while (!tm.isTimeup(1000)); + printf("%s Timer 1sec\n", currentDateTime()); + + MultiTaskProcess::run(); + printf("ProcessFramework test complited.\n"); +} + +TestQue::TestQue() +{ + +} + +TestQue::~TestQue() +{ + +} + +int* TestQue::front(void) +{ + return _que.front(); +} +void TestQue::pop(void) +{ + _que.pop(); +} +int TestQue::size(void) +{ + return _que.size(); +} +void TestQue::setMaxSize(int maxsize) +{ + _que.setMaxSize(maxsize); +} + +void TestQue::post(int* val) +{ + _que.post(val); +} diff --git a/MQTTSNGateway/src/tests/TestProcessFramework.h b/MQTTSNGateway/src/tests/TestProcessFramework.h new file mode 100644 index 0000000..88617f1 --- /dev/null +++ b/MQTTSNGateway/src/tests/TestProcessFramework.h @@ -0,0 +1,50 @@ +/************************************************************************************** + * Copyright (c) 2016, Tomoaki Yamaguchi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Tomoaki Yamaguchi - initial API and implementation + **************************************************************************************/ +#ifndef TESTPROCESSFRAMEWORK_H_ +#define TESTPROCESSFRAMEWORK_H_ + +#include "MQTTSNGWProcess.h" + + +namespace MQTTSNGW +{ +class TestProcessFramework: public MultiTaskProcess{ +public: + TestProcessFramework(); + ~TestProcessFramework(); + virtual void initialize(int argc, char** argv); + void run(void); + +private: + +}; + +class TestQue +{ +public: + TestQue(); + ~TestQue(); + void post(int*); + int* front(void); + void pop(void); + int size(void); + void setMaxSize(int maxsize); +private: + Que _que; +}; +} + +#endif /* TESTPROCESSFRAMEWORK_H_ */ diff --git a/MQTTSNGateway/src/tests/TestTask.cpp b/MQTTSNGateway/src/tests/TestTask.cpp new file mode 100644 index 0000000..cf1667e --- /dev/null +++ b/MQTTSNGateway/src/tests/TestTask.cpp @@ -0,0 +1,49 @@ +/************************************************************************************** + * Copyright (c) 2016, Tomoaki Yamaguchi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Tomoaki Yamaguchi - initial API and implementation + **************************************************************************************/ +#include +#include "TestTask.h" +#include "Threading.h" + +using namespace std; +using namespace MQTTSNGW; + +TestTask::TestTask(TestProcessFramework* proc) +{ + proc->attach((Thread*)this); +} + +TestTask::~TestTask() +{ + +} + +void TestTask::initialize(int argc, char** argv) +{ + printf("Task initialize complite.\n"); +} + +void TestTask::run(void) +{ + while(true) + { + printf("Task is running. Enter CTRL+C \n"); + if (theProcess->checkSignal() == SIGINT) + { + throw Exception("Terminated by CTL-C"); + } + sleep(1); + } +} diff --git a/MQTTSNGateway/src/tests/TestTask.h b/MQTTSNGateway/src/tests/TestTask.h new file mode 100644 index 0000000..c79db00 --- /dev/null +++ b/MQTTSNGateway/src/tests/TestTask.h @@ -0,0 +1,42 @@ +/************************************************************************************** + * Copyright (c) 2016, Tomoaki Yamaguchi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Tomoaki Yamaguchi - initial API and implementation + **************************************************************************************/ +#ifndef TESTTASK_H_ +#define TESTTASK_H_ + +#include "Threading.h" +#include "TestProcessFramework.h" + +namespace MQTTSNGW +{ + +class TestTask: public Thread +{ +MAGIC_WORD_FOR_THREAD; + ; +public: + TestTask(TestProcessFramework* proc); + ~TestTask(); + void initialize(int argc, char** argv); + void run(void); + +private: + +}; + +} + + +#endif /* TESTTASK_H_ */ diff --git a/MQTTSNGateway/src/tests/mainTestProcessFramework.cpp b/MQTTSNGateway/src/tests/mainTestProcessFramework.cpp new file mode 100644 index 0000000..9c23e04 --- /dev/null +++ b/MQTTSNGateway/src/tests/mainTestProcessFramework.cpp @@ -0,0 +1,31 @@ +/************************************************************************************** + * Copyright (c) 2016, Tomoaki Yamaguchi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Tomoaki Yamaguchi - initial API and implementation + **************************************************************************************/ +#include "TestProcessFramework.h" +#include "TestTask.h" + +using namespace MQTTSNGW; + +TestProcessFramework* proc = new TestProcessFramework(); +TestTask* task = new TestTask(proc); + +int main(int argc, char** argv) +{ + proc->initialize(argc, argv); + proc->run(); + delete proc; + return 0; +} + diff --git a/Makefile b/Makefile index 6f33f36..8fee46a 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,9 @@ APPL := mainGateway LPROGNAME := MQTT-SNLogmonitor LAPPL := mainLogmonitor +TESTPROGNAME := testPFW +TESTAPPL := mainTestProcessFramework + CONFIG := MQTTSNGateway/gateway.conf CLIENTS := MQTTSNGateway/clients.conf @@ -12,6 +15,7 @@ SUBDIR := MQTTSNPacket/src OS := linux SENSORNET := udp +TEST := tests CPPSRCS := \ $(SRCDIR)/MQTTGWConnectionHandler.cpp \ @@ -34,7 +38,10 @@ $(SRCDIR)/MQTTSNGWSubscribeHandler.cpp \ $(SRCDIR)/$(OS)/$(SENSORNET)/SensorNetwork.cpp \ $(SRCDIR)/$(OS)/Timer.cpp \ $(SRCDIR)/$(OS)/Network.cpp \ -$(SRCDIR)/$(OS)/Threading.cpp +$(SRCDIR)/$(OS)/Threading.cpp \ +$(SRCDIR)/$(TEST)/TestProcessFramework.cpp \ +$(SRCDIR)/$(TEST)/TestTask.cpp + CSRCS := $(SUBDIR)/MQTTSNConnectClient.c \ $(SUBDIR)/MQTTSNConnectServer.c \ @@ -51,13 +58,14 @@ $(SUBDIR)/MQTTSNUnsubscribeServer.c CXX := g++ CPPFLAGS += -INCLUDES += -IMQTTSNGateway/src \ --IMQTTSNGateway/src/$(OS) \ --IMQTTSNGateway/src/$(OS)/$(SENSORNET) \ --IMQTTSNPacket/src +INCLUDES += -I$(SRCDIR) \ +-I$(SRCDIR)/$(OS) \ +-I$(SRCDIR)/$(OS)/$(SENSORNET) \ +-I$(SUBDIR) \ +-I$(SRCDIR)/$(TEST) DEFS := -LIBS += +LIBS += -L/usr/local/lib LDFLAGS := CXXFLAGS := -Wall -O3 -std=c++11 LDADD := -lpthread -lssl -lcrypto @@ -65,17 +73,22 @@ OUTDIR := Build PROG := $(OUTDIR)/$(PROGNAME) LPROG := $(OUTDIR)/$(LPROGNAME) +TPROG := $(OUTDIR)/$(TESTPROGNAME) + OBJS := $(CPPSRCS:%.cpp=$(OUTDIR)/%.o) OBJS += $(CSRCS:%.c=$(OUTDIR)/%.o) DEPS := $(CPPSRCS:%.cpp=$(OUTDIR)/%.d) DEPS += $(CSRCS:%.c=$(OUTDIR)/%.d) -.PHONY: install clean +.PHONY: install clean exectest -all: $(PROG) $(LPROG) +all: $(PROG) $(LPROG) $(TPROG) monitor: $(LPROG) +test: $(TPROG) $(LPROG) exectest + + -include $(DEPS) $(PROG): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(APPL).o @@ -84,6 +97,10 @@ $(PROG): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(APPL).o $(LPROG): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(LAPPL).o $(CXX) $(LDFLAGS) -o $@ $^ $(LIBS) $(LDADD) +$(TPROG): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(TEST)/$(TESTAPPL).o + $(CXX) $(LDFLAGS) -o $@ $^ $(LIBS) $(LDADD) + + $(OUTDIR)/$(SRCDIR)/%.o:$(SRCDIR)/%.cpp @if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi $(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $< @@ -92,6 +109,10 @@ $(OUTDIR)/$(SRCDIR)/$(APPL).o:$(SRCDIR)/$(APPL).cpp @if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi $(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $< +$(OUTDIR)/$(SRCDIR)/$(TEST)/$(TESTAPPL).o:$(SRCDIR)/$(TEST)/$(TESTAPPL).cpp + @if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi + $(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $< + $(OUTDIR)/$(SRCDIR)/$(LAPPL).o:$(SRCDIR)/$(LAPPL).cpp @if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi $(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $< @@ -108,5 +129,9 @@ install: cp -pf $(LPROG) ../ cp -pf $(CONFIG) ../ cp -pf $(CLIENTS) ../ - + +exectest: + cp -pf $(CONFIG) $(OUTDIR) + cd $(OUTDIR); ./$(TESTPROGNAME) -f ./gateway.conf + \ No newline at end of file