diff --git a/.cproject b/.cproject index 7a8dff5..3466b14 100644 --- a/.cproject +++ b/.cproject @@ -52,6 +52,9 @@ + @@ -63,7 +66,7 @@ - + @@ -130,7 +133,7 @@ - + diff --git a/MQTTSNGateway/gateway.conf b/MQTTSNGateway/gateway.conf index 6d020c6..46016a4 100644 --- a/MQTTSNGateway/gateway.conf +++ b/MQTTSNGateway/gateway.conf @@ -22,7 +22,8 @@ ClientAuthentication=NO #RootCAfile=/etc/ssl/certs/ca-certificates.crt #RootCApath=/etc/ssl/certs/ -#CertsDirectory=/usr/share/GW/IoTcerts/ +#CertsFile=/path/to/certKey.pem +#PrivateKey=/path/to/privateKey.pem GatewayID=1 GatewayName=PahoGateway-01 @@ -40,3 +41,6 @@ Baudrate=38400 SerialDevice=/dev/ttyUSB0 ApiMode=2 +# LOG +ShearedMemory=NO; + diff --git a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp index 88263da..8440142 100644 --- a/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTGWConnectionHandler.cpp @@ -100,7 +100,7 @@ void MQTTGWConnectionHandler::handleDisconnect(Client* client, MQTTGWPacket* pac MQTTSNPacket* snPacket = new MQTTSNPacket(); snPacket->setDISCONNECT(0); client->disconnected(); + client->getNetwork()->close(); Event* ev1 = new Event(); ev1->setClientSendEvent(client, snPacket); } - diff --git a/MQTTSNGateway/src/MQTTGWConnectionHandler.h b/MQTTSNGateway/src/MQTTGWConnectionHandler.h index 8d8af3c..125350d 100644 --- a/MQTTSNGateway/src/MQTTGWConnectionHandler.h +++ b/MQTTSNGateway/src/MQTTGWConnectionHandler.h @@ -31,12 +31,9 @@ public: void handleConnack(Client* client, MQTTGWPacket* packet); void handlePingresp(Client* client, MQTTGWPacket* packet); void handleDisconnect(Client* client, MQTTGWPacket* packet); - private: Gateway* _gateway; }; } - - #endif /* MQTTGWCONNECTIONHANDLER_H_ */ diff --git a/MQTTSNGateway/src/MQTTGWPacket.cpp b/MQTTSNGateway/src/MQTTGWPacket.cpp index 74a885f..a969ce4 100644 --- a/MQTTSNGateway/src/MQTTGWPacket.cpp +++ b/MQTTSNGateway/src/MQTTGWPacket.cpp @@ -184,9 +184,10 @@ int MQTTGWPacket::recv(Network* network) unsigned char c; /* read First Byte of Packet */ - if (network->recv((unsigned char*)&_header.byte, 1) == -1) + int rc = network->recv((unsigned char*)&_header.byte, 1); + if ( rc <= 0) { - return -1; + return rc; } /* read RemainingLength */ do diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp index 11a2d3e..39b8eb4 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerRecvTask.cpp @@ -59,6 +59,12 @@ void BrokerRecvTask::run(void) while (true) { + _light->blueLight(false); + if (CHK_SIGINT) + { + WRITELOG("%s BrokerRecvTask stopped.\n", currentDateTime()); + return; + } timeout.tv_sec = 0; timeout.tv_usec = 500000; // 500 msec FD_ZERO(&rset); @@ -68,7 +74,6 @@ void BrokerRecvTask::run(void) /* Prepare sockets list to read */ Client* client = _gateway->getClientList()->getClient(); - _light->blueLight(false); while (client > 0) { @@ -111,26 +116,18 @@ void BrokerRecvTask::run(void) if ( log(client, packet) == -1 ) { - continue; + delete packet; + goto nextClient; } /* post a BrokerRecvEvent */ ev = new Event(); ev->setBrokerRecvEvent(client, packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); } else { - _light->blueLight(false); - if ( rc == 0 ) - { - delete packet; - continue; - } - else if (rc == -1) + if (rc == -1) { WRITELOG("%s BrokerRecvTask can't receive a packet from the broker errno=%d %s%s\n", ERRMSG_HEADER, errno, client->getClientId(), ERRMSG_FOOTER); } @@ -140,27 +137,28 @@ void BrokerRecvTask::run(void) } else if ( rc == -3 ) { - WRITELOG("%s BrokerRecvTask can't create the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); + WRITELOG("%s BrokerRecvTask can't get memories for the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); } - /* disconnect the client */ - client->disconnected(); - client->getNetwork()->disconnect(); - rc = 0; delete packet; + + if ( (rc == -1 || rc == -2) && client->isActive() ) + { + /* disconnect the client */ + packet = new MQTTGWPacket(); + packet->setHeader(DISCONNECT); + ev = new Event(); + ev->setBrokerRecvEvent(client, packet); + _gateway->getPacketEventQue()->post(ev); + } } } } + nextClient: client = client->getNextClient(); } - _light->blueLight(false); } } - else - { - _light->greenLight(false); - } - maxSock = 0; } } @@ -176,7 +174,6 @@ int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet) switch (packet->getType()) { case CONNACK: - case DISCONNECT: WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf)); break; case PUBLISH: @@ -196,6 +193,7 @@ int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet) WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf)); break; default: + WRITELOG("Type=%x\n", packet->getType()); rc = -1; break; } diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp index 0c44ee7..50bab94 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.cpp @@ -34,41 +34,13 @@ BrokerSendTask::BrokerSendTask(Gateway* gateway) { _gateway = gateway; _gateway->attach((Thread*)this); - _host = 0; - _port = 0; - _portSecure = 0; - _certDirectory = 0; - _rootCAfile = 0; - + _gwparams = 0; _light = 0; } BrokerSendTask::~BrokerSendTask() { - if (_host) - { - free(_host); - } - if (_port) - { - free(_port); - } - if (_portSecure) - { - free(_portSecure); - } - if (_certDirectory) - { - free(_certDirectory); - } - if (_rootCApath) - { - free(_rootCApath); - } - if (_rootCAfile) - { - free(_rootCAfile); - } + } /** @@ -76,34 +48,7 @@ BrokerSendTask::~BrokerSendTask() */ void BrokerSendTask::initialize(int argc, char** argv) { - char param[MQTTSNGW_PARAM_MAX]; - - if (_gateway->getParam("BrokerName", param) == 0) - { - _host = strdup(param); - } - if (_gateway->getParam("BrokerPortNo", param) == 0) - { - _port = strdup(param); - } - if (_gateway->getParam("BrokerSecurePortNo", param) == 0) - { - _portSecure = strdup(param); - } - - if (_gateway->getParam("CertsDirectory", param) == 0) - { - _certDirectory = strdup(param); - } - if (_gateway->getParam("RootCApath", param) == 0) - { - _rootCApath = strdup(param); - } - if (_gateway->getParam("RootCAfile", param) == 0) - { - _rootCAfile = strdup(param); - } - + _gwparams = _gateway->getGWParams(); _light = _gateway->getLightIndicator(); } @@ -115,83 +60,81 @@ void BrokerSendTask::run() Event* ev = 0; MQTTGWPacket* packet = 0; Client* client = 0; + int rc = 0; while (true) { - int rc = 0; ev = _gateway->getBrokerSendQue()->wait(); - client = ev->getClient(); - packet = ev->getMQTTGWPacket(); - if ( client->getNetwork()->isValid() && !client->getNetwork()->isSecure() && packet->getType() == CONNECT ) + if ( ev->getEventType() == EtStop ) { - client->getNetwork()->close(); + WRITELOG("%s BrokerSendTask stopped.\n", currentDateTime()); + delete ev; + return; } - if ( !client->getNetwork()->isValid() ) + if ( ev->getEventType() == EtBrokerSend) { - /* connect to the broker and send a packet */ - char* portNo = _port; - const char* cert = 0; - const char* keyFile = 0; - string certFile; - string privateKeyFile; + client = ev->getClient(); + packet = ev->getMQTTGWPacket(); - if (client->isSecureNetwork()) + if ( packet->getType() == CONNECT && client->getNetwork()->isValid() ) { - portNo = _portSecure; - if ( _certDirectory ) + client->getNetwork()->close(); + } + + if ( !client->getNetwork()->isValid() ) + { + /* connect to the broker and send a packet */ + + if (client->isSecureNetwork()) { - certFile = _certDirectory; - certFile += client->getClientId(); - certFile += ".crt"; - cert = certFile.c_str(); - privateKeyFile = _certDirectory; - privateKeyFile += client->getClientId(); - privateKeyFile += ".key"; - keyFile = privateKeyFile.c_str(); + rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->portSecure, _gwparams->rootCApath, + _gwparams->rootCAfile, _gwparams->certKey, _gwparams->privateKey); } - rc = client->getNetwork()->connect(_host, portNo, _rootCApath, _rootCAfile, cert, keyFile); + else + { + rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->port); + } + + if ( !rc ) + { + /* disconnect the broker and the client */ + WRITELOG("%s BrokerSendTask can't connect to the broker. %s%s\n", + ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); + delete ev; + client->getNetwork()->close(); + continue; + } + } + + /* send a packet */ + _light->blueLight(true); + if ( (rc = packet->send(client->getNetwork())) > 0 ) + { + if ( packet->getType() == CONNECT ) + { + client->setWaitWillMsgFlg(false); + client->connectSended(); + } + log(client, packet); } else { - rc = client->getNetwork()->connect(_host, portNo); - } - - if ( !rc ) - { - /* disconnect the broker and change the client's status */ - WRITELOG("%s BrokerSendTask can't connect to the broker. errno=%d %s%s\n", - ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); - client->disconnected(); - client->getNetwork()->disconnect(); - delete ev; - continue; - } - } - - /* send a packet */ - _light->blueLight(true); - if ( (rc = packet->send(client->getNetwork())) > 0 ) - { - if ( packet->getType() == CONNECT ) - { - client->setWaitWillMsgFlg(false); - client->connectSended(); - } - log(client, packet); - } - else - { - WRITELOG("%s BrokerSendTask can't send a packet. errno=%d %s%s\n", - ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s BrokerSendTask can't send a packet to the broker errno=%d %s%s\n", ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); - client->disconnected(); - client->getNetwork()->disconnect(); - } + client->getNetwork()->close(); - _light->blueLight(false); + /* Disconnect the client */ + packet = new MQTTGWPacket(); + packet->setHeader(DISCONNECT); + Event* ev1 = new Event(); + ev1->setBrokerRecvEvent(client, packet); + _gateway->getPacketEventQue()->post(ev1); + } + + _light->blueLight(false); + } delete ev; } } @@ -231,3 +174,4 @@ void BrokerSendTask::log(Client* client, MQTTGWPacket* packet) break; } } + diff --git a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h index 7ce73ea..f287536 100644 --- a/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h +++ b/MQTTSNGateway/src/MQTTSNGWBrokerSendTask.h @@ -35,14 +35,8 @@ public: void run(); private: void log(Client*, MQTTGWPacket*); - Gateway* _gateway; - char* _host; - char* _port; - char* _portSecure; - char* _rootCApath; - char* _rootCAfile; - char* _certDirectory; + GatewayParams* _gwparams; LightIndicator* _light; }; diff --git a/MQTTSNGateway/src/MQTTSNGWClient.cpp b/MQTTSNGateway/src/MQTTSNGWClient.cpp index 430498b..74b07ac 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClient.cpp @@ -102,9 +102,7 @@ bool ClientList::authorize(const char* fileName) } pos = data.find_first_of(","); string id = data.substr(0, pos); - clientId.cstring = strdup(id.c_str()); - string addr = data.substr(pos + 1); if (netAddr.setAddress(&addr) == 0) @@ -117,6 +115,7 @@ bool ClientList::authorize(const char* fileName) { WRITELOG("Invalid address %s\n", data.c_str()); } + free(clientId.cstring); } fclose(fp); _authorize = true; @@ -375,13 +374,9 @@ void Client::setClientSleepPacket(MQTTSNPacket* packet) _clientSleepPacketQue.post(packet); } -void Client::checkTimeover(void) +bool Client::checkTimeover(void) { - if (_status == Cstat_Active && _keepAliveTimer.isTimeup()) - { - _status = Cstat_Lost; - _network->disconnect(); - } + return (_status == Cstat_Active && _keepAliveTimer.isTimeup()); } void Client::setKeepAlive(MQTTSNPacket* packet) diff --git a/MQTTSNGateway/src/MQTTSNGWClient.h b/MQTTSNGateway/src/MQTTSNGWClient.h index 0ba2b03..d210fc3 100644 --- a/MQTTSNGateway/src/MQTTSNGWClient.h +++ b/MQTTSNGateway/src/MQTTSNGWClient.h @@ -248,7 +248,7 @@ public: void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); - void checkTimeover(void); + bool checkTimeover(void); void updateStatus(MQTTSNPacket*); void updateStatus(ClientStatus); void connectSended(void); diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp index ff87f5b..a98102e 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.cpp @@ -59,6 +59,13 @@ void ClientRecvTask::run() MQTTSNPacket* packet = new MQTTSNPacket(); int packetLen = packet->recv(_sensorNetwork); + if (CHK_SIGINT) + { + WRITELOG("%s ClientRecvTask stopped.\n", currentDateTime()); + delete packet; + return; + } + if (packetLen < 2 ) { delete packet; @@ -77,10 +84,7 @@ void ClientRecvTask::run() log(0, packet); ev = new Event(); ev->setBrodcastEvent(packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); continue; } @@ -93,10 +97,7 @@ void ClientRecvTask::run() log(client, packet); ev = new Event(); ev->setClientRecvEvent(client,packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); } else { @@ -108,41 +109,60 @@ void ClientRecvTask::run() packet->getCONNECT(&data); /* create a client */ - client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); //_gateway->getGWParams()->secureConnection); - + client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); + log(client, packet, &data.clientID); if (!client) { - WRITELOG("%s Can't create a Client. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, ERRMSG_FOOTER); + WRITELOG("%s Client was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, ERRMSG_FOOTER); delete packet; continue; } - log(client, packet); - /* set sensorNetAddress & post Event */ client->setClientAddress(_sensorNetwork->getSenderAddress()); ev = new Event(); ev->setClientRecvEvent(client, packet); - if ( _gateway->getPacketEventQue()->post(ev) == 0 ) - { - delete ev; - } + _gateway->getPacketEventQue()->post(ev); } else { log(client, packet); delete packet; + /* Send DISCONNECT */ + SensorNetAddress* addr = new SensorNetAddress(); + *addr = (*_sensorNetwork->getSenderAddress()); + packet = new MQTTSNPacket(); + packet->setDISCONNECT(0); + ev = new Event(); + ev->setClientSendEvent(addr, packet); + _gateway->getClientSendQue()->post(ev); continue; } } } } -void ClientRecvTask::log(Client* client, MQTTSNPacket* packet) +void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id) { char pbuf[SIZE_OF_LOG_PACKET * 3]; + const char* clientId; + char cstr[MAX_CLIENTID_LENGTH + 1]; char msgId[6]; - const char* clientId = client ? (const char*)client->getClientId() :"Non Active Client !" ; + + if ( id ) + { + memset((void*)cstr, 0, id->lenstring.len); + strncpy(cstr, id->lenstring.data, id->lenstring.len) ; + clientId = cstr; + } + else if ( client ) + { + clientId = client->getClientId(); + } + else + { + clientId = UNKNOWNCL; + } switch (packet->getType()) { diff --git a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h index 4af4f0d..295c0ee 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h +++ b/MQTTSNGateway/src/MQTTSNGWClientRecvTask.h @@ -35,7 +35,7 @@ public: void run(); private: - void log(Client*, MQTTSNPacket*); + void log(Client*, MQTTSNPacket*, MQTTSNString* id = 0); Gateway* _gateway; SensorNetwork* _sensorNetwork; diff --git a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp index b7d7bff..1ba6398 100644 --- a/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWClientSendTask.cpp @@ -45,6 +45,12 @@ void ClientSendTask::run() { Event* ev = _gateway->getClientSendQue()->wait(); + if (ev->getEventType() == EtStop) + { + WRITELOG("%s ClientSendTask stopped.\n", currentDateTime()); + delete ev; + break; + } if (ev->getEventType() == EtClientSend) { client = ev->getClient(); @@ -56,6 +62,11 @@ void ClientSendTask::run() packet = ev->getMQTTSNPacket(); packet->broadcast(_sensorNetwork); } + else if (ev->getEventType() == EtSensornetSend) + { + packet = ev->getMQTTSNPacket(); + packet->unicast(_sensorNetwork, ev->getSensorNetAddress()); + } log(client, packet); delete ev; @@ -66,6 +77,7 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet) { char pbuf[SIZE_OF_LOG_PACKET * 3]; char msgId[6]; + const char* clientId = client ? (const char*)client->getClientId() : UNKNOWNCL ; switch (packet->getType()) { @@ -78,15 +90,13 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet) case MQTTSN_DISCONNECT: case MQTTSN_WILLTOPICREQ: case MQTTSN_WILLMSGREQ: - WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, client->getClientId(), packet->print(pbuf)); - break; case MQTTSN_WILLTOPICRESP: case MQTTSN_WILLMSGRESP: - WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, client->getClientId(), packet->print(pbuf)); + WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, clientId, packet->print(pbuf)); break; case MQTTSN_REGISTER: case MQTTSN_PUBLISH: - WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, client->getClientId(), packet->print(pbuf)); + WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, clientId, packet->print(pbuf)); break; case MQTTSN_REGACK: case MQTTSN_PUBACK: @@ -95,7 +105,7 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet) case MQTTSN_PUBCOMP: case MQTTSN_SUBACK: case MQTTSN_UNSUBACK: - WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, client->getClientId(), packet->print(pbuf)); + WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, clientId, packet->print(pbuf)); break; default: break; diff --git a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp index cae95f2..5783646 100644 --- a/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWConnectionHandler.cpp @@ -209,19 +209,22 @@ void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet */ void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet) { - MQTTGWPacket* mqMsg = new MQTTGWPacket(); + Event* ev = new Event(); MQTTSNPacket* snMsg = new MQTTSNPacket(); - - mqMsg->setHeader(DISCONNECT); snMsg->setDISCONNECT(0); + ev->setClientSendEvent(client, snMsg); + _gateway->getClientSendQue()->post(ev); - Event* ev1 = new Event(); - ev1->setClientSendEvent(client, snMsg); - _gateway->getClientSendQue()->post(ev1); - - ev1 = new Event(); - ev1->setBrokerSendEvent(client, mqMsg); - _gateway->getBrokerSendQue()->post(ev1); + uint16_t duration = 0; + packet->getDISCONNECT(&duration); + if ( duration == 0 ) + { + MQTTGWPacket* mqMsg = new MQTTGWPacket(); + mqMsg->setHeader(DISCONNECT); + ev = new Event(); + ev->setBrokerSendEvent(client, mqMsg); + _gateway->getBrokerSendQue()->post(ev); + } } /* diff --git a/MQTTSNGateway/src/MQTTSNGWDefines.h b/MQTTSNGateway/src/MQTTSNGWDefines.h index 839708e..96f73f6 100644 --- a/MQTTSNGateway/src/MQTTSNGWDefines.h +++ b/MQTTSNGateway/src/MQTTSNGWDefines.h @@ -20,11 +20,17 @@ namespace MQTTSNGW { /*================================= - * Log controls + * Config Parametrs ==================================*/ -//#define DEBUG // print out log for debug -//#define RINGBUFFER // print out Packets log into shared memory -//#define DEBUG_NWSTACK // print out SensorNetwork log +#define CONFIG_DIRECTORY "./" +#define CONFIG_FILE "gateway.conf" +#define CLIENT_LIST "clients.conf" + +/*========================================================== + * Gateway default parameters + ===========================================================*/ +#define DEFAULT_KEEP_ALIVE_TIME (900) // 900 secs = 15 mins +#define DEFAULT_MQTT_VERSION (4) // Defualt MQTT version /*================================= * MQTT-SN Parametrs @@ -43,11 +49,13 @@ typedef unsigned short uint16_t; typedef unsigned int uint32_t; /*================================= - * Macros + * Log controls ==================================*/ +//#define DEBUG // print out log for debug +//#define DEBUG_NWSTACK // print out SensorNetwork log + #ifdef DEBUG #define DEBUGLOG(...) printf(__VA_ARGS__) -#undef RINGBUFFER #else #define DEBUGLOG(...) #endif diff --git a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp index 77dc86b..5149a85 100644 --- a/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPacketHandleTask.cpp @@ -32,7 +32,7 @@ using namespace std; using namespace MQTTSNGW; #define EVENT_QUE_TIME_OUT 2000 // 2000 msecs - +char* currentDateTime(void); /*===================================== Class PacketHandleTask =====================================*/ @@ -84,7 +84,6 @@ void PacketHandleTask::run() { Event* ev = 0; EventQue* eventQue = _gateway->getPacketEventQue(); - ClientList* clist = _gateway->getClientList(); Client* client = 0; MQTTSNPacket* snPacket = 0; MQTTGWPacket* brPacket = 0; @@ -95,23 +94,18 @@ void PacketHandleTask::run() while (true) { - if (theProcess->checkSignal() == SIGINT) - { - throw Exception("Terminated by CTL-C"); - } - /* wait Event */ ev = eventQue->timedwait(EVENT_QUE_TIME_OUT); + if (ev->getEventType() == EtStop) + { + WRITELOG("%s PacketHandleTask stopped.\n", currentDateTime()); + delete ev; + return; + } + if (ev->getEventType() == EtTimeout) { - /*------ Is Client Lost ? ---------*/ - client = clist->getClient(); - while (client > 0) - { - client->checkTimeover(); - client = client->getNextClient(); - } /*------ Check Keep Alive Timer & send Advertise ------*/ if (_advertiseTimer.isTimeup()) { @@ -200,9 +194,6 @@ void PacketHandleTask::run() case CONNACK: _mqttConnection->handleConnack(client, brPacket); break; - case DISCONNECT: - _mqttConnection->handleDisconnect(client, brPacket); - break; case PINGRESP: _mqttConnection->handlePingresp(client, brPacket); break; diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.cpp b/MQTTSNGateway/src/MQTTSNGWProcess.cpp index ce5085e..18f5df3 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.cpp +++ b/MQTTSNGateway/src/MQTTSNGWProcess.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "MQTTSNGWProcess.h" #include "Threading.h" @@ -53,8 +54,9 @@ Process::Process() { _argc = 0; _argv = 0; - _configDir = MQTTSNGW_CONFIG_DIRECTORY; - _configFile = MQTTSNGW_CONFIG_FILE; + _configDir = CONFIG_DIRECTORY; + _configFile = CONFIG_FILE; + _log = 0; } Process::~Process() @@ -76,6 +78,7 @@ void Process::run() void Process::initialize(int argc, char** argv) { + char param[MQTTSNGW_PARAM_MAX]; _argc = argc; _argv = argv; signal(SIGINT, signalHandler); @@ -102,6 +105,40 @@ void Process::initialize(int argc, char** argv) } _rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0); _rb = new RingBuffer(_configDir.c_str()); + + if (getParam("ShearedMemory", param) == 0) + { + if (!strcasecmp(param, "YES")) + { + _log = 1; + } + else + { + _log = 0; + } + } +} + +void Process::putLog(const char* format, ...) +{ + _mt.lock(); + va_list arg; + va_start(arg, format); + vsprintf(_rbdata, format, arg); + va_end(arg); + if (strlen(_rbdata)) + { + if ( _log > 0 ) + { + _rb->put(_rbdata); + _rbsem->post(); + } + else + { + printf("%s", _rbdata); + } + } + _mt.unlock(); } int Process::getArgc() @@ -169,21 +206,6 @@ int Process::getParam(const char* parameter, char* value) return -2; } -void Process::putLog(const char* format, ...) -{ - _mt.lock(); - va_list arg; - va_start(arg, format); - vsprintf(_rbdata, format, arg); - va_end(arg); - if (strlen(_rbdata)) - { - _rb->put(_rbdata); - _rbsem->post(); - } - _mt.unlock(); -} - const char* Process::getLog() { int len = 0; @@ -228,6 +250,7 @@ MultiTaskProcess::MultiTaskProcess() { theMultiTaskProcess = this; _threadCount = 0; + _stopCount = 0; } MultiTaskProcess::~MultiTaskProcess() @@ -254,28 +277,43 @@ void MultiTaskProcess::run(void) try { - _stopProcessEvent.wait(); - } - catch (Exception* ex) - { - for (int i = 0; i < _threadCount; i++) + while(true) { - if ( _threadList[i] ) + if (theProcess->checkSignal() == SIGINT) { - _threadList[i]->cancel();; + return; } + sleep(1); } + } + catch(Exception* ex) + { ex->writeMessage(); } + catch(...) + { + throw; + } } -Semaphore* MultiTaskProcess::getStopProcessEvent(void) +void MultiTaskProcess::waitStop(void) { - return &_stopProcessEvent; + while (_stopCount < _threadCount) + { + sleep(1); + } +} + +void MultiTaskProcess::threadStoped(void) +{ + _mutex.lock(); + _stopCount++; + _mutex.unlock(); } void MultiTaskProcess::attach(Thread* thread) { + _mutex.lock(); if (_threadCount < MQTTSNGW_MAX_TASK) { _threadList[_threadCount] = thread; @@ -283,8 +321,10 @@ void MultiTaskProcess::attach(Thread* thread) } else { + _mutex.unlock(); throw Exception("Full of Threads"); } + _mutex.unlock(); } int MultiTaskProcess::getParam(const char* parameter, char* value) diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.h b/MQTTSNGateway/src/MQTTSNGWProcess.h index c1b9110..7fdd52f 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.h +++ b/MQTTSNGateway/src/MQTTSNGWProcess.h @@ -31,11 +31,6 @@ namespace MQTTSNGW /*================================= * Parameters ==================================*/ -#define MQTTSNGW_CONFIG_DIRECTORY "./" - -#define MQTTSNGW_CONFIG_FILE "gateway.conf" -#define MQTTSNGW_CLIENT_LIST "clients.conf" - #define MQTTSNGW_MAX_TASK 10 // number of Tasks #define PROCESS_LOG_BUFFER_SIZE 16384 // Ring buffer size for Logs #define MQTTSNGW_PARAM_MAX 128 // Max length of config records. @@ -43,13 +38,8 @@ namespace MQTTSNGW /*================================= * Macros ==================================*/ -#ifdef RINGBUFFER #define WRITELOG theProcess->putLog -#else -#define WRITELOG printf -#endif - - +#define CHK_SIGINT (theProcess->checkSignal() == SIGINT) /*================================= Class Process ==================================*/ @@ -77,6 +67,7 @@ private: RingBuffer* _rb; Semaphore* _rbsem; Mutex _mt; + int _log; char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1]; }; @@ -88,17 +79,18 @@ class MultiTaskProcess: public Process public: MultiTaskProcess(void); ~MultiTaskProcess(); - virtual void initialize(int argc, char** argv); - virtual int getParam(const char* parameter, char* value); + void initialize(int argc, char** argv); + int getParam(const char* parameter, char* value); void run(void); + void waitStop(void); + void threadStoped(void); void attach(Thread* thread); - Semaphore* getStopProcessEvent(void); private: Thread* _threadList[MQTTSNGW_MAX_TASK]; - Semaphore _stopProcessEvent; Mutex _mutex; int _threadCount; + int _stopCount; }; /*===================================== diff --git a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp index 6c4bff3..e23bcae 100644 --- a/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp +++ b/MQTTSNGateway/src/MQTTSNGWPublishHandler.cpp @@ -48,7 +48,6 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet) if ( !client->isActive() ) { /* Reply DISCONNECT to the client */ - WRITELOG(" The client is not active. status = %s\n", client->getStatus()); Event* ev = new Event(); MQTTSNPacket* disconnect = new MQTTSNPacket(); disconnect->setDISCONNECT(0); diff --git a/MQTTSNGateway/src/MQTTSNGateway.cpp b/MQTTSNGateway/src/MQTTSNGateway.cpp index fa90643..fbed95c 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.cpp +++ b/MQTTSNGateway/src/MQTTSNGateway.cpp @@ -31,6 +31,20 @@ Gateway::Gateway() theProcess = this; _params.loginId = 0; _params.password = 0; + _params.keepAlive = 0; + _params.gatewayId = 0; + _params.mqttVersion = 0; + _params.maxInflightMsgs = 0; + _params.gatewayName = 0; + _params.brokerName = 0; + _params.port = 0; + _params.portSecure = 0; + _params.rootCApath = 0; + _params.rootCAfile = 0; + _params.certKey = 0; + _params.privateKey = 0; + _params.clientListName = 0; + _params.configName = 0; _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS); } @@ -44,15 +58,85 @@ Gateway::~Gateway() { free(_params.password); } + if ( _params.gatewayName ) + { + free(_params.gatewayName); + } + if ( _params.brokerName ) + { + free(_params.brokerName); + } + if ( _params.port ) + { + free(_params.port); + } + if ( _params.portSecure ) + { + free(_params.portSecure); + } + if ( _params.certKey ) + { + free(_params.certKey); + } + if ( _params.privateKey ) + { + free(_params.privateKey); + } + if ( _params.rootCApath ) + { + free(_params.rootCApath); + } + if ( _params.rootCAfile ) + { + free(_params.rootCAfile); + } + if ( _params.clientListName ) + { + free(_params.clientListName); + } + if ( _params.configName ) + { + free(_params.configName); + } } void Gateway::initialize(int argc, char** argv) { char param[MQTTSNGW_PARAM_MAX]; + string fileName; MultiTaskProcess::initialize(argc, argv); resetRingBuffer(); - _params.gatewayId = 0; + if (getParam("BrokerName", param) == 0) + { + _params.brokerName = strdup(param); + } + if (getParam("BrokerPortNo", param) == 0) + { + _params.port = strdup(param); + } + if (getParam("BrokerSecurePortNo", param) == 0) + { + _params.portSecure = strdup(param); + } + + if (getParam("CertKey", param) == 0) + { + _params.certKey = strdup(param); + } + if (getParam("PrivateKey", param) == 0) + { + _params.privateKey = strdup(param); + } + if (getParam("RootCApath", param) == 0) + { + _params.rootCApath = strdup(param); + } + if (getParam("RootCAfile", param) == 0) + { + _params.rootCAfile = strdup(param); + } + if (getParam("GatewayID", param) == 0) { _params.gatewayId = atoi(param); @@ -65,7 +149,7 @@ void Gateway::initialize(int argc, char** argv) if (getParam("GatewayName", param) == 0) { - _params.gatewayName = (uint8_t*) strdup(param); + _params.gatewayName = strdup(param); } _params.mqttVersion = DEFAULT_MQTT_VERSION; @@ -94,17 +178,16 @@ void Gateway::initialize(int argc, char** argv) if (getParam("LoginID", param) == 0) { - _params.loginId = (uint8_t*) strdup(param); + _params.loginId = strdup(param); } if (getParam("Password", param) == 0) { - _params.password = (uint8_t*) strdup(param); + _params.password = strdup(param); } if (getParam("ClientAuthentication", param) == 0) { - string fileName; if (!strcasecmp(param, "YES")) { if (getParam("ClientsList", param) == 0) @@ -113,15 +196,18 @@ void Gateway::initialize(int argc, char** argv) } else { - fileName = *getConfigDirName() + string(MQTTSNGW_CLIENT_LIST); + fileName = *getConfigDirName() + string(CLIENT_LIST); } if (!_clientList.authorize(fileName.c_str())) { throw Exception("Gateway::initialize: No client list defined by configuration."); } + _params.clientListName = strdup(fileName.c_str()); } } + fileName = *getConfigDirName() + *getConfigFileName(); + _params.configName = strdup(fileName.c_str()); } void Gateway::run(void) @@ -134,17 +220,36 @@ void Gateway::run(void) WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3); WRITELOG("%s\n", GATEWAY_VERSION); WRITELOG("%s\n", PAHO_COPYRIGHT4); - WRITELOG("\n%s %s has been started.\n listening on, %s\n", currentDateTime(), _params.gatewayName, _sensorNetwork.getDescription()); - + WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName); + WRITELOG(" ConfigFile: %s\n", _params.configName); if ( getClientList()->isAuthorized() ) { - WRITELOG("\nClient authentication is required by the configuration settings.\n\n"); + WRITELOG(" ClientList: %s\n", _params.clientListName); } - - /* execute threads & wait StopProcessEvent MQTTSNGWPacketHandleTask posts by CTL-C */ + WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription()); + WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure); + WRITELOG(" RootCApath: %s\n", _params.rootCApath); + WRITELOG(" RootCAfile: %s\n", _params.rootCAfile); + WRITELOG(" CertKey: %s\n", _params.certKey); + WRITELOG(" PrivateKey: %s\n", _params.privateKey); MultiTaskProcess::run(); - WRITELOG("%s MQTT-SN Gateway stoped\n", currentDateTime()); + + /* stop threads */ + Event* ev = new Event(); + ev->setStop(); + _packetEventQue.post(ev); + ev = new Event(); + ev->setStop(); + _brokerSendQue.post(ev); + ev = new Event(); + ev->setStop(); + _clientSendQue.post(ev); + + /* wait until all threads stop */ + MultiTaskProcess::waitStop(); + + WRITELOG("\n%s MQTT-SN Gateway stoped\n\n", currentDateTime()); _lightIndicator.allLightOff(); } @@ -193,7 +298,13 @@ EventQue::EventQue() EventQue::~EventQue() { - + _mutex.lock(); + while (_que.size() > 0) + { + delete _que.front(); + _que.pop(); + } + _mutex.unlock(); } void EventQue::setMaxSize(uint16_t maxSize) @@ -243,14 +354,18 @@ Event* EventQue::timedwait(uint16_t millsec) return ev; } -int EventQue::post(Event* ev) +void EventQue::post(Event* ev) { - int rc = 0; _mutex.lock(); - rc = _que.post(ev); - _sem.post(); + if ( _que.post(ev) ) + { + _sem.post(); + } + else + { + delete ev; + } _mutex.unlock(); - return rc; } int EventQue::size() @@ -269,20 +384,18 @@ Event::Event() { _eventType = Et_NA; _client = 0; - _mqttSNPacket = 0; - _mqttGWPacket = 0; -} - -Event::Event(EventType type) -{ - _eventType = type; - _client = 0; + _sensorNetAddr = 0; _mqttSNPacket = 0; _mqttGWPacket = 0; } Event::~Event() { + if (_sensorNetAddr) + { + delete _sensorNetAddr; + } + if (_mqttSNPacket) { delete _mqttSNPacket; @@ -332,17 +445,34 @@ void Event::setTimeout(void) _eventType = EtTimeout; } +void Event::setStop(void) +{ + _eventType = EtStop; +} + void Event::setBrodcastEvent(MQTTSNPacket* msg) { _mqttSNPacket = msg; _eventType = EtBroadcast; } +void Event::setClientSendEvent(SensorNetAddress* addr, MQTTSNPacket* msg) +{ + _eventType = EtSensornetSend; + _sensorNetAddr = addr; + _mqttSNPacket = msg; +} + Client* Event::getClient(void) { return _client; } +SensorNetAddress* Event::getSensorNetAddress(void) +{ + return _sensorNetAddr; +} + MQTTSNPacket* Event::getMQTTSNPacket() { return _mqttSNPacket; diff --git a/MQTTSNGateway/src/MQTTSNGateway.h b/MQTTSNGateway/src/MQTTSNGateway.h index f3e2b4d..04b6dcf 100644 --- a/MQTTSNGateway/src/MQTTSNGateway.h +++ b/MQTTSNGateway/src/MQTTSNGateway.h @@ -22,16 +22,10 @@ namespace MQTTSNGW { -/*========================================================== - * Gateway default parameters - ===========================================================*/ -#define DEFAULT_KEEP_ALIVE_TIME (900) // 900 secs = 15 mins -#define DEFAULT_MQTT_VERSION (4) // Defualt MQTT version - /*================================= * Starting prompt ==================================*/ -#define GATEWAY_VERSION " * Version: 0.6.0" +#define GATEWAY_VERSION " * Version: 0.9.1" #define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" @@ -49,6 +43,7 @@ namespace MQTTSNGW ===========================================================*/ #define CLIENT "Client" #define CLIENTS "Clients" +#define UNKNOWNCL "Unknown Client !" #define LEFTARROW "<---" #define RIGHTARROW "--->" @@ -87,20 +82,20 @@ namespace MQTTSNGW ====================================*/ enum EventType{ Et_NA = 0, + EtStop, EtTimeout, EtBrokerRecv, EtBrokerSend, EtClientRecv, EtClientSend, EtBroadcast, - EtSocketAlive + EtSensornetSend }; class Event{ public: Event(); - Event(EventType); ~Event(); EventType getEventType(void); void setClientRecvEvent(Client*, MQTTSNPacket*); @@ -109,13 +104,17 @@ public: void setBrokerSendEvent(Client*, MQTTGWPacket*); void setBrodcastEvent(MQTTSNPacket*); // ADVERTISE and GWINFO void setTimeout(void); // Required by EventQue.timedwait() + void setStop(void); + void setClientSendEvent(SensorNetAddress*, MQTTSNPacket*); Client* getClient(void); + SensorNetAddress* getSensorNetAddress(void); MQTTSNPacket* getMQTTSNPacket(void); MQTTGWPacket* getMQTTGWPacket(void); private: EventType _eventType; Client* _client; + SensorNetAddress* _sensorNetAddr; MQTTSNPacket* _mqttSNPacket; MQTTGWPacket* _mqttGWPacket; }; @@ -131,8 +130,8 @@ public: Event* wait(void); Event* timedwait(uint16_t millsec); void setMaxSize(uint16_t maxSize); - int post(Event*); - int size(); + void post(Event*); + int size(); private: Que _que; @@ -145,13 +144,22 @@ private: */ typedef struct { - uint8_t* loginId; - uint8_t* password; + char* configName; + char* clientListName; + char* loginId; + char* password; uint16_t keepAlive; uint8_t gatewayId; uint8_t mqttVersion; uint16_t maxInflightMsgs; - uint8_t* gatewayName; + char* gatewayName; + char* brokerName; + char* port; + char* portSecure; + char* rootCApath; + char* rootCAfile; + char* certKey; + char* privateKey; }GatewayParams; /*===================================== diff --git a/MQTTSNGateway/src/linux/Network.cpp b/MQTTSNGateway/src/linux/Network.cpp index 6d43da7..9af3697 100644 --- a/MQTTSNGateway/src/linux/Network.cpp +++ b/MQTTSNGateway/src/linux/Network.cpp @@ -158,7 +158,7 @@ bool TCPStack::connect(const char* host, const char* service) { if (isValid()) { - return false; + return true; } addrinfo hints; memset(&hints, 0, sizeof(addrinfo)); @@ -192,7 +192,7 @@ bool TCPStack::connect(const char* host, const char* service) if (::connect(sockfd, _addrinfo->ai_addr, _addrinfo->ai_addrlen) < 0) { - //perror("TCPStack connect"); + DEBUGLOG("Can not connect the socket. Check the PortNo! \n"); ::close(sockfd); return false; } @@ -233,6 +233,7 @@ int TCPStack::getSock() =======================================*/ int Network::_numOfInstance = 0; SSL_CTX* Network::_ctx = 0; +SSL_SESSION* Network::_session = 0; Network::Network(bool secure) : TCPStack() @@ -240,196 +241,168 @@ Network::Network(bool secure) : _ssl = 0; _secureFlg = secure; _busy = false; - _session = 0; _sslValid = false; } Network::~Network() { - if (_ssl) - { - SSL_free(_ssl); - _numOfInstance--; - } - if (_session ) - { - SSL_SESSION_free(_session); - } - if (_ctx && _numOfInstance == 0) - { - SSL_CTX_free(_ctx); - _ctx = 0; - ERR_free_strings(); - } + close(); } bool Network::connect(const char* host, const char* port) { + bool rc = false; + _mutex.lock(); if (_secureFlg) { - return false; + goto exit; } if (getSock() == 0) { if (!TCPStack::connect(host, port)) { - return false; + goto exit; } } - return true; + rc = true; +exit: + _mutex.unlock(); + return rc; } -bool Network::connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* cert, const char* prvkey) +bool Network::connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* certkey, const char* prvkey) { char errmsg[256]; char peer_CN[256]; - int rc = 0; + bool rc; - if (!_secureFlg) + _mutex.lock(); + try { - WRITELOG("TLS is not required.\n"); - return false; - } + if (!_secureFlg) + { + WRITELOG("TLS is not required.\n"); + throw; + } - if (_ctx == 0) - { - SSL_load_error_strings(); - SSL_library_init(); - _ctx = SSL_CTX_new(TLS_client_method()); if (_ctx == 0) { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_CTX_new() %s\n", errmsg); - return false; - } - - if (!SSL_CTX_load_verify_locations(_ctx, caFile, caPath)) - { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_CTX_load_verify_locations() %s\n", errmsg); - return false; - } - } - - if (!_sslValid) - { - if ( !TCPStack::connect(host, port) ) - { - return false; - } - /* - if ( _ssl ) - { - if (!SSL_set_fd(_ssl, getSock())) + SSL_load_error_strings(); + SSL_library_init(); + _ctx = SSL_CTX_new(TLS_client_method()); + if (_ctx == 0) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_set_fd() %s\n", errmsg); - SSL_free(_ssl); + WRITELOG("SSL_CTX_new() %s\n", errmsg); + throw; } - else + + if (!SSL_CTX_load_verify_locations(_ctx, caFile, caPath)) { - _sslValid = true; - return true; + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_CTX_load_verify_locations() %s\n", errmsg); + throw; + } + + if ( certkey ) + { + if ( SSL_CTX_use_certificate_file(_ctx, certkey, SSL_FILETYPE_PEM) != 1 ) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_CTX_use_certificate_file() %s %s\n", certkey, errmsg); + throw; + } + } + if ( prvkey ) + { + if ( SSL_CTX_use_PrivateKey_file(_ctx, prvkey, SSL_FILETYPE_PEM) != 1 ) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_use_PrivateKey_file() %s %s\n", prvkey, errmsg); + throw; + } } } - */ - } - _ssl = SSL_new(_ctx); - if (_ssl == 0) - { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_new() %s\n", errmsg); - return false; - } + if (! TCPStack::isValid()) + { + if ( !TCPStack::connect(host, port) ) + { + throw; + } + } - if (!SSL_set_fd(_ssl, getSock())) - { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_set_fd() %s\n", errmsg); - SSL_free(_ssl); - } - - SSL_set_options(_ssl, SSL_OP_NO_TICKET); - - if ( cert ) - { - if ( SSL_use_certificate_file(_ssl, cert, SSL_FILETYPE_PEM) <= 0 ) + _ssl = SSL_new(_ctx); + if (_ssl == 0) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_use_certificate_file() %s %s\n", cert, errmsg); - SSL_free(_ssl); - _ssl = 0; - return false; + WRITELOG("SSL_new() %s\n", errmsg); + throw; } - } - if ( prvkey ) - { - if ( SSL_use_PrivateKey_file(_ssl, prvkey, SSL_FILETYPE_PEM) <= 0 ) + + if (!SSL_set_fd(_ssl, TCPStack::getSock())) { ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_use_PrivateKey_file() %s %s\n", prvkey, errmsg); + WRITELOG("SSL_set_fd() %s\n", errmsg); SSL_free(_ssl); _ssl = 0; - return false; + throw; } - } - if (!SSL_set_fd(_ssl, TCPStack::getSock())) - { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_set_fd() %s\n", errmsg); - SSL_free(_ssl); - _ssl = 0; - return false; - } + if (_session) + { + SSL_set_session(_ssl, _session); + } - if (_session) - { - rc = SSL_set_session(_ssl, _session); - } + if (SSL_connect(_ssl) != 1) + { + ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); + WRITELOG("SSL_connect() %s\n", errmsg); + SSL_free(_ssl); + _ssl = 0; + throw; + } - if (SSL_connect(_ssl) != 1) - { - ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); - WRITELOG("SSL_connect() %s\n", errmsg); - SSL_free(_ssl); - _ssl = 0; - return false; - } + int result; + if ( (result = SSL_get_verify_result(_ssl)) != X509_V_OK) + { + WRITELOG("SSL_get_verify_result() error: %s.\n", X509_verify_cert_error_string(result)); + SSL_free(_ssl); + _ssl = 0; + throw; + } - if ( (rc = SSL_get_verify_result(_ssl)) != X509_V_OK) - { - WRITELOG("SSL_get_verify_result() error: %s.\n", X509_verify_cert_error_string(rc)); - SSL_free(_ssl); - _ssl = 0; - return false; - } + X509* peer = SSL_get_peer_certificate(_ssl); + X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, 256); + char* pos = peer_CN; + if ( *pos == '*') + { + while (*host++ != '.'); + pos += 2; + } + if ( strcmp(host, pos)) + { + WRITELOG("SSL_get_peer_certificate() error: Broker %s dosen't match the host name %s\n", peer_CN, host); + SSL_free(_ssl); + _ssl = 0; + throw; + } - X509* peer = SSL_get_peer_certificate(_ssl); - X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, 256); - char* pos = peer_CN; - if ( *pos == '*') - { - while (*host++ != '.'); - pos += 2; + if (_session == 0) + { + _session = SSL_get1_session(_ssl); + } + _numOfInstance++; + _sslValid = true; + rc = true; } - if ( strcmp(host, pos)) + catch (...) { - WRITELOG("SSL_get_peer_certificate() error: Broker %s dosen't match the host name %s\n", peer_CN, host); - SSL_free(_ssl); - _ssl = 0; - return false; + rc = false; } - - if (_session == 0) - { - _session = SSL_get1_session(_ssl); - } - _numOfInstance++; - _sslValid = true; - return true; + _mutex.unlock(); + return rc; } int Network::send(const uint8_t* buf, uint16_t length) @@ -447,6 +420,12 @@ int Network::send(const uint8_t* buf, uint16_t length) else { _mutex.lock(); + + if ( !_ssl ) + { + _mutex.unlock(); + return -1; + } _busy = true; while (true) @@ -516,8 +495,14 @@ int Network::recv(uint8_t* buf, uint16_t len) return 0; } _mutex.lock(); - _busy = true; + if ( !_ssl ) + { + _mutex.unlock(); + return 0; + } + + _busy = true; loop: do { readBlockedOnWrite = false; @@ -535,7 +520,8 @@ int Network::recv(uint8_t* buf, uint16_t len) case SSL_ERROR_ZERO_RETURN: SSL_shutdown(_ssl); _ssl = 0; - TCPStack::close(); + _numOfInstance--; + //TCPStack::close(); _busy = false; _mutex.unlock(); return -1; @@ -585,44 +571,53 @@ int Network::recv(uint8_t* buf, uint16_t len) void Network::close(void) { + _mutex.lock(); if (_secureFlg) { - _sslValid = false; - SSL_free(_ssl); - _ssl = 0; + if (_ssl) + { + SSL_shutdown(_ssl); + SSL_free(_ssl); + _numOfInstance--; + _ssl = 0; + _sslValid = false; + _busy = false; + } + if (_session && _numOfInstance == 0) + { + SSL_SESSION_free(_session); + _session = 0; + } + if (_ctx && _numOfInstance == 0) + { + SSL_CTX_free(_ctx); + _ctx = 0; + ERR_free_strings(); + } } TCPStack::close(); + _mutex.unlock(); } bool Network::isValid() { - if (_secureFlg) + if ( TCPStack::isValid() ) { - if (_sslValid && !_busy) + if (_secureFlg) + { + if (_sslValid && !_busy) + { + return true; + } + } + else { return true; } } - else - { - return TCPStack::isValid(); - } return false; } -void Network::disconnect() -{ - if (_ssl) - { - SSL_shutdown(_ssl); - _ssl = 0; - } - _sslValid = false; - _busy = false; - TCPStack::close(); - -} - int Network::getSock() { return TCPStack::getSock(); diff --git a/MQTTSNGateway/src/linux/Network.h b/MQTTSNGateway/src/linux/Network.h index c5bfaed..f5e15bd 100644 --- a/MQTTSNGateway/src/linux/Network.h +++ b/MQTTSNGateway/src/linux/Network.h @@ -71,9 +71,8 @@ public: Network(bool secure); virtual ~Network(); - bool connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* sert, const char* prvkey); + bool connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* cert, const char* prvkey); bool connect(const char* host, const char* port); - void disconnect(void); void close(void); int send(const uint8_t* buf, uint16_t length); int recv(uint8_t* buf, uint16_t len); @@ -84,9 +83,8 @@ public: private: static SSL_CTX* _ctx; + static SSL_SESSION* _session; static int _numOfInstance; - - SSL_SESSION* _session; SSL* _ssl; bool _secureFlg; Mutex _mutex; diff --git a/MQTTSNGateway/src/linux/Threading.cpp b/MQTTSNGateway/src/linux/Threading.cpp index b6eeb5c..a245118 100644 --- a/MQTTSNGateway/src/linux/Threading.cpp +++ b/MQTTSNGateway/src/linux/Threading.cpp @@ -160,12 +160,11 @@ Semaphore::Semaphore(const char* name, unsigned int val) { throw Exception( -1, "Semaphore can't be created."); } - _name = (char*) calloc(strlen(name + 1), 1); + _name = strdup(name); if (_name == NULL) { throw Exception( -1, "Semaphore can't allocate memories."); } - _name = strdup(name); } Semaphore::~Semaphore() @@ -174,7 +173,7 @@ Semaphore::~Semaphore() { sem_close(_psem); sem_unlink(_name); - free((void*) _name); + free(_name); } else { @@ -236,19 +235,25 @@ void Semaphore::timedwait(uint16_t millsec) =========================================*/ RingBuffer::RingBuffer() { - RingBuffer(MQTTSNGW_CONFIG_DIRECTORY); + RingBuffer(MQTTSNGW_KEY_DIRECTORY); } RingBuffer::RingBuffer(const char* keyDirectory) { int fp = 0; string fileName = keyDirectory + string(MQTTSNGW_RINGBUFFER_KEY); - fp = open(fileName.c_str(), O_CREAT, 0); - close(fp); + fp = open(fileName.c_str(), O_CREAT, S_IRGRP); + if ( fp > 0 ) + { + close(fp); + } fileName = keyDirectory + string(MQTTSNGW_RB_MUTEX_KEY); - fp = open(fileName.c_str(), O_CREAT, 0); - close(fp); + fp = open(fileName.c_str(), O_CREAT, S_IRGRP); + if ( fp > 0 ) + { + close(fp); + } key_t key = ftok(MQTTSNGW_RINGBUFFER_KEY, 1); @@ -302,10 +307,6 @@ RingBuffer::~RingBuffer() { shmctl(_shmid, IPC_RMID, NULL); } - if (_pmx > 0) - { - delete _pmx; - } } else { @@ -314,6 +315,11 @@ RingBuffer::~RingBuffer() shmdt(_shmaddr); } } + + if (_pmx > 0) + { + delete _pmx; + } } void RingBuffer::put(char* data) @@ -469,14 +475,11 @@ void RingBuffer::reset() =====================================*/ Thread::Thread() { - _stopProcessEvent = theMultiTaskProcess->getStopProcessEvent(); _threadID = 0; } Thread::~Thread() { - pthread_cancel(_threadID); - pthread_join(_threadID, 0); } void* Thread::_run(void* runnable) @@ -508,10 +511,6 @@ int Thread::start(void) void Thread::stopProcess(void) { - _stopProcessEvent->post(); + theMultiTaskProcess->threadStoped(); } -void Thread::cancel(void) -{ - pthread_cancel(_threadID); -} diff --git a/MQTTSNGateway/src/linux/Threading.h b/MQTTSNGateway/src/linux/Threading.h index 185ce69..91e7e05 100644 --- a/MQTTSNGateway/src/linux/Threading.h +++ b/MQTTSNGateway/src/linux/Threading.h @@ -23,7 +23,7 @@ namespace MQTTSNGW { - +#define MQTTSNGW_KEY_DIRECTORY "./" #define MQTTSNGW_RINGBUFFER_KEY "ringbuffer.key" #define MQTTSNGW_RB_MUTEX_KEY "rbmutex.key" #define MQTTSNGW_RB_SEMAPHOR_NAME "/rbsemaphor" @@ -101,17 +101,12 @@ public: virtual void EXECRUN(){} }; - #define MAGIC_WORD_FOR_THREAD \ public: void EXECRUN() \ { \ try \ { \ run(); \ - } \ - catch(Exception& ex) \ - { \ - ex.writeMessage();\ stopProcess(); \ } \ catch(...) \ @@ -132,13 +127,10 @@ public: static bool equals(pthread_t*, pthread_t*); virtual void initialize(int argc, char** argv); void stopProcess(void); - void cancel(void); + void waitStop(void); private: - pthread_t _threadID; - Semaphore* _stopProcessEvent; - static void* _run(void*); - + pthread_t _threadID; }; } diff --git a/MQTTSNGateway/src/linux/Timer.cpp b/MQTTSNGateway/src/linux/Timer.cpp index e0af9ed..487b50b 100644 --- a/MQTTSNGateway/src/linux/Timer.cpp +++ b/MQTTSNGateway/src/linux/Timer.cpp @@ -180,24 +180,30 @@ void LightIndicator::init() pinMode(LIGHT_INDICATOR_BLUE); } -void LightIndicator::lit(int gpioNo, const char* onoff) +int LightIndicator::lit(int gpioNo, const char* onoff) { + int rc = 0; if( _gpio[gpioNo] ) { - write(_gpio[gpioNo], onoff, 1); + rc = write(_gpio[gpioNo], onoff, 1); } + return rc; } void LightIndicator::pinMode(int gpioNo) { - int fd = open("/sys/class/gpio/export", O_WRONLY); + int rc = 0; + int fd = rc; // eliminate unused warnning of compiler + + fd = open("/sys/class/gpio/export", O_WRONLY); if ( fd < 0 ) { return; } char no[4]; + sprintf(no,"%d", gpioNo); - write(fd, no, strlen(no)); + rc = write(fd, no, strlen(no)); close(fd); char fileName[64]; @@ -208,9 +214,8 @@ void LightIndicator::pinMode(int gpioNo) { return; } - write(fd,"out", 3); + rc = write(fd,"out", 3); close(fd); - sprintf( fileName, "/sys/class/gpio/gpio%d/value", gpioNo); fd = open(fileName, O_WRONLY); if ( fd > 0 ) diff --git a/MQTTSNGateway/src/linux/Timer.h b/MQTTSNGateway/src/linux/Timer.h index 21862e9..43885b1 100644 --- a/MQTTSNGateway/src/linux/Timer.h +++ b/MQTTSNGateway/src/linux/Timer.h @@ -61,7 +61,7 @@ public: private: void init(); - void lit(int gpioNo, const char* onoff); + int lit(int gpioNo, const char* onoff); void pinMode(int gpioNo); bool _greenStatus; int _gpio[MAX_GPIO + 1]; diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp index 9c54d4e..263d3ce 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.cpp @@ -45,6 +45,16 @@ SensorNetAddress::~SensorNetAddress() } +uint32_t SensorNetAddress::getIpAddress(void) +{ + return _IpAddr; +} + +uint16_t SensorNetAddress::getPortNo(void) +{ + return _portNo; +} + void SensorNetAddress::setAddress(uint32_t IpAddr, uint16_t port) { _IpAddr = IpAddr; @@ -96,12 +106,10 @@ SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr) ============================================*/ SensorNetwork::SensorNetwork() { - } SensorNetwork::~SensorNetwork() { - } int SensorNetwork::unicast(const uint8_t* payload, uint16_t payloadLength, SensorNetAddress* sendToAddr) @@ -141,7 +149,7 @@ int SensorNetwork::initialize(void) if (theProcess->getParam("GatewayPortNo", param) == 0) { unicastPortNo = atoi(param); - _description += " and Gateway Port "; + _description += " Gateway Port "; _description += param; } @@ -302,10 +310,12 @@ int UDPPort::broadcast(const uint8_t* buf, uint32_t length) int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr) { + struct timeval timeout; fd_set recvfds; int maxSock = 0; - int rc = 0; + timeout.tv_sec = 0; + timeout.tv_usec = 1000000; // 1 sec FD_ZERO(&recvfds); FD_SET(_sockfdUnicast, &recvfds); FD_SET(_sockfdMulticast, &recvfds); @@ -319,15 +329,17 @@ int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr) maxSock = _sockfdUnicast; } - select(maxSock + 1, &recvfds, 0, 0, 0); - - if (FD_ISSET(_sockfdUnicast, &recvfds)) + int rc = 0; + if ( select(maxSock + 1, &recvfds, 0, 0, &timeout) > 0 ) { - rc = recvfrom(_sockfdUnicast, buf, len, 0, addr); - } - else if (FD_ISSET(_sockfdMulticast, &recvfds)) - { - rc = recvfrom(_sockfdMulticast, buf, len, 0, &_grpAddr); + if (FD_ISSET(_sockfdUnicast, &recvfds)) + { + rc = recvfrom(_sockfdUnicast, buf, len, 0, addr); + } + else if (FD_ISSET(_sockfdMulticast, &recvfds)) + { + rc = recvfrom(_sockfdMulticast, buf, len, 0, &_grpAddr); + } } return rc; } diff --git a/MQTTSNGateway/src/linux/udp/SensorNetwork.h b/MQTTSNGateway/src/linux/udp/SensorNetwork.h index 86d4595..021c9bc 100644 --- a/MQTTSNGateway/src/linux/udp/SensorNetwork.h +++ b/MQTTSNGateway/src/linux/udp/SensorNetwork.h @@ -41,14 +41,8 @@ public: ~SensorNetAddress(); void setAddress(uint32_t IpAddr, uint16_t port); int setAddress(string* data); - uint32_t getIpAddress(void) - { - return _IpAddr; - } - uint16_t getPortNo(void) - { - return _portNo; - } + uint16_t getPortNo(void); + uint32_t getIpAddress(void); bool isMatch(SensorNetAddress* addr); SensorNetAddress& operator =(SensorNetAddress& addr); diff --git a/MQTTSNGateway/src/tests/TestProcessFramework.cpp b/MQTTSNGateway/src/tests/TestProcessFramework.cpp index 6b6fe04..627b2e0 100644 --- a/MQTTSNGateway/src/tests/TestProcessFramework.cpp +++ b/MQTTSNGateway/src/tests/TestProcessFramework.cpp @@ -115,6 +115,7 @@ void TestProcessFramework::run(void) printf("%s Timer 1sec\n", currentDateTime()); MultiTaskProcess::run(); + printf("ProcessFramework test complited.\n"); } diff --git a/MQTTSNGateway/src/tests/TestTask.cpp b/MQTTSNGateway/src/tests/TestTask.cpp index cf1667e..ae5cff8 100644 --- a/MQTTSNGateway/src/tests/TestTask.cpp +++ b/MQTTSNGateway/src/tests/TestTask.cpp @@ -39,11 +39,12 @@ void TestTask::run(void) { while(true) { - printf("Task is running. Enter CTRL+C \n"); - if (theProcess->checkSignal() == SIGINT) + if ( CHK_SIGINT) { - throw Exception("Terminated by CTL-C"); + printf("Task stopped.\n"); + return; } + printf("Task is running. Enter CTRL+C\n"); sleep(1); } }