Add The forwarder Encapsulation mesage #27 #69

Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2018-07-26 17:42:12 +09:00
parent 4d75351a06
commit 9940aadd4b
28 changed files with 1322 additions and 433 deletions

View File

@@ -14,14 +14,14 @@
</extensions> </extensions>
</storageModule> </storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0"> <storageModule moduleId="cdtBuildSystem" version="4.0.0">
<configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe,org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.debug" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.debug.1685199227" name="Debug" parent="cdt.managedbuild.config.gnu.exe.debug"> <configuration artifactName="${ProjName}" buildArtefactType="org.eclipse.cdt.build.core.buildArtefactType.exe" buildProperties="org.eclipse.cdt.build.core.buildArtefactType=org.eclipse.cdt.build.core.buildArtefactType.exe,org.eclipse.cdt.build.core.buildType=org.eclipse.cdt.build.core.buildType.debug" cleanCommand="rm -rf" description="" id="cdt.managedbuild.config.gnu.exe.debug.1685199227" name="Debug" optionalBuildProperties="org.eclipse.cdt.docker.launcher.containerbuild.property.volumes=,org.eclipse.cdt.docker.launcher.containerbuild.property.selectedvolumes=" parent="cdt.managedbuild.config.gnu.exe.debug">
<folderInfo id="cdt.managedbuild.config.gnu.exe.debug.1685199227." name="/" resourcePath=""> <folderInfo id="cdt.managedbuild.config.gnu.exe.debug.1685199227." name="/" resourcePath="">
<toolChain id="cdt.managedbuild.toolchain.gnu.exe.debug.102264757" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.exe.debug"> <toolChain id="cdt.managedbuild.toolchain.gnu.exe.debug.102264757" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.exe.debug">
<targetPlatform id="cdt.managedbuild.target.gnu.platform.exe.debug.2137160061" name="Debug Platform" superClass="cdt.managedbuild.target.gnu.platform.exe.debug"/> <targetPlatform id="cdt.managedbuild.target.gnu.platform.exe.debug.2137160061" name="Debug Platform" superClass="cdt.managedbuild.target.gnu.platform.exe.debug"/>
<builder buildPath="${workspace_loc:/MQTTSN-embedded-C}/Debug" id="cdt.managedbuild.target.gnu.builder.exe.debug.767762182" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.debug"/> <builder buildPath="${workspace_loc:/MQTTSN-embedded-C}/Debug" id="cdt.managedbuild.target.gnu.builder.exe.debug.767762182" keepEnvironmentInBuildfile="false" managedBuildOn="true" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.exe.debug"/>
<tool id="cdt.managedbuild.tool.gnu.archiver.base.602829827" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base"/> <tool id="cdt.managedbuild.tool.gnu.archiver.base.602829827" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base"/>
<tool id="cdt.managedbuild.tool.gnu.cpp.compiler.exe.debug.109068141" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.exe.debug"> <tool id="cdt.managedbuild.tool.gnu.cpp.compiler.exe.debug.109068141" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.exe.debug">
<option id="gnu.cpp.compiler.exe.debug.option.optimization.level.1710260957" name="Optimization Level" superClass="gnu.cpp.compiler.exe.debug.option.optimization.level" useByScannerDiscovery="false" value="gnu.cpp.compiler.optimization.level.most" valueType="enumerated"/> <option id="gnu.cpp.compiler.exe.debug.option.optimization.level.1710260957" name="Optimization Level" superClass="gnu.cpp.compiler.exe.debug.option.optimization.level" useByScannerDiscovery="false" value="gnu.cpp.compiler.optimization.level.none" valueType="enumerated"/>
<option id="gnu.cpp.compiler.exe.debug.option.debugging.level.1258567310" name="Debug Level" superClass="gnu.cpp.compiler.exe.debug.option.debugging.level" useByScannerDiscovery="false" value="gnu.cpp.compiler.debugging.level.max" valueType="enumerated"/> <option id="gnu.cpp.compiler.exe.debug.option.debugging.level.1258567310" name="Debug Level" superClass="gnu.cpp.compiler.exe.debug.option.debugging.level" useByScannerDiscovery="false" value="gnu.cpp.compiler.debugging.level.max" valueType="enumerated"/>
<option id="gnu.cpp.compiler.option.dialect.std.1678347248" name="Language standard" superClass="gnu.cpp.compiler.option.dialect.std" useByScannerDiscovery="true" value="gnu.cpp.compiler.dialect.default" valueType="enumerated"/> <option id="gnu.cpp.compiler.option.dialect.std.1678347248" name="Language standard" superClass="gnu.cpp.compiler.option.dialect.std" useByScannerDiscovery="true" value="gnu.cpp.compiler.dialect.default" valueType="enumerated"/>
<option id="gnu.cpp.compiler.option.dialect.flags.1711182709" name="Other dialect flags" superClass="gnu.cpp.compiler.option.dialect.flags" useByScannerDiscovery="true" value="-std=c++11" valueType="string"/> <option id="gnu.cpp.compiler.option.dialect.flags.1711182709" name="Other dialect flags" superClass="gnu.cpp.compiler.option.dialect.flags" useByScannerDiscovery="true" value="-std=c++11" valueType="string"/>

View File

@@ -220,7 +220,7 @@ TASK_LIST = {// e.g. TASK( task, executing duration in second),
*------------------------------------------------------*/ *------------------------------------------------------*/
void setup(void) void setup(void)
{ {
//SetForwarderMode();
} }

View File

@@ -1,5 +1,5 @@
/************************************************************************************** /**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi * Copyright (c) 2016-2018, Tomoaki Yamaguchi
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
@@ -29,44 +29,47 @@ extern void setUint16(uint8_t* pos, uint16_t val);
extern uint16_t getUint16(const uint8_t* pos); extern uint16_t getUint16(const uint8_t* pos);
extern LMqttsnClient* theClient; extern LMqttsnClient* theClient;
extern LScreen* theScreen; extern LScreen* theScreen;
/*===================================== /*=====================================
Class GwProxy Class GwProxy
======================================*/ ======================================*/
static const char* packet_names[] = static const char* packet_names[] =
{ {
"ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK", "ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK",
"WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK", "WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK",
"PUBLISH", "PUBACK", "PUBCOMP", "PUBREC", "PUBREL", "RESERVED", "PUBLISH", "PUBACK", "PUBCOMP", "PUBREC", "PUBREL", "RESERVED",
"SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP",
"DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD", "DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD",
"WILLMSGRESP" "WILLMSGRESP"
}; };
LGwProxy::LGwProxy(){ LGwProxy::LGwProxy(){
_nextMsgId = 0; _nextMsgId = 0;
_status = GW_LOST; _status = GW_LOST;
_gwId = 0; _gwId = 0;
_willTopic = 0; _willTopic = 0;
_willMsg = 0; _willMsg = 0;
_qosWill = 0; _qosWill = 0;
_retainWill = 0; _retainWill = 0;
_tkeepAlive = MQTTSN_DEFAULT_KEEPALIVE; _tkeepAlive = MQTTSN_DEFAULT_KEEPALIVE;
_tAdv = MQTTSN_DEFAULT_DURATION; _tAdv = MQTTSN_DEFAULT_DURATION;
_cleanSession = 0; _cleanSession = 0;
_pingStatus = 0; _pingStatus = 0;
_connectRetry = MQTTSN_RETRY_COUNT; _connectRetry = MQTTSN_RETRY_COUNT;
_tSleep = 0; _tSleep = 0;
_tWake = 0; _tWake = 0;
_initialized = 0; _initialized = 0;
_isForwarderMode = false;
} }
LGwProxy::~LGwProxy(){ LGwProxy::~LGwProxy(){
_topicTbl.clearTopic(); _topicTbl.clearTopic();
} }
void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf){ void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf){
_network.initialize(netconf); _network.initialize(netconf);
_clientId = netconf.clientId; _clientId = netconf.clientId;
_willTopic = mqconf.willTopic; _willTopic = mqconf.willTopic;
_willMsg = mqconf.willMsg; _willMsg = mqconf.willMsg;
_qosWill = mqconf.willQos; _qosWill = mqconf.willQos;
@@ -77,220 +80,220 @@ void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf){
} }
void LGwProxy::connect(){ void LGwProxy::connect(){
char* pos; char* pos;
while (_status != GW_CONNECTED){ while (_status != GW_CONNECTED){
pos = _msg; pos = _msg;
if (_status == GW_SEND_WILLMSG){ if (_status == GW_SEND_WILLMSG){
*pos++ = 2 + (uint8_t)strlen(_willMsg); *pos++ = 2 + (uint8_t)strlen(_willMsg);
*pos++ = MQTTSN_TYPE_WILLMSG; *pos++ = MQTTSN_TYPE_WILLMSG;
strcpy(pos,_willMsg); // WILLMSG strcpy(pos,_willMsg); // WILLMSG
_status = GW_WAIT_CONNACK; _status = GW_WAIT_CONNACK;
writeGwMsg(); writeGwMsg();
}else if (_status == GW_SEND_WILLTOPIC){ }else if (_status == GW_SEND_WILLTOPIC){
*pos++ = 3 + (uint8_t)strlen(_willTopic); *pos++ = 3 + (uint8_t)strlen(_willTopic);
*pos++ = MQTTSN_TYPE_WILLTOPIC; *pos++ = MQTTSN_TYPE_WILLTOPIC;
*pos++ = _qosWill | _retainWill; *pos++ = _qosWill | _retainWill;
strcpy(pos,_willTopic); // WILLTOPIC strcpy(pos,_willTopic); // WILLTOPIC
_status = GW_WAIT_WILLMSGREQ; _status = GW_WAIT_WILLMSGREQ;
writeGwMsg(); writeGwMsg();
}else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){ }else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED || _status == GW_SLEPT ){
uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId)); uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
*pos++ = 6 + clientIdLen; *pos++ = 6 + clientIdLen;
*pos++ = MQTTSN_TYPE_CONNECT; *pos++ = MQTTSN_TYPE_CONNECT;
pos++; pos++;
if (_cleanSession){ if (_cleanSession){
_msg[2] = MQTTSN_FLAG_CLEAN; _msg[2] = MQTTSN_FLAG_CLEAN;
} }
*pos++ = MQTTSN_PROTOCOL_ID; *pos++ = MQTTSN_PROTOCOL_ID;
setUint16((uint8_t*)pos, _tkeepAlive); setUint16((uint8_t*)pos, _tkeepAlive);
pos += 2; pos += 2;
strncpy(pos, _clientId, clientIdLen); strncpy(pos, _clientId, clientIdLen);
_msg[ 6 + clientIdLen] = 0; _msg[ 6 + clientIdLen] = 0;
_status = GW_WAIT_CONNACK; _status = GW_WAIT_CONNACK;
if ( _willMsg && _willTopic && _status != GW_SLEPT ){ if ( _willMsg && _willTopic && _status != GW_SLEPT ){
if (strlen(_willMsg) && strlen(_willTopic)){ if (strlen(_willMsg) && strlen(_willTopic)){
_msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT _msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT
_status = GW_WAIT_WILLTOPICREQ; _status = GW_WAIT_WILLTOPICREQ;
} }
} }
writeGwMsg(); writeGwMsg();
_connectRetry = MQTTSN_RETRY_COUNT; _connectRetry = MQTTSN_RETRY_COUNT;
}else if (_status == GW_LOST){ }else if (_status == GW_LOST){
*pos++ = 3; *pos++ = 3;
*pos++ = MQTTSN_TYPE_SEARCHGW; *pos++ = MQTTSN_TYPE_SEARCHGW;
*pos = 0; // SERCHGW *pos = 0; // SERCHGW
_status = GW_SEARCHING; _status = GW_SEARCHING;
writeGwMsg(); writeGwMsg();
} }
getConnectResponce(); getConnectResponce();
} }
return; return;
} }
int LGwProxy::getConnectResponce(void){ int LGwProxy::getConnectResponce(void){
int len = readMsg(); int len = readMsg();
if (len == 0){ if (len == 0){
if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){ if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){
if (_msg[1] == MQTTSN_TYPE_CONNECT) if (_msg[1] == MQTTSN_TYPE_CONNECT)
{ {
_connectRetry--; _connectRetry--;
} }
if (--_retryCount > 0){ if (--_retryCount > 0){
writeMsg((const uint8_t*)_msg); // Not writeGwMsg() : not to reset the counter. writeMsg((const uint8_t*)_msg); // Not writeGwMsg() : not to reset the counter.
_sendUTC = time(NULL); _sendUTC = time(NULL);
}else{ }else{
_sendUTC = 0; _sendUTC = 0;
if ( _status > GW_SEARCHING && _connectRetry > 0){ if ( _status > GW_SEARCHING && _connectRetry > 0){
_status = GW_CONNECTING; _status = GW_CONNECTING;
}else{ }else{
_status = GW_LOST; _status = GW_LOST;
_gwId = 0; _gwId = 0;
} }
return -1; return -1;
} }
} }
return 0; return 0;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING){
_network.setGwAddress(); _network.setGwAddress();
_gwId = _mqttsnMsg[1]; _gwId = _mqttsnMsg[1];
_status = GW_CONNECTING; _status = GW_CONNECTING;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ){
_status = GW_SEND_WILLTOPIC; _status = GW_SEND_WILLTOPIC;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ){
_status = GW_SEND_WILLMSG; _status = GW_SEND_WILLMSG;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK){
if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){ if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){
_status = GW_CONNECTED; _status = GW_CONNECTED;
_connectRetry = MQTTSN_RETRY_COUNT; _connectRetry = MQTTSN_RETRY_COUNT;
setPingReqTimer(); setPingReqTimer();
if ( _tSleep ){ if ( _tSleep ){
_tSleep = 0; _tSleep = 0;
}else{ }else{
DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n"); DISPLAY("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
if ( _cleanSession || _initialized == 1 ) if ( _cleanSession || _initialized == 1 )
{ {
_topicTbl.clearTopic(); _topicTbl.clearTopic();
_initialized = 0; _initialized = 0;
theClient->onConnect(); // SUBSCRIBEs are conducted theClient->onConnect(); // SUBSCRIBEs are conducted
} }
} }
}else{ }else{
_status = GW_CONNECTING; _status = GW_CONNECTING;
} }
} }
return 1; return 1;
} }
void LGwProxy::reconnect(void){ void LGwProxy::reconnect(void){
D_MQTTLOG("...Gateway reconnect\r\n"); D_MQTTLOG("...Gateway reconnect\r\n");
_status = GW_DISCONNECTED; _status = GW_DISCONNECTED;
connect(); connect();
} }
void LGwProxy::disconnect(uint16_t secs){ void LGwProxy::disconnect(uint16_t secs){
_tSleep = secs; _tSleep = secs;
_tWake = 0; _tWake = 0;
_msg[1] = MQTTSN_TYPE_DISCONNECT; _msg[1] = MQTTSN_TYPE_DISCONNECT;
if (secs){ if (secs){
_msg[0] = 4; _msg[0] = 4;
setUint16((uint8_t*) _msg + 2, secs); setUint16((uint8_t*) _msg + 2, secs);
_status = GW_SLEEPING; _status = GW_SLEEPING;
}else{ }else{
_msg[0] = 2; _msg[0] = 2;
_keepAliveTimer.stop(); _keepAliveTimer.stop();
_status = GW_DISCONNECTING; _status = GW_DISCONNECTING;
} }
_retryCount = MQTTSN_RETRY_COUNT; _retryCount = MQTTSN_RETRY_COUNT;
writeMsg((const uint8_t*)_msg); writeMsg((const uint8_t*)_msg);
_sendUTC = time(NULL); _sendUTC = time(NULL);
while ( _status != GW_DISCONNECTED && _status != GW_SLEPT){ while ( _status != GW_DISCONNECTED && _status != GW_SLEPT){
if (getDisconnectResponce() < 0){ if (getDisconnectResponce() < 0){
_status = GW_LOST; _status = GW_LOST;
DISPLAY("\033[0m\033[0;31m\n\n!!!!!! DISCONNECT Error !!!!!\033[0m\033[0;37m \n\n"); DISPLAY("\033[0m\033[0;31m\n\n!!!!!! DISCONNECT Error !!!!!\033[0m\033[0;37m \n\n");
return; return;
} }
} }
} }
int LGwProxy::getDisconnectResponce(void){ int LGwProxy::getDisconnectResponce(void){
int len = readMsg(); int len = readMsg();
if (len == 0){ if (len == 0){
if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){ if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){
if (--_retryCount >= 0){ if (--_retryCount >= 0){
writeMsg((const uint8_t*)_msg); writeMsg((const uint8_t*)_msg);
_sendUTC = time(NULL); _sendUTC = time(NULL);
}else{ }else{
_status = GW_LOST; _status = GW_LOST;
_gwId = 0; _gwId = 0;
return -1; return -1;
} }
} }
return 0; return 0;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
if (_status == GW_SLEEPING ){ if (_status == GW_SLEEPING ){
_status = GW_SLEPT; _status = GW_SLEPT;
uint32_t remain = _keepAliveTimer.getRemain(); uint32_t remain = _keepAliveTimer.getRemain();
theClient->setSleepMode(remain); theClient->setSleepMode(remain);
/* Wake up and starts from this point. */ /* Wake up and starts from this point. */
}else{ }else{
_status = GW_DISCONNECTED; _status = GW_DISCONNECTED;
} }
} }
return 0; return 0;
} }
int LGwProxy::getMessage(void){ int LGwProxy::getMessage(void){
int len = readMsg(); int len = readMsg();
if (len < 0){ if (len < 0){
return len; //error return len; //error
} }
#ifdef DEBUG_MQTTSN #ifdef DEBUG_MQTTSN
if (len){ if (len){
D_MQTTLOG(" recved msgType %x\n", _mqttsnMsg[0]); D_MQTTLOG(" recved msgType %x\n", _mqttsnMsg[0]);
} }
#endif #endif
if (len == 0){ if (len == 0){
// Check PINGREQ required // Check PINGREQ required
checkPingReq(); checkPingReq();
// Check ADVERTISE valid // Check ADVERTISE valid
checkAdvertise(); checkAdvertise();
// Check Timeout of REGISTERs // Check Timeout of REGISTERs
_regMgr.checkTimeout(); _regMgr.checkTimeout();
// Check Timeout of PUBLISHes, // Check Timeout of PUBLISHes,
theClient->getPublishManager()->checkTimeout(); theClient->getPublishManager()->checkTimeout();
// Check Timeout of SUBSCRIBEs, // Check Timeout of SUBSCRIBEs,
theClient->getSubscribeManager()->checkTimeout(); theClient->getSubscribeManager()->checkTimeout();
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH){
theClient->getPublishManager()->published(_mqttsnMsg, len); theClient->getPublishManager()->published(_mqttsnMsg, len);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP || }else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP ||
_mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL ){ _mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL ){
theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t)len); theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t)len);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK){
theClient->getSubscribeManager()->responce(_mqttsnMsg); theClient->getSubscribeManager()->responce(_mqttsnMsg);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER){
_regMgr.responceRegister(_mqttsnMsg, len); _regMgr.responceRegister(_mqttsnMsg, len);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK){
_regMgr.responceRegAck(getUint16(_mqttsnMsg + 3), getUint16(_mqttsnMsg + 1)); _regMgr.responceRegAck(getUint16(_mqttsnMsg + 3), getUint16(_mqttsnMsg + 1));
@@ -301,26 +304,26 @@ int LGwProxy::getMessage(void){
setPingReqTimer(); setPingReqTimer();
if ( _tSleep > 0 ){ if ( _tSleep > 0 ){
_tWake += _tkeepAlive; _tWake += _tkeepAlive;
if ( _tWake < _tSleep ){ if ( _tWake < _tSleep ){
theClient->setSleepMode(_tkeepAlive * 1000UL); theClient->setSleepMode(_tkeepAlive * 1000UL);
}else{ }else{
DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n"); DISPLAY("\033[0m\033[0;32m\n\n Get back to ACTIVE.\033[0m\033[0;37m\n\n");
_tWake = 0; _tWake = 0;
connect(); connect();
} }
} }
} }
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
_status = GW_LOST; _status = GW_LOST;
_gwAliveTimer.stop(); _gwAliveTimer.stop();
_keepAliveTimer.stop(); _keepAliveTimer.stop();
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE){ }else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE){
if (getUint16((const uint8_t*)(_mqttsnMsg + 2)) < 61){ if (getUint16((const uint8_t*)(_mqttsnMsg + 2)) < 61){
_tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1500; _tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1500;
}else{ }else{
_tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1100; _tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1100;
} }
_gwAliveTimer.start(_tAdv); _gwAliveTimer.start(_tAdv);
} }
return 0; return 0;
@@ -330,165 +333,203 @@ int LGwProxy::getMessage(void){
uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId){ uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId){
uint16_t id = topicId; uint16_t id = topicId;
if (id == 0){ if (id == 0){
id = _topicTbl.getTopicId(topicName); id = _topicTbl.getTopicId(topicName);
_regMgr.registerTopic(topicName); _regMgr.registerTopic(topicName);
} }
return id; return id;
} }
int LGwProxy::writeMsg(const uint8_t* msg){ int LGwProxy::writeMsg(const uint8_t* msg){
uint16_t len; uint16_t len;
uint8_t pos; uint8_t pos;
uint8_t rc; uint8_t rc = 0;
if (msg[0] == 0x01){ if (msg[0] == 0x01){
len = getUint16(msg + 1); len = getUint16(msg + 1);
pos = 2; pos = 2;
}else{ }else{
len = msg[0]; len = msg[0];
pos = 1; pos = 1;
} }
if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW){ if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW){
rc = _network.broadcast(msg,len); rc = _network.broadcast(msg,len);
}else{ }else
rc = _network.unicast(msg,len); {
} if ( _isForwarderMode )
{
// create a forwarder encapsulation message WirelessNodeId is a 4bytes fake data
uint8_t* buf = (uint8_t*)malloc(len + 7);
buf[0] = 7;
buf[1] = MQTTSN_TYPE_ENCAPSULATED;
buf[2] = 1;
buf[3] = 'w';
buf[4] = 'n';
buf[5] = 'I';
buf[6] = 'd';
memcpy(buf + 7, msg, len);
if ( buf)
rc = _network.unicast(buf, len + 7);
free(buf);
DISPLAY(" Encapsulated\n ");
}else{
rc = _network.unicast(msg,len);
}
if (rc > 0){ if (rc > 0){
if ( msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP ) if ( msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP )
{ {
DISPLAY(" send %s\n", packet_names[msg[pos]]); DISPLAY(" send %s\n", packet_names[msg[pos]]);
} }
return rc; }
} }
//_status = GW_LOST; return rc;
//_gwId = 0;
return rc;
} }
void LGwProxy::writeGwMsg(void){ void LGwProxy::writeGwMsg(void){
_retryCount = MQTTSN_RETRY_COUNT; _retryCount = MQTTSN_RETRY_COUNT;
writeMsg((const uint8_t*)_msg); writeMsg((const uint8_t*)_msg);
_sendUTC = time(NULL); _sendUTC = time(NULL);
} }
int LGwProxy::readMsg(void){ int LGwProxy::readMsg(void){
int len = 0; int len = 0;
_mqttsnMsg = _network.getMessage(&len); uint8_t* msg = _network.getMessage(&len);
if (len == 0){ _mqttsnMsg = msg;
return 0;
}
if (_mqttsnMsg[0] == 0x01){ if (len == 0){
int msgLen = (int) getUint16((const uint8_t*)_mqttsnMsg + 1); return 0;
if (len != msgLen){ }
_mqttsnMsg += 3;
len = msgLen - 3; if (_mqttsnMsg[0] == 0x01){
} int msgLen = (int) getUint16((const uint8_t*)_mqttsnMsg + 1);
}else{ if (len != msgLen){
_mqttsnMsg += 1; _mqttsnMsg += 3;
len -= 1; len = msgLen - 3;
} }
if ( *_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP ) }else{
{ _mqttsnMsg += 1;
DISPLAY(" recv %s\n", packet_names[*_mqttsnMsg]); len -= 1;
} }
return len;
if ( *_mqttsnMsg == MQTTSN_TYPE_ENCAPSULATED )
{
int lenEncap = len + 1;
if (msg[lenEncap] == 0x01){
int msgLen = (int) getUint16((const uint8_t*)(msg + lenEncap + 1));
msg += (lenEncap + 3);
len = msgLen - 3;
}else{
msg += (lenEncap + 1);
len = *(msg - 1);
}
_mqttsnMsg = msg;
DISPLAY(" recv encapslated message\n" );
}
if ( *_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP )
{
DISPLAY(" recv %s\n", packet_names[*_mqttsnMsg]);
}
return len;
} }
void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain){ void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain){
_willTopic = willTopic; _willTopic = willTopic;
_retainWill = _qosWill = 0; _retainWill = _qosWill = 0;
if (qos == 1){ if (qos == 1){
_qosWill = MQTTSN_FLAG_QOS_1; _qosWill = MQTTSN_FLAG_QOS_1;
}else if (qos == 2){ }else if (qos == 2){
_qosWill = MQTTSN_FLAG_QOS_2; _qosWill = MQTTSN_FLAG_QOS_2;
} }
if (retain){ if (retain){
_retainWill = MQTTSN_FLAG_RETAIN; _retainWill = MQTTSN_FLAG_RETAIN;
} }
} }
void LGwProxy::setWillMsg(const char* willMsg){ void LGwProxy::setWillMsg(const char* willMsg){
_willMsg = willMsg; _willMsg = willMsg;
} }
void LGwProxy::setCleanSession(bool flg){ void LGwProxy::setCleanSession(bool flg){
if (flg){ if (flg){
_cleanSession = MQTTSN_FLAG_CLEAN; _cleanSession = MQTTSN_FLAG_CLEAN;
}else{ }else{
_cleanSession = 0; _cleanSession = 0;
} }
} }
uint16_t LGwProxy::getNextMsgId(void){ uint16_t LGwProxy::getNextMsgId(void){
_nextMsgId++; _nextMsgId++;
if (_nextMsgId == 0){ if (_nextMsgId == 0){
_nextMsgId = 1; _nextMsgId = 1;
} }
return _nextMsgId; return _nextMsgId;
} }
void LGwProxy::checkPingReq(void){ void LGwProxy::checkPingReq(void){
uint8_t msg[2]; uint8_t msg[2];
msg[0] = 0x02; msg[0] = 0x02;
msg[1] = MQTTSN_TYPE_PINGREQ; msg[1] = MQTTSN_TYPE_PINGREQ;
if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){ if ( (_status == GW_CONNECTED || _status == GW_SLEPT) && isPingReqRequired() && _pingStatus != GW_WAIT_PINGRESP){
_pingStatus = GW_WAIT_PINGRESP; _pingStatus = GW_WAIT_PINGRESP;
_pingRetryCount = MQTTSN_RETRY_COUNT; _pingRetryCount = MQTTSN_RETRY_COUNT;
writeMsg((const uint8_t*)msg); writeMsg((const uint8_t*)msg);
_pingSendUTC = time(NULL); _pingSendUTC = time(NULL);
}else if (_pingStatus == GW_WAIT_PINGRESP){ }else if (_pingStatus == GW_WAIT_PINGRESP){
if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL)){ if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL)){
if (--_pingRetryCount > 0){ if (--_pingRetryCount > 0){
writeMsg((const uint8_t*)msg); writeMsg((const uint8_t*)msg);
_pingSendUTC = time(NULL); _pingSendUTC = time(NULL);
}else{ }else{
_status = GW_LOST; _status = GW_LOST;
_gwId = 0; _gwId = 0;
_pingStatus = 0; _pingStatus = 0;
_keepAliveTimer.stop(); _keepAliveTimer.stop();
D_MQTTLOG(" !!! PINGREQ Timeout\n"); D_MQTTLOG(" !!! PINGREQ Timeout\n");
} }
} }
} }
} }
void LGwProxy::checkAdvertise(void){ void LGwProxy::checkAdvertise(void){
if ( _gwAliveTimer.isTimeUp()){ if ( _gwAliveTimer.isTimeUp()){
_status = GW_LOST; _status = GW_LOST;
_gwId = 0; _gwId = 0;
_pingStatus = 0; _pingStatus = 0;
_gwAliveTimer.stop(); _gwAliveTimer.stop();
_keepAliveTimer.stop(); _keepAliveTimer.stop();
D_MQTTLOG(" !!! ADVERTISE Timeout\n"); D_MQTTLOG(" !!! ADVERTISE Timeout\n");
} }
} }
LTopicTable* LGwProxy::getTopicTable(void){ LTopicTable* LGwProxy::getTopicTable(void){
return &_topicTbl; return &_topicTbl;
} }
LRegisterManager* LGwProxy::getRegisterManager(void){ LRegisterManager* LGwProxy::getRegisterManager(void){
return &_regMgr; return &_regMgr;
} }
bool LGwProxy::isPingReqRequired(void){ bool LGwProxy::isPingReqRequired(void){
return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL); return _keepAliveTimer.isTimeUp(_tkeepAlive * 1000UL);
} }
void LGwProxy::setPingReqTimer(void){ void LGwProxy::setPingReqTimer(void){
_keepAliveTimer.start(_tkeepAlive * 1000UL); _keepAliveTimer.start(_tkeepAlive * 1000UL);
} }
const char* LGwProxy::getClientId(void) { const char* LGwProxy::getClientId(void) {
return _clientId; return _clientId;
} }
void LGwProxy::setForwarderMode(void)
{
_isForwarderMode = true;
}

View File

@@ -51,63 +51,65 @@ namespace linuxAsyncClient {
=======================================*/ =======================================*/
class LGwProxy{ class LGwProxy{
public: public:
LGwProxy(); LGwProxy();
~LGwProxy(); ~LGwProxy();
void initialize(LUdpConfig netconf, LMqttsnConfig mqconf); void initialize(LUdpConfig netconf, LMqttsnConfig mqconf);
void connect(void); void connect(void);
void disconnect(uint16_t sec = 0); void disconnect(uint16_t sec = 0);
int getMessage(void); int getMessage(void);
uint16_t registerTopic(char* topic, uint16_t toipcId); uint16_t registerTopic(char* topic, uint16_t toipcId);
void setWillTopic(const char* willTopic, uint8_t qos, bool retain = false); void setWillTopic(const char* willTopic, uint8_t qos, bool retain = false);
void setWillMsg(const char* willMsg); void setWillMsg(const char* willMsg);
void setCleanSession(bool); void setCleanSession(bool);
void setKeepAliveDuration(uint16_t duration); void setKeepAliveDuration(uint16_t duration);
void setAdvertiseDuration(uint16_t duration); void setAdvertiseDuration(uint16_t duration);
void reconnect(void); void setForwarderMode(void);
int writeMsg(const uint8_t* msg); void reconnect(void);
void setPingReqTimer(void); int writeMsg(const uint8_t* msg);
uint16_t getNextMsgId(); void setPingReqTimer(void);
LTopicTable* getTopicTable(void); uint16_t getNextMsgId();
LRegisterManager* getRegisterManager(void); LTopicTable* getTopicTable(void);
const char* getClientId(void); LRegisterManager* getRegisterManager(void);
const char* getClientId(void);
private: private:
int readMsg(void); int readMsg(void);
void writeGwMsg(void); void writeGwMsg(void);
void checkPingReq(void); void checkPingReq(void);
void checkAdvertise(void); void checkAdvertise(void);
int getConnectResponce(void); int getConnectResponce(void);
int getDisconnectResponce(void); int getDisconnectResponce(void);
bool isPingReqRequired(void); bool isPingReqRequired(void);
LNetwork _network; LNetwork _network;
uint8_t* _mqttsnMsg; uint8_t* _mqttsnMsg;
uint16_t _nextMsgId; uint16_t _nextMsgId;
const char* _clientId; const char* _clientId;
const char* _willTopic; const char* _willTopic;
const char* _willMsg; const char* _willMsg;
uint8_t _cleanSession; uint8_t _cleanSession;
uint8_t _initialized; uint8_t _initialized;
uint8_t _retainWill; uint8_t _retainWill;
uint8_t _qosWill; uint8_t _qosWill;
uint8_t _gwId; uint8_t _gwId;
uint16_t _tkeepAlive; uint16_t _tkeepAlive;
uint32_t _tAdv; uint32_t _tAdv;
time_t _sendUTC; time_t _sendUTC;
int _retryCount; int _retryCount;
int _connectRetry; int _connectRetry;
uint8_t _status; uint8_t _status;
time_t _pingSendUTC; time_t _pingSendUTC;
uint8_t _pingRetryCount; uint8_t _pingRetryCount;
uint8_t _pingStatus; uint8_t _pingStatus;
LRegisterManager _regMgr; LRegisterManager _regMgr;
LTopicTable _topicTbl; LTopicTable _topicTbl;
LTimer _gwAliveTimer; LTimer _gwAliveTimer;
LTimer _keepAliveTimer; LTimer _keepAliveTimer;
uint16_t _tSleep; uint16_t _tSleep;
uint16_t _tWake; uint16_t _tWake;
char _msg[MQTTSN_MAX_MSG_LENGTH + 1]; bool _isForwarderMode;
char _msg[MQTTSN_MAX_MSG_LENGTH + 1];
}; };
} /* end of namespace */ } /* end of namespace */

View File

@@ -1,5 +1,5 @@
/************************************************************************************** /**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi * Copyright (c) 2016-2018, Tomoaki Yamaguchi
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
@@ -105,6 +105,7 @@ typedef enum
#define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0} #define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0}
#define UDPCONF LUdpConfig theNetcon #define UDPCONF LUdpConfig theNetcon
#define MQTTSNCONF LMqttsnConfig theMqcon #define MQTTSNCONF LMqttsnConfig theMqcon
#define SetForwarderMode theClient->getGwProxy()->setForwarderMode
#ifdef CLIENT_MODE #ifdef CLIENT_MODE
#define DISPLAY(...) #define DISPLAY(...)
#define PROMPT(...) #define PROMPT(...)
@@ -168,6 +169,7 @@ typedef enum
#define MQTTSN_TYPE_WILLTOPICRESP 0x1B #define MQTTSN_TYPE_WILLTOPICRESP 0x1B
#define MQTTSN_TYPE_WILLMSGUPD 0x1C #define MQTTSN_TYPE_WILLMSGUPD 0x1C
#define MQTTSN_TYPE_WILLMSGRESP 0x1D #define MQTTSN_TYPE_WILLMSGRESP 0x1D
#define MQTTSN_TYPE_ENCAPSULATED 0xFE
#define MQTTSN_TOPIC_TYPE 0x03 #define MQTTSN_TOPIC_TYPE 0x03

View File

@@ -76,12 +76,12 @@ uint8_t* LNetwork::getMessage(int* len){
}else{ }else{
*len = _rxDataBuf[0]; *len = _rxDataBuf[0];
} }
if(recvLen != *len){ //if(recvLen != *len){
*len = 0; // *len = 0;
return 0; // return 0;
}else{ //}else{
return _rxDataBuf; return _rxDataBuf;
} //}
} }
} }
return 0; return 0;

View File

@@ -10,6 +10,7 @@ TESTAPPL := mainTestProcess
CONFIG := gateway.conf CONFIG := gateway.conf
CLIENTS := clients.conf CLIENTS := clients.conf
PREDEFTOPIC := predefinedTopic.conf PREDEFTOPIC := predefinedTopic.conf
FORWARDERS := forwarders.conf
SRCDIR := src SRCDIR := src
SUBDIR := ../MQTTSNPacket/src SUBDIR := ../MQTTSNPacket/src
@@ -36,6 +37,8 @@ $(SRCDIR)/MQTTSNGWPacketHandleTask.cpp \
$(SRCDIR)/MQTTSNGWProcess.cpp \ $(SRCDIR)/MQTTSNGWProcess.cpp \
$(SRCDIR)/MQTTSNGWPublishHandler.cpp \ $(SRCDIR)/MQTTSNGWPublishHandler.cpp \
$(SRCDIR)/MQTTSNGWSubscribeHandler.cpp \ $(SRCDIR)/MQTTSNGWSubscribeHandler.cpp \
$(SRCDIR)/MQTTSNGWEncapsulatedPacket.cpp \
$(SRCDIR)/MQTTSNGWForwarder.cpp \
$(SRCDIR)/$(OS)/$(SENSORNET)/SensorNetwork.cpp \ $(SRCDIR)/$(OS)/$(SENSORNET)/SensorNetwork.cpp \
$(SRCDIR)/$(OS)/Timer.cpp \ $(SRCDIR)/$(OS)/Timer.cpp \
$(SRCDIR)/$(OS)/Network.cpp \ $(SRCDIR)/$(OS)/Network.cpp \
@@ -137,6 +140,8 @@ install:
cp -pf $(CONFIG) ../../ cp -pf $(CONFIG) ../../
cp -pf $(CLIENTS) ../../ cp -pf $(CLIENTS) ../../
cp -pf $(PREDEFTOPIC) ../../ cp -pf $(PREDEFTOPIC) ../../
cp -pf $(FORWARDERS) ../../
exectest: exectest:
./$(OUTDIR)/$(TESTPROGNAME) -f ./gateway.conf ./$(OUTDIR)/$(TESTPROGNAME) -f ./gateway.conf

View File

@@ -31,10 +31,13 @@ BrokerPortNo=1883
BrokerSecurePortNo=8883 BrokerSecurePortNo=8883
ClientAuthentication=NO ClientAuthentication=NO
#ClientsList=/path/to/your_clients.conf ClientsList=/path/to/your_clients.conf
PredefinedTopic=NO PredefinedTopic=NO
#PredefinedTopicFile=/path/to/your_predefinedTopic.conf PredefinedTopicList=/path/to/your_predefinedTopic.conf
Forwarder=NO
ForwardersList=/home/tomoaki/tmp/forwarders.conf
#RootCAfile=/path/to/your_Root_CA.crt #RootCAfile=/path/to/your_Root_CA.crt
#RootCApath=/path/to/your_certs_directory/ #RootCApath=/path/to/your_certs_directory/
@@ -64,7 +67,8 @@ Client should know the MulticastIP and MulticastPortNo to send a SEARCHGW messag
**GatewayId** is used by GWINFO message. **GatewayId** is used by GWINFO message.
**KeepAlive** is a duration of ADVERTISE message in seconds. **KeepAlive** is a duration of ADVERTISE message in seconds.
when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No. when **ClientAuthentication** is YES, see MQTTSNGWClient.cpp line53, clients file specified by ClientsList is required. This file defines connect allowed clients by ClientId and SensorNetwork Address. e.g. IP address and Port No.
When **PredefinedTopic** is YES, Pre-definedTopicID file specified by PredefinedTopicFile is effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format. When **PredefinedTopic** is YES, Pre-definedTopicID file specified by PredefinedTopicList is effective. This file defines Pre-definedTopics of the clients. In this file, ClientID,TopicName and TopicID are declared in CSV format.
When **Forwarder** is YES, Forwarder Encapsulation Message is available. Connectable Forwarders are specifed by ForwardersList file. In this file, ForwarderIds and those sensorNet addresses are declared in CSV format.

View File

@@ -0,0 +1,16 @@
#***********************************************************************
# Copyright (c) 2018, Tomoaki Yamaguchi
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#***********************************************************************
#
# SensorNetwork address format is defined by SensorNetAddress::setAddress(string* data) function.
#
Forwarder01,172.16.1.7:12002

View File

@@ -1,5 +1,5 @@
#************************************************************************** #**************************************************************************
# Copyright (c) 2016, Tomoaki Yamaguchi # Copyright (c) 2016-2018, Tomoaki Yamaguchi
# #
# All rights reserved. This program and the accompanying materials # All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0 # are made available under the terms of the Eclipse Public License v1.0
@@ -18,10 +18,13 @@ BrokerPortNo=1883
BrokerSecurePortNo=8883 BrokerSecurePortNo=8883
ClientAuthentication=NO ClientAuthentication=NO
#ClientsList=/path/to/your_clients.conf ClientsList=/path/to/your_clients.conf
PredefinedTopic=NO PredefinedTopic=NO
#PredefinedTopicFile=/path/to/your_predefinedTopic.conf PredefinedTopicList=/path/to/your_predefinedTopic.conf
Forwarder=NO
ForwardersList=/path/to/your_forwarers.conf
#RootCAfile=/etc/ssl/certs/ca-certificates.crt #RootCAfile=/etc/ssl/certs/ca-certificates.crt
#RootCApath=/etc/ssl/certs/ #RootCApath=/etc/ssl/certs/

View File

@@ -19,6 +19,7 @@
#include "MQTTSNGWClient.h" #include "MQTTSNGWClient.h"
#include "MQTTSNGateway.h" #include "MQTTSNGateway.h"
#include "SensorNetwork.h" #include "SensorNetwork.h"
#include "MQTTSNGWForwarder.h"
#include "Network.h" #include "Network.h"
#include <string> #include <string>
#include <string.h> #include <string.h>
@@ -75,7 +76,7 @@ bool ClientList::authorize(const char* fileName)
{ {
FILE* fp; FILE* fp;
char buf[MAX_CLIENTID_LENGTH + 256]; char buf[MAX_CLIENTID_LENGTH + 256];
size_t pos;; size_t pos;
bool secure; bool secure;
bool stable; bool stable;
SensorNetAddress netAddr; SensorNetAddress netAddr;
@@ -167,6 +168,11 @@ bool ClientList::setPredefinedTopics(const char* fileName)
fclose(fp); fclose(fp);
rc = true; rc = true;
} }
else
{
WRITELOG("Can not open the Predefined Topic List. %s\n", fileName);
return false;
}
return rc; return rc;
} }
@@ -196,6 +202,11 @@ void ClientList::erase(Client*& client)
_endClient = prev; _endClient = prev;
} }
_clientCnt--; _clientCnt--;
Forwarder* fwd = client->getForwarder();
if ( fwd )
{
fwd->eraseClient(client);
}
delete client; delete client;
client = 0; client = 0;
_mutex.unlock(); _mutex.unlock();
@@ -204,19 +215,22 @@ void ClientList::erase(Client*& client)
Client* ClientList::getClient(SensorNetAddress* addr) Client* ClientList::getClient(SensorNetAddress* addr)
{ {
_mutex.lock(); if ( addr )
Client* client = _firstClient; {
_mutex.lock();
Client* client = _firstClient;
while (client != 0) while (client != 0)
{ {
if (client->getSensorNetAddress()->isMatch(addr) ) if (client->getSensorNetAddress()->isMatch(addr) )
{ {
_mutex.unlock(); _mutex.unlock();
return client; return client;
} }
client = client->_nextClient; client = client->_nextClient;
} }
_mutex.unlock(); _mutex.unlock();
}
return 0; return 0;
} }
@@ -401,6 +415,7 @@ Client::Client(bool secure)
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH); _clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
_hasPredefTopic = false; _hasPredefTopic = false;
_holdPingRequest = false; _holdPingRequest = false;
_forwarder = 0;
} }
Client::~Client() Client::~Client()
@@ -519,6 +534,16 @@ void Client::setKeepAlive(MQTTSNPacket* packet)
} }
} }
void Client::setForwarder(Forwarder* forwarder)
{
_forwarder = forwarder;
}
Forwarder* Client::getForwarder(void)
{
return _forwarder;
}
void Client::setSessionStatus(bool status) void Client::setSessionStatus(bool status)
{ {
_sessionStatus = status; _sessionStatus = status;

View File

@@ -26,6 +26,8 @@
#include "Network.h" #include "Network.h"
#include "SensorNetwork.h" #include "SensorNetwork.h"
#include "MQTTSNPacket.h" #include "MQTTSNPacket.h"
#include "MQTTSNGWEncapsulatedPacket.h"
#include "MQTTSNGWForwarder.h"
namespace MQTTSNGW namespace MQTTSNGW
{ {
@@ -230,13 +232,13 @@ private:
/*===================================== /*=====================================
Class Client Class Client
=====================================*/ =====================================*/
#define MQTTSN_CLIENTID_LENGTH 23
typedef enum typedef enum
{ {
Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost Cstat_Disconnected = 0, Cstat_TryConnecting, Cstat_Connecting, Cstat_Active, Cstat_Asleep, Cstat_Awake, Cstat_Lost
} ClientStatus; } ClientStatus;
class Forwarder;
class Client class Client
{ {
@@ -281,6 +283,9 @@ public:
void setClientAddress(SensorNetAddress* sensorNetAddr); void setClientAddress(SensorNetAddress* sensorNetAddr);
void setSensorNetType(bool stable); void setSensorNetType(bool stable);
Forwarder* getForwarder(void);
void setForwarder(Forwarder* forwader);
void setClientId(MQTTSNString id); void setClientId(MQTTSNString id);
void setWillTopic(MQTTSNString willTopic); void setWillTopic(MQTTSNString willTopic);
void setWillMsg(MQTTSNString willmsg); void setWillMsg(MQTTSNString willmsg);
@@ -339,6 +344,9 @@ private:
bool _sensorNetype; // false: unstable network like a G3 bool _sensorNetype; // false: unstable network like a G3
SensorNetAddress _sensorNetAddr; SensorNetAddress _sensorNetAddr;
Forwarder* _forwarder;
bool _sessionStatus; bool _sessionStatus;
bool _hasPredefTopic; bool _hasPredefTopic;
@@ -373,5 +381,7 @@ private:
bool _authorize; bool _authorize;
}; };
} }
#endif /* MQTTSNGWCLIENT_H_ */ #endif /* MQTTSNGWCLIENT_H_ */

View File

@@ -16,7 +16,12 @@
#include "MQTTSNGWClientRecvTask.h" #include "MQTTSNGWClientRecvTask.h"
#include "MQTTSNGateway.h" #include "MQTTSNGateway.h"
#include <string.h> #include "MQTTSNPacket.h"
#include "MQTTSNGWForwarder.h"
#include "MQTTSNGWEncapsulatedPacket.h"
#include <cstring>
using namespace MQTTSNGW;
char* currentDateTime(void); char* currentDateTime(void);
/*===================================== /*=====================================
Class ClientRecvTask Class ClientRecvTask
@@ -55,8 +60,12 @@ void ClientRecvTask::run()
Client* client = 0; Client* client = 0;
char buf[128]; char buf[128];
while (true) while (true)
{ {
Forwarder* fwd = 0;
WirelessNodeId nodeId;
MQTTSNPacket* packet = new MQTTSNPacket(); MQTTSNPacket* packet = new MQTTSNPacket();
int packetLen = packet->recv(_sensorNetwork); int packetLen = packet->recv(_sensorNetwork);
@@ -89,8 +98,38 @@ void ClientRecvTask::run()
continue; continue;
} }
/* get client from the ClientList of Gateway by sensorNetAddress. */ if ( packet->getType() == MQTTSN_ENCAPSULATED )
client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress()); {
fwd = _gateway->getForwarderList()->getForwarder(_sensorNetwork->getSenderAddress());
if ( fwd == 0 )
{
log(0, packet);
WRITELOG("%s Forwarder %s is not authorized.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
delete packet;
continue;
}
else
{
MQTTSNString fwdName;
fwdName.lenstring.data = const_cast<char *>( fwd->getName() );
fwdName.lenstring.len = strlen(fwdName.lenstring.data);
log(0, packet, &fwdName);
MQTTSNGWEncapsulatedPacket encap;
encap.desirialize(packet->getPacketData(), packet->getPacketLength());
nodeId.setId( encap.getWirelessNodeId() );
client = fwd->getClient(&nodeId);
delete packet;
packet = encap.getMQTTSNPacket();
}
}
else
{
/* get client from the ClientList of Gateway by sensorNetAddress. */
client = _gateway->getClientList()->getClient(_sensorNetwork->getSenderAddress());
}
if ( client ) if ( client )
{ {
@@ -117,27 +156,40 @@ void ClientRecvTask::run()
client = _gateway->getClientList()->getClient(&data.clientID); client = _gateway->getClientList()->getClient(&data.clientID);
if ( client ) if ( fwd )
{ {
/* set SensorNet Address */ if ( client == 0 )
client->setClientAddress(_sensorNetwork->getSenderAddress()); {
/* create a new client */
client = _gateway->getClientList()->createClient(0, &data.clientID, false, false);
}
/* Add to af forwarded client list of forwarder. */
fwd->addClient(client, &nodeId);
} }
else else
{ {
/* create a client */ if ( client )
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false); {
/* Client exists. Set SensorNet Address of it. */
client->setClientAddress(_sensorNetwork->getSenderAddress());
}
else
{
/* create a new client */
client = _gateway->getClientList()->createClient(_sensorNetwork->getSenderAddress(), &data.clientID, false, false);
}
} }
log(client, packet, &data.clientID); log(client, packet, &data.clientID);
if (!client) if (!client)
{ {
WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER); WRITELOG("%s Client(%s) was rejected. CONNECT message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
delete packet; delete packet;
continue; continue;
} }
/* set sensorNetAddress & post Event */ /* post Client RecvEvent */
ev = new Event(); ev = new Event();
ev->setClientRecvEvent(client, packet); ev->setClientRecvEvent(client, packet);
_gateway->getPacketEventQue()->post(ev); _gateway->getPacketEventQue()->post(ev);
@@ -145,16 +197,18 @@ void ClientRecvTask::run()
else else
{ {
log(client, packet); log(client, packet);
delete packet; WRITELOG("%s Client(%s) is not connecting. message has been discarded.%s\n", ERRMSG_HEADER, _sensorNetwork->getSenderAddress()->sprint(buf), ERRMSG_FOOTER);
/* Send DISCONNECT */ delete packet;
SensorNetAddress* addr = new SensorNetAddress();
*addr = (*_sensorNetwork->getSenderAddress()); /* Send DISCONNECT */
packet = new MQTTSNPacket(); if ( fwd == 0 )
packet->setDISCONNECT(0); {
ev = new Event(); packet = new MQTTSNPacket();
ev->setClientSendEvent(addr, packet); packet->setDISCONNECT(0);
_gateway->getClientSendQue()->post(ev); ev = new Event();
continue; ev->setClientSendEvent(_sensorNetwork->getSenderAddress(), packet);
_gateway->getClientSendQue()->post(ev);
}
} }
} }
} }
@@ -211,6 +265,9 @@ void ClientRecvTask::log(Client* client, MQTTSNPacket* packet, MQTTSNString* id)
case MQTTSN_PUBCOMP: case MQTTSN_PUBCOMP:
WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf)); WRITELOG(FORMAT_G_MSGID_G_G, currentDateTime(), packet->getName(), packet->getMsgId(msgId), LEFTARROW, clientId, packet->print(pbuf));
break; break;
case MQTTSN_ENCAPSULATED:
WRITELOG(FORMAT_Y_G_G, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
break;
default: default:
WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf)); WRITELOG(FORMAT_W_NL, currentDateTime(), packet->getName(), LEFTARROW, clientId, packet->print(pbuf));
break; break;

View File

@@ -17,6 +17,7 @@
#include "MQTTSNGWPacket.h" #include "MQTTSNGWPacket.h"
#include "MQTTSNGWPacket.h" #include "MQTTSNGWPacket.h"
#include "MQTTSNGateway.h" #include "MQTTSNGateway.h"
#include "MQTTSNGWEncapsulatedPacket.h"
using namespace MQTTSNGW; using namespace MQTTSNGW;
using namespace std; using namespace std;
@@ -56,8 +57,22 @@ void ClientSendTask::run()
{ {
client = ev->getClient(); client = ev->getClient();
packet = ev->getMQTTSNPacket(); packet = ev->getMQTTSNPacket();
log(client, packet); Forwarder* fwd = client->getForwarder();
rc = packet->unicast(_sensorNetwork, client->getSensorNetAddress());
if ( fwd )
{
MQTTSNGWEncapsulatedPacket encap(packet);
WirelessNodeId* wnId = fwd->getWirelessNodeId(client);
encap.setWirelessNodeId(wnId);
log(fwd, &encap);
log(client, packet);
rc = encap.unicast(_sensorNetwork,fwd->getSensorNetAddr());
}
else
{
log(client, packet);
rc = packet->unicast(_sensorNetwork, client->getSensorNetAddress());
}
} }
else if (ev->getEventType() == EtBroadcast) else if (ev->getEventType() == EtBroadcast)
{ {
@@ -119,3 +134,10 @@ void ClientSendTask::log(Client* client, MQTTSNPacket* packet)
break; break;
} }
} }
void ClientSendTask::log(Forwarder* forwarder, MQTTSNGWEncapsulatedPacket* packet)
{
char pbuf[SIZE_OF_LOG_PACKET * 3];
const char* forwarderId = forwarder->getId();
WRITELOG(FORMAT_Y_W_G, currentDateTime(), packet->getName(), RIGHTARROW, forwarderId, packet->print(pbuf));
}

View File

@@ -35,6 +35,7 @@ public:
private: private:
void log(Client*, MQTTSNPacket*); void log(Client*, MQTTSNPacket*);
void log(Forwarder*, MQTTSNGWEncapsulatedPacket*);
Gateway* _gateway; Gateway* _gateway;
SensorNetwork* _sensorNetwork; SensorNetwork* _sensorNetwork;

View File

@@ -218,7 +218,6 @@ void MQTTSNConnectionHandler::handleWillmsg(Client* client, MQTTSNPacket* packet
void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet) void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* packet)
{ {
uint16_t duration = 0; uint16_t duration = 0;
Event* ev = new Event();
if ( packet->getDISCONNECT(&duration) != 0 ) if ( packet->getDISCONNECT(&duration) != 0 )
{ {
@@ -226,7 +225,7 @@ void MQTTSNConnectionHandler::handleDisconnect(Client* client, MQTTSNPacket* pac
{ {
MQTTGWPacket* mqMsg = new MQTTGWPacket(); MQTTGWPacket* mqMsg = new MQTTGWPacket();
mqMsg->setHeader(DISCONNECT); mqMsg->setHeader(DISCONNECT);
ev = new Event(); Event* ev = new Event();
ev->setBrokerSendEvent(client, mqMsg); ev->setBrokerSendEvent(client, mqMsg);
_gateway->getBrokerSendQue()->post(ev); _gateway->getBrokerSendQue()->post(ev);
} }
@@ -270,8 +269,6 @@ void MQTTSNConnectionHandler::handleWillmsgupd(Client* client, MQTTSNPacket* pac
*/ */
void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet) void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet)
{ {
MQTTGWPacket* msg = 0;
if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() ) if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() )
{ {
sendStoredPublish(client); sendStoredPublish(client);

View File

@@ -26,6 +26,7 @@ namespace MQTTSNGW
#define CONFIG_FILE "gateway.conf" #define CONFIG_FILE "gateway.conf"
#define CLIENT_LIST "clients.conf" #define CLIENT_LIST "clients.conf"
#define PREDEFINEDTOPIC_FILE "predefinedTopic.conf" #define PREDEFINEDTOPIC_FILE "predefinedTopic.conf"
#define FORWARDER_LIST "forwarders.conf"
/*========================================================== /*==========================================================
* Gateway default parameters * Gateway default parameters
@@ -41,7 +42,7 @@ namespace MQTTSNGW
#define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages #define MAX_INFLIGHTMESSAGES (10) // Number of inflight messages
#define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state #define MAX_SAVED_PUBLISH (20) // Max number of PUBLISH message for Asleep state
#define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256 #define MAX_TOPIC_PAR_CLIENT (50) // Max Topic count for a client. it should be less than 256
#define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen) #define MQTTSNGW_MAX_PACKET_SIZE (1024) // Max Packet size (5+2+TopicLen+PayloadLen + Foward Encapsulation)
#define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes #define SIZE_OF_LOG_PACKET (500) // Length of the packet log in bytes
/*================================= /*=================================

View File

@@ -0,0 +1,179 @@
/**************************************************************************************
* Copyright (c) 2018, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include "MQTTSNGWPacket.h"
#include "MQTTSNGWEncapsulatedPacket.h"
#include "MQTTSNPacket.h"
#include <string.h>
using namespace MQTTSNGW;
using namespace std;
WirelessNodeId::WirelessNodeId()
:
_len{0},
_nodeId{0}
{
}
WirelessNodeId::~WirelessNodeId()
{
if ( _nodeId )
{
free(_nodeId);
}
}
void WirelessNodeId::setId(uint8_t* id, uint8_t len)
{
if ( _nodeId )
{
free(_nodeId);
}
uint8_t* buf = (uint8_t*)malloc(len);
if ( buf )
{
memcpy(buf, id, len);
_len = len;
_nodeId = buf;
}
else
{
_nodeId = 0;
_len = 0;
}
}
void WirelessNodeId::setId(WirelessNodeId* id)
{
setId(id->_nodeId, id->_len);
}
bool WirelessNodeId::operator ==(WirelessNodeId& id)
{
if ( _len == id._len )
{
return memcmp(_nodeId, id._nodeId, _len) == 0;
}
else
{
return false;
}
}
/*
* Class MQTTSNGWEncapsulatedPacket
*/
MQTTSNGWEncapsulatedPacket::MQTTSNGWEncapsulatedPacket()
: _mqttsn{0},
_ctrl{0}
{
}
MQTTSNGWEncapsulatedPacket::MQTTSNGWEncapsulatedPacket(MQTTSNPacket* packet)
: _mqttsn{packet},
_ctrl{0}
{
}
MQTTSNGWEncapsulatedPacket::~MQTTSNGWEncapsulatedPacket()
{
/* Do not delete the MQTTSNPacket. MQTTSNPacket is deleted by delete Event */
}
int MQTTSNGWEncapsulatedPacket::unicast(SensorNetwork* network, SensorNetAddress* sendTo)
{
uint8_t buf[MQTTSNGW_MAX_PACKET_SIZE];
int len = serialize(buf);
return network->unicast(buf, len, sendTo);
}
int MQTTSNGWEncapsulatedPacket::serialize(uint8_t* buf)
{
int len = 0;
buf[0] = _id._len + 3;
buf[1] = MQTTSN_ENCAPSULATED;
buf[2] = _ctrl;
memcpy( buf + 3, _id._nodeId, _id._len);
if ( _mqttsn )
{
len = _mqttsn->getPacketLength();
memcpy(buf + buf[0], _mqttsn->getPacketData(), len);
}
return buf[0] + len;
}
int MQTTSNGWEncapsulatedPacket::desirialize(unsigned char* buf, unsigned short len)
{
if ( _mqttsn )
{
delete _mqttsn;
_mqttsn = 0;
}
_ctrl = buf[2];
_id.setId(buf + 3, buf[0] - 3);
_mqttsn = new MQTTSNPacket;
_mqttsn->desirialize(buf + buf[0], len - buf[0]);
return buf[0];
}
int MQTTSNGWEncapsulatedPacket::getType(void)
{
return MQTTSN_ENCAPSULATED;
}
const char* MQTTSNGWEncapsulatedPacket::getName()
{
return MQTTSNPacket_name(MQTTSN_ENCAPSULATED);
}
MQTTSNPacket* MQTTSNGWEncapsulatedPacket::getMQTTSNPacket(void)
{
return _mqttsn;
}
WirelessNodeId* MQTTSNGWEncapsulatedPacket::getWirelessNodeId(void)
{
return &_id;
}
void MQTTSNGWEncapsulatedPacket::setWirelessNodeId(WirelessNodeId* id)
{
_id.setId(id);
}
char* MQTTSNGWEncapsulatedPacket::print(char* pbuf)
{
char* ptr = pbuf;
char** pptr = &pbuf;
uint8_t buf[MQTTSNGW_MAX_PACKET_SIZE];
int len = serialize(buf);
int size = len > SIZE_OF_LOG_PACKET ? SIZE_OF_LOG_PACKET : len;
for (int i = 1; i < size; i++)
{
sprintf(*pptr, " %02X", *(buf + i));
*pptr += 3;
}
**pptr = 0;
return ptr;
}

View File

@@ -0,0 +1,65 @@
/**************************************************************************************
* Copyright (c) 2018, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef MQTTSNGATEWAY_SRC_MQTTSNGWENCAPSULATEDPACKET_H_
#define MQTTSNGATEWAY_SRC_MQTTSNGWENCAPSULATEDPACKET_H_
namespace MQTTSNGW
{
class WirelessNodeId
{
friend class MQTTSNGWEncapsulatedPacket;
public:
WirelessNodeId();
~WirelessNodeId();
void setId(uint8_t* id, uint8_t len);
void setId(WirelessNodeId* id);
bool operator ==(WirelessNodeId& id);
private:
uint8_t _len;
uint8_t* _nodeId;
};
class MQTTSNGWEncapsulatedPacket
{
public:
MQTTSNGWEncapsulatedPacket();
MQTTSNGWEncapsulatedPacket(MQTTSNPacket* packet);
~MQTTSNGWEncapsulatedPacket();
int unicast(SensorNetwork* network, SensorNetAddress* sendTo);
int serialize(uint8_t* buf);
int desirialize(unsigned char* buf, unsigned short len);
int getType(void);
unsigned char* getPacketData(void);
int getPacketLength(void);
const char* getName();
MQTTSNPacket* getMQTTSNPacket(void);
void setWirelessNodeId(WirelessNodeId* id);
WirelessNodeId* getWirelessNodeId(void);
char* print(char* buf);
private:
MQTTSNPacket* _mqttsn;
WirelessNodeId _id;
uint8_t _ctrl;
};
}
#endif /* MQTTSNGATEWAY_SRC_MQTTSNGWENCAPSULATEDPACKET_H_ */

View File

@@ -0,0 +1,321 @@
/**************************************************************************************
* Copyright (c) 2018, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <string.h>
#include "MQTTSNGWForwarder.h"
using namespace MQTTSNGW;
using namespace std;
/*
* Class ForwarderList
*/
ForwarderList::ForwarderList()
{
_head = 0;
}
ForwarderList::~ForwarderList()
{
if ( _head )
{
Forwarder* p = _head;
while ( p )
{
Forwarder* next = p->_next;
delete p;
p = next;
}
}
}
Forwarder* ForwarderList::getForwarder(SensorNetAddress* addr)
{
Forwarder* p = _head;
while ( p )
{
if ( p->_sensorNetAddr.isMatch(addr) )
{
break;
}
p = p->_next;
}
return p;
}
bool ForwarderList::setFowerder(const char* fileName)
{
FILE* fp;
char buf[MAX_CLIENTID_LENGTH + 256];
size_t pos;
SensorNetAddress netAddr;
if ((fp = fopen(fileName, "r")) != 0)
{
while (fgets(buf, MAX_CLIENTID_LENGTH + 254, fp) != 0)
{
if (*buf == '#')
{
continue;
}
string data = string(buf);
while ((pos = data.find_first_of("  \t\n")) != string::npos)
{
data.erase(pos, 1);
}
if (data.empty())
{
continue;
}
pos = data.find_first_of(",");
string id = data.substr(0, pos);
string addr = data.substr(pos + 1);
if (netAddr.setAddress(&addr) == 0)
{
addForwarder(&netAddr, &id);
}
else
{
WRITELOG("Invalid address %s\n", data.c_str());
return false;
}
}
fclose(fp);
}
else
{
WRITELOG("Can not open the forwarders List. %s\n", fileName);
return false;
}
return true;
}
Forwarder* ForwarderList::addForwarder(SensorNetAddress* addr, string* forwarderId)
{
Forwarder* fdr = new Forwarder(addr, forwarderId);
if ( _head == 0 )
{
_head = fdr;
}
else
{
Forwarder* p = _head;
while ( p )
{
if ( p->_next == 0 )
{
p->_next = fdr;
break;
}
else
{
p = p->_next;
}
}
}
return fdr;
}
Forwarder::Forwarder()
{
_headClient = 0;
_next = 0;
}
/*
* Class Forwarder
*/
Forwarder::Forwarder(SensorNetAddress* addr, string* forwarderId)
{
_forwarderName = *forwarderId;
_sensorNetAddr = *addr;
_headClient = 0;
_next = 0;
}
Forwarder::~Forwarder(void)
{
if ( _headClient )
{
ForwardedClient* p = _headClient;
while ( p )
{
ForwardedClient* next = p->_next;
delete p;
p = next;
}
}
}
const char* Forwarder::getId(void)
{
return _forwarderName.c_str();
}
void Forwarder::addClient(Client* client, WirelessNodeId* id)
{
ForwardedClient* p = _headClient;
ForwardedClient* prev = 0;
client->setForwarder(this);
if ( p != 0 )
{
while ( p )
{
if ( p->_client == client )
{
client->setForwarder(this);
return;
}
prev = p;
p = p->_next;
}
}
ForwardedClient* fclient = new ForwardedClient();
fclient->setClient(client);
fclient->setWirelessNodeId(id);
if ( prev )
{
prev->_next = fclient;
}
else
{
_headClient = fclient;
}
}
Client* Forwarder::getClient(WirelessNodeId* id)
{
Client* cl = 0;
_mutex.lock();
ForwardedClient* p = _headClient;
while ( p )
{
if ( *(p->_wirelessNodeId) == *id )
{
cl = p->_client;
break;
}
else
{
p = p->_next;
}
}
_mutex.unlock();
return cl;
}
const char* Forwarder::getName(void)
{
return _forwarderName.c_str();
}
WirelessNodeId* Forwarder::getWirelessNodeId(Client* client)
{
WirelessNodeId* nodeId = 0;
_mutex.lock();
ForwardedClient* p = _headClient;
while ( p )
{
if ( p->_client == client )
{
nodeId = p->_wirelessNodeId;
break;
}
else
{
p = p->_next;
}
}
_mutex.unlock();
return nodeId;
}
void Forwarder::eraseClient(Client* client)
{
ForwardedClient* prev = 0;
_mutex.lock();
ForwardedClient* p = _headClient;
while ( p )
{
if ( p->_client == client )
{
if ( prev )
{
prev->_next = p->_next;
}
else
{
_headClient = p->_next;
}
delete p;
break;
}
else
{
p = p->_next;
}
}
_mutex.unlock();
}
SensorNetAddress* Forwarder::getSensorNetAddr(void)
{
return &_sensorNetAddr;
}
/*
* Class ForwardedClient
*/
ForwardedClient::ForwardedClient()
: _client{0}
, _wirelessNodeId{0}
, _next{0}
{
}
ForwardedClient::~ForwardedClient()
{
if (_wirelessNodeId)
{
delete _wirelessNodeId;
}
}
void ForwardedClient::setClient(Client* client)
{
_client = client;
}
void ForwardedClient::setWirelessNodeId(WirelessNodeId* id)
{
if ( _wirelessNodeId == 0 )
{
_wirelessNodeId = new WirelessNodeId();
}
_wirelessNodeId->setId(id);
}

View File

@@ -0,0 +1,88 @@
/**************************************************************************************
* Copyright (c) 2018, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef MQTTSNGATEWAY_SRC_MQTTSNGWFORWARDER_H_
#define MQTTSNGATEWAY_SRC_MQTTSNGWFORWARDER_H_
#include "MQTTSNGWClient.h"
#include "MQTTSNGWEncapsulatedPacket.h"
#include "SensorNetwork.h"
namespace MQTTSNGW
{
class Client;
class WirelessNodeId;
class ForwardedClient
{
friend class Forwarder;
public:
ForwardedClient();
~ForwardedClient();
void setClient(Client* client);
void setWirelessNodeId(WirelessNodeId* id);
private:
Client* _client;
WirelessNodeId* _wirelessNodeId;
ForwardedClient* _next;
};
class Forwarder
{
friend class ForwarderList;
public:
Forwarder();
Forwarder(SensorNetAddress* addr, string* forwarderId);
~Forwarder();
const char* getId(void);
void addClient(Client* client, WirelessNodeId* id);
Client* getClient(WirelessNodeId* id);
WirelessNodeId* getWirelessNodeId(Client* client);
void eraseClient(Client* client);
SensorNetAddress* getSensorNetAddr(void);
const char* getName(void);
private:
string _forwarderName;
SensorNetAddress _sensorNetAddr;
ForwardedClient* _headClient;
Forwarder* _next;
Mutex _mutex;
};
class ForwarderList
{
public:
ForwarderList();
~ForwarderList();
Forwarder* getForwarder(SensorNetAddress* addr);
bool setFowerder(const char* fileName);
Forwarder* addForwarder(SensorNetAddress* addr, string* forwarderId);
private:
Forwarder* _head;
};
}
#endif /* MQTTSNGATEWAY_SRC_MQTTSNGWFORWARDER_H_ */

View File

@@ -17,6 +17,6 @@
#ifndef MQTTSNGWVERSION_H_IN_ #ifndef MQTTSNGWVERSION_H_IN_
#define MQTTSNGWVERSION_H_IN_ #define MQTTSNGWVERSION_H_IN_
#define PAHO_GATEWAY_VERSION "1.1.0" #define PAHO_GATEWAY_VERSION "1.2.0"
#endif /* MQTTSNGWVERSION_H_IN_ */ #endif /* MQTTSNGWVERSION_H_IN_ */

View File

@@ -46,6 +46,8 @@ Gateway::Gateway()
_params.privateKey = 0; _params.privateKey = 0;
_params.clientListName = 0; _params.clientListName = 0;
_params.configName = 0; _params.configName = 0;
_params.predefinedTopicFileName = 0;
_params.forwarderListName = 0;
_packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS); _packetEventQue.setMaxSize(MAX_INFLIGHTMESSAGES * MAX_CLIENTS);
} }
@@ -99,6 +101,14 @@ Gateway::~Gateway()
{ {
free(_params.configName); free(_params.configName);
} }
if ( _params.predefinedTopicFileName )
{
free(_params.predefinedTopicFileName);
}
if ( _params.forwarderListName )
{
free(_params.forwarderListName);
}
} }
void Gateway::initialize(int argc, char** argv) void Gateway::initialize(int argc, char** argv)
@@ -214,7 +224,7 @@ void Gateway::initialize(int argc, char** argv)
{ {
if (!strcasecmp(param, "YES") ) if (!strcasecmp(param, "YES") )
{ {
if (getParam("PredefinedTopicFile", param) == 0) if (getParam("PredefinedTopicList", param) == 0)
{ {
fileName = string(param); fileName = string(param);
} }
@@ -234,6 +244,30 @@ void Gateway::initialize(int argc, char** argv)
} }
} }
if (getParam("Forwarder", param) == 0 )
{
if (!strcasecmp(param, "YES") )
{
if (getParam("ForwardersList", param) == 0)
{
fileName = string(param);
}
else
{
fileName = *getConfigDirName() + string(FORWARDER_LIST);
}
if ( !_forwarderList.setFowerder(fileName.c_str()) )
{
throw Exception("Gateway::initialize: No ForwardersList file defined by the configuration..");
}
_params.forwarderListName = strdup(fileName.c_str());
}
else
{
_params.forwarderListName = 0;
}
}
fileName = *getConfigDirName() + *getConfigFileName(); fileName = *getConfigDirName() + *getConfigFileName();
_params.configName = strdup(fileName.c_str()); _params.configName = strdup(fileName.c_str());
} }
@@ -254,12 +288,17 @@ void Gateway::run(void)
{ {
WRITELOG(" ClientList: %s\n", _params.clientListName); WRITELOG(" ClientList: %s\n", _params.clientListName);
} }
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
if ( _params.predefinedTopicFileName ) if ( _params.predefinedTopicFileName )
{ {
WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName); WRITELOG(" PreDefFile: %s\n", _params.predefinedTopicFileName);
} }
if ( _params.forwarderListName )
{
WRITELOG(" Forwarders: %s\n", _params.forwarderListName);
}
WRITELOG(" SensorN/W: %s\n", _sensorNetwork.getDescription());
WRITELOG(" Broker: %s : %s, %s\n", _params.brokerName, _params.port, _params.portSecure);
WRITELOG(" RootCApath: %s\n", _params.rootCApath); WRITELOG(" RootCApath: %s\n", _params.rootCApath);
WRITELOG(" RootCAfile: %s\n", _params.rootCAfile); WRITELOG(" RootCAfile: %s\n", _params.rootCAfile);
WRITELOG(" CertKey: %s\n", _params.certKey); WRITELOG(" CertKey: %s\n", _params.certKey);
@@ -305,6 +344,11 @@ ClientList* Gateway::getClientList()
return &_clientList; return &_clientList;
} }
ForwarderList* Gateway::getForwarderList(void)
{
return &_forwarderList;
}
SensorNetwork* Gateway::getSensorNetwork() SensorNetwork* Gateway::getSensorNetwork()
{ {
return &_sensorNetwork; return &_sensorNetwork;

View File

@@ -17,6 +17,7 @@
#define MQTTSNGATEWAY_H_ #define MQTTSNGATEWAY_H_
#include "MQTTSNGWClient.h" #include "MQTTSNGWClient.h"
#include "MQTTSNGWForwarder.h"
#include "MQTTSNGWProcess.h" #include "MQTTSNGWProcess.h"
#include "MQTTSNPacket.h" #include "MQTTSNPacket.h"
@@ -42,6 +43,7 @@ namespace MQTTSNGW
#define CLIENT "Client" #define CLIENT "Client"
#define CLIENTS "Clients" #define CLIENTS "Clients"
#define UNKNOWNCL "Unknown Client !" #define UNKNOWNCL "Unknown Client !"
#define LEFTARROW "<---" #define LEFTARROW "<---"
#define RIGHTARROW "--->" #define RIGHTARROW "--->"
#define LEFTARROWB "<===" #define LEFTARROWB "<==="
@@ -69,14 +71,6 @@ namespace MQTTSNGW
#define ERRMSG_HEADER "\033[0m\033[0;31mError:" #define ERRMSG_HEADER "\033[0m\033[0;31mError:"
#define ERRMSG_FOOTER "\033[0m\033[0;37m" #define ERRMSG_FOOTER "\033[0m\033[0;37m"
/*=====================================
Predefined TopicId for OTA
====================================*/
//#define OTA_CLIENTS
//#define PREDEFINEDID_OTA_REQ (0x0ff0)
//#define PREDEFINEDID_OTA_READY (0x0ff1)
//#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
/*===================================== /*=====================================
Class Event Class Event
====================================*/ ====================================*/
@@ -161,6 +155,7 @@ typedef struct
char* certKey; char* certKey;
char* privateKey; char* privateKey;
char* predefinedTopicFileName; char* predefinedTopicFileName;
char* forwarderListName;
}GatewayParams; }GatewayParams;
/*===================================== /*=====================================
@@ -177,12 +172,14 @@ public:
EventQue* getClientSendQue(void); EventQue* getClientSendQue(void);
EventQue* getBrokerSendQue(void); EventQue* getBrokerSendQue(void);
ClientList* getClientList(void); ClientList* getClientList(void);
ForwarderList* getForwarderList(void);
SensorNetwork* getSensorNetwork(void); SensorNetwork* getSensorNetwork(void);
LightIndicator* getLightIndicator(void); LightIndicator* getLightIndicator(void);
GatewayParams* getGWParams(void); GatewayParams* getGWParams(void);
private: private:
ClientList _clientList; ClientList _clientList;
ForwarderList _forwarderList;
EventQue _packetEventQue; EventQue _packetEventQue;
EventQue _brokerSendQue; EventQue _brokerSendQue;
EventQue _clientSendQue; EventQue _clientSendQue;

View File

@@ -93,10 +93,10 @@ void TestProcess::run(void)
delete tque; delete tque;
/* Test Tree23 */ /* Test Tree23 */
printf("Test Tree23 "); //printf("Test Tree23 ");
TestTree23* tree23 = new TestTree23(); //TestTree23* tree23 = new TestTree23();
tree23->test(); //tree23->test();
delete tree23; //delete tree23;
/* Test TopicTable */ /* Test TopicTable */
printf("Test Topic "); printf("Test Topic ");
@@ -111,6 +111,7 @@ void TestProcess::run(void)
delete testMap; delete testMap;
/* Test EventQue */ /* Test EventQue */
/*
printf("Test EventQue "); printf("Test EventQue ");
Client* client = new Client(); Client* client = new Client();
_evQue.setMaxSize(EVENT_CNT); _evQue.setMaxSize(EVENT_CNT);
@@ -122,6 +123,7 @@ void TestProcess::run(void)
ev->setClientSendEvent(client, packet); ev->setClientSendEvent(client, packet);
_evQue.post(ev); _evQue.post(ev);
} }
delete client;
*/
//MultiTaskProcess::run(); //MultiTaskProcess::run();
} }

View File

@@ -19,7 +19,7 @@
using namespace MQTTSNGW; using namespace MQTTSNGW;
TestProcess* test = new TestProcess(); TestProcess* test = new TestProcess();
TestTask* task = new TestTask(test); //TestTask* task = new TestTask(test);
int main(int argc, char** argv) int main(int argc, char** argv)
{ {

View File

@@ -29,6 +29,7 @@ static const char* packet_names[] =
"WILLMSGRESP" "WILLMSGRESP"
}; };
static const char* encapsulation_packet_name = "ENCAPSULATED";
/** /**
* Returns a character string representing the packet name given a MsgType code * Returns a character string representing the packet name given a MsgType code
@@ -37,6 +38,10 @@ static const char* packet_names[] =
*/ */
const char* MQTTSNPacket_name(int code) const char* MQTTSNPacket_name(int code)
{ {
if ( code == MQTTSN_ENCAPSULATED )
{
return encapsulation_packet_name;
}
return (code >= 0 && code <= MQTTSN_WILLMSGRESP) ? packet_names[code] : "UNKNOWN"; return (code >= 0 && code <= MQTTSN_WILLMSGRESP) ? packet_names[code] : "UNKNOWN";
} }

View File

@@ -26,7 +26,7 @@ enum errors
{ {
MQTTSNPACKET_BUFFER_TOO_SHORT = -2, MQTTSNPACKET_BUFFER_TOO_SHORT = -2,
MQTTSNPACKET_READ_ERROR = -1, MQTTSNPACKET_READ_ERROR = -1,
MQTTSNPACKET_READ_COMPLETE, MQTTSNPACKET_READ_COMPLETE
}; };
#define MQTTSN_PROTOCOL_VERSION 0x01 #define MQTTSN_PROTOCOL_VERSION 0x01
@@ -43,12 +43,12 @@ typedef enum
{ {
MQTTSN_TOPIC_TYPE_NORMAL, /* topic id in publish, topic name in subscribe */ MQTTSN_TOPIC_TYPE_NORMAL, /* topic id in publish, topic name in subscribe */
MQTTSN_TOPIC_TYPE_PREDEFINED, MQTTSN_TOPIC_TYPE_PREDEFINED,
MQTTSN_TOPIC_TYPE_SHORT, MQTTSN_TOPIC_TYPE_SHORT
}MQTTSN_topicTypes; }MQTTSN_topicTypes;
enum MQTTSN_msgTypes enum MQTTSN_msgTypes
{ {
MQTTSN_ADVERTISE, MQTTSN_SEARCHGW, MQTTSN_GWINFO, MQTTSN_RESERVED1, MQTTSN_ADVERTISE, MQTTSN_SEARCHGW, MQTTSN_GWINFO, MQTTSN_RESERVED1,
MQTTSN_CONNECT, MQTTSN_CONNACK, MQTTSN_CONNECT, MQTTSN_CONNACK,
MQTTSN_WILLTOPICREQ, MQTTSN_WILLTOPIC, MQTTSN_WILLMSGREQ, MQTTSN_WILLMSG, MQTTSN_WILLTOPICREQ, MQTTSN_WILLTOPIC, MQTTSN_WILLMSGREQ, MQTTSN_WILLMSG,
MQTTSN_REGISTER, MQTTSN_REGACK, MQTTSN_REGISTER, MQTTSN_REGACK,
@@ -57,8 +57,10 @@ enum MQTTSN_msgTypes
MQTTSN_PINGREQ, MQTTSN_PINGRESP, MQTTSN_PINGREQ, MQTTSN_PINGRESP,
MQTTSN_DISCONNECT, MQTTSN_RESERVED3, MQTTSN_DISCONNECT, MQTTSN_RESERVED3,
MQTTSN_WILLTOPICUPD, MQTTSN_WILLTOPICRESP, MQTTSN_WILLMSGUPD, MQTTSN_WILLMSGRESP, MQTTSN_WILLTOPICUPD, MQTTSN_WILLTOPICRESP, MQTTSN_WILLMSGUPD, MQTTSN_WILLMSGRESP,
MQTTSN_ENCAPSULATED = 0xfe
}; };
typedef struct typedef struct
{ {
MQTTSN_topicTypes type; MQTTSN_topicTypes type;