Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2018-07-18 21:03:03 +09:00
parent a658bd5714
commit 74a9ebaa55
10 changed files with 366 additions and 293 deletions

View File

@@ -77,7 +77,7 @@ MQTTSNCONF = {
const char* topic1 = "ty4tw/topic1";
const char* topic2 = "ty4tw/topic2";
const char* topic3 = "ty4tw/topic3";
const char* topic57 = "ty4tw/topic5/7";
/*------------------------------------------------------
* Callback routines for Subscribed Topics
@@ -112,6 +112,14 @@ void publishTopic2(void)
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos);
}
void publishTopic57(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/topic57\" \n");
uint8_t qos = 0;
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos);
}
void disconnect(void)
{
@@ -127,7 +135,7 @@ void disconnect(void)
TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step1:Publish topic1", publishTopic1),
TEST("Step2:Publish topic2", publishTopic2),
TEST("Step2:Publish topic57", publishTopic57),
TEST("Step3:Publish topic2", publishTopic2),
TEST("Step4:Disconnect", disconnect),
END_OF_TEST_LIST

View File

@@ -62,7 +62,7 @@ UDPCONF = {
* Client Configuration (theMqcon)
*------------------------------------------------------*/
MQTTSNCONF = {
300, //KeepAlive [seconds]
60, //KeepAlive [seconds]
false, //Clean session
300, //Sleep duration [seconds]
"", //WillTopic
@@ -109,14 +109,22 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen)
return 0;
}
int on_Topic05(uint8_t* pload, uint16_t ploadlen)
{
DISPLAY("\n\nNew callback recv TopicA\n");
pload[ploadlen-1]= 0; // set null terminator
DISPLAY("Payload -->%s <--\n\n",pload);
return 0;
}
/*------------------------------------------------------
* A Link list of Callback routines and Topics
*------------------------------------------------------*/
SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx),
// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1),
// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1),
// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic5, 0, on_Topic05, QoS1),
//SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1),
END_OF_SUBSCRIBE_LIST
};
@@ -139,6 +147,11 @@ void subscribeTopic2(void)
SUBSCRIBE(topic2, on_Topic02, qos);
}
void subscribeTopic5(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic5, on_Topic05, qos);
}
void disconnect(void)
{
@@ -162,9 +175,9 @@ void asleep(void)
*------------------------------------------------------*/
TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step1:Subscribe topic1", subscribeTopic1),
TEST("Step1:Subscribe topic5", subscribeTopic5),
//TEST("Step2:Subscribe topic2", subscribeTopic2),
TEST("Step2:Disconnect", disconnect),
TEST("Step2:Disconnect", asleep),
END_OF_TEST_LIST
};

View File

@@ -78,7 +78,10 @@ const char* topic1 = "ty4tw/topic1";
const char* topic2 = "ty4tw/topic2";
const char* topic3 = "ty4tw/topic3";
const char* topic4 = "ty4tw/topic4";
const char* topic5 = "ty4tw/topic5";
const char* topic51 = "ty4tw/topic5/1";
const char* topic52 = "ty4tw/topic5/2";
const char* topic53 = "ty4tw/topic5/3";
const char* topic50 = "ty4tw/topic5/+";
/*------------------------------------------------------

View File

@@ -206,7 +206,7 @@ void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen)
uint8_t regack[7];
regack[0] = 7;
regack[1] = MQTTSN_TYPE_REGACK;
memcpy(regack + 2, msg + 2, 4);
memcpy(regack + 2, msg + 1, 4);
LTopic* tp = theClient->getGwProxy()->getTopicTable()->match((char*) msg + 5);
if (tp)

View File

@@ -123,16 +123,16 @@ void LSubscribeManager::send(SubElement* elm)
return;
}
uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
if (elm->topicType == MQTTSN_TOPIC_TYPE_NORMAL)
{
msg[0] = 5 + strlen(elm->topicName);
strcpy((char*) msg + 5, elm->topicName);
}
else
if (elm->topicType == MQTTSN_TOPIC_TYPE_PREDEFINED)
{
msg[0] = 7;
setUint16(msg + 5, elm->topicId);
}
else
{
msg[0] = 5 + strlen(elm->topicName);
strcpy((char*) msg + 5, elm->topicName);
}
msg[1] = elm->msgType;
msg[2] = elm->qos | elm->topicType;
if (elm->retryCount == MQTTSN_RETRY_COUNT)
@@ -281,6 +281,7 @@ SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, MQTTS
{
elm = getElement(topicId, topicType);
}
if ( elm == 0 )
{
elm = (SubElement*) calloc(1, sizeof(SubElement));

View File

@@ -82,7 +82,7 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
MQTTSN_topicid topicId;
uint16_t id = 0;
if (pub.topiclen == 2)
if (pub.topiclen <= 2)
{
topicId.type = MQTTSN_TOPIC_TYPE_SHORT;
*(topicId.data.short_name) = *pub.topic;

View File

@@ -400,6 +400,7 @@ Client::Client(bool secure)
_nextClient = 0;
_clientSleepPacketQue.setMaxSize(MAX_SAVED_PUBLISH);
_hasPredefTopic = false;
_holdPingRequest = false;
}
Client::~Client()
@@ -796,6 +797,21 @@ void Client::setOTAClient(Client* cl)
_otaClient =cl;
}
void Client::holdPingRequest(void)
{
_holdPingRequest = true;
}
void Client::resetPingRequest(void)
{
_holdPingRequest = false;
}
bool Client::isHoldPringReqest(void)
{
return _holdPingRequest;
}
/*=====================================
Class Topic
======================================*/
@@ -1214,7 +1230,7 @@ TopicIdMapelement* TopicIdMap::getElement(uint16_t msgId)
TopicIdMapelement* TopicIdMap::add(uint16_t msgId, uint16_t topicId, MQTTSN_topicTypes type)
{
if ( _cnt > _maxInflight * 2 || topicId == 0)
if ( _cnt > _maxInflight * 2 || ( topicId == 0 && type != MQTTSN_TOPIC_TYPE_SHORT ) )
{
return 0;
}
@@ -1314,6 +1330,7 @@ WaitREGACKPacketList::WaitREGACKPacketList()
{
_first = 0;
_end = 0;
_cnt = 0;
}
WaitREGACKPacketList::~WaitREGACKPacketList()
@@ -1346,6 +1363,7 @@ int WaitREGACKPacketList::setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId)
elm->_prev = _end;
_end = elm;
}
_cnt++;
return 1;
}
@@ -1387,6 +1405,7 @@ void WaitREGACKPacketList::erase(uint16_t REGACKMsgId)
{
p->_next->_prev = p->_prev;
}
_cnt--;
break;
// Do not delete element. Element is deleted after sending to Client.
}
@@ -1394,3 +1413,9 @@ void WaitREGACKPacketList::erase(uint16_t REGACKMsgId)
}
}
uint8_t WaitREGACKPacketList::getCount(void)
{
return _cnt;
}

View File

@@ -219,8 +219,10 @@ public:
int setPacket(MQTTSNPacket* packet, uint16_t REGACKMsgId);
MQTTSNPacket* getPacket(uint16_t REGACKMsgId);
void erase(uint16_t REGACKMsgId);
uint8_t getCount(void);
private:
uint8_t _cnt;
waitREGACKPacket* _first;
waitREGACKPacket* _end;
};
@@ -298,6 +300,10 @@ public:
bool isSensorNetStable(void);
bool isWaitWillMsg(void);
void holdPingRequest(void);
void resetPingRequest(void);
bool isHoldPringReqest(void);
Client* getNextClient(void);
Client* getOTAClient(void);
void setOTAClient(Client* cl);
@@ -317,6 +323,8 @@ private:
char* _willTopic;
char* _willMsg;
bool _holdPingRequest;
Timer _keepAliveTimer;
uint32_t _keepAliveMsec;

View File

@@ -270,7 +270,7 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet
{
MQTTGWPacket* msg = 0;
if ( client->isSleep() || client->isAwake() )
if ( ( client->isSleep() || client->isAwake() ) && client->getClientSleepPacket() )
{
while ( ( msg = client->getClientSleepPacket() ) != 0 )
{
@@ -281,12 +281,16 @@ void MQTTSNConnectionHandler::handlePingreq(Client* client, MQTTSNPacket* packet
ev->setBrokerRecvEvent(client, msg);
_gateway->getPacketEventQue()->post(ev);
}
client->holdPingRequest();
}
else
{
/* send PINGREQ to the broker */
client->resetPingRequest();
MQTTGWPacket* pingreq = new MQTTGWPacket();
pingreq->setHeader(PINGREQ);
Event* evt = new Event();
evt->setBrokerSendEvent(client, pingreq);
_gateway->getBrokerSendQue()->post(evt);
}
}

View File

@@ -198,6 +198,7 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet)
}
MQTTSNPacket* regAck = client->getWaitREGACKPacketList()->getPacket(msgId);
if ( regAck != 0 )
{
client->getWaitREGACKPacketList()->erase(msgId);
@@ -205,6 +206,16 @@ void MQTTSNPublishHandler::handleRegAck( Client* client, MQTTSNPacket* packet)
ev->setClientSendEvent(client, regAck);
_gateway->getClientSendQue()->post(ev);
}
if (client->isHoldPringReqest() && client->getWaitREGACKPacketList()->getCount() == 0 )
{
/* send PINGREQ to the broker */
client->resetPingRequest();
MQTTGWPacket* pingreq = new MQTTGWPacket();
pingreq->setHeader(PINGREQ);
Event* evt = new Event();
evt->setBrokerSendEvent(client, pingreq);
_gateway->getBrokerSendQue()->post(evt);
}
}
}