Merge pull request #32 from ty4tw/gateway

Update: DISCONNECT Issue #31 and others
This commit is contained in:
Ian Craggs
2016-10-16 21:02:09 +01:00
committed by GitHub
31 changed files with 668 additions and 534 deletions

View File

@@ -52,6 +52,9 @@
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="ssl"/> <listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="ssl"/>
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="crypto"/> <listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="crypto"/>
</option> </option>
<option id="gnu.cpp.link.option.paths.170974409" name="Library search path (-L)" superClass="gnu.cpp.link.option.paths" useByScannerDiscovery="false" valueType="libPaths">
<listOptionValue builtIn="false" value="/usr/local/lib"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.cpp.linker.input.98211496" superClass="cdt.managedbuild.tool.gnu.cpp.linker.input"> <inputType id="cdt.managedbuild.tool.gnu.cpp.linker.input.98211496" superClass="cdt.managedbuild.tool.gnu.cpp.linker.input">
<additionalInput kind="additionalinputdependency" paths="$(USER_OBJS)"/> <additionalInput kind="additionalinputdependency" paths="$(USER_OBJS)"/>
<additionalInput kind="additionalinput" paths="$(LIBS)"/> <additionalInput kind="additionalinput" paths="$(LIBS)"/>
@@ -63,7 +66,7 @@
</toolChain> </toolChain>
</folderInfo> </folderInfo>
<sourceEntries> <sourceEntries>
<entry excluding="MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/> <entry excluding="MQTTSNGateway/src/tests/mainTestProcessFramework.cpp|MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
</sourceEntries> </sourceEntries>
</configuration> </configuration>
</storageModule> </storageModule>
@@ -130,7 +133,7 @@
</folderInfo> </folderInfo>
<sourceEntries> <sourceEntries>
<entry excluding="MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNPacket/src|MQTTSNGateway/src|MQTTSNGateway/src/linux|MQTTSNGateway/src/linux/udp|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/> <entry excluding="MQTTSNGateway/GatewayTester/samples/mainOTA.cpp|MQTTSNGateway/GatewayTester/samples/mainTest.cpp|MQTTSNPacket/src|MQTTSNGateway/src|MQTTSNGateway/src/linux|MQTTSNGateway/src/linux/udp|MQTTSNGateway/src/mainLogmonitor.cpp|MQTTSNGateway/src/linux/xbee|MQTTSNPacket/test|MQTTSNPacket/samples" flags="VALUE_WORKSPACE_PATH|RESOLVED" kind="sourcePath" name=""/>
<entry excluding="mainLogmonitor.cpp|linux|linux/udp" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src"/> <entry excluding="tests/mainTestProcessFramework.cpp|mainLogmonitor.cpp|linux|linux/udp" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src"/>
<entry excluding="xbee" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src/linux"/> <entry excluding="xbee" flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNGateway/src/linux"/>
<entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNPacket/src"/> <entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="MQTTSNPacket/src"/>
</sourceEntries> </sourceEntries>

View File

@@ -22,7 +22,8 @@ ClientAuthentication=NO
#RootCAfile=/etc/ssl/certs/ca-certificates.crt #RootCAfile=/etc/ssl/certs/ca-certificates.crt
#RootCApath=/etc/ssl/certs/ #RootCApath=/etc/ssl/certs/
#CertsDirectory=/usr/share/GW/IoTcerts/ #CertsFile=/path/to/certKey.pem
#PrivateKey=/path/to/privateKey.pem
GatewayID=1 GatewayID=1
GatewayName=PahoGateway-01 GatewayName=PahoGateway-01
@@ -40,3 +41,6 @@ Baudrate=38400
SerialDevice=/dev/ttyUSB0 SerialDevice=/dev/ttyUSB0
ApiMode=2 ApiMode=2
# LOG
ShearedMemory=NO;

View File

@@ -100,7 +100,7 @@ void MQTTGWConnectionHandler::handleDisconnect(Client* client, MQTTGWPacket* pac
MQTTSNPacket* snPacket = new MQTTSNPacket(); MQTTSNPacket* snPacket = new MQTTSNPacket();
snPacket->setDISCONNECT(0); snPacket->setDISCONNECT(0);
client->disconnected(); client->disconnected();
client->getNetwork()->close();
Event* ev1 = new Event(); Event* ev1 = new Event();
ev1->setClientSendEvent(client, snPacket); ev1->setClientSendEvent(client, snPacket);
} }

View File

@@ -31,12 +31,9 @@ public:
void handleConnack(Client* client, MQTTGWPacket* packet); void handleConnack(Client* client, MQTTGWPacket* packet);
void handlePingresp(Client* client, MQTTGWPacket* packet); void handlePingresp(Client* client, MQTTGWPacket* packet);
void handleDisconnect(Client* client, MQTTGWPacket* packet); void handleDisconnect(Client* client, MQTTGWPacket* packet);
private: private:
Gateway* _gateway; Gateway* _gateway;
}; };
} }
#endif /* MQTTGWCONNECTIONHANDLER_H_ */ #endif /* MQTTGWCONNECTIONHANDLER_H_ */

View File

@@ -184,9 +184,10 @@ int MQTTGWPacket::recv(Network* network)
unsigned char c; unsigned char c;
/* read First Byte of Packet */ /* read First Byte of Packet */
if (network->recv((unsigned char*)&_header.byte, 1) == -1) int rc = network->recv((unsigned char*)&_header.byte, 1);
if ( rc <= 0)
{ {
return -1; return rc;
} }
/* read RemainingLength */ /* read RemainingLength */
do do

View File

@@ -59,6 +59,12 @@ void BrokerRecvTask::run(void)
while (true) while (true)
{ {
_light->blueLight(false);
if (CHK_SIGINT)
{
WRITELOG("%s BrokerRecvTask stopped.\n", currentDateTime());
return;
}
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 500000; // 500 msec timeout.tv_usec = 500000; // 500 msec
FD_ZERO(&rset); FD_ZERO(&rset);
@@ -68,7 +74,6 @@ void BrokerRecvTask::run(void)
/* Prepare sockets list to read */ /* Prepare sockets list to read */
Client* client = _gateway->getClientList()->getClient(); Client* client = _gateway->getClientList()->getClient();
_light->blueLight(false);
while (client > 0) while (client > 0)
{ {
@@ -111,26 +116,18 @@ void BrokerRecvTask::run(void)
if ( log(client, packet) == -1 ) if ( log(client, packet) == -1 )
{ {
continue; delete packet;
goto nextClient;
} }
/* post a BrokerRecvEvent */ /* post a BrokerRecvEvent */
ev = new Event(); ev = new Event();
ev->setBrokerRecvEvent(client, packet); ev->setBrokerRecvEvent(client, packet);
if ( _gateway->getPacketEventQue()->post(ev) == 0 ) _gateway->getPacketEventQue()->post(ev);
{
delete ev;
}
} }
else else
{ {
_light->blueLight(false); if (rc == -1)
if ( rc == 0 )
{
delete packet;
continue;
}
else if (rc == -1)
{ {
WRITELOG("%s BrokerRecvTask can't receive a packet from the broker errno=%d %s%s\n", ERRMSG_HEADER, errno, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s BrokerRecvTask can't receive a packet from the broker errno=%d %s%s\n", ERRMSG_HEADER, errno, client->getClientId(), ERRMSG_FOOTER);
} }
@@ -140,27 +137,28 @@ void BrokerRecvTask::run(void)
} }
else if ( rc == -3 ) else if ( rc == -3 )
{ {
WRITELOG("%s BrokerRecvTask can't create the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER); WRITELOG("%s BrokerRecvTask can't get memories for the packet %s%s\n", ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
} }
/* disconnect the client */
client->disconnected();
client->getNetwork()->disconnect();
rc = 0;
delete packet; delete packet;
if ( (rc == -1 || rc == -2) && client->isActive() )
{
/* disconnect the client */
packet = new MQTTGWPacket();
packet->setHeader(DISCONNECT);
ev = new Event();
ev->setBrokerRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev);
}
} }
} }
} }
nextClient:
client = client->getNextClient(); client = client->getNextClient();
} }
_light->blueLight(false);
} }
} }
else
{
_light->greenLight(false);
}
maxSock = 0;
} }
} }
@@ -176,7 +174,6 @@ int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet)
switch (packet->getType()) switch (packet->getType())
{ {
case CONNACK: case CONNACK:
case DISCONNECT:
WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf)); WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf));
break; break;
case PUBLISH: case PUBLISH:
@@ -196,6 +193,7 @@ int BrokerRecvTask::log(Client* client, MQTTGWPacket* packet)
WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf)); WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), LEFTARROW, client->getClientId(), packet->print(pbuf));
break; break;
default: default:
WRITELOG("Type=%x\n", packet->getType());
rc = -1; rc = -1;
break; break;
} }

View File

@@ -34,41 +34,13 @@ BrokerSendTask::BrokerSendTask(Gateway* gateway)
{ {
_gateway = gateway; _gateway = gateway;
_gateway->attach((Thread*)this); _gateway->attach((Thread*)this);
_host = 0; _gwparams = 0;
_port = 0;
_portSecure = 0;
_certDirectory = 0;
_rootCAfile = 0;
_light = 0; _light = 0;
} }
BrokerSendTask::~BrokerSendTask() BrokerSendTask::~BrokerSendTask()
{ {
if (_host)
{
free(_host);
}
if (_port)
{
free(_port);
}
if (_portSecure)
{
free(_portSecure);
}
if (_certDirectory)
{
free(_certDirectory);
}
if (_rootCApath)
{
free(_rootCApath);
}
if (_rootCAfile)
{
free(_rootCAfile);
}
} }
/** /**
@@ -76,34 +48,7 @@ BrokerSendTask::~BrokerSendTask()
*/ */
void BrokerSendTask::initialize(int argc, char** argv) void BrokerSendTask::initialize(int argc, char** argv)
{ {
char param[MQTTSNGW_PARAM_MAX]; _gwparams = _gateway->getGWParams();
if (_gateway->getParam("BrokerName", param) == 0)
{
_host = strdup(param);
}
if (_gateway->getParam("BrokerPortNo", param) == 0)
{
_port = strdup(param);
}
if (_gateway->getParam("BrokerSecurePortNo", param) == 0)
{
_portSecure = strdup(param);
}
if (_gateway->getParam("CertsDirectory", param) == 0)
{
_certDirectory = strdup(param);
}
if (_gateway->getParam("RootCApath", param) == 0)
{
_rootCApath = strdup(param);
}
if (_gateway->getParam("RootCAfile", param) == 0)
{
_rootCAfile = strdup(param);
}
_light = _gateway->getLightIndicator(); _light = _gateway->getLightIndicator();
} }
@@ -115,83 +60,81 @@ void BrokerSendTask::run()
Event* ev = 0; Event* ev = 0;
MQTTGWPacket* packet = 0; MQTTGWPacket* packet = 0;
Client* client = 0; Client* client = 0;
int rc = 0;
while (true) while (true)
{ {
int rc = 0;
ev = _gateway->getBrokerSendQue()->wait(); ev = _gateway->getBrokerSendQue()->wait();
client = ev->getClient();
packet = ev->getMQTTGWPacket();
if ( client->getNetwork()->isValid() && !client->getNetwork()->isSecure() && packet->getType() == CONNECT ) if ( ev->getEventType() == EtStop )
{ {
client->getNetwork()->close(); WRITELOG("%s BrokerSendTask stopped.\n", currentDateTime());
delete ev;
return;
} }
if ( !client->getNetwork()->isValid() ) if ( ev->getEventType() == EtBrokerSend)
{ {
/* connect to the broker and send a packet */ client = ev->getClient();
char* portNo = _port; packet = ev->getMQTTGWPacket();
const char* cert = 0;
const char* keyFile = 0;
string certFile;
string privateKeyFile;
if (client->isSecureNetwork()) if ( packet->getType() == CONNECT && client->getNetwork()->isValid() )
{ {
portNo = _portSecure; client->getNetwork()->close();
if ( _certDirectory ) }
if ( !client->getNetwork()->isValid() )
{
/* connect to the broker and send a packet */
if (client->isSecureNetwork())
{ {
certFile = _certDirectory; rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->portSecure, _gwparams->rootCApath,
certFile += client->getClientId(); _gwparams->rootCAfile, _gwparams->certKey, _gwparams->privateKey);
certFile += ".crt";
cert = certFile.c_str();
privateKeyFile = _certDirectory;
privateKeyFile += client->getClientId();
privateKeyFile += ".key";
keyFile = privateKeyFile.c_str();
} }
rc = client->getNetwork()->connect(_host, portNo, _rootCApath, _rootCAfile, cert, keyFile); else
{
rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->port);
}
if ( !rc )
{
/* disconnect the broker and the client */
WRITELOG("%s BrokerSendTask can't connect to the broker. %s%s\n",
ERRMSG_HEADER, client->getClientId(), ERRMSG_FOOTER);
delete ev;
client->getNetwork()->close();
continue;
}
}
/* send a packet */
_light->blueLight(true);
if ( (rc = packet->send(client->getNetwork())) > 0 )
{
if ( packet->getType() == CONNECT )
{
client->setWaitWillMsgFlg(false);
client->connectSended();
}
log(client, packet);
} }
else else
{ {
rc = client->getNetwork()->connect(_host, portNo);
}
if ( !rc )
{
/* disconnect the broker and change the client's status */
WRITELOG("%s BrokerSendTask can't connect to the broker. errno=%d %s%s\n",
ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER);
client->disconnected();
client->getNetwork()->disconnect();
delete ev;
continue;
}
}
/* send a packet */
_light->blueLight(true);
if ( (rc = packet->send(client->getNetwork())) > 0 )
{
if ( packet->getType() == CONNECT )
{
client->setWaitWillMsgFlg(false);
client->connectSended();
}
log(client, packet);
}
else
{
WRITELOG("%s BrokerSendTask can't send a packet. errno=%d %s%s\n",
ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER);
WRITELOG("%s BrokerSendTask can't send a packet to the broker errno=%d %s%s\n", WRITELOG("%s BrokerSendTask can't send a packet to the broker errno=%d %s%s\n",
ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER); ERRMSG_HEADER, rc == -1 ? errno : 0, client->getClientId(), ERRMSG_FOOTER);
client->disconnected(); client->getNetwork()->close();
client->getNetwork()->disconnect();
}
_light->blueLight(false); /* Disconnect the client */
packet = new MQTTGWPacket();
packet->setHeader(DISCONNECT);
Event* ev1 = new Event();
ev1->setBrokerRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev1);
}
_light->blueLight(false);
}
delete ev; delete ev;
} }
} }
@@ -231,3 +174,4 @@ void BrokerSendTask::log(Client* client, MQTTGWPacket* packet)
break; break;
} }
} }

View File

@@ -35,14 +35,8 @@ public:
void run(); void run();
private: private:
void log(Client*, MQTTGWPacket*); void log(Client*, MQTTGWPacket*);
Gateway* _gateway; Gateway* _gateway;
char* _host; GatewayParams* _gwparams;
char* _port;
char* _portSecure;
char* _rootCApath;
char* _rootCAfile;
char* _certDirectory;
LightIndicator* _light; LightIndicator* _light;
}; };

View File

@@ -102,9 +102,7 @@ bool ClientList::authorize(const char* fileName)
} }
pos = data.find_first_of(","); pos = data.find_first_of(",");
string id = data.substr(0, pos); string id = data.substr(0, pos);
clientId.cstring = strdup(id.c_str()); clientId.cstring = strdup(id.c_str());
string addr = data.substr(pos + 1); string addr = data.substr(pos + 1);
if (netAddr.setAddress(&addr) == 0) if (netAddr.setAddress(&addr) == 0)
@@ -117,6 +115,7 @@ bool ClientList::authorize(const char* fileName)
{ {
WRITELOG("Invalid address %s\n", data.c_str()); WRITELOG("Invalid address %s\n", data.c_str());
} }
free(clientId.cstring);
} }
fclose(fp); fclose(fp);
_authorize = true; _authorize = true;
@@ -375,13 +374,9 @@ void Client::setClientSleepPacket(MQTTSNPacket* packet)
_clientSleepPacketQue.post(packet); _clientSleepPacketQue.post(packet);
} }
void Client::checkTimeover(void) bool Client::checkTimeover(void)
{ {
if (_status == Cstat_Active && _keepAliveTimer.isTimeup()) return (_status == Cstat_Active && _keepAliveTimer.isTimeup());
{
_status = Cstat_Lost;
_network->disconnect();
}
} }
void Client::setKeepAlive(MQTTSNPacket* packet) void Client::setKeepAlive(MQTTSNPacket* packet)

View File

@@ -248,7 +248,7 @@ public:
void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedPubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type); void setWaitedSubTopicId(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type);
void checkTimeover(void); bool checkTimeover(void);
void updateStatus(MQTTSNPacket*); void updateStatus(MQTTSNPacket*);
void updateStatus(ClientStatus); void updateStatus(ClientStatus);
void connectSended(void); void connectSended(void);

View File

@@ -59,6 +59,13 @@ void ClientRecvTask::run()
MQTTSNPacket* packet = new MQTTSNPacket(); MQTTSNPacket* packet = new MQTTSNPacket();
int packetLen = packet->recv(_sensorNetwork); int packetLen = packet->recv(_sensorNetwork);
if (CHK_SIGINT)
{
WRITELOG("%s ClientRecvTask stopped.\n", currentDateTime());
delete packet;
return;
}
if (packetLen < 2 ) if (packetLen < 2 )
{ {
delete packet; delete packet;
@@ -77,10 +84,7 @@ void ClientRecvTask::run()
log(0, packet); log(0, packet);
ev = new Event(); ev = new Event();
ev->setBrodcastEvent(packet); ev->setBrodcastEvent(packet);
if ( _gateway->getPacketEventQue()->post(ev) == 0 ) _gateway->getPacketEventQue()->post(ev);
{
delete ev;
}
continue; continue;
} }
@@ -93,10 +97,7 @@ void ClientRecvTask::run()
log(client, packet); log(client, packet);
ev = new Event(); ev = new Event();
ev->setClientRecvEvent(client,packet); ev->setClientRecvEvent(client,packet);
if ( _gateway->getPacketEventQue()->post(ev) == 0 ) _gateway->getPacketEventQue()->post(ev);
{
delete ev;
}
} }
else else
{ {
@@ -108,41 +109,60 @@ void ClientRecvTask::run()
packet->getCONNECT(&data); packet->getCONNECT(&data);
/* create a client */ /* create a client */
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); //_gateway->getGWParams()->secureConnection); client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
log(client, packet, &data.clientID);
if (!client) if (!client)
{ {
WRITELOG("%s Can't create a Client. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, ERRMSG_FOOTER); WRITELOG("%s Client was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, ERRMSG_FOOTER);
delete packet; delete packet;
continue; continue;
} }
log(client, packet);
/* set sensorNetAddress & post Event */ /* set sensorNetAddress & post Event */
client->setClientAddress(_sensorNetwork->getSenderAddress()); client->setClientAddress(_sensorNetwork->getSenderAddress());
ev = new Event(); ev = new Event();
ev->setClientRecvEvent(client, packet); ev->setClientRecvEvent(client, packet);
if ( _gateway->getPacketEventQue()->post(ev) == 0 ) _gateway->getPacketEventQue()->post(ev);
{
delete ev;
}
} }
else else
{ {
log(client, packet); log(client, packet);
delete packet; delete packet;
/* Send DISCONNECT */
SensorNetAddress* addr = new SensorNetAddress();
*addr = (*_sensorNetwork->getSenderAddress());
packet = new MQTTSNPacket();
packet->setDISCONNECT(0);
ev = new Event();
ev->setClientSendEvent(addr, packet);
_gateway->getClientSendQue()->post(ev);
continue; continue;
} }
} }
} }
} }
void ClientRecvTask::log(Client* client, MQTTSNPacket* packet) void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id)
{ {
char pbuf[SIZE_OF_LOG_PACKET * 3]; char pbuf[SIZE_OF_LOG_PACKET * 3];
const char* clientId;
char cstr[MAX_CLIENTID_LENGTH + 1];
char msgId[6]; char msgId[6];
const char* clientId = client ? (const char*)client->getClientId() :"Non Active Client !" ;
if ( id )
{
memset((void*)cstr, 0, id->lenstring.len);
strncpy(cstr, id->lenstring.data, id->lenstring.len) ;
clientId = cstr;
}
else if ( client )
{
clientId = client->getClientId();
}
else
{
clientId = UNKNOWNCL;
}
switch (packet->getType()) switch (packet->getType())
{ {

View File

@@ -35,7 +35,7 @@ public:
void run(); void run();
private: private:
void log(Client*, MQTTSNPacket*); void log(Client*, MQTTSNPacket*, MQTTSNString* id = 0);
Gateway* _gateway; Gateway* _gateway;
SensorNetwork* _sensorNetwork; SensorNetwork* _sensorNetwork;

View File

@@ -45,6 +45,12 @@ void ClientSendTask::run()
{ {
Event* ev = _gateway->getClientSendQue()->wait(); Event* ev = _gateway->getClientSendQue()->wait();
if (ev->getEventType() == EtStop)
{
WRITELOG("%s ClientSendTask stopped.\n", currentDateTime());
delete ev;
break;
}
if (ev->getEventType() == EtClientSend) if (ev->getEventType() == EtClientSend)
{ {
client = ev->getClient(); client = ev->getClient();
@@ -56,6 +62,11 @@ void ClientSendTask::run()
packet = ev->getMQTTSNPacket(); packet = ev->getMQTTSNPacket();
packet->broadcast(_sensorNetwork); packet->broadcast(_sensorNetwork);
} }
else if (ev->getEventType() == EtSensornetSend)
{
packet = ev->getMQTTSNPacket();
packet->unicast(_sensorNetwork, ev->getSensorNetAddress());
}
log(client, packet); log(client, packet);
delete ev; delete ev;
@@ -66,6 +77,7 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet)
{ {
char pbuf[SIZE_OF_LOG_PACKET * 3]; char pbuf[SIZE_OF_LOG_PACKET * 3];
char msgId[6]; char msgId[6];
const char* clientId = client ? (const char*)client->getClientId() : UNKNOWNCL ;
switch (packet->getType()) switch (packet->getType())
{ {
@@ -78,15 +90,13 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet)
case MQTTSN_DISCONNECT: case MQTTSN_DISCONNECT:
case MQTTSN_WILLTOPICREQ: case MQTTSN_WILLTOPICREQ:
case MQTTSN_WILLMSGREQ: case MQTTSN_WILLMSGREQ:
WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, client->getClientId(), packet->print(pbuf));
break;
case MQTTSN_WILLTOPICRESP: case MQTTSN_WILLTOPICRESP:
case MQTTSN_WILLMSGRESP: case MQTTSN_WILLMSGRESP:
WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, client->getClientId(), packet->print(pbuf)); WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, clientId, packet->print(pbuf));
break; break;
case MQTTSN_REGISTER: case MQTTSN_REGISTER:
case MQTTSN_PUBLISH: case MQTTSN_PUBLISH:
WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, client->getClientId(), packet->print(pbuf)); WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, clientId, packet->print(pbuf));
break; break;
case MQTTSN_REGACK: case MQTTSN_REGACK:
case MQTTSN_PUBACK: case MQTTSN_PUBACK:
@@ -95,7 +105,7 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet)
case MQTTSN_PUBCOMP: case MQTTSN_PUBCOMP:
case MQTTSN_SUBACK: case MQTTSN_SUBACK:
case MQTTSN_UNSUBACK: case MQTTSN_UNSUBACK:
WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, client->getClientId(), packet->print(pbuf)); WRITELOG(FORMAT_W_MSGID_W_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROW, clientId, packet->print(pbuf));
break; break;
default: default:
break; break;

View File

@@ -209,19 +209,22 @@ void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet
*/ */
void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet) void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet)
{ {
MQTTGWPacket* mqMsg = new MQTTGWPacket(); Event* ev = new Event();
MQTTSNPacket* snMsg = new MQTTSNPacket(); MQTTSNPacket* snMsg = new MQTTSNPacket();
mqMsg->setHeader(DISCONNECT);
snMsg->setDISCONNECT(0); snMsg->setDISCONNECT(0);
ev->setClientSendEvent(client, snMsg);
_gateway->getClientSendQue()->post(ev);
Event* ev1 = new Event(); uint16_t duration = 0;
ev1->setClientSendEvent(client, snMsg); packet->getDISCONNECT(&duration);
_gateway->getClientSendQue()->post(ev1); if ( duration == 0 )
{
ev1 = new Event(); MQTTGWPacket* mqMsg = new MQTTGWPacket();
ev1->setBrokerSendEvent(client, mqMsg); mqMsg->setHeader(DISCONNECT);
_gateway->getBrokerSendQue()->post(ev1); ev = new Event();
ev->setBrokerSendEvent(client, mqMsg);
_gateway->getBrokerSendQue()->post(ev);
}
} }
/* /*

View File

@@ -20,11 +20,17 @@
namespace MQTTSNGW namespace MQTTSNGW
{ {
/*================================= /*=================================
* Log controls * Config Parametrs
==================================*/ ==================================*/
//#define DEBUG // print out log for debug #define CONFIG_DIRECTORY "./"
//#define RINGBUFFER // print out Packets log into shared memory #define CONFIG_FILE "gateway.conf"
//#define DEBUG_NWSTACK // print out SensorNetwork log #define CLIENT_LIST "clients.conf"
/*==========================================================
* Gateway default parameters
===========================================================*/
#define DEFAULT_KEEP_ALIVE_TIME (900) // 900 secs = 15 mins
#define DEFAULT_MQTT_VERSION (4) // Defualt MQTT version
/*================================= /*=================================
* MQTT-SN Parametrs * MQTT-SN Parametrs
@@ -43,11 +49,13 @@ typedef unsigned short uint16_t;
typedef unsigned int uint32_t; typedef unsigned int uint32_t;
/*================================= /*=================================
* Macros * Log controls
==================================*/ ==================================*/
//#define DEBUG // print out log for debug
//#define DEBUG_NWSTACK // print out SensorNetwork log
#ifdef DEBUG #ifdef DEBUG
#define DEBUGLOG(...) printf(__VA_ARGS__) #define DEBUGLOG(...) printf(__VA_ARGS__)
#undef RINGBUFFER
#else #else
#define DEBUGLOG(...) #define DEBUGLOG(...)
#endif #endif

View File

@@ -32,7 +32,7 @@ using namespace std;
using namespace MQTTSNGW; using namespace MQTTSNGW;
#define EVENT_QUE_TIME_OUT 2000 // 2000 msecs #define EVENT_QUE_TIME_OUT 2000 // 2000 msecs
char* currentDateTime(void);
/*===================================== /*=====================================
Class PacketHandleTask Class PacketHandleTask
=====================================*/ =====================================*/
@@ -84,7 +84,6 @@ void PacketHandleTask::run()
{ {
Event* ev = 0; Event* ev = 0;
EventQue* eventQue = _gateway->getPacketEventQue(); EventQue* eventQue = _gateway->getPacketEventQue();
ClientList* clist = _gateway->getClientList();
Client* client = 0; Client* client = 0;
MQTTSNPacket* snPacket = 0; MQTTSNPacket* snPacket = 0;
MQTTGWPacket* brPacket = 0; MQTTGWPacket* brPacket = 0;
@@ -95,23 +94,18 @@ void PacketHandleTask::run()
while (true) while (true)
{ {
if (theProcess->checkSignal() == SIGINT)
{
throw Exception("Terminated by CTL-C");
}
/* wait Event */ /* wait Event */
ev = eventQue->timedwait(EVENT_QUE_TIME_OUT); ev = eventQue->timedwait(EVENT_QUE_TIME_OUT);
if (ev->getEventType() == EtStop)
{
WRITELOG("%s PacketHandleTask stopped.\n", currentDateTime());
delete ev;
return;
}
if (ev->getEventType() == EtTimeout) if (ev->getEventType() == EtTimeout)
{ {
/*------ Is Client Lost ? ---------*/
client = clist->getClient();
while (client > 0)
{
client->checkTimeover();
client = client->getNextClient();
}
/*------ Check Keep Alive Timer & send Advertise ------*/ /*------ Check Keep Alive Timer & send Advertise ------*/
if (_advertiseTimer.isTimeup()) if (_advertiseTimer.isTimeup())
{ {
@@ -200,9 +194,6 @@ void PacketHandleTask::run()
case CONNACK: case CONNACK:
_mqttConnection->handleConnack(client, brPacket); _mqttConnection->handleConnack(client, brPacket);
break; break;
case DISCONNECT:
_mqttConnection->handleDisconnect(client, brPacket);
break;
case PINGRESP: case PINGRESP:
_mqttConnection->handlePingresp(client, brPacket); _mqttConnection->handlePingresp(client, brPacket);
break; break;

View File

@@ -22,6 +22,7 @@
#include <Timer.h> #include <Timer.h>
#include <exception> #include <exception>
#include <getopt.h> #include <getopt.h>
#include <unistd.h>
#include "MQTTSNGWProcess.h" #include "MQTTSNGWProcess.h"
#include "Threading.h" #include "Threading.h"
@@ -53,8 +54,9 @@ Process::Process()
{ {
_argc = 0; _argc = 0;
_argv = 0; _argv = 0;
_configDir = MQTTSNGW_CONFIG_DIRECTORY; _configDir = CONFIG_DIRECTORY;
_configFile = MQTTSNGW_CONFIG_FILE; _configFile = CONFIG_FILE;
_log = 0;
} }
Process::~Process() Process::~Process()
@@ -76,6 +78,7 @@ void Process::run()
void Process::initialize(int argc, char** argv) void Process::initialize(int argc, char** argv)
{ {
char param[MQTTSNGW_PARAM_MAX];
_argc = argc; _argc = argc;
_argv = argv; _argv = argv;
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
@@ -102,6 +105,40 @@ void Process::initialize(int argc, char** argv)
} }
_rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0); _rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0);
_rb = new RingBuffer(_configDir.c_str()); _rb = new RingBuffer(_configDir.c_str());
if (getParam("ShearedMemory", param) == 0)
{
if (!strcasecmp(param, "YES"))
{
_log = 1;
}
else
{
_log = 0;
}
}
}
void Process::putLog(const char* format, ...)
{
_mt.lock();
va_list arg;
va_start(arg, format);
vsprintf(_rbdata, format, arg);
va_end(arg);
if (strlen(_rbdata))
{
if ( _log > 0 )
{
_rb->put(_rbdata);
_rbsem->post();
}
else
{
printf("%s", _rbdata);
}
}
_mt.unlock();
} }
int Process::getArgc() int Process::getArgc()
@@ -169,21 +206,6 @@ int Process::getParam(const char* parameter, char* value)
return -2; return -2;
} }
void Process::putLog(const char* format, ...)
{
_mt.lock();
va_list arg;
va_start(arg, format);
vsprintf(_rbdata, format, arg);
va_end(arg);
if (strlen(_rbdata))
{
_rb->put(_rbdata);
_rbsem->post();
}
_mt.unlock();
}
const char* Process::getLog() const char* Process::getLog()
{ {
int len = 0; int len = 0;
@@ -228,6 +250,7 @@ MultiTaskProcess::MultiTaskProcess()
{ {
theMultiTaskProcess = this; theMultiTaskProcess = this;
_threadCount = 0; _threadCount = 0;
_stopCount = 0;
} }
MultiTaskProcess::~MultiTaskProcess() MultiTaskProcess::~MultiTaskProcess()
@@ -254,28 +277,43 @@ void MultiTaskProcess::run(void)
try try
{ {
_stopProcessEvent.wait(); while(true)
}
catch (Exception* ex)
{
for (int i = 0; i < _threadCount; i++)
{ {
if ( _threadList[i] ) if (theProcess->checkSignal() == SIGINT)
{ {
_threadList[i]->cancel();; return;
} }
sleep(1);
} }
}
catch(Exception* ex)
{
ex->writeMessage(); ex->writeMessage();
} }
catch(...)
{
throw;
}
} }
Semaphore* MultiTaskProcess::getStopProcessEvent(void) void MultiTaskProcess::waitStop(void)
{ {
return &_stopProcessEvent; while (_stopCount < _threadCount)
{
sleep(1);
}
}
void MultiTaskProcess::threadStoped(void)
{
_mutex.lock();
_stopCount++;
_mutex.unlock();
} }
void MultiTaskProcess::attach(Thread* thread) void MultiTaskProcess::attach(Thread* thread)
{ {
_mutex.lock();
if (_threadCount < MQTTSNGW_MAX_TASK) if (_threadCount < MQTTSNGW_MAX_TASK)
{ {
_threadList[_threadCount] = thread; _threadList[_threadCount] = thread;
@@ -283,8 +321,10 @@ void MultiTaskProcess::attach(Thread* thread)
} }
else else
{ {
_mutex.unlock();
throw Exception("Full of Threads"); throw Exception("Full of Threads");
} }
_mutex.unlock();
} }
int MultiTaskProcess::getParam(const char* parameter, char* value) int MultiTaskProcess::getParam(const char* parameter, char* value)

View File

@@ -31,11 +31,6 @@ namespace MQTTSNGW
/*================================= /*=================================
* Parameters * Parameters
==================================*/ ==================================*/
#define MQTTSNGW_CONFIG_DIRECTORY "./"
#define MQTTSNGW_CONFIG_FILE "gateway.conf"
#define MQTTSNGW_CLIENT_LIST "clients.conf"
#define MQTTSNGW_MAX_TASK 10 // number of Tasks #define MQTTSNGW_MAX_TASK 10 // number of Tasks
#define PROCESS_LOG_BUFFER_SIZE 16384 // Ring buffer size for Logs #define PROCESS_LOG_BUFFER_SIZE 16384 // Ring buffer size for Logs
#define MQTTSNGW_PARAM_MAX 128 // Max length of config records. #define MQTTSNGW_PARAM_MAX 128 // Max length of config records.
@@ -43,13 +38,8 @@ namespace MQTTSNGW
/*================================= /*=================================
* Macros * Macros
==================================*/ ==================================*/
#ifdef RINGBUFFER
#define WRITELOG theProcess->putLog #define WRITELOG theProcess->putLog
#else #define CHK_SIGINT (theProcess->checkSignal() == SIGINT)
#define WRITELOG printf
#endif
/*================================= /*=================================
Class Process Class Process
==================================*/ ==================================*/
@@ -77,6 +67,7 @@ private:
RingBuffer* _rb; RingBuffer* _rb;
Semaphore* _rbsem; Semaphore* _rbsem;
Mutex _mt; Mutex _mt;
int _log;
char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1]; char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1];
}; };
@@ -88,17 +79,18 @@ class MultiTaskProcess: public Process
public: public:
MultiTaskProcess(void); MultiTaskProcess(void);
~MultiTaskProcess(); ~MultiTaskProcess();
virtual void initialize(int argc, char** argv); void initialize(int argc, char** argv);
virtual int getParam(const char* parameter, char* value); int getParam(const char* parameter, char* value);
void run(void); void run(void);
void waitStop(void);
void threadStoped(void);
void attach(Thread* thread); void attach(Thread* thread);
Semaphore* getStopProcessEvent(void);
private: private:
Thread* _threadList[MQTTSNGW_MAX_TASK]; Thread* _threadList[MQTTSNGW_MAX_TASK];
Semaphore _stopProcessEvent;
Mutex _mutex; Mutex _mutex;
int _threadCount; int _threadCount;
int _stopCount;
}; };
/*===================================== /*=====================================

View File

@@ -48,7 +48,6 @@ void MQTTSNPublishHandler::handlePublish(Client* client, MQTTSNPacket* packet)
if ( !client->isActive() ) if ( !client->isActive() )
{ {
/* Reply DISCONNECT to the client */ /* Reply DISCONNECT to the client */
WRITELOG(" The client is not active. status = %s\n", client->getStatus());
Event* ev = new Event(); Event* ev = new Event();
MQTTSNPacket* disconnect = new MQTTSNPacket(); MQTTSNPacket* disconnect = new MQTTSNPacket();
disconnect->setDISCONNECT(0); disconnect->setDISCONNECT(0);

View File

@@ -31,6 +31,20 @@ Gateway::Gateway()
theProcess = this; theProcess = this;
_params.loginId = 0; _params.loginId = 0;
_params.password = 0; _params.password = 0;
_params.keepAlive = 0;
_params.gatewayId = 0;
_params.mqttVersion = 0;
_params.maxInflightMsgs = 0;
_params.gatewayName = 0;
_params.brokerName = 0;
_params.port = 0;
_params.portSecure = 0;
_params.rootCApath = 0;
_params.rootCAfile = 0;
_params.certKey = 0;
_params.privateKey = 0;
_params.clientListName = 0;
_params.configName = 0;
_packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS); _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
} }
@@ -44,15 +58,85 @@ Gateway::~Gateway()
{ {
free(_params.password); free(_params.password);
} }
if ( _params.gatewayName )
{
free(_params.gatewayName);
}
if ( _params.brokerName )
{
free(_params.brokerName);
}
if ( _params.port )
{
free(_params.port);
}
if ( _params.portSecure )
{
free(_params.portSecure);
}
if ( _params.certKey )
{
free(_params.certKey);
}
if ( _params.privateKey )
{
free(_params.privateKey);
}
if ( _params.rootCApath )
{
free(_params.rootCApath);
}
if ( _params.rootCAfile )
{
free(_params.rootCAfile);
}
if ( _params.clientListName )
{
free(_params.clientListName);
}
if ( _params.configName )
{
free(_params.configName);
}
} }
void Gateway::initialize(int argc, char** argv) void Gateway::initialize(int argc, char** argv)
{ {
char param[MQTTSNGW_PARAM_MAX]; char param[MQTTSNGW_PARAM_MAX];
string fileName;
MultiTaskProcess::initialize(argc, argv); MultiTaskProcess::initialize(argc, argv);
resetRingBuffer(); resetRingBuffer();
_params.gatewayId = 0; if (getParam("BrokerName", param) == 0)
{
_params.brokerName = strdup(param);
}
if (getParam("BrokerPortNo", param) == 0)
{
_params.port = strdup(param);
}
if (getParam("BrokerSecurePortNo", param) == 0)
{
_params.portSecure = strdup(param);
}
if (getParam("CertKey", param) == 0)
{
_params.certKey = strdup(param);
}
if (getParam("PrivateKey", param) == 0)
{
_params.privateKey = strdup(param);
}
if (getParam("RootCApath", param) == 0)
{
_params.rootCApath = strdup(param);
}
if (getParam("RootCAfile", param) == 0)
{
_params.rootCAfile = strdup(param);
}
if (getParam("GatewayID", param) == 0) if (getParam("GatewayID", param) == 0)
{ {
_params.gatewayId = atoi(param); _params.gatewayId = atoi(param);
@@ -65,7 +149,7 @@ void Gateway::initialize(int argc, char** argv)
if (getParam("GatewayName", param) == 0) if (getParam("GatewayName", param) == 0)
{ {
_params.gatewayName = (uint8_t*) strdup(param); _params.gatewayName = strdup(param);
} }
_params.mqttVersion = DEFAULT_MQTT_VERSION; _params.mqttVersion = DEFAULT_MQTT_VERSION;
@@ -94,17 +178,16 @@ void Gateway::initialize(int argc, char** argv)
if (getParam("LoginID", param) == 0) if (getParam("LoginID", param) == 0)
{ {
_params.loginId = (uint8_t*) strdup(param); _params.loginId = strdup(param);
} }
if (getParam("Password", param) == 0) if (getParam("Password", param) == 0)
{ {
_params.password = (uint8_t*) strdup(param); _params.password = strdup(param);
} }
if (getParam("ClientAuthentication", param) == 0) if (getParam("ClientAuthentication", param) == 0)
{ {
string fileName;
if (!strcasecmp(param, "YES")) if (!strcasecmp(param, "YES"))
{ {
if (getParam("ClientsList", param) == 0) if (getParam("ClientsList", param) == 0)
@@ -113,15 +196,18 @@ void Gateway::initialize(int argc, char** argv)
} }
else else
{ {
fileName = *getConfigDirName() + string(MQTTSNGW_CLIENT_LIST); fileName = *getConfigDirName() + string(CLIENT_LIST);
} }
if (!_clientList.authorize(fileName.c_str())) if (!_clientList.authorize(fileName.c_str()))
{ {
throw Exception("Gateway::initialize: No client list defined by configuration."); throw Exception("Gateway::initialize: No client list defined by configuration.");
} }
_params.clientListName = strdup(fileName.c_str());
} }
} }
fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str());
} }
void Gateway::run(void) void Gateway::run(void)
@@ -134,17 +220,36 @@ void Gateway::run(void)
WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3); WRITELOG(" *\n%s\n", PAHO_COPYRIGHT3);
WRITELOG("%s\n", GATEWAY_VERSION); WRITELOG("%s\n", GATEWAY_VERSION);
WRITELOG("%s\n", PAHO_COPYRIGHT4); WRITELOG("%s\n", PAHO_COPYRIGHT4);
WRITELOG("\n%s %s has been started.\n listening on, %s\n", currentDateTime(), _params.gatewayName, _sensorNetwork.getDescription()); WRITELOG("\n%s %s has been started.\n\n", currentDateTime(), _params.gatewayName);
WRITELOG(" ConfigFile: %s\n", _params.configName);
if ( getClientList()->isAuthorized() ) if ( getClientList()->isAuthorized() )
{ {
WRITELOG("\nClient authentication is required by the configuration settings.\n\n"); WRITELOG(" ClientList: %s\n", _params.clientListName);
} }
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
/* execute threads & wait StopProcessEvent MQTTSNGWPacketHandleTask posts by CTL-C */ WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
WRITELOG(" RootCApath: %s\n", _params.rootCApath);
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
WRITELOG(" CertKey: %s\n", _params.certKey);
WRITELOG(" PrivateKey: %s\n", _params.privateKey);
MultiTaskProcess::run(); MultiTaskProcess::run();
WRITELOG("%s MQTT-SN Gateway stoped\n", currentDateTime());
/* stop threads */
Event* ev = new Event();
ev->setStop();
_packetEventQue.post(ev);
ev = new Event();
ev->setStop();
_brokerSendQue.post(ev);
ev = new Event();
ev->setStop();
_clientSendQue.post(ev);
/* wait until all threads stop */
MultiTaskProcess::waitStop();
WRITELOG("\n%s MQTT-SN Gateway stoped\n\n", currentDateTime());
_lightIndicator.allLightOff(); _lightIndicator.allLightOff();
} }
@@ -193,7 +298,13 @@ EventQue::EventQue()
EventQue::~EventQue() EventQue::~EventQue()
{ {
_mutex.lock();
while (_que.size() > 0)
{
delete _que.front();
_que.pop();
}
_mutex.unlock();
} }
void EventQue::setMaxSize(uint16_t maxSize) void EventQue::setMaxSize(uint16_t maxSize)
@@ -243,14 +354,18 @@ Event* EventQue::timedwait(uint16_t millsec)
return ev; return ev;
} }
int EventQue::post(Event* ev) void EventQue::post(Event* ev)
{ {
int rc = 0;
_mutex.lock(); _mutex.lock();
rc = _que.post(ev); if ( _que.post(ev) )
_sem.post(); {
_sem.post();
}
else
{
delete ev;
}
_mutex.unlock(); _mutex.unlock();
return rc;
} }
int EventQue::size() int EventQue::size()
@@ -269,20 +384,18 @@ Event::Event()
{ {
_eventType = Et_NA; _eventType = Et_NA;
_client = 0; _client = 0;
_mqttSNPacket = 0; _sensorNetAddr = 0;
_mqttGWPacket = 0;
}
Event::Event(EventType type)
{
_eventType = type;
_client = 0;
_mqttSNPacket = 0; _mqttSNPacket = 0;
_mqttGWPacket = 0; _mqttGWPacket = 0;
} }
Event::~Event() Event::~Event()
{ {
if (_sensorNetAddr)
{
delete _sensorNetAddr;
}
if (_mqttSNPacket) if (_mqttSNPacket)
{ {
delete _mqttSNPacket; delete _mqttSNPacket;
@@ -332,17 +445,34 @@ void Event::setTimeout(void)
_eventType = EtTimeout; _eventType = EtTimeout;
} }
void Event::setStop(void)
{
_eventType = EtStop;
}
void Event::setBrodcastEvent(MQTTSNPacket* msg) void Event::setBrodcastEvent(MQTTSNPacket* msg)
{ {
_mqttSNPacket = msg; _mqttSNPacket = msg;
_eventType = EtBroadcast; _eventType = EtBroadcast;
} }
void Event::setClientSendEvent(SensorNetAddress* addr, MQTTSNPacket* msg)
{
_eventType = EtSensornetSend;
_sensorNetAddr = addr;
_mqttSNPacket = msg;
}
Client* Event::getClient(void) Client* Event::getClient(void)
{ {
return _client; return _client;
} }
SensorNetAddress* Event::getSensorNetAddress(void)
{
return _sensorNetAddr;
}
MQTTSNPacket* Event::getMQTTSNPacket() MQTTSNPacket* Event::getMQTTSNPacket()
{ {
return _mqttSNPacket; return _mqttSNPacket;

View File

@@ -22,16 +22,10 @@
namespace MQTTSNGW namespace MQTTSNGW
{ {
/*==========================================================
* Gateway default parameters
===========================================================*/
#define DEFAULT_KEEP_ALIVE_TIME (900) // 900 secs = 15 mins
#define DEFAULT_MQTT_VERSION (4) // Defualt MQTT version
/*================================= /*=================================
* Starting prompt * Starting prompt
==================================*/ ==================================*/
#define GATEWAY_VERSION " * Version: 0.6.0" #define GATEWAY_VERSION " * Version: 0.9.1"
#define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway" #define PAHO_COPYRIGHT0 " * MQTT-SN Transparent Gateway"
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse" #define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"
@@ -49,6 +43,7 @@ namespace MQTTSNGW
===========================================================*/ ===========================================================*/
#define CLIENT "Client" #define CLIENT "Client"
#define CLIENTS "Clients" #define CLIENTS "Clients"
#define UNKNOWNCL "Unknown Client !"
#define LEFTARROW "<---" #define LEFTARROW "<---"
#define RIGHTARROW "--->" #define RIGHTARROW "--->"
@@ -87,20 +82,20 @@ namespace MQTTSNGW
====================================*/ ====================================*/
enum EventType{ enum EventType{
Et_NA = 0, Et_NA = 0,
EtStop,
EtTimeout, EtTimeout,
EtBrokerRecv, EtBrokerRecv,
EtBrokerSend, EtBrokerSend,
EtClientRecv, EtClientRecv,
EtClientSend, EtClientSend,
EtBroadcast, EtBroadcast,
EtSocketAlive EtSensornetSend
}; };
class Event{ class Event{
public: public:
Event(); Event();
Event(EventType);
~Event(); ~Event();
EventType getEventType(void); EventType getEventType(void);
void setClientRecvEvent(Client*, MQTTSNPacket*); void setClientRecvEvent(Client*, MQTTSNPacket*);
@@ -109,13 +104,17 @@ public:
void setBrokerSendEvent(Client*, MQTTGWPacket*); void setBrokerSendEvent(Client*, MQTTGWPacket*);
void setBrodcastEvent(MQTTSNPacket*); // ADVERTISE and GWINFO void setBrodcastEvent(MQTTSNPacket*); // ADVERTISE and GWINFO
void setTimeout(void); // Required by EventQue<Event>.timedwait() void setTimeout(void); // Required by EventQue<Event>.timedwait()
void setStop(void);
void setClientSendEvent(SensorNetAddress*, MQTTSNPacket*);
Client* getClient(void); Client* getClient(void);
SensorNetAddress* getSensorNetAddress(void);
MQTTSNPacket* getMQTTSNPacket(void); MQTTSNPacket* getMQTTSNPacket(void);
MQTTGWPacket* getMQTTGWPacket(void); MQTTGWPacket* getMQTTGWPacket(void);
private: private:
EventType _eventType; EventType _eventType;
Client* _client; Client* _client;
SensorNetAddress* _sensorNetAddr;
MQTTSNPacket* _mqttSNPacket; MQTTSNPacket* _mqttSNPacket;
MQTTGWPacket* _mqttGWPacket; MQTTGWPacket* _mqttGWPacket;
}; };
@@ -131,8 +130,8 @@ public:
Event* wait(void); Event* wait(void);
Event* timedwait(uint16_t millsec); Event* timedwait(uint16_t millsec);
void setMaxSize(uint16_t maxSize); void setMaxSize(uint16_t maxSize);
int post(Event*); void post(Event*);
int size(); int size();
private: private:
Que<Event> _que; Que<Event> _que;
@@ -145,13 +144,22 @@ private:
*/ */
typedef struct typedef struct
{ {
uint8_t* loginId; char* configName;
uint8_t* password; char* clientListName;
char* loginId;
char* password;
uint16_t keepAlive; uint16_t keepAlive;
uint8_t gatewayId; uint8_t gatewayId;
uint8_t mqttVersion; uint8_t mqttVersion;
uint16_t maxInflightMsgs; uint16_t maxInflightMsgs;
uint8_t* gatewayName; char* gatewayName;
char* brokerName;
char* port;
char* portSecure;
char* rootCApath;
char* rootCAfile;
char* certKey;
char* privateKey;
}GatewayParams; }GatewayParams;
/*===================================== /*=====================================

View File

@@ -158,7 +158,7 @@ bool TCPStack::connect(const char* host, const char* service)
{ {
if (isValid()) if (isValid())
{ {
return false; return true;
} }
addrinfo hints; addrinfo hints;
memset(&hints, 0, sizeof(addrinfo)); memset(&hints, 0, sizeof(addrinfo));
@@ -192,7 +192,7 @@ bool TCPStack::connect(const char* host, const char* service)
if (::connect(sockfd, _addrinfo->ai_addr, _addrinfo->ai_addrlen) < 0) if (::connect(sockfd, _addrinfo->ai_addr, _addrinfo->ai_addrlen) < 0)
{ {
//perror("TCPStack connect"); DEBUGLOG("Can not connect the socket. Check the PortNo! \n");
::close(sockfd); ::close(sockfd);
return false; return false;
} }
@@ -233,6 +233,7 @@ int TCPStack::getSock()
=======================================*/ =======================================*/
int Network::_numOfInstance = 0; int Network::_numOfInstance = 0;
SSL_CTX* Network::_ctx = 0; SSL_CTX* Network::_ctx = 0;
SSL_SESSION* Network::_session = 0;
Network::Network(bool secure) : Network::Network(bool secure) :
TCPStack() TCPStack()
@@ -240,196 +241,168 @@ Network::Network(bool secure) :
_ssl = 0; _ssl = 0;
_secureFlg = secure; _secureFlg = secure;
_busy = false; _busy = false;
_session = 0;
_sslValid = false; _sslValid = false;
} }
Network::~Network() Network::~Network()
{ {
if (_ssl) close();
{
SSL_free(_ssl);
_numOfInstance--;
}
if (_session )
{
SSL_SESSION_free(_session);
}
if (_ctx && _numOfInstance == 0)
{
SSL_CTX_free(_ctx);
_ctx = 0;
ERR_free_strings();
}
} }
bool Network::connect(const char* host, const char* port) bool Network::connect(const char* host, const char* port)
{ {
bool rc = false;
_mutex.lock();
if (_secureFlg) if (_secureFlg)
{ {
return false; goto exit;
} }
if (getSock() == 0) if (getSock() == 0)
{ {
if (!TCPStack::connect(host, port)) if (!TCPStack::connect(host, port))
{ {
return false; goto exit;
} }
} }
return true; rc = true;
exit:
_mutex.unlock();
return rc;
} }
bool Network::connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* cert, const char* prvkey) bool Network::connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* certkey, const char* prvkey)
{ {
char errmsg[256]; char errmsg[256];
char peer_CN[256]; char peer_CN[256];
int rc = 0; bool rc;
if (!_secureFlg) _mutex.lock();
try
{ {
WRITELOG("TLS is not required.\n"); if (!_secureFlg)
return false; {
} WRITELOG("TLS is not required.\n");
throw;
}
if (_ctx == 0)
{
SSL_load_error_strings();
SSL_library_init();
_ctx = SSL_CTX_new(TLS_client_method());
if (_ctx == 0) if (_ctx == 0)
{ {
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); SSL_load_error_strings();
WRITELOG("SSL_CTX_new() %s\n", errmsg); SSL_library_init();
return false; _ctx = SSL_CTX_new(TLS_client_method());
} if (_ctx == 0)
if (!SSL_CTX_load_verify_locations(_ctx, caFile, caPath))
{
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_CTX_load_verify_locations() %s\n", errmsg);
return false;
}
}
if (!_sslValid)
{
if ( !TCPStack::connect(host, port) )
{
return false;
}
/*
if ( _ssl )
{
if (!SSL_set_fd(_ssl, getSock()))
{ {
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_set_fd() %s\n", errmsg); WRITELOG("SSL_CTX_new() %s\n", errmsg);
SSL_free(_ssl); throw;
} }
else
if (!SSL_CTX_load_verify_locations(_ctx, caFile, caPath))
{ {
_sslValid = true; ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
return true; WRITELOG("SSL_CTX_load_verify_locations() %s\n", errmsg);
throw;
}
if ( certkey )
{
if ( SSL_CTX_use_certificate_file(_ctx, certkey, SSL_FILETYPE_PEM) != 1 )
{
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_CTX_use_certificate_file() %s %s\n", certkey, errmsg);
throw;
}
}
if ( prvkey )
{
if ( SSL_CTX_use_PrivateKey_file(_ctx, prvkey, SSL_FILETYPE_PEM) != 1 )
{
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_use_PrivateKey_file() %s %s\n", prvkey, errmsg);
throw;
}
} }
} }
*/
}
_ssl = SSL_new(_ctx); if (! TCPStack::isValid())
if (_ssl == 0) {
{ if ( !TCPStack::connect(host, port) )
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); {
WRITELOG("SSL_new() %s\n", errmsg); throw;
return false; }
} }
if (!SSL_set_fd(_ssl, getSock())) _ssl = SSL_new(_ctx);
{ if (_ssl == 0)
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_set_fd() %s\n", errmsg);
SSL_free(_ssl);
}
SSL_set_options(_ssl, SSL_OP_NO_TICKET);
if ( cert )
{
if ( SSL_use_certificate_file(_ssl, cert, SSL_FILETYPE_PEM) <= 0 )
{ {
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_use_certificate_file() %s %s\n", cert, errmsg); WRITELOG("SSL_new() %s\n", errmsg);
SSL_free(_ssl); throw;
_ssl = 0;
return false;
} }
}
if ( prvkey ) if (!SSL_set_fd(_ssl, TCPStack::getSock()))
{
if ( SSL_use_PrivateKey_file(_ssl, prvkey, SSL_FILETYPE_PEM) <= 0 )
{ {
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
WRITELOG("SSL_use_PrivateKey_file() %s %s\n", prvkey, errmsg); WRITELOG("SSL_set_fd() %s\n", errmsg);
SSL_free(_ssl); SSL_free(_ssl);
_ssl = 0; _ssl = 0;
return false; throw;
} }
}
if (!SSL_set_fd(_ssl, TCPStack::getSock())) if (_session)
{ {
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); SSL_set_session(_ssl, _session);
WRITELOG("SSL_set_fd() %s\n", errmsg); }
SSL_free(_ssl);
_ssl = 0;
return false;
}
if (_session) if (SSL_connect(_ssl) != 1)
{ {
rc = SSL_set_session(_ssl, _session); ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg));
} WRITELOG("SSL_connect() %s\n", errmsg);
SSL_free(_ssl);
_ssl = 0;
throw;
}
if (SSL_connect(_ssl) != 1) int result;
{ if ( (result = SSL_get_verify_result(_ssl)) != X509_V_OK)
ERR_error_string_n(ERR_get_error(), errmsg, sizeof(errmsg)); {
WRITELOG("SSL_connect() %s\n", errmsg); WRITELOG("SSL_get_verify_result() error: %s.\n", X509_verify_cert_error_string(result));
SSL_free(_ssl); SSL_free(_ssl);
_ssl = 0; _ssl = 0;
return false; throw;
} }
if ( (rc = SSL_get_verify_result(_ssl)) != X509_V_OK) X509* peer = SSL_get_peer_certificate(_ssl);
{ X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, 256);
WRITELOG("SSL_get_verify_result() error: %s.\n", X509_verify_cert_error_string(rc)); char* pos = peer_CN;
SSL_free(_ssl); if ( *pos == '*')
_ssl = 0; {
return false; while (*host++ != '.');
} pos += 2;
}
if ( strcmp(host, pos))
{
WRITELOG("SSL_get_peer_certificate() error: Broker %s dosen't match the host name %s\n", peer_CN, host);
SSL_free(_ssl);
_ssl = 0;
throw;
}
X509* peer = SSL_get_peer_certificate(_ssl); if (_session == 0)
X509_NAME_get_text_by_NID(X509_get_subject_name(peer), NID_commonName, peer_CN, 256); {
char* pos = peer_CN; _session = SSL_get1_session(_ssl);
if ( *pos == '*') }
{ _numOfInstance++;
while (*host++ != '.'); _sslValid = true;
pos += 2; rc = true;
} }
if ( strcmp(host, pos)) catch (...)
{ {
WRITELOG("SSL_get_peer_certificate() error: Broker %s dosen't match the host name %s\n", peer_CN, host); rc = false;
SSL_free(_ssl);
_ssl = 0;
return false;
} }
_mutex.unlock();
if (_session == 0) return rc;
{
_session = SSL_get1_session(_ssl);
}
_numOfInstance++;
_sslValid = true;
return true;
} }
int Network::send(const uint8_t* buf, uint16_t length) int Network::send(const uint8_t* buf, uint16_t length)
@@ -447,6 +420,12 @@ int Network::send(const uint8_t* buf, uint16_t length)
else else
{ {
_mutex.lock(); _mutex.lock();
if ( !_ssl )
{
_mutex.unlock();
return -1;
}
_busy = true; _busy = true;
while (true) while (true)
@@ -516,8 +495,14 @@ int Network::recv(uint8_t* buf, uint16_t len)
return 0; return 0;
} }
_mutex.lock(); _mutex.lock();
_busy = true;
if ( !_ssl )
{
_mutex.unlock();
return 0;
}
_busy = true;
loop: do loop: do
{ {
readBlockedOnWrite = false; readBlockedOnWrite = false;
@@ -535,7 +520,8 @@ int Network::recv(uint8_t* buf, uint16_t len)
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
SSL_shutdown(_ssl); SSL_shutdown(_ssl);
_ssl = 0; _ssl = 0;
TCPStack::close(); _numOfInstance--;
//TCPStack::close();
_busy = false; _busy = false;
_mutex.unlock(); _mutex.unlock();
return -1; return -1;
@@ -585,44 +571,53 @@ int Network::recv(uint8_t* buf, uint16_t len)
void Network::close(void) void Network::close(void)
{ {
_mutex.lock();
if (_secureFlg) if (_secureFlg)
{ {
_sslValid = false; if (_ssl)
SSL_free(_ssl); {
_ssl = 0; SSL_shutdown(_ssl);
SSL_free(_ssl);
_numOfInstance--;
_ssl = 0;
_sslValid = false;
_busy = false;
}
if (_session && _numOfInstance == 0)
{
SSL_SESSION_free(_session);
_session = 0;
}
if (_ctx && _numOfInstance == 0)
{
SSL_CTX_free(_ctx);
_ctx = 0;
ERR_free_strings();
}
} }
TCPStack::close(); TCPStack::close();
_mutex.unlock();
} }
bool Network::isValid() bool Network::isValid()
{ {
if (_secureFlg) if ( TCPStack::isValid() )
{ {
if (_sslValid && !_busy) if (_secureFlg)
{
if (_sslValid && !_busy)
{
return true;
}
}
else
{ {
return true; return true;
} }
} }
else
{
return TCPStack::isValid();
}
return false; return false;
} }
void Network::disconnect()
{
if (_ssl)
{
SSL_shutdown(_ssl);
_ssl = 0;
}
_sslValid = false;
_busy = false;
TCPStack::close();
}
int Network::getSock() int Network::getSock()
{ {
return TCPStack::getSock(); return TCPStack::getSock();

View File

@@ -71,9 +71,8 @@ public:
Network(bool secure); Network(bool secure);
virtual ~Network(); virtual ~Network();
bool connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* sert, const char* prvkey); bool connect(const char* host, const char* port, const char* caPath, const char* caFile, const char* cert, const char* prvkey);
bool connect(const char* host, const char* port); bool connect(const char* host, const char* port);
void disconnect(void);
void close(void); void close(void);
int send(const uint8_t* buf, uint16_t length); int send(const uint8_t* buf, uint16_t length);
int recv(uint8_t* buf, uint16_t len); int recv(uint8_t* buf, uint16_t len);
@@ -84,9 +83,8 @@ public:
private: private:
static SSL_CTX* _ctx; static SSL_CTX* _ctx;
static SSL_SESSION* _session;
static int _numOfInstance; static int _numOfInstance;
SSL_SESSION* _session;
SSL* _ssl; SSL* _ssl;
bool _secureFlg; bool _secureFlg;
Mutex _mutex; Mutex _mutex;

View File

@@ -160,12 +160,11 @@ Semaphore::Semaphore(const char* name, unsigned int val)
{ {
throw Exception( -1, "Semaphore can't be created."); throw Exception( -1, "Semaphore can't be created.");
} }
_name = (char*) calloc(strlen(name + 1), 1); _name = strdup(name);
if (_name == NULL) if (_name == NULL)
{ {
throw Exception( -1, "Semaphore can't allocate memories."); throw Exception( -1, "Semaphore can't allocate memories.");
} }
_name = strdup(name);
} }
Semaphore::~Semaphore() Semaphore::~Semaphore()
@@ -174,7 +173,7 @@ Semaphore::~Semaphore()
{ {
sem_close(_psem); sem_close(_psem);
sem_unlink(_name); sem_unlink(_name);
free((void*) _name); free(_name);
} }
else else
{ {
@@ -236,19 +235,25 @@ void Semaphore::timedwait(uint16_t millsec)
=========================================*/ =========================================*/
RingBuffer::RingBuffer() RingBuffer::RingBuffer()
{ {
RingBuffer(MQTTSNGW_CONFIG_DIRECTORY); RingBuffer(MQTTSNGW_KEY_DIRECTORY);
} }
RingBuffer::RingBuffer(const char* keyDirectory) RingBuffer::RingBuffer(const char* keyDirectory)
{ {
int fp = 0; int fp = 0;
string fileName = keyDirectory + string(MQTTSNGW_RINGBUFFER_KEY); string fileName = keyDirectory + string(MQTTSNGW_RINGBUFFER_KEY);
fp = open(fileName.c_str(), O_CREAT, 0); fp = open(fileName.c_str(), O_CREAT, S_IRGRP);
close(fp); if ( fp > 0 )
{
close(fp);
}
fileName = keyDirectory + string(MQTTSNGW_RB_MUTEX_KEY); fileName = keyDirectory + string(MQTTSNGW_RB_MUTEX_KEY);
fp = open(fileName.c_str(), O_CREAT, 0); fp = open(fileName.c_str(), O_CREAT, S_IRGRP);
close(fp); if ( fp > 0 )
{
close(fp);
}
key_t key = ftok(MQTTSNGW_RINGBUFFER_KEY, 1); key_t key = ftok(MQTTSNGW_RINGBUFFER_KEY, 1);
@@ -302,10 +307,6 @@ RingBuffer::~RingBuffer()
{ {
shmctl(_shmid, IPC_RMID, NULL); shmctl(_shmid, IPC_RMID, NULL);
} }
if (_pmx > 0)
{
delete _pmx;
}
} }
else else
{ {
@@ -314,6 +315,11 @@ RingBuffer::~RingBuffer()
shmdt(_shmaddr); shmdt(_shmaddr);
} }
} }
if (_pmx > 0)
{
delete _pmx;
}
} }
void RingBuffer::put(char* data) void RingBuffer::put(char* data)
@@ -469,14 +475,11 @@ void RingBuffer::reset()
=====================================*/ =====================================*/
Thread::Thread() Thread::Thread()
{ {
_stopProcessEvent = theMultiTaskProcess->getStopProcessEvent();
_threadID = 0; _threadID = 0;
} }
Thread::~Thread() Thread::~Thread()
{ {
pthread_cancel(_threadID);
pthread_join(_threadID, 0);
} }
void* Thread::_run(void* runnable) void* Thread::_run(void* runnable)
@@ -508,10 +511,6 @@ int Thread::start(void)
void Thread::stopProcess(void) void Thread::stopProcess(void)
{ {
_stopProcessEvent->post(); theMultiTaskProcess->threadStoped();
} }
void Thread::cancel(void)
{
pthread_cancel(_threadID);
}

View File

@@ -23,7 +23,7 @@
namespace MQTTSNGW namespace MQTTSNGW
{ {
#define MQTTSNGW_KEY_DIRECTORY "./"
#define MQTTSNGW_RINGBUFFER_KEY "ringbuffer.key" #define MQTTSNGW_RINGBUFFER_KEY "ringbuffer.key"
#define MQTTSNGW_RB_MUTEX_KEY "rbmutex.key" #define MQTTSNGW_RB_MUTEX_KEY "rbmutex.key"
#define MQTTSNGW_RB_SEMAPHOR_NAME "/rbsemaphor" #define MQTTSNGW_RB_SEMAPHOR_NAME "/rbsemaphor"
@@ -101,17 +101,12 @@ public:
virtual void EXECRUN(){} virtual void EXECRUN(){}
}; };
#define MAGIC_WORD_FOR_THREAD \ #define MAGIC_WORD_FOR_THREAD \
public: void EXECRUN() \ public: void EXECRUN() \
{ \ { \
try \ try \
{ \ { \
run(); \ run(); \
} \
catch(Exception& ex) \
{ \
ex.writeMessage();\
stopProcess(); \ stopProcess(); \
} \ } \
catch(...) \ catch(...) \
@@ -132,13 +127,10 @@ public:
static bool equals(pthread_t*, pthread_t*); static bool equals(pthread_t*, pthread_t*);
virtual void initialize(int argc, char** argv); virtual void initialize(int argc, char** argv);
void stopProcess(void); void stopProcess(void);
void cancel(void); void waitStop(void);
private: private:
pthread_t _threadID;
Semaphore* _stopProcessEvent;
static void* _run(void*); static void* _run(void*);
pthread_t _threadID;
}; };
} }

View File

@@ -180,24 +180,30 @@ void LightIndicator::init()
pinMode(LIGHT_INDICATOR_BLUE); pinMode(LIGHT_INDICATOR_BLUE);
} }
void LightIndicator::lit(int gpioNo, const char* onoff) int LightIndicator::lit(int gpioNo, const char* onoff)
{ {
int rc = 0;
if( _gpio[gpioNo] ) if( _gpio[gpioNo] )
{ {
write(_gpio[gpioNo], onoff, 1); rc = write(_gpio[gpioNo], onoff, 1);
} }
return rc;
} }
void LightIndicator::pinMode(int gpioNo) void LightIndicator::pinMode(int gpioNo)
{ {
int fd = open("/sys/class/gpio/export", O_WRONLY); int rc = 0;
int fd = rc; // eliminate unused warnning of compiler
fd = open("/sys/class/gpio/export", O_WRONLY);
if ( fd < 0 ) if ( fd < 0 )
{ {
return; return;
} }
char no[4]; char no[4];
sprintf(no,"%d", gpioNo); sprintf(no,"%d", gpioNo);
write(fd, no, strlen(no)); rc = write(fd, no, strlen(no));
close(fd); close(fd);
char fileName[64]; char fileName[64];
@@ -208,9 +214,8 @@ void LightIndicator::pinMode(int gpioNo)
{ {
return; return;
} }
write(fd,"out", 3); rc = write(fd,"out", 3);
close(fd); close(fd);
sprintf( fileName, "/sys/class/gpio/gpio%d/value", gpioNo); sprintf( fileName, "/sys/class/gpio/gpio%d/value", gpioNo);
fd = open(fileName, O_WRONLY); fd = open(fileName, O_WRONLY);
if ( fd > 0 ) if ( fd > 0 )

View File

@@ -61,7 +61,7 @@ public:
private: private:
void init(); void init();
void lit(int gpioNo, const char* onoff); int lit(int gpioNo, const char* onoff);
void pinMode(int gpioNo); void pinMode(int gpioNo);
bool _greenStatus; bool _greenStatus;
int _gpio[MAX_GPIO + 1]; int _gpio[MAX_GPIO + 1];

View File

@@ -45,6 +45,16 @@ SensorNetAddress::~SensorNetAddress()
} }
uint32_t SensorNetAddress::getIpAddress(void)
{
return _IpAddr;
}
uint16_t SensorNetAddress::getPortNo(void)
{
return _portNo;
}
void SensorNetAddress::setAddress(uint32_t IpAddr, uint16_t port) void SensorNetAddress::setAddress(uint32_t IpAddr, uint16_t port)
{ {
_IpAddr = IpAddr; _IpAddr = IpAddr;
@@ -96,12 +106,10 @@ SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr)
============================================*/ ============================================*/
SensorNetwork::SensorNetwork() SensorNetwork::SensorNetwork()
{ {
} }
SensorNetwork::~SensorNetwork() SensorNetwork::~SensorNetwork()
{ {
} }
int SensorNetwork::unicast(const uint8_t* payload, uint16_t payloadLength, SensorNetAddress* sendToAddr) int SensorNetwork::unicast(const uint8_t* payload, uint16_t payloadLength, SensorNetAddress* sendToAddr)
@@ -141,7 +149,7 @@ int SensorNetwork::initialize(void)
if (theProcess->getParam("GatewayPortNo", param) == 0) if (theProcess->getParam("GatewayPortNo", param) == 0)
{ {
unicastPortNo = atoi(param); unicastPortNo = atoi(param);
_description += " and Gateway Port "; _description += " Gateway Port ";
_description += param; _description += param;
} }
@@ -302,10 +310,12 @@ int UDPPort::broadcast(const uint8_t* buf, uint32_t length)
int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr) int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr)
{ {
struct timeval timeout;
fd_set recvfds; fd_set recvfds;
int maxSock = 0; int maxSock = 0;
int rc = 0;
timeout.tv_sec = 0;
timeout.tv_usec = 1000000; // 1 sec
FD_ZERO(&recvfds); FD_ZERO(&recvfds);
FD_SET(_sockfdUnicast, &recvfds); FD_SET(_sockfdUnicast, &recvfds);
FD_SET(_sockfdMulticast, &recvfds); FD_SET(_sockfdMulticast, &recvfds);
@@ -319,15 +329,17 @@ int UDPPort::recv(uint8_t* buf, uint16_t len, SensorNetAddress* addr)
maxSock = _sockfdUnicast; maxSock = _sockfdUnicast;
} }
select(maxSock + 1, &recvfds, 0, 0, 0); int rc = 0;
if ( select(maxSock + 1, &recvfds, 0, 0, &timeout) > 0 )
if (FD_ISSET(_sockfdUnicast, &recvfds))
{ {
rc = recvfrom(_sockfdUnicast, buf, len, 0, addr); if (FD_ISSET(_sockfdUnicast, &recvfds))
} {
else if (FD_ISSET(_sockfdMulticast, &recvfds)) rc = recvfrom(_sockfdUnicast, buf, len, 0, addr);
{ }
rc = recvfrom(_sockfdMulticast, buf, len, 0, &_grpAddr); else if (FD_ISSET(_sockfdMulticast, &recvfds))
{
rc = recvfrom(_sockfdMulticast, buf, len, 0, &_grpAddr);
}
} }
return rc; return rc;
} }

View File

@@ -41,14 +41,8 @@ public:
~SensorNetAddress(); ~SensorNetAddress();
void setAddress(uint32_t IpAddr, uint16_t port); void setAddress(uint32_t IpAddr, uint16_t port);
int setAddress(string* data); int setAddress(string* data);
uint32_t getIpAddress(void) uint16_t getPortNo(void);
{ uint32_t getIpAddress(void);
return _IpAddr;
}
uint16_t getPortNo(void)
{
return _portNo;
}
bool isMatch(SensorNetAddress* addr); bool isMatch(SensorNetAddress* addr);
SensorNetAddress& operator =(SensorNetAddress& addr); SensorNetAddress& operator =(SensorNetAddress& addr);

View File

@@ -115,6 +115,7 @@ void TestProcessFramework::run(void)
printf("%s Timer 1sec\n", currentDateTime()); printf("%s Timer 1sec\n", currentDateTime());
MultiTaskProcess::run(); MultiTaskProcess::run();
printf("ProcessFramework test complited.\n"); printf("ProcessFramework test complited.\n");
} }

View File

@@ -39,11 +39,12 @@ void TestTask::run(void)
{ {
while(true) while(true)
{ {
printf("Task is running. Enter CTRL+C \n"); if ( CHK_SIGINT)
if (theProcess->checkSignal() == SIGINT)
{ {
throw Exception("Terminated by CTL-C"); printf("Task stopped.\n");
return;
} }
printf("Task is running. Enter CTRL+C\n");
sleep(1); sleep(1);
} }
} }