Update & BugFix Add GatewayTester

Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2016-08-28 20:44:40 +09:00
parent bb1455f528
commit 5019ead91d
59 changed files with 5392 additions and 231 deletions

View File

@@ -0,0 +1,69 @@
PROGNAME := MQTT-SNGatewayTester
APPL := mainTest
SRCDIR := samples
SUBDIR := src
CPPSRCS := \
$(SRCDIR)/$(APPL).cpp \
$(SUBDIR)/LGwProxy.cpp \
$(SUBDIR)/LMqttsnClient.cpp \
$(SUBDIR)/LNetworkUdp.cpp \
$(SUBDIR)/LPublishManager.cpp \
$(SUBDIR)/LRegisterManager.cpp \
$(SUBDIR)/LSubscribeManager.cpp \
$(SUBDIR)/LTaskManager.cpp \
$(SUBDIR)/LTimer.cpp \
$(SUBDIR)/LTopicTable.cpp \
$(SUBDIR)/LScreen.cpp \
$(SUBDIR)/Payload.cpp \
$(SUBDIR)/Util.cpp \
CXX := g++
CPPFLAGS +=
INCLUDES += -I$(SUBDIR)
DEFS :=
LIBS +=
LDFLAGS :=
CXXFLAGS := -Wall -O3 -std=c++11
LDADD :=
OUTDIR := Build
PROG := $(OUTDIR)/$(PROGNAME)
OBJS := $(CPPSRCS:%.cpp=$(OUTDIR)/%.o)
DEPS := $(CPPSRCS:%.cpp=$(OUTDIR)/%.d)
.PHONY: install clean
all: $(PROG)
-include $(DEPS)
$(PROG): $(OBJS) $(OUTDIR)/$(SRCDIR)/$(APPL).o
$(CXX) $(LDFLAGS) -o $@ $^ $(LIBS) $(LDADD)
$(OUTDIR)/$(SUBDIR)/%.o:$(SUBDIR)/%.cpp
@if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $<
$(OUTDIR)/$(SRCDIR)/%.o:$(SRCDIR)/%.cpp
@if [ ! -e `dirname $@` ]; then mkdir -p `dirname $@`; fi
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $(INCLUDES) $(DEFS) -o $@ -c -MMD -MP -MF $(@:%.o=%.d) $<
clean:
rm -rf $(OUTDIR)
install:
cp -pf $(PROG) ../../../

View File

@@ -0,0 +1,119 @@
###Gateway Test Program.
**sample/mainTest.cpp** is a Test sample coading.
Each test is described as one function. test1(), test2()...
````
/*------------------------------------------------------
* Test functions
*
* you can use 4 commands in Test functions
*
* 1) PUBLISH(const char* topicName,
* uint8_t* payload,
* uint16_t len,
* uint8_t qos,
* bool retain = false);
*
* 2) SUBSCRIBE(const char* topicName,
* TopicCallback onPublish,
* uint8_t qos);
*
* 3) UNSUBSCRIBE(const char* topicName);
*
* 4) DISCONNECT(uint16_t sleepInSecs);
*
*------------------------------------------------------*/
void test1(void)
{
char payload[300];
sprintf(payload, "ESP8266-08b133 ");
uint8_t qos = 0;
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), qos);
}
void test2(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_publish02, qos);
}
````
**TEST_LIST** is a test senario. Test functions are executed one by one.
````
/*------------------------------------------------------
* A List of Test Tasks
*------------------------------------------------------*/
TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Publish topic1", test1),
TEST("Subscribe topic2", test2),
TEST("Publish topic2", test3),
TEST("Unsubscribe topic2", test4),
TEST("Publish topic2", test3),
TEST("Disconnect", test5),
END_OF_TEST_LIST
};
````
### **step1. Build **
````
$ git clone -b gateway https://github.com/eclipse/paho.mqtt-sn.embedded-c
$ cd paho.mqtt-sn.embedded-c/MQTTSNGateway/GatewayTester
$ make
$ make install
$ make clean
```
MQTT-SNGatewayTester program is copied into ../../../ directory.
### **step2. Execute Gateway Tester.**
````
$ cd ../../..
$ ./MQTT-SNGatewayTester
***************************************************************************
* MQTT-SN Gateway Tester
* Part of Project Paho in Eclipse
* (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt-sn.embedded-c.git/)
*
* Author : Tomoaki YAMAGUCHI
* Version: 0.0.0
***************************************************************************
Attempting to Connect the Broker.....
sendto 225.1.1.1 :1883 03 01 00
recved 192.168.11.5 :1883 03 01 00
sendto 225.1.1.1 :1883 03 01 00
recved 192.168.11.5 :1883 03 01 00
recved 192.168.11.17 :10000 03 02 01
sendto 192.168.11.17 :10000 13 04 0c 01 03 84 47 61 74 65 77 61 79 54 65 73 74 65 72
recved 192.168.11.17 :10000 02 06
sendto 192.168.11.17 :10000 0c 07 00 77 69 6c 6c 54 6f 70 69 63
recved 192.168.11.17 :10000 02 08
sendto 192.168.11.17 :10000 0d 09 77 69 6c 6c 4d 65 73 73 61 67 65
recved 192.168.11.17 :10000 03 05 00
Connected to the Broker
Attempting OnConnect.....
sendto 192.168.11.17 :10000 13 12 20 00 01 74 79 34 74 77 2f 63 6c 69 65 6e 74 49 64
recved 192.168.11.17 :10000 08 13 20 00 01 00 01 00
SUBSCRIBE complete. ty4tw/clientId
OnConnect complete
Test Ready.
Execute Publish topic1 Test ? ( Y/N ) :
````

View File

@@ -0,0 +1,162 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation
**************************************************************************************/
#include "LMqttsnClientApp.h"
#include "LMqttsnClient.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern LMqttsnClient* theClient;
extern LScreen* theScreen;
extern int run(void);
/*
* MQTT-SN Functions supported :
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
* void ASSERT( format, valiables, .....); <== instead of printf()
*
*/
/*------------------------------------------------------
* UDP Configuration
*------------------------------------------------------*/
UDPCONF = {
"GatewayTester", // ClientId
{225,1,1,1}, // Multicast group IP
1883, // Multicast group Port
20000, // Local PortNo
};
/*------------------------------------------------------
* Client Configuration
*------------------------------------------------------*/
MQTTSNCONF = {
60, //KeepAlive (seconds)
true, //Clean session
0, //Sleep duration in msecs
"willTopic", //WillTopic
"willMessage", //WillMessage
0, //WillQos
false //WillRetain
};
/*------------------------------------------------------
* Define Topics
*------------------------------------------------------*/
const char* topic1 = "ty4tw/clientId";
/*------------------------------------------------------
* Callback routines for Subscribed Topics
*------------------------------------------------------*/
int on_publish01(uint8_t* pload, uint16_t ploadlen)
{
return 0;
}
/*------------------------------------------------------
* A Link list of Callback routines and Topics
*------------------------------------------------------*/
SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
//SUB(topic1, on_publish01, 1),
END_OF_SUBSCRIBE_LIST
};
/*------------------------------------------------------
* Test functions
*------------------------------------------------------*/
void test1(void)
{
char payload[300];
sprintf(payload, "Client-01 ");
uint8_t qos = 0;
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), qos);
}
void test2(void)
{
}
void test3(void)
{
}
void test4(void)
{
}
void test5(void)
{
}
/*------------------------------------------------------
* A List of Test functions
*------------------------------------------------------*/
TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Publish topic1", test1),
END_OF_TEST_LIST
};
/*------------------------------------------------------
* unused for Test
*------------------------------------------------------*/
TASK_LIST = {// e.g. TASK( task, executing duration in second),
TASK(test1, 4),
END_OF_TASK_LIST
};
/*------------------------------------------------------
* Initialize function
*------------------------------------------------------*/
void setup(void)
{
}
/*======================================================
* main
*======================================================*/
/* uncomment this
int main(int argc, char** argv)
{
return run();
}
*/

View File

@@ -0,0 +1,208 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation
**************************************************************************************/
#include "LMqttsnClientApp.h"
#include "LMqttsnClient.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern LMqttsnClient* theClient;
extern LScreen* theScreen;
extern int run(void);
/*
* Functions supported.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
* void ASSERT( format, .....); <== instead of printf()
*
*/
/*------------------------------------------------------
* UDP Configuration
*------------------------------------------------------*/
UDPCONF = {
"GatewayTester", // ClientId
{225,1,1,1}, // Multicast group IP
1883, // Multicast group Port
20001, // Local PortNo
};
/*------------------------------------------------------
* Client Configuration
*------------------------------------------------------*/
MQTTSNCONF = {
300, //KeepAlive (seconds)
true, //Clean session
0, //Sleep duration in msecs
//"willTopic", //WillTopic
//"willMessage", //WillMessage
"",
"",
0, //WillQos
false //WillRetain
};
/*------------------------------------------------------
* Define Topics
*------------------------------------------------------*/
const char* topic1 = "ty4tw/topic1";
const char* topic2 = "ty4tw/topic2";
const char* topic3 = "ty4tw/topic3";
/*------------------------------------------------------
* Callback routines for Subscribed Topics
*------------------------------------------------------*/
int on_Topic01(uint8_t* pload, uint16_t ploadlen)
{
ASSERT("\n\nTopic1 recv.\n");
char c = pload[ploadlen-1];
pload[ploadlen-1]= 0; // set null terminator
ASSERT("Payload -->%s%c<--\n\n",pload, c);
return 0;
}
int on_Topic02(uint8_t* pload, uint16_t ploadlen)
{
ASSERT("\n\nTopic2 recv.\n");
pload[ploadlen-1]= 0; // set null terminator
ASSERT("Payload -->%s <--\n\n",pload);
return 0;
}
int on_Topic03(uint8_t* pload, uint16_t ploadlen)
{
ASSERT("\n\nNew callback recv Topic2\n");
pload[ploadlen-1]= 0; // set null terminator
ASSERT("Payload -->%s <--\n\n",pload);
return 0;
}
/*------------------------------------------------------
* A Link list of Callback routines and Topics
*------------------------------------------------------*/
SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
SUB(topic1, on_Topic01, 1),
END_OF_SUBSCRIBE_LIST
};
/*------------------------------------------------------
* Test functions
*------------------------------------------------------*/
void publishTopic1(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/Topic1\" \n");
uint8_t qos = 0;
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), qos);
}
void subscribeTopic2(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_Topic02, qos);
}
void publishTopic2(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/topic2\" \n");
uint8_t qos = 0;
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos);
}
void unsubscribe(void)
{
UNSUBSCRIBE(topic2);
}
void subscribechangeCallback(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_Topic03, qos);
}
void test3(void)
{
char payload[300];
sprintf(payload, "TEST3 ");
uint8_t qos = 0;
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos);
}
void disconnect(void)
{
DISCONNECT(0);
}
/*------------------------------------------------------
* A List of Test functions
*------------------------------------------------------*/
TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step1:Publish topic1", publishTopic1),
TEST("Step2:Publish topic2", publishTopic2),
TEST("Step3:Subscribe topic2", subscribeTopic2),
TEST("Step4:Publish topic2", publishTopic2),
TEST("Step5:Unsubscribe topic2", unsubscribe),
TEST("Step6:Publish topic2", publishTopic2),
TEST("Step7:subscribe again", subscribechangeCallback),
TEST("Step8:Publish topic2", publishTopic2),
TEST("Step9:Disconnect", disconnect),
END_OF_TEST_LIST
};
/*------------------------------------------------------
* unused for Test
*------------------------------------------------------*/
TASK_LIST = {// e.g. TASK( task, executing duration in second),
//TASK(test1, 4);
END_OF_TASK_LIST
};
/*------------------------------------------------------
* Initialize function
*------------------------------------------------------*/
void setup(void)
{
}
/*------------------------------------------------------
* main
*------------------------------------------------------*/
int main(int argc, char** argv)
{
return run();
}

View File

@@ -0,0 +1,458 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <string.h>
#include <stdio.h>
#include "LMqttsnClientApp.h"
#include "LGwProxy.h"
#include "LMqttsnClient.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern void setUint16(uint8_t* pos, uint16_t val);
extern uint16_t getUint16(const uint8_t* pos);
extern LMqttsnClient* theClient;
extern LScreen* theScreen;
/*=====================================
Class GwProxy
======================================*/
static const char* packet_names[] =
{
"ADVERTISE", "SEARCHGW", "GWINFO", "RESERVED", "CONNECT", "CONNACK",
"WILLTOPICREQ", "WILLTOPIC", "WILLMSGREQ", "WILLMSG", "REGISTER", "REGACK",
"PUBLISH", "PUBACK", "PUBCOMP", "PUBREC", "PUBREL", "RESERVED",
"SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP",
"DISCONNECT", "RESERVED", "WILLTOPICUPD", "WILLTOPICRESP", "WILLMSGUPD",
"WILLMSGRESP"
};
LGwProxy::LGwProxy(){
_nextMsgId = 0;
_status = GW_LOST;
_gwId = 0;
_willTopic = 0;
_willMsg = 0;
_qosWill = 0;
_retainWill = 0;
_tkeepAlive = MQTTSN_DEFAULT_KEEPALIVE;
_tAdv = MQTTSN_DEFAULT_DURATION;
_cleanSession = 0;
_pingStatus = 0;
_connectRetry = MQTTSN_RETRY_COUNT;
}
LGwProxy::~LGwProxy(){
_topicTbl.clearTopic();
}
void LGwProxy::initialize(LUdpConfig netconf, LMqttsnConfig mqconf){
_network.initialize(netconf);
_clientId = netconf.clientId;
_willTopic = mqconf.willTopic;
_willMsg = mqconf.willMsg;
_qosWill = mqconf.willQos;
_retainWill = mqconf.willRetain;
_cleanSession = mqconf.cleanSession;
_tkeepAlive = mqconf.keepAlive;
}
void LGwProxy::connect(){
char* pos;
while (_status != GW_CONNECTED){
pos = _msg;
if (_status == GW_SEND_WILLMSG){
*pos++ = 2 + (uint8_t)strlen(_willMsg);
*pos++ = MQTTSN_TYPE_WILLMSG;
strcpy(pos,_willMsg); // WILLMSG
_status = GW_WAIT_CONNACK;
writeGwMsg();
}else if (_status == GW_SEND_WILLTOPIC){
*pos++ = 3 + (uint8_t)strlen(_willTopic);
*pos++ = MQTTSN_TYPE_WILLTOPIC;
*pos++ = _qosWill | _retainWill;
strcpy(pos,_willTopic); // WILLTOPIC
_status = GW_WAIT_WILLMSGREQ;
writeGwMsg();
}else if (_status == GW_CONNECTING || _status == GW_DISCONNECTED){
uint8_t clientIdLen = uint8_t(strlen(_clientId) > 23 ? 23 : strlen(_clientId));
*pos++ = 6 + clientIdLen;
*pos++ = MQTTSN_TYPE_CONNECT;
pos++;
if (_cleanSession){
_msg[2] = MQTTSN_FLAG_CLEAN;
}
*pos++ = MQTTSN_PROTOCOL_ID;
setUint16((uint8_t*)pos, _tkeepAlive);
pos += 2;
strncpy(pos, _clientId, clientIdLen);
_msg[ 6 + clientIdLen] = 0;
_status = GW_WAIT_CONNACK;
if (_willMsg && _willTopic){
if (strlen(_willMsg) && strlen(_willTopic)){
_msg[2] = _msg[2] | MQTTSN_FLAG_WILL; // CONNECT
_status = GW_WAIT_WILLTOPICREQ;
}
}
writeGwMsg();
_connectRetry = MQTTSN_RETRY_COUNT;
}else if (_status == GW_LOST){
*pos++ = 3;
*pos++ = MQTTSN_TYPE_SEARCHGW;
*pos = 0; // SERCHGW
_status = GW_SEARCHING;
writeGwMsg();
}
getConnectResponce();
}
return;
}
int LGwProxy::getConnectResponce(void){
int len = readMsg();
if (len == 0){
if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){
if (_msg[1] == MQTTSN_TYPE_CONNECT)
{
_connectRetry--;
}
if (--_retryCount > 0){
writeMsg((const uint8_t*)_msg); // Not writeGwMsg() : not to reset the counter.
_sendUTC = time(NULL);
}else{
_sendUTC = 0;
if ( _status > GW_SEARCHING && _connectRetry > 0){
_status = GW_CONNECTING;
}else{
_status = GW_LOST;
_gwId = 0;
}
return -1;
}
}
return 0;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_GWINFO && _status == GW_SEARCHING){
_network.setGwAddress();
_gwId = _mqttsnMsg[1];
_status = GW_CONNECTING;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLTOPICREQ && _status == GW_WAIT_WILLTOPICREQ){
_status = GW_SEND_WILLTOPIC;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_WILLMSGREQ && _status == GW_WAIT_WILLMSGREQ){
_status = GW_SEND_WILLMSG;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_CONNACK && _status == GW_WAIT_CONNACK){
if (_mqttsnMsg[1] == MQTTSN_RC_ACCEPTED){
_status = GW_CONNECTED;
_connectRetry = MQTTSN_RETRY_COUNT;
_keepAliveTimer.start(_tkeepAlive * 1000);
_topicTbl.clearTopic();
ASSERT("\033[0m\033[0;32m\n\n Connected to the Broker\033[0m\033[0;37m\n\n");
theClient->onConnect(); // SUBSCRIBEs are conducted
}else{
_status = GW_CONNECTING;
}
}
return 1;
}
void LGwProxy::reconnect(void){
D_MQTTLOG("...Gateway reconnect\r\n");
_status = GW_DISCONNECTED;
connect();
}
void LGwProxy::disconnect(uint16_t secs){
_tSleep = secs;
_status = GW_DISCONNECTING;
_msg[1] = MQTTSN_TYPE_DISCONNECT;
if (secs){
_msg[0] = 4;
setUint16((uint8_t*) _msg + 2, secs);
}else{
_msg[0] = 2;
_keepAliveTimer.stop();
}
_retryCount = MQTTSN_RETRY_COUNT;
writeMsg((const uint8_t*)_msg);
_sendUTC = time(NULL);
while ( _status != GW_DISCONNECTED && _status != GW_SLEPT){
if (getDisconnectResponce() < 0){
_status = GW_LOST;
ASSERT("\033[0m\033[0;31m\n\n!!!!!! DISCONNECT Error !!!!!\033[0m\033[0;37m \n\n");
return;
}
}
}
int LGwProxy::getDisconnectResponce(void){
int len = readMsg();
if (len == 0){
if (_sendUTC + MQTTSN_TIME_RETRY < time(NULL)){
if (--_retryCount >= 0){
writeMsg((const uint8_t*)_msg);
_sendUTC = time(NULL);
}else{
_status = GW_LOST;
_gwId = 0;
return -1;
}
}
return 0;
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
if (_tSleep){
_status = GW_SLEEPING;
_keepAliveTimer.start(_tSleep);
}else{
_status = GW_DISCONNECTED;
}
}
return 0;
}
int LGwProxy::getMessage(void){
int len = readMsg();
if (len < 0){
return len; //error
}
#ifdef DEBUG_MQTTSN
if (len){
D_MQTTLOG(" recved msgType %x\n", _mqttsnMsg[0]);
}
#endif
if (len == 0){
// Check PINGREQ required
checkPingReq();
// Check ADVERTISE valid
checkAdvertise();
// Check Timeout of REGISTERs
_regMgr.checkTimeout();
// Check Timeout of PUBLISHes,
theClient->getPublishManager()->checkTimeout();
// Check Timeout of SUBSCRIBEs,
theClient->getSubscribeManager()->checkTimeout();
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBLISH){
theClient->getPublishManager()->published(_mqttsnMsg, len);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_PUBCOMP ||
_mqttsnMsg[0] == MQTTSN_TYPE_PUBREC || _mqttsnMsg[0] == MQTTSN_TYPE_PUBREL ){
theClient->getPublishManager()->responce(_mqttsnMsg, (uint16_t)len);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_SUBACK || _mqttsnMsg[0] == MQTTSN_TYPE_UNSUBACK){
theClient->getSubscribeManager()->responce(_mqttsnMsg);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGISTER){
_regMgr.responceRegister(_mqttsnMsg, len);
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_REGACK){
_regMgr.responceRegAck(getUint16(_mqttsnMsg + 3), getUint16(_mqttsnMsg + 1));
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_PINGRESP){
if (_pingStatus == GW_WAIT_PINGRESP){
_pingStatus = 0;
resetPingReqTimer();
}
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_DISCONNECT){
_status = GW_LOST;
_gwAliveTimer.stop();
_keepAliveTimer.stop();
}else if (_mqttsnMsg[0] == MQTTSN_TYPE_ADVERTISE){
if (getUint16((const uint8_t*)(_mqttsnMsg + 2)) < 61){
_tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1500;
}else{
_tAdv = getUint16((const uint8_t*)(_mqttsnMsg + 2)) * 1100;
}
_gwAliveTimer.start(_tAdv);
}
return 0;
}
uint16_t LGwProxy::registerTopic(char* topicName, uint16_t topicId){
uint16_t id = topicId;
if (id == 0){
id = _topicTbl.getTopicId(topicName);
_regMgr.registerTopic(topicName);
}
return id;
}
int LGwProxy::writeMsg(const uint8_t* msg){
uint16_t len;
uint8_t pos;
uint8_t rc;
if (msg[0] == 0x01){
len = getUint16(msg + 1);
pos = 2;
}else{
len = msg[0];
pos = 1;
}
if (msg[0] == 3 && msg[1] == MQTTSN_TYPE_SEARCHGW){
rc = _network.broadcast(msg,len);
}else{
rc = _network.unicast(msg,len);
}
if (rc > 0){
if ( msg[pos] >= MQTTSN_TYPE_ADVERTISE && msg[pos] <= MQTTSN_TYPE_WILLMSGRESP )
{
ASSERT(" send %s\n", packet_names[msg[pos]]);
}
return rc;
}
//_status = GW_LOST;
//_gwId = 0;
return rc;
}
void LGwProxy::writeGwMsg(void){
_retryCount = MQTTSN_RETRY_COUNT;
writeMsg((const uint8_t*)_msg);
_sendUTC = time(NULL);
}
int LGwProxy::readMsg(void){
int len = 0;
_mqttsnMsg = _network.getMessage(&len);
if (len == 0){
return 0;
}
if (_mqttsnMsg[0] == 0x01){
int msgLen = (int) getUint16((const uint8_t*)_mqttsnMsg + 1);
if (len != msgLen){
_mqttsnMsg += 3;
len = msgLen - 3;
}
}else{
_mqttsnMsg += 1;
len -= 1;
}
if ( *_mqttsnMsg >= MQTTSN_TYPE_ADVERTISE && *_mqttsnMsg <= MQTTSN_TYPE_WILLMSGRESP )
{
ASSERT(" recv %s\n", packet_names[*_mqttsnMsg]);
}
return len;
}
void LGwProxy::setWillTopic(const char* willTopic, uint8_t qos, bool retain){
_willTopic = willTopic;
_retainWill = _qosWill = 0;
if (qos == 1){
_qosWill = MQTTSN_FLAG_QOS_1;
}else if (qos == 2){
_qosWill = MQTTSN_FLAG_QOS_2;
}
if (retain){
_retainWill = MQTTSN_FLAG_RETAIN;
}
}
void LGwProxy::setWillMsg(const char* willMsg){
_willMsg = willMsg;
}
void LGwProxy::setCleanSession(bool flg){
if (flg){
_cleanSession = MQTTSN_FLAG_CLEAN;
}else{
_cleanSession = 0;
}
}
uint16_t LGwProxy::getNextMsgId(void){
_nextMsgId++;
if (_nextMsgId == 0){
_nextMsgId = 1;
}
return _nextMsgId;
}
void LGwProxy::checkPingReq(void){
uint8_t msg[2];
msg[0] = 0x02;
msg[1] = MQTTSN_TYPE_PINGREQ;
if (_status == GW_CONNECTED && _keepAliveTimer.isTimeUp() && _pingStatus != GW_WAIT_PINGRESP){
_pingStatus = GW_WAIT_PINGRESP;
_pingRetryCount = MQTTSN_RETRY_COUNT;
writeMsg((const uint8_t*)msg);
_pingSendUTC = time(NULL);
}else if (_pingStatus == GW_WAIT_PINGRESP){
if (_pingSendUTC + MQTTSN_TIME_RETRY < time(NULL)){
if (--_pingRetryCount > 0){
writeMsg((const uint8_t*)msg);
_pingSendUTC = time(NULL);
}else{
_status = GW_LOST;
_gwId = 0;
_pingStatus = 0;
_keepAliveTimer.stop();
D_MQTTLOG(" !!! PINGREQ Timeout\n");
}
}
}
}
void LGwProxy::checkAdvertise(void){
if ( _gwAliveTimer.isTimeUp()){
_status = GW_LOST;
_gwId = 0;
_pingStatus = 0;
_gwAliveTimer.stop();
_keepAliveTimer.stop();
D_MQTTLOG(" !!! ADVERTISE Timeout\n");
}
}
LTopicTable* LGwProxy::getTopicTable(void){
return &_topicTbl;
}
LRegisterManager* LGwProxy::getRegisterManager(void){
return &_regMgr;
}
void LGwProxy::resetPingReqTimer(void){
_keepAliveTimer.start(_tkeepAlive * 1000);
}
const char* LGwProxy::getClientId(void) {
return _clientId;
}

View File

@@ -0,0 +1,110 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef GWPROXY_H_
#define GWPROXY_H_
#include <stdio.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LNetworkUdp.h"
#include "LRegisterManager.h"
#include "LTimer.h"
#include "LTopicTable.h"
using namespace std;
#define GW_LOST 0
#define GW_SEARCHING 1
#define GW_CONNECTING 2
#define GW_WAIT_WILLTOPICREQ 3
#define GW_SEND_WILLTOPIC 4
#define GW_WAIT_WILLMSGREQ 5
#define GW_SEND_WILLMSG 6
#define GW_WAIT_CONNACK 7
#define GW_CONNECTED 8
#define GW_DISCONNECTING 9
#define GW_SLEEPING 10
#define GW_DISCONNECTED 11
#define GW_SLEPT 12
#define GW_WAIT_PINGRESP 1
namespace linuxAsyncClient {
/*========================================
Class LGwProxy
=======================================*/
class LGwProxy{
public:
LGwProxy();
~LGwProxy();
void initialize(LUdpConfig netconf, LMqttsnConfig mqconf);
void connect(void);
void disconnect(uint16_t sec = 0);
int getMessage(void);
uint16_t registerTopic(char* topic, uint16_t toipcId);
void setWillTopic(const char* willTopic, uint8_t qos, bool retain = false);
void setWillMsg(const char* willMsg);
void setCleanSession(bool);
void setKeepAliveDuration(uint16_t duration);
void setAdvertiseDuration(uint16_t duration);
void reconnect(void);
int writeMsg(const uint8_t* msg);
void resetPingReqTimer(void);
uint16_t getNextMsgId();
LTopicTable* getTopicTable(void);
LRegisterManager* getRegisterManager(void);
const char* getClientId(void);
private:
int readMsg(void);
void writeGwMsg(void);
void checkPingReq(void);
void checkAdvertise(void);
int getConnectResponce(void);
int getDisconnectResponce(void);
LNetwork _network;
uint8_t* _mqttsnMsg;
uint16_t _nextMsgId;
const char* _clientId;
const char* _willTopic;
const char* _willMsg;
uint8_t _cleanSession;
uint8_t _retainWill;
uint8_t _qosWill;
uint8_t _gwId;
uint16_t _tkeepAlive;
uint32_t _tAdv;
uint32_t _sendUTC;
int _retryCount;
int _connectRetry;
uint8_t _status;
uint32_t _pingSendUTC;
uint8_t _pingRetryCount;
uint8_t _pingStatus;
LRegisterManager _regMgr;
LTopicTable _topicTbl;
LTimer _gwAliveTimer;
LTimer _keepAliveTimer;
uint16_t _tSleep;
char _msg[MQTTSN_MAX_MSG_LENGTH + 1];
};
} /* end of namespace */
#endif /* GWPROXY_H_ */

View File

@@ -0,0 +1,226 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <string.h>
#include <stdio.h>
#include "LGwProxy.h"
#include "LMqttsnClientApp.h"
#include "LMqttsnClient.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern TaskList theTaskList[];
extern TestList theTestList[];
extern OnPublishList theOnPublishList[];
extern MQTTSNCONF;
extern UDPCONF;
extern void setup(void);
/*=====================================
LMqttsnClient
======================================*/
LMqttsnClient* theClient = new LMqttsnClient();
LScreen* theScreen = new LScreen();
bool theOTAflag = false;
bool theClientMode = true;
int run(void)
{
char c = 0;
setup();
printf("\n%s", PAHO_COPYRIGHT4);
printf("\n%s\n", PAHO_COPYRIGHT0);
printf("%s\n", PAHO_COPYRIGHT1);
printf("%s\n", PAHO_COPYRIGHT2);
printf(" *\n%s\n", PAHO_COPYRIGHT3);
printf("%s\n", TESTER_VERSION);
printf("%s\n", PAHO_COPYRIGHT4);
#ifndef CLIENT_MODE
{
theClientMode = false;
PROMPT(" Do you like Tomoaki ? ( y/n ) : ");
while (true)
{
if (CHECKKEYIN(&c))
{
if ( toupper(c) == 'N' )
{
ASSERT("\033[0;31m\n**** Sorry ****\033[0;37m\n\n");
PROMPT("");
return 0;
}
}
else if ( toupper(c) == 'Y' )
{
ASSERT("\033[0m\033[0;32mAttempting to Connect the Broker.....\033[0m\033[0;37m\n");
PROMPT("");
break;
}
}
}
#endif
theClient->addTask(theClientMode);
theClient->initialize( theNetcon, theMqcon);
do
{
theClient->run();
}
while (theClientMode);
delete theScreen;
delete theClient;
return 0;
}
/*=====================================
Class LMqttsnClient
======================================*/
LMqttsnClient::LMqttsnClient()
{
}
LMqttsnClient::~LMqttsnClient()
{
}
void LMqttsnClient::initialize(LUdpConfig netconf, LMqttsnConfig mqconf)
{
_gwProxy.initialize(netconf, mqconf);
setSleepDuration(mqconf.sleepDuration);
}
void LMqttsnClient::addTask(bool clientMode)
{
if ( clientMode )
{
_taskMgr.add(theTaskList);
}
else
{
_taskMgr.add(theTestList);
}
}
LGwProxy* LMqttsnClient::getGwProxy(void)
{
return &_gwProxy;
}
LPublishManager* LMqttsnClient::getPublishManager(void)
{
return &_pubMgr;
}
;
LSubscribeManager* LMqttsnClient::getSubscribeManager(void)
{
return &_subMgr;
}
;
LRegisterManager* LMqttsnClient::getRegisterManager(void)
{
return _gwProxy.getRegisterManager();
}
LTaskManager* LMqttsnClient::getTaskManager(void)
{
return &_taskMgr;
}
;
LTopicTable* LMqttsnClient::getTopicTable(void)
{
return _gwProxy.getTopicTable();
}
void LMqttsnClient::publish(const char* topicName, Payload* payload, uint8_t qos, bool retain)
{
_pubMgr.publish(topicName, payload, qos, retain);
}
void LMqttsnClient::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
_pubMgr.publish(topicName, payload, len, qos, retain);
}
void LMqttsnClient::publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain)
{
_pubMgr.publish(topicId, payload, qos, retain);
}
void LMqttsnClient::publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
_pubMgr.publish(topicId, payload, len, qos, retain);
}
void LMqttsnClient::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos)
{
_subMgr.subscribe(topicName, onPublish, qos);
}
void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType)
{
_subMgr.subscribe(topicId, onPublish, qos, topicType);
}
void LMqttsnClient::unsubscribe(const char* topicName)
{
_subMgr.unsubscribe(topicName);
}
void LMqttsnClient::disconnect(uint16_t sleepInSecs)
{
_gwProxy.disconnect(sleepInSecs);
}
void LMqttsnClient::run()
{
_gwProxy.connect();
_taskMgr.run();
sleep();
}
void LMqttsnClient::sleep(void)
{
}
void LMqttsnClient::setSleepDuration(uint32_t duration)
{
_sleepDuration = duration;
}
void LMqttsnClient::onConnect(void)
{
_subMgr.onConnect();
}
const char* LMqttsnClient::getClientId(void)
{
return _gwProxy.getClientId();
}

View File

@@ -0,0 +1,80 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef MQTTSNCLIENT_H_
#define MQTTSNCLIENT_H_
#include <stdio.h>
#include <string.h>
#include "LGwProxy.h"
#include "LMqttsnClientApp.h"
#include "LTimer.h"
#include "Payload.h"
#include "LPublishManager.h"
#include "LSubscribeManager.h"
#include "LTaskManager.h"
using namespace std;
namespace linuxAsyncClient {
struct OnPublishList
{
const char* topic;
int (*pubCallback)(uint8_t* payload, uint16_t payloadlen);
uint8_t qos;
};
/*========================================
Class LMqttsnClient
=======================================*/
class LMqttsnClient{
public:
LMqttsnClient();
~LMqttsnClient();
void onConnect(void);
void publish(const char* topicName, Payload* payload, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType);
void unsubscribe(const char* topicName);
void disconnect(uint16_t sleepInSecs);
void initialize(LUdpConfig netconf, LMqttsnConfig mqconf);
void run(void);
void addTask(bool test);
void setSleepDuration(uint32_t duration);
void sleep(void);
const char* getClientId(void);
LGwProxy* getGwProxy(void);
LPublishManager* getPublishManager(void);
LSubscribeManager* getSubscribeManager(void);
LRegisterManager* getRegisterManager(void);
LTaskManager* getTaskManager(void);
LTopicTable* getTopicTable(void);
private:
LTaskManager _taskMgr;
LPublishManager _pubMgr;
LSubscribeManager _subMgr;
LGwProxy _gwProxy;
uint32_t _sleepDuration;
};
} /* end of namespace */
#endif /* MQTTSNCLIENT_H_ */

View File

@@ -0,0 +1,196 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef MQTTSNCLIENTAPP_H_
#define MQTTSNCLIENTAPP_H_
/*======================================
* Program mode Flag
======================================*/
//#define CLIENT_MODE
/*======================================
* Debug Flag
======================================*/
//#define DEBUG_NW
//#define DEBUG_MQTTSN
//#define DEBUG_OTA
/****************************************
MQTT-SN Parameters
*****************************************/
#define MAX_INFLIGHT_MSG 10
#define MQTTSN_MAX_MSG_LENGTH 1024
#define MQTTSN_MAX_PACKET_SIZE 1024
#define MQTTSN_DEFAULT_KEEPALIVE 900 // 1H
#define MQTTSN_DEFAULT_DURATION 900 // 15min
#define MQTTSN_TIME_SEARCHGW 3
#define MQTTSN_TIME_RETRY 10
#define MQTTSN_RETRY_COUNT 3
/****************************************
Application config structures
*****************************************/
typedef unsigned char uint8_t;
typedef unsigned short uint16_t;
typedef unsigned int uint32_t;
typedef signed char int8_t;
typedef signed short int16_t;
typedef signed int int32_t;
/****************************************
Application config structures
*****************************************/
struct LMqttsnConfig{
uint16_t keepAlive;
bool cleanSession;
uint32_t sleepDuration;
const char* willTopic;
const char* willMsg;
uint8_t willQos;
bool willRetain;
};
struct LUdpConfig{
const char* clientId;
uint8_t ipAddress[4];
uint16_t gPortNo;
uint16_t uPortNo;
};
/*======================================
MACROs for Application
=======================================*/
#define MQTTSN_CONFIG MqttsnConfig theMqttsnConfig
#define NETWORK_CONFIG UdpConfig theNetworkConfig
#define PUBLISH(...) theClient->publish(__VA_ARGS__)
#define SUBSCRIBE(...) theClient->subscribe(__VA_ARGS__)
#define UNSUBSCRIBE(...) theClient->unsubscribe(__VA_ARGS__)
#define DISCONNECT(...) theClient->disconnect(__VA_ARGS__)
#define TASK_LIST TaskList theTaskList[]
#define TASK(...) {__VA_ARGS__, 0, 0}
#define END_OF_TASK_LIST {0, 0, 0, 0}
#define TEST_LIST TestList theTestList[]
#define TEST(...) {__VA_ARGS__, 0}
#define END_OF_TEST_LIST {0, 0, 0}
#define SUBSCRIBE_LIST OnPublishList theOnPublishList[]
#define SUB(...) {__VA_ARGS__}
#define END_OF_SUBSCRIBE_LIST {0,0,0}
#define UDPCONF LUdpConfig theNetcon
#define MQTTSNCONF LMqttsnConfig theMqcon
#ifdef CLIENT_MODE
#define ASSERT(...)
#define PROMPT(...)
#define CHECKKEYIN(...)
#else
#define ASSERT(...) theScreen->display(__VA_ARGS__)
#define PROMPT(...) theScreen->prompt(__VA_ARGS__)
#define CHECKKEYIN(...) theScreen->checkKeyIn(__VA_ARGS__)
#endif
/*======================================
MACROs for debugging
========================================*/
#ifndef DEBUG_NW
#define D_NWLOG(...)
#else
#define D_NWLOG(...) printf(__VA_ARGS__)
#endif
#ifndef DEBUG_MQTTSN
#define D_MQTTLOG(...)
#else
#define D_MQTTLOG(...) printf(__VA_ARGS__)
#endif
#ifndef DEBUG_OTA
#define D_OTALOG(...)
#else
#define D_OTALOG(...) printf(__VA_ARGS__)
#endif
/*======================================
MQTT-SN Defines
========================================*/
#define MQTTSN_TYPE_ADVERTISE 0x00
#define MQTTSN_TYPE_SEARCHGW 0x01
#define MQTTSN_TYPE_GWINFO 0x02
#define MQTTSN_TYPE_CONNECT 0x04
#define MQTTSN_TYPE_CONNACK 0x05
#define MQTTSN_TYPE_WILLTOPICREQ 0x06
#define MQTTSN_TYPE_WILLTOPIC 0x07
#define MQTTSN_TYPE_WILLMSGREQ 0x08
#define MQTTSN_TYPE_WILLMSG 0x09
#define MQTTSN_TYPE_REGISTER 0x0A
#define MQTTSN_TYPE_REGACK 0x0B
#define MQTTSN_TYPE_PUBLISH 0x0C
#define MQTTSN_TYPE_PUBACK 0x0D
#define MQTTSN_TYPE_PUBCOMP 0x0E
#define MQTTSN_TYPE_PUBREC 0x0F
#define MQTTSN_TYPE_PUBREL 0x10
#define MQTTSN_TYPE_SUBSCRIBE 0x12
#define MQTTSN_TYPE_SUBACK 0x13
#define MQTTSN_TYPE_UNSUBSCRIBE 0x14
#define MQTTSN_TYPE_UNSUBACK 0x15
#define MQTTSN_TYPE_PINGREQ 0x16
#define MQTTSN_TYPE_PINGRESP 0x17
#define MQTTSN_TYPE_DISCONNECT 0x18
#define MQTTSN_TYPE_WILLTOPICUPD 0x1A
#define MQTTSN_TYPE_WILLTOPICRESP 0x1B
#define MQTTSN_TYPE_WILLMSGUPD 0x1C
#define MQTTSN_TYPE_WILLMSGRESP 0x1D
#define MQTTSN_TOPIC_TYPE_NORMAL 0x00
#define MQTTSN_TOPIC_TYPE_PREDEFINED 0x01
#define MQTTSN_TOPIC_TYPE_SHORT 0x02
#define MQTTSN_TOPIC_TYPE 0x03
#define MQTTSN_FLAG_DUP 0x80
#define MQTTSN_FLAG_QOS_0 0x0
#define MQTTSN_FLAG_QOS_1 0x20
#define MQTTSN_FLAG_QOS_2 0x40
#define MQTTSN_FLAG_QOS_N1 0xc0
#define MQTTSN_FLAG_RETAIN 0x10
#define MQTTSN_FLAG_WILL 0x08
#define MQTTSN_FLAG_CLEAN 0x04
#define MQTTSN_PROTOCOL_ID 0x01
#define MQTTSN_HEADER_SIZE 2
#define MQTTSN_RC_ACCEPTED 0x00
#define MQTTSN_RC_REJECTED_CONGESTION 0x01
#define MQTTSN_RC_REJECTED_INVALID_TOPIC_ID 0x02
#define MQTTSN_RC_REJECTED_NOT_SUPPORTED 0x03
#define PREDEFINEDID_OTA_REQ (0x0ff0)
#define PREDEFINEDID_OTA_READY (0x0ff1)
#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)
/*=================================
* Starting prompt
==================================*/
#define TESTER_VERSION " * Version: 0.1.0"
#define PAHO_COPYRIGHT0 " * MQTT-SN Gateway Tester"
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"
#define PAHO_COPYRIGHT2 " * (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt-sn.embedded-c.git/)"
#define PAHO_COPYRIGHT3 " * Author : Tomoaki YAMAGUCHI"
#define PAHO_COPYRIGHT4 " ***************************************************************************"
#endif /* MQTTSNCLIENTAPP_H_ */

View File

@@ -0,0 +1,390 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <termios.h>
#include "LNetworkUdp.h"
#include "LTimer.h"
#include "LScreen.h"
#include "LMqttsnClientApp.h"
using namespace std;
using namespace linuxAsyncClient;
extern uint16_t getUint16(const uint8_t* pos);
extern uint32_t getUint32(const uint8_t* pos);
extern LScreen* theScreen;
extern bool theClientMode;
/*=========================================
Class LNetwork
=========================================*/
LNetwork::LNetwork(){
_sleepflg = false;
resetGwAddress();
}
LNetwork::~LNetwork(){
}
int LNetwork::broadcast(const uint8_t* xmitData, uint16_t dataLen){
return LUdpPort::multicast(xmitData, (uint32_t)dataLen);
}
int LNetwork::unicast(const uint8_t* xmitData, uint16_t dataLen){
return LUdpPort::unicast(xmitData, dataLen, _gwIpAddress, _gwPortNo);
}
uint8_t* LNetwork::getMessage(int* len){
*len = 0;
if (checkRecvBuf()){
uint16_t recvLen = LUdpPort::recv(_rxDataBuf, MQTTSN_MAX_PACKET_SIZE, false, &_ipAddress, &_portNo);
if(_gwIpAddress && isUnicast() && (_ipAddress != _gwIpAddress) && (_portNo != _gwPortNo)){
return 0;
}
if(recvLen < 0){
*len = recvLen;
return 0;
}else{
if(_rxDataBuf[0] == 0x01){
*len = getUint16(_rxDataBuf + 1 );
}else{
*len = _rxDataBuf[0];
}
if(recvLen != *len){
*len = 0;
return 0;
}else{
return _rxDataBuf;
}
}
}
return 0;
}
void LNetwork::setGwAddress(void){
_gwPortNo = _portNo;
_gwIpAddress = _ipAddress;
}
void LNetwork::setFixedGwAddress(void){
_gwPortNo = LUdpPort::_gPortNo;
_gwIpAddress = LUdpPort::_gIpAddr;
}
void LNetwork::resetGwAddress(void){
_gwIpAddress = 0;
_gwPortNo = 0;
}
bool LNetwork::initialize(LUdpConfig config){
return LUdpPort::open(config);
}
void LNetwork::setSleep(){
_sleepflg = true;
}
/*=========================================
Class udpStack
=========================================*/
LUdpPort::LUdpPort(){
_disconReq = false;
_sockfdUcast = -1;
_sockfdMcast = -1;
_castStat = 0;
}
LUdpPort::~LUdpPort(){
close();
}
void LUdpPort::close(){
if(_sockfdMcast > 0){
::close( _sockfdMcast);
_sockfdMcast = -1;
if(_sockfdUcast > 0){
::close( _sockfdUcast);
_sockfdUcast = -1;
}
}
}
bool LUdpPort::open(LUdpConfig config){
const int reuse = 1;
char loopch = 1;
uint8_t sav = config.ipAddress[3];
config.ipAddress[3] = config.ipAddress[0];
config.ipAddress[0] = sav;
sav = config.ipAddress[2];
config.ipAddress[2] = config.ipAddress[1];
config.ipAddress[1] = sav;
_gPortNo = htons(config.gPortNo);
_gIpAddr = getUint32((const uint8_t*)config.ipAddress);
_uPortNo = htons(config.uPortNo);
if( _gPortNo == 0 || _gIpAddr == 0 || _uPortNo == 0){
return false;
}
_sockfdUcast = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfdUcast < 0){
return false;
}
setsockopt(_sockfdUcast, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = _uPortNo;
addr.sin_addr.s_addr = INADDR_ANY;
if( ::bind ( _sockfdUcast, (struct sockaddr*)&addr, sizeof(addr)) <0){
return false;
}
_sockfdMcast = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (_sockfdMcast < 0){
return false;
}
struct sockaddr_in addrm;
addrm.sin_family = AF_INET;
addrm.sin_port = _gPortNo;
addrm.sin_addr.s_addr = htonl(INADDR_ANY);
setsockopt(_sockfdMcast, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
if( ::bind ( _sockfdMcast, (struct sockaddr*)&addrm, sizeof(addrm)) <0){
return false;
}
if(setsockopt(_sockfdMcast, IPPROTO_IP, IP_MULTICAST_LOOP,(char*)&loopch, sizeof(loopch)) <0 ){
D_NWLOG("\033[0m\033[0;31merror IP_MULTICAST_LOOP in UdpPPort::open\033[0m\033[0;37m\n");
ASSERT("\033[0m\033[0;31m\nerror IP_MULTICAST_LOOP in UdpPPort::open\033[0m\033[0;37m\n");
close();
return false;
}
ip_mreq mreq;
mreq.imr_interface.s_addr = INADDR_ANY;
mreq.imr_multiaddr.s_addr = _gIpAddr;
if( setsockopt(_sockfdMcast, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq) )< 0){
D_NWLOG("\033[0m\033[0;31merror IP_ADD_MEMBERSHIP in UdpPort::open\033[0m\033[0;37m\n");
ASSERT("\033[0m\033[0;31m\nerror IP_ADD_MEMBERSHIP in UdpPort::open\033[0m\033[0;37m\n");
close();
return false;
}
return true;
}
bool LUdpPort::isUnicast(){
return ( _castStat == STAT_UNICAST);
}
int LUdpPort::unicast(const uint8_t* buf, uint32_t length, uint32_t ipAddress, uint16_t port ){
struct sockaddr_in dest;
dest.sin_family = AF_INET;
dest.sin_port = port;
dest.sin_addr.s_addr = ipAddress;
int status = ::sendto( _sockfdUcast, buf, length, 0, (const sockaddr*)&dest, sizeof(dest) );
if( status < 0){
D_NWLOG("errno == %d in UdpPort::unicast\n", errno);
ASSERT("errno == %d in UdpPort::unicast\n", errno);
}else{
D_NWLOG("sendto %-15s:%-6u",inet_ntoa(dest.sin_addr),htons(port));
for(uint16_t i = 0; i < length ; i++){
D_NWLOG(" %02x", *(buf + i));
}
D_NWLOG("\n");
if ( !theClientMode )
{
char sbuf[SCREEN_BUFF_SIZE];
int pos = 0;
sprintf(sbuf,"\033[0;34msendto %-15s:%-6u",inet_ntoa(dest.sin_addr),htons(port));
pos = strlen(sbuf);
for(uint16_t i = 0; i < length ; i++){
sprintf(sbuf + pos, " %02x", *(buf + i));
if (strlen(sbuf) > SCREEN_BUFF_SIZE - 20 ) // -20 for Escape sequence
{
break;
}
pos += 3;
}
sprintf(sbuf + strlen(sbuf), "\033[0;37m\n");
theScreen->display(sbuf);
}
}
return status;
}
int LUdpPort::multicast( const uint8_t* buf, uint32_t length ){
struct sockaddr_in dest;
dest.sin_family = AF_INET;
dest.sin_port = _gPortNo;
dest.sin_addr.s_addr = _gIpAddr;
int status = ::sendto( _sockfdMcast, buf, length, 0, (const sockaddr*)&dest, sizeof(dest) );
if( status < 0){
D_NWLOG("\033[0m\033[0;31merrno == %d in UdpPort::multicast\033[0m\033[0;37m\n", errno);
ASSERT("\033[0m\033[0;31merrno == %d in UdpPort::multicast\033[0m\033[0;37m\n", errno);
return errno;
}else{
D_NWLOG("sendto %-15s:%-6u",inet_ntoa(dest.sin_addr),htons(_gPortNo));
for(uint16_t i = 0; i < length ; i++){
D_NWLOG(" %02x", *(buf + i));
ASSERT(" %02x", *(buf + i));
}
D_NWLOG("\n");
if ( !theClientMode )
{
char sbuf[SCREEN_BUFF_SIZE];
int pos = 0;
sprintf(sbuf,"\033[0;34msendto %-15s:%-6u",inet_ntoa(dest.sin_addr),htons(_gPortNo));
pos = strlen(sbuf);
for(uint16_t i = 0; i < length ; i++){
sprintf(sbuf + pos, " %02x", *(buf + i));
if (strlen(sbuf) > SCREEN_BUFF_SIZE - 20 )
{
break;
}
pos += 3;
}
sprintf(sbuf + strlen(sbuf), "\033[0;37m\n");
theScreen->display(sbuf);
}
return status;
}
}
bool LUdpPort::checkRecvBuf(){
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 50000; // 50 msec
uint8_t buf[2];
fd_set recvfds;
int maxSock = 0;
FD_ZERO(&recvfds);
FD_SET(_sockfdUcast, &recvfds);
FD_SET(_sockfdMcast, &recvfds);
if(_sockfdMcast > _sockfdUcast){
maxSock = _sockfdMcast;
}else{
maxSock = _sockfdUcast;
}
select(maxSock + 1, &recvfds, 0, 0, &timeout);
if(FD_ISSET(_sockfdUcast, &recvfds)){
if( ::recv(_sockfdUcast, buf, 1, MSG_DONTWAIT | MSG_PEEK) > 0){
_castStat = STAT_UNICAST;
return true;
}
}else if(FD_ISSET(_sockfdMcast, &recvfds)){
if( ::recv(_sockfdMcast, buf, 1, MSG_DONTWAIT | MSG_PEEK) > 0){
_castStat = STAT_MULTICAST;
return true;
}
}
_castStat = 0;
return false;
}
int LUdpPort::recv(uint8_t* buf, uint16_t len, bool flg, uint32_t* ipAddressPtr, uint16_t* portPtr){
int flags = flg ? MSG_DONTWAIT : 0;
return recvfrom (buf, len, flags, ipAddressPtr, portPtr );
}
int LUdpPort::recvfrom (uint8_t* buf, uint16_t length, int flags, uint32_t* ipAddressPtr, uint16_t* portPtr ){
struct sockaddr_in sender;
int status;
socklen_t addrlen = sizeof(sender);
memset(&sender, 0, addrlen);
if(isUnicast()){
status = ::recvfrom( _sockfdUcast, buf, length, flags, (struct sockaddr*)&sender, &addrlen );
}else if(_castStat == STAT_MULTICAST){
status = ::recvfrom( _sockfdMcast, buf, length, flags, (struct sockaddr*)&sender, &addrlen );
}else{
return 0;
}
if (status < 0 && errno != EAGAIN) {
D_NWLOG("\033[0m\033[0;31merrno == %d in UdpPort::recvfrom \033[0m\033[0;37m\n", errno);
ASSERT("\033[0m\033[0;31merrno == %d in UdpPort::recvfrom \033[0m\033[0;37m\n", errno);
}else if(status > 0){
*ipAddressPtr = sender.sin_addr.s_addr;
*portPtr = sender.sin_port;
D_NWLOG("\nrecved %-15s:%-6u",inet_ntoa(sender.sin_addr), htons(*portPtr));
for(uint16_t i = 0; i < status ; i++){
D_NWLOG(" %02x", *(buf + i));
}
D_NWLOG("\n");
if ( !theClientMode )
{
char sbuf[SCREEN_BUFF_SIZE];
int pos = 0;
sprintf(sbuf, "\033[0;34mrecved %-15s:%-6u",inet_ntoa(sender.sin_addr), htons(*portPtr));
pos = strlen(sbuf);
for(uint16_t i = 0; i < status ; i++){
sprintf(sbuf + pos, " %02x", *(buf + i));
if (strlen(sbuf) > SCREEN_BUFF_SIZE - 20 )
{
break;
}
pos += 3;
}
sprintf(sbuf + strlen(sbuf), "\033[0;37m\n");
theScreen->display(sbuf);
}
return status;
}else{
return 0;
}
return status;
}

View File

@@ -0,0 +1,108 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef NETWORKUDP_H_
#define NETWORKUDP_H_
#include <sys/time.h>
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <unistd.h>
#include <string>
#include <arpa/inet.h>
#include "LMqttsnClientApp.h"
#define SOCKET_MAXHOSTNAME 200
#define SOCKET_MAXCONNECTIONS 5
#define SOCKET_MAXRECV 500
#define SOCKET_MAXBUFFER_LENGTH 500 // buffer size
#define STAT_UNICAST 1
#define STAT_MULTICAST 2
using namespace std;
namespace linuxAsyncClient {
/*========================================
Class LUpdPort
=======================================*/
class LUdpPort{
friend class LNetwork;
public:
LUdpPort();
virtual ~LUdpPort();
bool open(LUdpConfig config);
int unicast(const uint8_t* buf, uint32_t length, uint32_t ipaddress, uint16_t port );
int multicast( const uint8_t* buf, uint32_t length );
int recv(uint8_t* buf, uint16_t len, bool nonblock, uint32_t* ipaddress, uint16_t* port );
int recv(uint8_t* buf, int flags);
bool checkRecvBuf();
bool isUnicast();
private:
void close();
int recvfrom ( uint8_t* buf, uint16_t len, int flags, uint32_t* ipaddress, uint16_t* port );
int _sockfdUcast;
int _sockfdMcast;
uint16_t _gPortNo;
uint16_t _uPortNo;
uint32_t _gIpAddr;
uint8_t _castStat;
bool _disconReq;
};
#define NO_ERROR 0
#define PACKET_EXCEEDS_LENGTH 1
/*===========================================
Class Network
============================================*/
class LNetwork : public LUdpPort {
public:
LNetwork();
~LNetwork();
int broadcast(const uint8_t* payload, uint16_t payloadLen);
int unicast(const uint8_t* payload, uint16_t payloadLen);
void setGwAddress(void);
void resetGwAddress(void);
void setFixedGwAddress(void);
bool initialize(LUdpConfig config);
uint8_t* getMessage(int* len);
private:
void setSleep();
int readApiFrame(void);
uint32_t _gwIpAddress;
uint16_t _gwPortNo;
uint32_t _ipAddress;
uint16_t _portNo;
int _returnCode;
bool _sleepflg;
uint8_t _rxDataBuf[MQTTSN_MAX_PACKET_SIZE + 1]; // defined in MqttsnClientApp.h
};
} /* end of namespace */
#endif /* NETWORKUDP_H_ */

View File

@@ -0,0 +1,470 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
#include "LGwProxy.h"
#include "LMqttsnClient.h"
#include "LPublishManager.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern void setUint16(uint8_t* pos, uint16_t val);
extern uint16_t getUint16(const uint8_t* pos);
extern LMqttsnClient* theClient;
extern bool theOTAflag;
extern LScreen* theScreen;
/*========================================
Class PublishManager
=======================================*/
const char* NULLCHAR = "";
LPublishManager::LPublishManager()
{
_first = 0;
_last = 0;
_elmCnt = 0;
_publishedFlg = SAVE_TASK_INDEX;
}
LPublishManager::~LPublishManager()
{
PubElement* elm = _first;
PubElement* sav = 0;
while (elm)
{
sav = elm->next;
if (elm != 0)
{
delElement(elm);
}
elm = sav;
}
}
void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t qos, bool retain)
{
publish(topicName, payload->getRowData(), payload->getLen(), qos, retain);
}
void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL;
if ( strlen(topicName) < 2 )
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
publish(topicName, payload, len, qos, topicType, retain);
}
void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain)
{
uint16_t msgId = 0;
if ( qos > 0 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}
PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType);
if (elm->status == TOPICID_IS_READY)
{
sendPublish(elm);
}
else
{
theClient->getGwProxy()->registerTopic((char*) topicName, 0);
}
}
void LPublishManager::publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain)
{
publish(topicId, payload->getRowData(), payload->getLen(), qos, retain);
}
void LPublishManager::publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
uint16_t msgId = 0;
if ( qos > 0 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}
PubElement* elm = add(NULLCHAR, topicId, payload, len, qos, retain, msgId, MQTTSN_TOPIC_TYPE_PREDEFINED);
sendPublish(elm);
}
void LPublishManager::sendPublish(PubElement* elm)
{
if (elm == 0)
{
return;
}
theClient->getGwProxy()->connect();
uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
uint8_t org = 0;
if (elm->payloadlen > 128)
{
msg[0] = 0x01;
setUint16(msg + 1, elm->payloadlen + 9);
org = 2;
}
else
{
msg[0] = (uint8_t) elm->payloadlen + 7;
}
msg[org + 1] = MQTTSN_TYPE_PUBLISH;
msg[org + 2] = elm->flag;
if ((elm->retryCount < MQTTSN_RETRY_COUNT))
{
msg[org + 2] = msg[org + 2] | MQTTSN_FLAG_DUP;
}
if ((elm->flag & 0x03) == MQTTSN_TOPIC_TYPE_SHORT )
{
memcpy(msg + org + 3, elm->topicName, 2);
}
else
{
setUint16(msg + org + 3, elm->topicId);
}
setUint16(msg + org + 5, elm->msgId);
memcpy(msg + org + 7, elm->payload, elm->payloadlen);
theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->resetPingReqTimer();
if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0)
{
ASSERT("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName);
remove(elm); // PUBLISH Done
return;
}
else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_1)
{
elm->status = WAIT_PUBACK;
}
else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_2)
{
elm->status = WAIT_PUBREC;
}
elm->sendUTC = time(NULL);
elm->retryCount--;
}
void LPublishManager::sendSuspend(const char* topicName, uint16_t topicId, uint8_t topicType)
{
PubElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName) == 0 && elm->status == TOPICID_IS_SUSPEND)
{
elm->topicId = topicId;
elm->flag |= topicType;
elm->status = TOPICID_IS_READY;
sendPublish(elm);
elm = 0;
}
else
{
elm = elm->next;
}
}
}
void LPublishManager::sendPubAck(uint16_t topicId, uint16_t msgId, uint8_t rc)
{
uint8_t msg[7];
msg[0] = 7;
msg[1] = MQTTSN_TYPE_PUBACK;
setUint16(msg + 2, topicId);
setUint16(msg + 4, msgId);
msg[6] = rc;
theClient->getGwProxy()->writeMsg(msg);
}
void LPublishManager::sendPubRel(PubElement* elm)
{
uint8_t msg[4];
msg[0] = 4;
msg[1] = MQTTSN_TYPE_PUBREL;
setUint16(msg + 2, elm->msgId);
theClient->getGwProxy()->writeMsg(msg);
}
bool LPublishManager::isDone(void)
{
return (_first == 0);
}
bool LPublishManager::isMaxFlight(void)
{
return (_elmCnt > MAX_INFLIGHT_MSG / 2);
}
void LPublishManager::responce(const uint8_t* msg, uint16_t msglen)
{
if (msg[0] == MQTTSN_TYPE_PUBACK)
{
uint16_t msgId = getUint16(msg + 3);
PubElement* elm = getElement(msgId);
if (elm == 0)
{
return;
}
if (msg[5] == MQTTSN_RC_ACCEPTED)
{
if (elm->status == WAIT_PUBACK)
{
ASSERT("\033[0m\033[0;32m Topic \"%s\" Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId);
remove(elm); // PUBLISH Done
}
}
else if (msg[5] == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID)
{
elm->status = TOPICID_IS_SUSPEND;
elm->topicId = 0;
elm->retryCount = MQTTSN_RETRY_COUNT;
elm->sendUTC = 0;
theClient->getGwProxy()->registerTopic((char*) elm->topicName, 0);
}
}
else if (msg[0] == MQTTSN_TYPE_PUBREC)
{
PubElement* elm = getElement(getUint16(msg + 1));
if (elm == 0)
{
return;
}
if (elm->status == WAIT_PUBREC || elm->status == WAIT_PUBCOMP)
{
sendPubRel(elm);
elm->status = WAIT_PUBCOMP;
elm->sendUTC = time(NULL);
}
}
else if (msg[0] == MQTTSN_TYPE_PUBCOMP)
{
PubElement* elm = getElement(getUint16(msg + 1));
if (elm == 0)
{
return;
}
if (elm->status == WAIT_PUBCOMP)
{
ASSERT("\033[0m\033[0;32m Topic \"%s\" Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId);
remove(elm); // PUBLISH Done
}
}
}
void LPublishManager::published(uint8_t* msg, uint16_t msglen)
{
uint16_t topicId = getUint16(msg + 2);
if (msg[1] & MQTTSN_FLAG_QOS_1)
{
sendPubAck(topicId, getUint16(msg + 4), MQTTSN_RC_ACCEPTED);
}
_publishedFlg = NEG_TASK_INDEX;
theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, msg[1] & 0x03);
_publishedFlg = SAVE_TASK_INDEX;
}
void LPublishManager::checkTimeout(void)
{
PubElement* elm = _first;
while (elm)
{
if (elm->sendUTC > 0 && elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL))
{
if (elm->retryCount >= 0)
{
sendPublish(elm);
D_MQTTLOG("...Timeout retry\r\n");
}
else
{
theClient->getGwProxy()->reconnect();
elm->retryCount = MQTTSN_RETRY_COUNT;
break;
}
}
elm = elm->next;
}
}
PubElement* LPublishManager::getElement(uint16_t msgId)
{
PubElement* elm = _first;
while (elm)
{
if (elm->msgId == msgId)
{
break;
}
else
{
elm = elm->next;
}
}
return elm;
}
PubElement* LPublishManager::getElement(const char* topicName)
{
PubElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName) == 0)
{
break;
}
else
{
elm = elm->next;
}
}
return elm;
}
void LPublishManager::remove(PubElement* elm)
{
if (elm)
{
if (elm->prev == 0)
{
_first = elm->next;
if (elm->next == 0)
{
_last = 0;
}
else
{
elm->next->prev = 0;
_last = elm->next;
}
}
else
{
if ( elm->next == 0 )
{
_last = elm->prev;
}
elm->prev->next = elm->next;
}
delElement(elm);
}
}
void LPublishManager::delElement(PubElement* elm)
{
if (elm->taskIndex >= 0)
{
theClient->getTaskManager()->done(elm->taskIndex);
}
_elmCnt--;
if ( elm->payload )
{
free(elm->payload);
}
free(elm);
}
/*
PubElement* PublishManager::add(const char* topicName, uint16_t topicId, MQTTSNPayload* payload, uint8_t qos, uint8_t retain, uint16_t msgId){
return add(topicName, topicId, payload->getRowData(), payload->getLen(), qos, retain, msgId);
}*/
PubElement* LPublishManager::add(const char* topicName, uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos,
uint8_t retain, uint16_t msgId, uint8_t topicType)
{
PubElement* elm = (PubElement*) calloc(1, sizeof(PubElement));
if (elm == 0)
{
return elm;
}
if (_last == 0)
{
_first = elm;
_last = elm;
}
else
{
elm->prev = _last;
_last->next = elm;
_last = elm;
}
elm->topicName = topicName;
elm->flag |= topicType;
if (qos == 0)
{
elm->flag |= MQTTSN_FLAG_QOS_0;
}
else if (qos == 1)
{
elm->flag |= MQTTSN_FLAG_QOS_1;
}
else if (qos == 2)
{
elm->flag |= MQTTSN_FLAG_QOS_2;
}
if (retain)
{
elm->flag |= MQTTSN_FLAG_RETAIN;
}
if (topicId)
{
elm->status = TOPICID_IS_READY;
elm->topicId = topicId;
}
elm->payloadlen = len;
elm->msgId = msgId;
elm->retryCount = MQTTSN_RETRY_COUNT;
elm->sendUTC = 0;
if (_publishedFlg == NEG_TASK_INDEX)
{
elm->taskIndex = -1;
}
else
{
elm->taskIndex = theClient->getTaskManager()->getIndex();
theClient->getTaskManager()->suspend(elm->taskIndex);
}
elm->payload = (uint8_t*) malloc(len);
if (elm->payload == 0)
{
delElement(elm);
return 0;
}
memcpy(elm->payload, payload, len);
++_elmCnt;
return elm;
}

View File

@@ -0,0 +1,90 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef PUBLISHMANAGER_H_
#define PUBLISHMANAGER_H_
#include "LMqttsnClientApp.h"
#include "LTimer.h"
#include "LTopicTable.h"
#include "Payload.h"
using namespace std;
namespace linuxAsyncClient {
#define TOPICID_IS_SUSPEND 0
#define TOPICID_IS_READY 1
#define WAIT_PUBACK 2
#define WAIT_PUBREC 3
#define WAIT_PUBREL 4
#define WAIT_PUBCOMP 5
#define SAVE_TASK_INDEX 1
#define NEG_TASK_INDEX 0
typedef struct PubElement{
uint16_t msgId;
uint16_t topicId;
const char* topicName;
uint8_t* payload;
uint16_t payloadlen;
uint32_t sendUTC;
int (*callback)(void);
int retryCount;
int taskIndex;
PubElement* prev;
PubElement* next;
uint8_t flag;
uint8_t status; // 0:SUSPEND, 1:READY
} PubElement;
/*========================================
Class LPublishManager
=======================================*/
class LPublishManager{
public:
LPublishManager();
~LPublishManager();
void publish(const char* topicName, Payload* payload, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain = false);
void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void responce(const uint8_t* msg, uint16_t msglen);
void published(uint8_t* msg, uint16_t msglen);
void checkTimeout(void);
void sendSuspend(const char* topicName, uint16_t topicId, uint8_t topicType);
bool isDone(void);
bool isMaxFlight(void);
private:
PubElement* getElement(uint16_t msgId);
PubElement* getElement(const char* topicName);
PubElement* add(const char* topicName, uint16_t topicId, uint8_t* payload, uint16_t len,
uint8_t qos, uint8_t retain, uint16_t msgId, uint8_t topicType);
void remove(PubElement* elm);
void sendPublish(PubElement* elm);
void sendPubAck(uint16_t topicId, uint16_t msgId, uint8_t rc);
void sendPubRel(PubElement* elm);
void delElement(PubElement* elm);
PubElement* _first;
PubElement* _last;
uint8_t _elmCnt;
uint8_t _publishedFlg;
};
} /* tomyAsyncClient */
#endif /* PUBLISHMANAGER_H_ */

View File

@@ -0,0 +1,263 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdlib.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
#include "LGwProxy.h"
#include "LMqttsnClient.h"
#include "LRegisterManager.h"
using namespace std;
using namespace linuxAsyncClient;
extern void setUint16(uint8_t* pos, uint16_t val);
extern LMqttsnClient* theClient;
/*=====================================
Class RegisterQue
=====================================*/
LRegisterManager::LRegisterManager()
{
_first = 0;
_last = 0;
}
LRegisterManager::~LRegisterManager()
{
RegQueElement* elm = _first;
RegQueElement* sav = 0;
while (elm)
{
sav = elm->next;
if (elm != 0)
{
free(elm);
}
elm = sav;
}
}
RegQueElement* LRegisterManager::add(const char* topic, uint16_t msgId)
{
RegQueElement* elm = (RegQueElement*) calloc(1, sizeof(RegQueElement));
if (elm)
{
if (_last == 0)
{
_first = elm;
_last = elm;
}
else
{
elm->prev = _last;
_last->next = elm;
_last = elm;
}
elm->topicName = topic;
elm->msgId = msgId;
elm->retryCount = MQTTSN_RETRY_COUNT;
elm->sendUTC = 0;
}
return elm;
}
void LRegisterManager::remove(RegQueElement* elm)
{
if (elm)
{
if (elm->prev == 0)
{
_first = elm->next;
if (elm->next == 0)
{
_last = 0;
}
else
{
elm->next->prev = 0;
_last = elm->next;
}
}
else
{
if ( elm->next == 0 )
{
_last = elm->prev;
}
elm->prev->next = elm->next;
}
free(elm);
}
}
bool LRegisterManager::isDone(void)
{
return _first == 0;
}
const char* LRegisterManager::getTopic(uint16_t msgId)
{
RegQueElement* elm = _first;
while (elm)
{
if (elm->msgId == msgId)
{
return elm->topicName;
}
else
{
elm = elm->next;
}
}
return 0;
}
void LRegisterManager::send(RegQueElement* elm)
{
uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
msg[0] = 6 + strlen(elm->topicName);
msg[1] = MQTTSN_TYPE_REGISTER;
msg[2] = msg[3] = 0;
setUint16(msg + 4, elm->msgId);
strcpy((char*) msg + 6, elm->topicName);
theClient->getGwProxy()->connect();
theClient->getGwProxy()->writeMsg(msg);
elm->sendUTC = time(NULL);
elm->retryCount--;
}
RegQueElement* LRegisterManager::getElement(const char* topicName)
{
RegQueElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName))
{
elm = elm->next;
}
else
{
return elm;
}
}
return 0;
}
RegQueElement* LRegisterManager::getElement(uint16_t msgId)
{
RegQueElement* elm = _first;
while (elm)
{
if (elm->msgId == msgId)
{
break;
}
else
{
elm = elm->next;
}
}
return elm;
}
void LRegisterManager::registerTopic(char* topicName)
{
RegQueElement* elm = getElement(topicName);
if (elm == 0)
{
uint16_t msgId = theClient->getGwProxy()->getNextMsgId();
elm = add(topicName, msgId);
send(elm);
}
}
void LRegisterManager::responceRegAck(uint16_t msgId, uint16_t topicId)
{
const char* topicName = getTopic(msgId);
if (topicName)
{
uint8_t topicType = strlen((char*) topicName) > 2 ? MQTTSN_TOPIC_TYPE_NORMAL : MQTTSN_TOPIC_TYPE_SHORT;
theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, topicType); // Add Topic to TopicTable
RegQueElement* elm = getElement(msgId);
remove(elm);
theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, topicType);
}
}
void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen)
{
// *msg is terminated with 0x00 by Network::getMessage()
uint8_t regack[7];
regack[0] = 7;
regack[1] = MQTTSN_TYPE_REGACK;
memcpy(regack + 2, msg + 2, 4);
LTopic* tp = theClient->getGwProxy()->getTopicTable()->match((char*) msg + 5);
if (tp)
{
TopicCallback callback = tp->getCallback();
void* topicName = calloc(strlen((char*) msg + 5) + 1, sizeof(char));
theClient->getGwProxy()->getTopicTable()->add((char*) topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, callback, 1);
regack[6] = MQTTSN_RC_ACCEPTED;
}
else
{
regack[6] = MQTTSN_RC_REJECTED_INVALID_TOPIC_ID;
}
theClient->getGwProxy()->writeMsg(regack);
}
uint8_t LRegisterManager::checkTimeout(void)
{
RegQueElement* elm = _first;
RegQueElement* sav;
while (elm)
{
if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL))
{
if (elm->retryCount >= 0)
{
send(elm);
}
else
{
if (elm->next)
{
sav = elm->prev;
remove(elm);
if (sav)
{
elm = sav;
}
else
{
break;
}
}
else
{
remove(elm);
break;
}
}
}
elm = elm->next;
}
return 0;
}

View File

@@ -0,0 +1,56 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef REGISTERQUE_H_
#define REGISTERQUE_H_
#include "LMqttsnClientApp.h"
namespace linuxAsyncClient {
/*======================================
structure LRegisterQue
======================================*/
typedef struct RegQueElement{
const char* topicName;
uint16_t msgId;
int retryCount;
uint32_t sendUTC;
RegQueElement* prev;
RegQueElement* next;
}RegQueElement;
class LRegisterManager{
public:
LRegisterManager();
~LRegisterManager();
void registerTopic(char* topicName);
void responceRegAck(uint16_t msgId, uint16_t topicId);
void responceRegister(uint8_t* msg, uint16_t msglen);
bool isDone(void);
uint8_t checkTimeout();
const char* getTopic(uint16_t msgId);
private:
RegQueElement* getElement(const char* topicName);
RegQueElement* getElement(uint16_t msgId);
RegQueElement* add(const char* topicName, uint16_t msgId);
void remove(RegQueElement* elm);
void send(RegQueElement* elm);
RegQueElement* _first;
RegQueElement* _last;
};
}
#endif /* REGISTERQUE_H_ */

View File

@@ -0,0 +1,135 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation
**************************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include <sys/ioctl.h>
#include <unistd.h>
#include <termios.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdarg.h>
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
LScreen::LScreen()
{
_hight = 0;
_width = 0;
fcntl(0, F_SETFL, O_NONBLOCK );
}
LScreen::~LScreen()
{
}
void LScreen::getSize(void)
{
struct winsize wsize ;
ioctl(STDOUT_FILENO, TIOCGWINSZ, &wsize);
_width = wsize.ws_col ? wsize.ws_col : 132;
_hight = wsize.ws_row ? wsize.ws_row : 10;
}
void LScreen::clear(void)
{
getSize();
printf("\033[2J");
reprompt();
}
void LScreen::display(const char* format, ...)
{
va_list args;
va_start(args, format);
vdisplay(format, args);
va_end(args);
reprompt();
}
void LScreen::vdisplay(const char* format, va_list args)
{
vsprintf(_buffer, format, args);
fprintf(stdout, "\033[%d;%dH\033[2K", _hight, 1);
fprintf(stdout,"%s", _buffer);
reprompt();
}
void LScreen::prompt(const char* format, ...)
{
va_list args;
va_start(args, format);
vprompt(format, args);
va_end(args);
}
void LScreen::vprompt(const char* format, va_list args)
{
getSize();
int pos = 0;
string fmt = format;
if ( ( pos = fmt.find("\n")) > 0 )
{
fmt.replace(pos, 1, " ");
}
vsprintf(_buffer, format, args);
_prompt = _buffer;
reprompt();
}
void LScreen::reprompt(void)
{
int len = 0;
if ( (len =_prompt.size()) >= _width )
{
len = _width - 1;
}
fprintf(stdout,"\033[%d;%dH", _hight, 1);
fprintf(stdout,"\033[0;33m%s\033[0K\033[0;37m", _prompt.substr(0, len).c_str());
fflush(stdout);
}
bool LScreen::checkKeyIn(char* val)
{
int c = 0;
int cprev = 0;
while ( read(0, &c, 1) == 1 )
{
if ( c == '\n' )
{
*val = cprev;
fprintf(stdout, "\033[1T");
fflush(stdout);
reprompt();
return true;
}
cprev = c;
}
return false;
}

View File

@@ -0,0 +1,49 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation
**************************************************************************************/
#ifndef LSCREEN_H_
#define LSCREEN_H_
#include <stdarg.h>
#include <string>
using namespace std;
namespace linuxAsyncClient {
#define SCREEN_BUFF_SIZE 1024
/*========================================
Class Screen
=======================================*/
class LScreen{
public:
LScreen();
~LScreen();
void clear(void);
void display(const char* format, ...);
void prompt(const char* format, ...);
bool checkKeyIn(char* val);
private:
void reprompt(void);
void getSize(void);
void vdisplay(const char* format, va_list args);
void vprompt(const char* format, va_list args);
char _buffer[SCREEN_BUFF_SIZE];
int _hight;
int _width;
string _prompt;
};
} /* end of namespace */
#endif /* LSCREEN_H_ */

View File

@@ -0,0 +1,388 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdlib.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
#include "LScreen.h"
#include "LGwProxy.h"
#include "LMqttsnClient.h"
#include "LSubscribeManager.h"
using namespace std;
using namespace linuxAsyncClient;
extern void setUint16(uint8_t* pos, uint16_t val);
extern uint16_t getUint16(const uint8_t* pos);
extern LMqttsnClient* theClient;
extern SUBSCRIBE_LIST;
extern LScreen* theScreen;
#define SUB_DONE 1
#define SUB_READY 0
/*========================================
Class SubscribeManager
=======================================*/
LSubscribeManager::LSubscribeManager()
{
_first = 0;
_last = 0;
}
LSubscribeManager::~LSubscribeManager()
{
SubElement* elm = _first;
SubElement* sav = 0;
while (elm)
{
sav = elm->next;
if (elm != 0)
{
free(elm);
}
elm = sav;
}
}
void LSubscribeManager::onConnect(void)
{
ASSERT("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n");
if (_first == 0)
{
for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++)
{
subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos);
}
}
else
{
SubElement* elm = _first;
SubElement* pelm;
do
{
pelm = elm;
if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE)
{
elm->done = SUB_READY;
elm->retryCount = MQTTSN_RETRY_COUNT;
subscribe(elm->topicName, elm->callback, elm->qos);
}
elm = pelm->next;
} while (pelm->next);
}
while (!theClient->getSubscribeManager()->isDone())
{
theClient->getGwProxy()->getMessage();
}
ASSERT("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n");
ASSERT("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n");
}
bool LSubscribeManager::isDone(void)
{
SubElement* elm = _first;
SubElement* prevelm;
while (elm)
{
prevelm = elm;
if (elm->done == SUB_READY)
{
return false;
}
elm = prevelm->next;
}
return true;
}
void LSubscribeManager::send(SubElement* elm)
{
if (elm->done == SUB_DONE)
{
return;
}
uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
if (elm->topicType == MQTTSN_TOPIC_TYPE_NORMAL)
{
msg[0] = 5 + strlen(elm->topicName);
strcpy((char*) msg + 5, elm->topicName);
}
else
{
msg[0] = 7;
setUint16(msg + 5, elm->topicId);
}
msg[1] = elm->msgType;
msg[2] = elm->qos | elm->topicType;
if (elm->retryCount == MQTTSN_RETRY_COUNT)
{
elm->msgId = theClient->getGwProxy()->getNextMsgId();
}
if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE)
{
msg[2] = msg[2] | MQTTSN_FLAG_DUP;
}
setUint16(msg + 3, elm->msgId);
theClient->getGwProxy()->connect();
theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->resetPingReqTimer();
elm->sendUTC = time(NULL);
elm->retryCount--;
}
void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos)
{
uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL;
if ( strlen(topicName) <= 2)
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, 0, topicType, qos, onPublish);
send(elm);
}
void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType)
{
SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, topicId, topicType, qos, onPublish);
send(elm);
}
void LSubscribeManager::unsubscribe(const char* topicName)
{
SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, 0, 0);
send(elm);
}
void LSubscribeManager::unsubscribe(uint16_t topicId, uint8_t topicType)
{
SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, topicId, topicType, 0, 0);
send(elm);
}
void LSubscribeManager::checkTimeout(void)
{
SubElement* elm = _first;
while (elm)
{
if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL))
{
if (elm->retryCount >= 0)
{
send(elm);
}
else
{
if ( elm->done == SUB_READY )
{
if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE)
{
ASSERT("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName);
}else{
ASSERT("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName);
}
elm->done = SUB_DONE;
}
}
}
elm = elm->next;
}
}
void LSubscribeManager::responce(const uint8_t* msg)
{
if (msg[0] == MQTTSN_TYPE_SUBACK)
{
uint16_t topicId = getUint16(msg + 2);
uint16_t msgId = getUint16(msg + 4);
uint8_t rc = msg[6];
LTopicTable* tt = theClient->getGwProxy()->getTopicTable();
SubElement* elm = getElement(msgId);
if (elm)
{
if ( rc == MQTTSN_RC_ACCEPTED )
{
tt->add((char*) elm->topicName, topicId, elm->topicType, elm->callback);
getElement(msgId)->done = SUB_DONE;
ASSERT("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, getElement(msgId)->topicId);
}
else
{
remove(elm);
ASSERT("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
}
}
}
else if (msg[0] == MQTTSN_TYPE_UNSUBACK)
{
uint16_t msgId = getUint16(msg + 1);
SubElement* elm = getElement(msgId);
if (elm)
{
LTopicTable* tt = theClient->getGwProxy()->getTopicTable();
tt->setCallback(elm->topicName, 0);
ASSERT("\033[0m\033[0;32m Topic \"%s\" Id : %d was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, getElement(msgId)->topicId);
remove(getElement(msgId));
}
else
{
ASSERT("\033[0m\033[0;31m UNSUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName);
remove(getElement(msgId));
}
}
}
/* SubElement operations */
SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType,
uint8_t qos, TopicCallback callback)
{
SubElement* elm = 0;
if (topicName )
{
elm = getElement(topicName, msgType);
}
else
{
elm = getElement(topicId, topicType);
}
if ( elm == 0 )
{
elm = (SubElement*) calloc(1, sizeof(SubElement));
if (elm == 0)
{
return 0;
}
if (_last == 0)
{
_first = elm;
_last = elm;
}
else
{
elm->prev = _last;
_last->next = elm;
_last = elm;
}
}
elm->msgType = msgType;
elm->callback = callback;
elm->topicName = topicName;
elm->topicId = topicId;
elm->topicType = topicType;
if (qos == 1)
{
elm->qos = MQTTSN_FLAG_QOS_1;
}
else if (qos == 2)
{
elm->qos = MQTTSN_FLAG_QOS_2;
}
else
{
elm->qos = MQTTSN_FLAG_QOS_0;
}
elm->msgId = 0;
elm->retryCount = MQTTSN_RETRY_COUNT;
elm->done = SUB_READY;
elm->sendUTC = 0;
return elm;
}
void LSubscribeManager::remove(SubElement* elm)
{
if (elm)
{
if (elm->prev == 0)
{
_first = elm->next;
if (elm->next != 0)
{
elm->next->prev = 0;
_last = elm->next;
}
free(elm);
}
else
{
if ( elm->next == 0 )
{
_last = elm->prev;
}
elm->prev->next = elm->next;
free(elm);
}
}
}
SubElement* LSubscribeManager::getElement(uint16_t msgId)
{
SubElement* elm = _first;
while (elm)
{
if (elm->msgId == msgId)
{
return elm;
}
else
{
elm = elm->next;
}
}
return 0;
}
SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType)
{
SubElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName) == 0 && elm->msgType == msgType)
{
return elm;
}
else
{
elm = elm->next;
}
}
return 0;
}
SubElement* LSubscribeManager::getElement(uint16_t topicId, uint8_t topicType)
{
SubElement* elm = _first;
while (elm)
{
if (elm->topicId == topicId && elm->topicType == topicType)
{
return elm;
}
else
{
elm = elm->next;
}
}
return 0;
}

View File

@@ -0,0 +1,77 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef SUBSCRIBEMANAGER_H_
#define SUBSCRIBEMANAGER_H_
#include <stdio.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LRegisterManager.h"
#include "LTimer.h"
#include "LTopicTable.h"
using namespace std;
namespace linuxAsyncClient {
typedef struct SubElement{
TopicCallback callback;
const char* topicName;
uint16_t msgId;
uint32_t sendUTC;
uint16_t topicId;
uint8_t msgType;
uint8_t topicType;
uint8_t qos;
int retryCount;
uint8_t done;
SubElement* prev;
SubElement* next;
} SubElement;
/*========================================
Class LSubscribeManager
=======================================*/
class LSubscribeManager{
public:
LSubscribeManager();
~LSubscribeManager();
void onConnect(void);
void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType);
void unsubscribe(const char* topicName);
void unsubscribe(uint16_t topicId, uint8_t topicType);
void responce(const uint8_t* msg);
void checkTimeout(void);
bool isDone(void);
private:
void send(SubElement* elm);
SubElement* getFirstElement(void);
SubElement* getElement(uint16_t msgId);
SubElement* getElement(uint16_t topicId, uint8_t topicType);
SubElement* getElement(const char* topicName, uint8_t msgType);
SubElement* add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType, uint8_t qos, TopicCallback callback);
void remove(SubElement* elm);
SubElement* _first;
SubElement* _last;
};
} /* end of namespace */
#endif /* SUBSCRIBEMANAGER_H_ */

View File

@@ -0,0 +1,177 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdio.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LSubscribeManager.h"
#include "LTimer.h"
#include "LMqttsnClient.h"
#include "LTaskManager.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern LMqttsnClient* theClient;
extern LScreen* theScreen;
extern bool theClientMode;
/*=====================================
TaskManager
======================================*/
LTaskManager::LTaskManager(void){
_tasks = 0;
_tests = 0;
_index = 0;
}
LTaskManager::~LTaskManager(void){
}
void LTaskManager::add(TaskList* task){
_tasks = task;
}
void LTaskManager::add(TestList* test){
_tests = test;
}
void LTaskManager::run(void){
int i = 0;
char c = 0;
bool cancelFlg = false;
if ( !theClientMode )
{
theClient->getGwProxy()->getMessage();
for (i = 0; _tests[i].testTask > 0; i++)
{
PROMPT("Execute \"%s\" ? ( y/n ) : ", _tests[i].testLabel);
while (true)
{
if (CHECKKEYIN(&c))
{
if ( toupper(c) == 'N' )
{
ASSERT("\033[0m\033[0;32m\n**** %s is canceled ****\033[0m\033[0;37m\n\n", _tests[i].testLabel);
theScreen->prompt("");
cancelFlg = true;
break;
}
else if ( toupper(c) == 'Y' )
{
ASSERT("\033[0m\033[0;32m\n\n**** %s start ****\033[0m\033[0;37m\n", _tests[i].testLabel);
theScreen->prompt("");
(_tests[i].testTask)();
cancelFlg = false;
break;
}
}
else
{
theClient->getGwProxy()->getMessage();
}
}
while ( true )
{
do
{
theClient->getGwProxy()->getMessage();
}
while(theClient->getPublishManager()->isMaxFlight() ||
!theClient->getSubscribeManager()->isDone() ||
!theClient->getRegisterManager()->isDone());
if (theClient->getPublishManager()->isDone())
{
break;
}
}
if ( !cancelFlg )
{
ASSERT("\033[0m\033[0;32m\n**** %s complete ****\033[0m\033[0;37m\n\n", _tests[i].testLabel);
}
}
ASSERT("\033[0m\033[0;32m\n\n######### All tests complete! ###########\033[0m\033[0;37m\n\n");
}
else
{
while (true)
{
theClient->getGwProxy()->getMessage();
for (_index = 0; _tasks[_index].callback > 0; _index++)
{
if ((_tasks[_index].prevTime + _tasks[_index].interval <= time(NULL)) &&
_tasks[_index].count == 0)
{
_tasks[_index].prevTime = time(NULL);
(_tasks[_index].callback)();
}
}
do
{
theClient->getGwProxy()->getMessage();
}
while(theClient->getPublishManager()->isMaxFlight() ||
!theClient->getSubscribeManager()->isDone() ||
!theClient->getRegisterManager()->isDone());
if (theClient->getPublishManager()->isDone())
{
break;
}
}
}
}
uint8_t LTaskManager::getIndex(void){
return _index;
}
void LTaskManager::done(uint8_t index){
if (_tasks )
{
if (_tasks[index].count > 0)
{
_tasks[index].count--;
}
}
if (_tests )
{
if (_tests[index].count > 0)
{
_tests[index].count--;
}
}
}
void LTaskManager::suspend(uint8_t index){
if ( _tasks )
{
_tasks[index].count++;
}
if ( _tests )
{
_tests[index].count++;
}
}

View File

@@ -0,0 +1,65 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef TASKMANAGER_H_
#define TASKMANAGER_H_
#include <stdio.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
using namespace std;
namespace linuxAsyncClient {
struct TaskList{
void (*callback)(void);
uint32_t interval;
uint32_t prevTime;
uint8_t count;
};
struct TestList {
const char* testLabel;
void (*testTask)(void);
uint8_t count;
};
/*========================================
Class TaskManager
=======================================*/
class LTaskManager{
public:
LTaskManager();
~LTaskManager();
void add(TaskList* task);
void add(TestList* test);
void run(void);
void done(uint8_t index);
void suspend(uint8_t index);
uint8_t getIndex(void);
private:
TaskList* _tasks;
TestList* _tests;
uint8_t _index;
};
} /* end of namespace */
#endif /* TASKMANAGER_H_ */

View File

@@ -0,0 +1,65 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdlib.h>
#include <string.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
using namespace std;
using namespace linuxAsyncClient;
/*=====================================
Class Timer
=====================================*/
LTimer::LTimer(){
_startTime.tv_sec = 0;
_millis = 0;
}
LTimer::~LTimer(){
}
void LTimer::start(uint32_t msec){
gettimeofday(&_startTime, 0);
_millis = msec;
}
bool LTimer::isTimeUp(void){
return isTimeUp(_millis);
}
bool LTimer::isTimeUp(uint32_t msec){
struct timeval curTime;
uint32_t secs, usecs;
if (_startTime.tv_sec == 0){
return false;
}else{
gettimeofday(&curTime, 0);
secs = (curTime.tv_sec - _startTime.tv_sec) * 1000;
usecs = (curTime.tv_usec - _startTime.tv_usec) / 1000.0;
return ((secs + usecs) > (uint32_t)msec);
}
}
void LTimer::stop(){
_startTime.tv_sec = 0;
_millis = 0;
}

View File

@@ -0,0 +1,46 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef TIMER_H_
#define TIMER_H_
#include <sys/time.h>
#include "LMqttsnClientApp.h"
namespace linuxAsyncClient {
/*============================================
LTimer
============================================*/
class LTimer{
public:
LTimer();
~LTimer();
void start(uint32_t msec = 0);
bool isTimeUp(uint32_t msec);
bool isTimeUp(void);
void stop(void);
void changeUTC(void){};
static void setUnixTime(uint32_t utc){};
private:
struct timeval _startTime;
uint32_t _millis;
};
} /* end of namespace */
#endif /* TIMER_H_ */

View File

@@ -0,0 +1,287 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "LMqttsnClientApp.h"
#include "LTopicTable.h"
using namespace std;
using namespace linuxAsyncClient;
/*=====================================
Class Topic
======================================*/
LTopic::LTopic(){
_topicStr = 0;
_callback = 0;
_topicId = 0;
_topicType = MQTTSN_TOPIC_TYPE_NORMAL;
_next = 0;
_malocFlg = 0;
}
LTopic::~LTopic(){
if (_malocFlg){
free(_topicStr);
}
}
TopicCallback LTopic::getCallback(void){
return _callback;
}
int LTopic::execCallback(uint8_t* payload, uint16_t payloadlen){
if(_callback != 0){
return _callback(payload, payloadlen);
}
return 0;
}
uint8_t LTopic::hasWildCard(uint8_t* pos){
*pos = strlen(_topicStr) - 1;
if (*(_topicStr + *pos) == '#'){
return MQTTSN_TOPIC_MULTI_WILDCARD;
}else{
for(uint8_t p = 0; p < strlen(_topicStr); p++){
if (*(_topicStr + p) == '+'){
*pos = p;
return MQTTSN_TOPIC_SINGLE_WILDCARD;
}
}
}
return 0;
}
bool LTopic::isMatch(const char* topic){
uint8_t pos;
if ( strlen(topic) < strlen(_topicStr)){
return false;
}
uint8_t wc = hasWildCard(&pos);
if (wc == MQTTSN_TOPIC_SINGLE_WILDCARD){
if ( strncmp(_topicStr, topic, pos - 1) == 0){
if (*(_topicStr + pos + 1) == '/'){
for(uint8_t p = pos; p < strlen(topic); p++){
if (*(topic + p) == '/'){
if (strcmp(_topicStr + pos + 1, topic + p ) == 0){
return true;
}
}
}
}else{
for(uint8_t p = pos + 1;p < strlen(topic); p++){
if (*(topic + p) == '/'){
return false;
}
}
}
return true;
}
}else if (wc == MQTTSN_TOPIC_MULTI_WILDCARD){
if (strncmp(_topicStr, topic, pos) == 0){
return true;
}
}else if (strcmp(_topicStr, topic) == 0){
return true;
}
return false;
}
/*=====================================
Class TopicTable
======================================*/
LTopicTable::LTopicTable(){
_first = 0;
_last = 0;
}
LTopicTable::~LTopicTable(){
clearTopic();
}
LTopic* LTopicTable::getTopic(const char* topic){
LTopic* p = _first;
while(p){
if (p->_topicStr != 0 && strcmp(p->_topicStr, topic) == 0){
return p;
}
p = p->_next;
}
return 0;
}
LTopic* LTopicTable::getTopic(uint16_t topicId, uint8_t topicType){
LTopic* p = _first;
while(p){
if (p->_topicId == topicId && p->_topicType == topicType){
return p;
}
p = p->_next;
}
return 0;
}
uint16_t LTopicTable::getTopicId(const char* topic){
LTopic* p = getTopic(topic);
if (p){
return p->_topicId;
}
return 0;
}
char* LTopicTable::getTopicName(LTopic* topic){
return topic->_topicStr;
}
void LTopicTable::setTopicId(const char* topic, uint16_t id, uint8_t type){
LTopic* tp = getTopic(topic);
if (tp){
tp->_topicId = id;
}else{
add(topic, id, type, 0);
}
}
bool LTopicTable::setCallback(const char* topic, TopicCallback callback){
LTopic* p = getTopic(topic);
if (p){
p->_callback = callback;
return true;
}
return false;
}
bool LTopicTable::setCallback(uint16_t topicId, uint8_t topicType, TopicCallback callback){
LTopic* p = getTopic(topicId, topicType);
if (p){
p->_callback = callback;
return true;
}
return false;
}
int LTopicTable::execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, uint8_t topicType){
LTopic* p = getTopic(topicId, topicType);
if (p){;
return p->execCallback(payload, payloadlen);
}
return 0;
}
LTopic* LTopicTable::add(const char* topicName, uint16_t id, uint8_t type, TopicCallback callback, uint8_t alocFlg)
{
LTopic* elm;
if (topicName){
elm = getTopic(topicName);
}else{
elm = getTopic(id, type);
}
if (elm == 0){
elm = new LTopic();
if(elm == 0){
goto exit;
}
if ( _last == 0){
_first = elm;
_last = elm;
}
else
{
elm->_prev = _last;
_last->_next = elm;
_last = elm;
}
elm->_topicStr = const_cast <char*>(topicName);
elm->_topicId = id;
elm->_topicType = type;
elm->_callback = callback;
elm->_malocFlg = alocFlg;
elm->_prev = 0;
}else{
elm->_callback = callback;
}
exit:
return elm;
}
void LTopicTable::remove(uint16_t topicId)
{
LTopic* elm = getTopic(topicId);
if (elm){
if (elm->_prev == 0)
{
_first = elm->_next;
if (elm->_next == 0)
{
_last = 0;
}
else
{
elm->_next->_prev = 0;
_last = elm->_next;
}
}
else
{
if ( elm->_next == 0 )
{
_last = elm->_prev;
}
elm->_prev->_next = elm->_next;
}
delete elm;
}
}
LTopic* LTopicTable::match(const char* topicName){
LTopic* elm = _first;
while(elm){
if (elm->isMatch(topicName)){
break;
}
elm = elm->_next;
}
return elm;
}
void LTopicTable::clearTopic(void){
LTopic* p = _first;
while(p){
_first = p->_next;
delete p;
p = _first;
}
_last = 0;
}

View File

@@ -0,0 +1,82 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef TOPICTABLE_H_
#define TOPICTABLE_H_
#include <stdio.h>
#include "LMqttsnClientApp.h"
#include "Payload.h"
#define MQTTSN_TOPIC_MULTI_WILDCARD 1
#define MQTTSN_TOPIC_SINGLE_WILDCARD 2
namespace linuxAsyncClient {
/*=====================================
Class LTopic
======================================*/
typedef int (*TopicCallback)(uint8_t*, uint16_t);
class LTopic {
friend class LTopicTable;
public:
LTopic();
~LTopic();
int execCallback(uint8_t* payload, uint16_t payloadlen);
uint8_t hasWildCard(uint8_t* pos);
bool isMatch(const char* topic);
TopicCallback getCallback(void);
private:
uint16_t _topicId;
uint8_t _topicType;
char* _topicStr;
TopicCallback _callback;
uint8_t _malocFlg;
LTopic* _prev;
LTopic* _next;
};
/*=====================================
Class LTopicTable
======================================*/
class LTopicTable {
public:
LTopicTable();
~LTopicTable();
uint16_t getTopicId(const char* topic);
char* getTopicName(LTopic* topic);
LTopic* getTopic(const char* topic);
LTopic* getTopic(uint16_t topicId, uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL);
void setTopicId(const char* topic, uint16_t id, uint8_t topicType);
bool setCallback(const char* topic, TopicCallback callback);
bool setCallback(uint16_t topicId, uint8_t type, TopicCallback callback);
int execCallback(uint16_t topicId, uint8_t* payload, uint16_t payloadlen, uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL);
LTopic* add(const char* topic, uint16_t id = 0, uint8_t type = MQTTSN_TOPIC_TYPE_NORMAL, TopicCallback callback = 0, uint8_t alocFlg = 0);
LTopic* add(uint16_t topicId, uint16_t id, uint8_t type, TopicCallback callback, uint8_t alocFlg);
LTopic* match(const char* topic);
void clearTopic(void);
void remove(uint16_t topicId);
private:
LTopic* _first;
LTopic* _last;
};
} /* end of namespace */
#endif /* TOPICTABLE_H_ */

View File

@@ -0,0 +1,348 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <sys/stat.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include "LMqttsnClientApp.h"
#include "Payload.h"
using namespace std;
using namespace linuxAsyncClient;
extern uint16_t getUint16(const uint8_t* pos);
extern uint32_t getUint32(const uint8_t* pos);
extern float getFloat32(const uint8_t* pos);
extern void setUint16(uint8_t* pos, uint16_t val);
extern void setUint32(uint8_t* pos, uint32_t val);
extern void setFloat32(uint8_t* pos, float val);
/*=====================================
Class Payload
=====================================*/
Payload::Payload(){
_buff = _pos = 0;
_len = 0;
_elmCnt = 0;
_memDlt = 0;
}
Payload::Payload(uint16_t len){
_buff = (uint8_t*)calloc(len, sizeof(uint8_t));
if(_buff == 0){
exit(-1);
}
_pos = _buff;
_elmCnt = 0;
_len = len;
_memDlt = 1;
}
Payload::~Payload(){
if(_memDlt){
free(_buff);
}
}
void Payload::init(){
_pos = _buff;
_elmCnt = 0;
}
uint16_t Payload::getAvailableLength(){
return _len - (_pos - _buff);
}
uint16_t Payload::getLen(){
return _pos - _buff;
}
uint8_t* Payload::getRowData(){
return _buff;
}
/*======================
* setter
======================*/
int8_t Payload::set_uint32(uint32_t val){
if(getAvailableLength() < 6){
return -1;
}
if(val < 128){
*_pos++ = (uint8_t)val;
}else if(val < 256){
*_pos++ = MSGPACK_UINT8;
*_pos++ = (uint8_t)val;
}else if(val < 65536){
*_pos++ = MSGPACK_UINT16;
setUint16(_pos,(uint16_t) val);
_pos += 2;
}else{
*_pos++ = MSGPACK_UINT32;
setUint32(_pos, val);
_pos += 4;
}
_elmCnt++;
return 0;
}
int8_t Payload::set_int32(int32_t val){
if(getAvailableLength() < 6){
return -1;
}
if((val > -32) && (val < 0)){
*_pos++ = val | MSGPACK_NEGINT;
}else if((val >= 0) && (val < 128)){
*_pos++ = val;
}else if(val > -128 && val < 128){
*_pos++ = MSGPACK_INT8;
*_pos++ = (uint8_t)val;
}else if(val > -32768 && val < 32768){
*_pos++ = MSGPACK_INT16;
setUint16(_pos, (uint16_t)val);
_pos += 2;
}else{
*_pos++ = MSGPACK_INT32;
setUint32(_pos, (uint32_t)val);
_pos += 4;
}
_elmCnt++;
return 0;
}
int8_t Payload::set_float(float val){
if(getAvailableLength() < 6){
return -1;
}
*_pos++ = MSGPACK_FLOAT32;
setFloat32(_pos, val);
_pos += 4;
_elmCnt++;
return 0;
}
int8_t Payload::set_str(char* val){
return set_str((const char*) val);
}
int8_t Payload::set_str(const char* val){
if(getAvailableLength() < strlen(val) + 3){
return -1;
}else if(strlen(val) < 32){
*_pos++ = (uint8_t)strlen(val) | MSGPACK_FIXSTR;
}else if(strlen(val) < 256){
*_pos++ = MSGPACK_STR8;
*_pos++ = (uint8_t)strlen(val);
}else if(strlen(val) < 65536){
*_pos++ = MSGPACK_STR16;
setUint16(_pos, (uint16_t)strlen(val));
_pos += 2;
}
memcpy(_pos, val, strlen(val));
_pos += strlen(val);
return 0;
}
int8_t Payload::set_array(uint8_t val){
if(getAvailableLength() < (uint16_t)val+ 1){
return -1;
}
if(val < 16){
*_pos++ = MSGPACK_ARRAY15 | val;
}else{
*_pos++ = MSGPACK_ARRAY16;
setUint16(_pos,(uint16_t)val);
_pos += 2;
}
_elmCnt++;
return 0;
}
int8_t Payload::set_bool(bool val){
if (getAvailableLength() < 1){
return -1;
}
if (val){
*_pos++ = MSGPACK_TRUE;
}else {
*_pos++ = MSGPACK_FALSE;
}
_elmCnt++;
return 0;
}
/*======================
* getter
======================*/
uint8_t Payload::getArray(uint8_t index){
uint8_t rc = 0;
uint8_t* val = getBufferPos(index);
if(val != 0){
if(*val == MSGPACK_ARRAY15){
rc = *val & 0x0F;
}else if(*val == MSGPACK_ARRAY16){
rc = (uint8_t)getUint16(val + 1);
}
}
return rc;
}
bool Payload::get_bool(uint8_t index){
uint8_t* val = getBufferPos(index);
if (*val == MSGPACK_FALSE){
return false;
}else{
return true;
}
}
uint32_t Payload::get_uint32(uint8_t index){
uint32_t rc = 0;
uint8_t* val = getBufferPos(index);
if(val != 0){
if(*val == MSGPACK_UINT32){
rc = getUint32(val + 1);
}else if(*val == MSGPACK_UINT16){
rc = (uint32_t)getUint16(val + 1);
}else if(*val == MSGPACK_UINT8){
rc = (uint32_t)*(val + 1);
}else if(*val < 128){
rc = (uint32_t)*val;
}
}
return rc;
}
int32_t Payload::get_int32(uint8_t index){
int32_t rc = 0;
uint8_t* val = getBufferPos(index);
if(val != 0){
if(*val == MSGPACK_INT32){
rc = (int32_t) getUint32(val + 1);
}else if(*val == MSGPACK_INT16){
uint16_t d16 = getUint16(val + 1);
if(d16 >= 32768){
rc = d16 - 65536;
}else{
rc = (int32_t)d16;
}
}else if(*val == MSGPACK_INT8){
rc = (int32_t)*(val + 1);
}else if((*val & MSGPACK_NEGINT) == MSGPACK_NEGINT){
*val &= ~MSGPACK_NEGINT;
rc = ((int32_t)*val) * -1;
}else{
rc = (int32_t) *val;
}
}
return rc;
}
float Payload::get_float(uint8_t index){
uint8_t* val = getBufferPos(index);
if(val != 0){
if(*val == MSGPACK_FLOAT32){
return getFloat32(val + 1);
}
}
return 0;
}
const char* Payload::get_str(uint8_t index, uint16_t* len){
uint8_t* val = getBufferPos(index);
if(val != 0){
if(*val == MSGPACK_STR16){
*len = getUint16(val + 1);
return (const char*)(val + 3);
}else if(*val == MSGPACK_STR8){
*len = *(val + 1);
return (const char*)(val + 2);
}else if( (*val & 0xf0) == MSGPACK_FIXSTR ){
*len = *val & 0x0f;
return (const char*)(val + 1);
}
}
*len = 0;
return (const char*) 0;
}
uint8_t* Payload::getBufferPos(uint8_t index){
uint8_t* bpos = 0;
uint8_t* pos = _buff;
for(uint8_t i = 0; i <= index; i++){
bpos = pos;
switch(*pos){
case MSGPACK_FALSE:
case MSGPACK_TRUE:
pos++;
break;
case MSGPACK_UINT8:
case MSGPACK_INT8:
pos += 2;
break;
case MSGPACK_UINT16:
case MSGPACK_INT16:
case MSGPACK_ARRAY16:
pos += 3;
break;
case MSGPACK_UINT32:
case MSGPACK_INT32:
case MSGPACK_FLOAT32:
pos += 5;
break;
case MSGPACK_STR8:
pos += *(pos + 1) + 2;
break;
case MSGPACK_STR16:
pos += getUint16(pos + 1) + 3;
break;
default:
if((*pos < MSGPACK_POSINT) ||
((*pos & 0xf0) == MSGPACK_NEGINT) ||
((*pos & 0xf0) == MSGPACK_ARRAY15)) {
pos++;
}else if((*pos & 0xf0) == MSGPACK_FIXSTR){
pos += (*pos & 0x0f) + 1;
}
}
/*
if((pos - _buff) >= _len){
return 0;
}
*/
}
return bpos;
}
void Payload::setRowData(uint8_t* payload, uint16_t payloadLen){
if(_memDlt){
free(_buff);
_memDlt = 0;
}
_buff = payload;
_len = payloadLen;
_pos = _buff + _len;
}

View File

@@ -0,0 +1,87 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#ifndef PAYLOAD_H_
#define PAYLOAD_H_
#include "LMqttsnClientApp.h"
#define MSGPACK_FALSE 0xc2
#define MSGPACK_TRUE 0xc3
#define MSGPACK_POSINT 0x80
#define MSGPACK_NEGINT 0xe0
#define MSGPACK_UINT8 0xcc
#define MSGPACK_UINT16 0xcd
#define MSGPACK_UINT32 0xce
#define MSGPACK_INT8 0xd0
#define MSGPACK_INT16 0xd1
#define MSGPACK_INT32 0xd2
#define MSGPACK_FLOAT32 0xca
#define MSGPACK_FIXSTR 0xa0
#define MSGPACK_STR8 0xd9
#define MSGPACK_STR16 0xda
#define MSGPACK_ARRAY15 0x90
#define MSGPACK_ARRAY16 0xdc
#define MSGPACK_MAX_ELEMENTS 50 // Less than 256
namespace linuxAsyncClient {
/*=====================================
Class Payload
=====================================*/
class Payload{
public:
Payload();
Payload(uint16_t len);
~Payload();
/*---------------------------------------------
getLen() and getRowData() are
minimum required functions of Payload class.
----------------------------------------------*/
uint16_t getLen(); // get data length
uint8_t* getRowData(); // get data pointer
/*--- Functions for MessagePack ---*/
void init(void);
int8_t set_bool(bool val);
int8_t set_uint32(uint32_t val);
int8_t set_int32(int32_t val);
int8_t set_float(float val);
int8_t set_str(char* val);
int8_t set_str(const char* val);
int8_t set_array(uint8_t val);
bool get_bool(uint8_t index);
uint8_t getArray(uint8_t index);
uint32_t get_uint32(uint8_t index);
int32_t get_int32(uint8_t index);
float get_float(uint8_t index);
const char* get_str(uint8_t index, uint16_t* len);
void setRowData(uint8_t* payload, uint16_t payloadLen);
uint16_t getAvailableLength();
private:
uint8_t* getBufferPos(uint8_t index);
uint8_t* _buff;
uint16_t _len;
uint8_t _elmCnt;
uint8_t* _pos;
uint8_t _memDlt;
};
} /* linuxAsyncClient */
#endif /* PAYLOAD_H_ */

View File

@@ -0,0 +1,134 @@
/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdio.h>
#include <unistd.h>
#include "LMqttsnClientApp.h"
using namespace std;
/*=====================================
Global functions
======================================*/
#ifndef CPU_BIGENDIANN
/*--- For Little endianness ---*/
uint16_t getUint16(const uint8_t* pos){
uint16_t val = ((uint16_t)*pos++ << 8);
return val += *pos;
}
void setUint16(uint8_t* pos, uint16_t val){
*pos++ = (val >> 8) & 0xff;
*pos = val & 0xff;
}
uint32_t getUint32(const uint8_t* pos){
uint32_t val = uint32_t(*pos++) << 24;
val += uint32_t(*pos++) << 16;
val += uint32_t(*pos++) << 8;
return val += *pos++;
}
void setUint32(uint8_t* pos, uint32_t val){
*pos++ = (val >> 24) & 0xff;
*pos++ = (val >> 16) & 0xff;
*pos++ = (val >> 8) & 0xff;
*pos = val & 0xff;
}
float getFloat32(const uint8_t* pos){
union{
float flt;
uint8_t d[4];
}val;
val.d[3] = *pos++;
val.d[2] = *pos++;
val.d[1] = *pos++;
val.d[0] = *pos;
return val.flt;
}
void setFloat32(uint8_t* pos, float flt){
union{
float flt;
uint8_t d[4];
}val;
val.flt = flt;
*pos++ = val.d[3];
*pos++ = val.d[2];
*pos++ = val.d[1];
*pos = val.d[0];
}
#else
/*--- For Big endianness ---*/
uint16_t getUint16(const uint8_t* pos){
uint16_t val = *pos++;
return val += ((uint16_t)*pos++ << 8);
}
void setUint16(uint8_t* pos, uint16_t val){
*pos++ = val & 0xff;
*pos = (val >> 8) & 0xff;
}
uint32_t getUint32(const uint8_t* pos){
long val = uint32_t(*(pos + 3)) << 24;
val += uint32_t(*(pos + 2)) << 16;
val += uint32_t(*(pos + 1)) << 8;
return val += *pos;
}
void setUint32(uint8_t* pos, uint32_t val){
*pos++ = val & 0xff;
*pos++ = (val >> 8) & 0xff;
*pos++ = (val >> 16) & 0xff;
*pos = (val >> 24) & 0xff;
}
float getFloat32(const uint8_t* pos){
union{
float flt;
uint8_t d[4];
}val;
val.d[0] = *pos++;
val.d[1] = *pos++;
val.d[2] = *pos++;
val.d[3] = *pos;
return val.flt;
}
void setFloat32(uint8_t* pos, float flt){
union{
float flt;
uint8_t d[4];
}val;
val.flt = flt;
*pos++ = val.d[0];
*pos++ = val.d[1];
*pos++ = val.d[2];
*pos = val.d[3];
}
#endif // CPU_LITTLEENDIANN