Files
paho.mqtt-sn.embedded-c/MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
tomoaki bc731210ae BugFix of #76 and #77
1.Return CONNACK instead of the broker when the gateway receives CONNECT
while the client is Sleep or Awake mode.

2.Define the max size of a que for PUBLISH while the client state is
Asleep mode.  Despose packets when the que is full of packets.

3.Return PUBACK or PUBREL to the broker when the client is Asleep or
Awake.


Signed-off-by: tomoaki <tomoaki@tomy-tech.com>



Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
2017-08-27 16:39:29 +09:00

471 lines
9.7 KiB
C++

/**************************************************************************************
* Copyright (c) 2016, Tomoaki Yamaguchi
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include "LMqttsnClientApp.h"
#include "LTimer.h"
#include "LGwProxy.h"
#include "LMqttsnClient.h"
#include "LPublishManager.h"
#include "LScreen.h"
using namespace std;
using namespace linuxAsyncClient;
extern void setUint16(uint8_t* pos, uint16_t val);
extern uint16_t getUint16(const uint8_t* pos);
extern LMqttsnClient* theClient;
extern bool theOTAflag;
extern LScreen* theScreen;
/*========================================
Class PublishManager
=======================================*/
const char* NULLCHAR = "";
LPublishManager::LPublishManager()
{
_first = 0;
_last = 0;
_elmCnt = 0;
_publishedFlg = SAVE_TASK_INDEX;
}
LPublishManager::~LPublishManager()
{
PubElement* elm = _first;
PubElement* sav = 0;
while (elm)
{
sav = elm->next;
if (elm != 0)
{
delElement(elm);
}
elm = sav;
}
}
void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t qos, bool retain)
{
publish(topicName, payload->getRowData(), payload->getLen(), qos, retain);
}
void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL;
if ( strlen(topicName) < 2 )
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
publish(topicName, payload, len, qos, topicType, retain);
}
void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain)
{
uint16_t msgId = 0;
if ( qos > 0 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}
PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType);
if (elm->status == TOPICID_IS_READY)
{
sendPublish(elm);
}
else
{
theClient->getGwProxy()->registerTopic((char*) topicName, 0);
}
}
void LPublishManager::publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain)
{
publish(topicId, payload->getRowData(), payload->getLen(), qos, retain);
}
void LPublishManager::publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
uint16_t msgId = 0;
if ( qos > 0 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}
PubElement* elm = add(NULLCHAR, topicId, payload, len, qos, retain, msgId, MQTTSN_TOPIC_TYPE_PREDEFINED);
sendPublish(elm);
}
void LPublishManager::sendPublish(PubElement* elm)
{
if (elm == 0)
{
return;
}
theClient->getGwProxy()->connect();
uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1];
uint8_t org = 0;
if (elm->payloadlen > 128)
{
msg[0] = 0x01;
setUint16(msg + 1, elm->payloadlen + 9);
org = 2;
}
else
{
msg[0] = (uint8_t) elm->payloadlen + 7;
}
msg[org + 1] = MQTTSN_TYPE_PUBLISH;
msg[org + 2] = elm->flag;
if ((elm->retryCount < MQTTSN_RETRY_COUNT))
{
msg[org + 2] = msg[org + 2] | MQTTSN_FLAG_DUP;
}
if ((elm->flag & 0x03) == MQTTSN_TOPIC_TYPE_SHORT )
{
memcpy(msg + org + 3, elm->topicName, 2);
}
else
{
setUint16(msg + org + 3, elm->topicId);
}
setUint16(msg + org + 5, elm->msgId);
memcpy(msg + org + 7, elm->payload, elm->payloadlen);
theClient->getGwProxy()->writeMsg(msg);
theClient->getGwProxy()->setPingReqTimer();
if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_0)
{
DISPLAY("\033[0m\033[0;32m Topic \"%s\" was Published. \033[0m\033[0;37m\n\n", elm->topicName);
remove(elm); // PUBLISH Done
return;
}
else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_1)
{
elm->status = WAIT_PUBACK;
}
else if ((elm->flag & 0x60) == MQTTSN_FLAG_QOS_2)
{
elm->status = WAIT_PUBREC;
}
elm->sendUTC = time(NULL);
elm->retryCount--;
}
void LPublishManager::sendSuspend(const char* topicName, uint16_t topicId, uint8_t topicType)
{
PubElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName) == 0 && elm->status == TOPICID_IS_SUSPEND)
{
elm->topicId = topicId;
elm->flag |= topicType;
elm->status = TOPICID_IS_READY;
sendPublish(elm);
elm = 0;
}
else
{
elm = elm->next;
}
}
}
void LPublishManager::sendPubAck(uint16_t topicId, uint16_t msgId, uint8_t rc)
{
uint8_t msg[7];
msg[0] = 7;
msg[1] = MQTTSN_TYPE_PUBACK;
setUint16(msg + 2, topicId);
setUint16(msg + 4, msgId);
msg[6] = rc;
theClient->getGwProxy()->writeMsg(msg);
}
void LPublishManager::sendPubRel(PubElement* elm)
{
uint8_t msg[4];
msg[0] = 4;
msg[1] = MQTTSN_TYPE_PUBREL;
setUint16(msg + 2, elm->msgId);
theClient->getGwProxy()->writeMsg(msg);
}
bool LPublishManager::isDone(void)
{
return (_first == 0);
}
bool LPublishManager::isMaxFlight(void)
{
return (_elmCnt > MAX_INFLIGHT_MSG / 2);
}
void LPublishManager::responce(const uint8_t* msg, uint16_t msglen)
{
if (msg[0] == MQTTSN_TYPE_PUBACK)
{
uint16_t msgId = getUint16(msg + 3);
PubElement* elm = getElement(msgId);
if (elm == 0)
{
return;
}
if (msg[5] == MQTTSN_RC_ACCEPTED)
{
if (elm->status == WAIT_PUBACK)
{
DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId);
remove(elm); // PUBLISH Done
}
}
else if (msg[5] == MQTTSN_RC_REJECTED_INVALID_TOPIC_ID)
{
elm->status = TOPICID_IS_SUSPEND;
elm->topicId = 0;
elm->retryCount = MQTTSN_RETRY_COUNT;
elm->sendUTC = 0;
theClient->getGwProxy()->registerTopic((char*) elm->topicName, 0);
}
}
else if (msg[0] == MQTTSN_TYPE_PUBREC)
{
PubElement* elm = getElement(getUint16(msg + 1));
if (elm == 0)
{
return;
}
if (elm->status == WAIT_PUBREC || elm->status == WAIT_PUBCOMP)
{
sendPubRel(elm);
elm->status = WAIT_PUBCOMP;
elm->sendUTC = time(NULL);
}
}
else if (msg[0] == MQTTSN_TYPE_PUBCOMP)
{
PubElement* elm = getElement(getUint16(msg + 1));
if (elm == 0)
{
return;
}
if (elm->status == WAIT_PUBCOMP)
{
DISPLAY("\033[0m\033[0;32m Topic \"%s\" Id : %d was Published. \033[0m\033[0;37m\n\n", elm->topicName, elm->topicId);
remove(elm); // PUBLISH Done
}
}
}
void LPublishManager::published(uint8_t* msg, uint16_t msglen)
{
uint16_t topicId = getUint16(msg + 2);
if (msg[1] & MQTTSN_FLAG_QOS_1)
{
sendPubAck(topicId, getUint16(msg + 4), MQTTSN_RC_ACCEPTED);
}
_publishedFlg = NEG_TASK_INDEX;
theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, msg[1] & 0x03);
_publishedFlg = SAVE_TASK_INDEX;
}
void LPublishManager::checkTimeout(void)
{
PubElement* elm = _first;
while (elm)
{
if (elm->sendUTC > 0 && elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL))
{
if (elm->retryCount >= 0)
{
sendPublish(elm);
D_MQTTLOG("...Timeout retry\r\n");
}
else
{
theClient->getGwProxy()->reconnect();
elm->retryCount = MQTTSN_RETRY_COUNT;
break;
}
}
elm = elm->next;
}
}
PubElement* LPublishManager::getElement(uint16_t msgId)
{
PubElement* elm = _first;
while (elm)
{
if (elm->msgId == msgId)
{
break;
}
else
{
elm = elm->next;
}
}
return elm;
}
PubElement* LPublishManager::getElement(const char* topicName)
{
PubElement* elm = _first;
while (elm)
{
if (strcmp(elm->topicName, topicName) == 0)
{
break;
}
else
{
elm = elm->next;
}
}
return elm;
}
void LPublishManager::remove(PubElement* elm)
{
if (elm)
{
if (elm->prev == 0)
{
_first = elm->next;
if (elm->next == 0)
{
_last = 0;
}
else
{
elm->next->prev = 0;
_last = elm->next;
}
}
else
{
if ( elm->next == 0 )
{
_last = elm->prev;
}
elm->prev->next = elm->next;
}
delElement(elm);
}
}
void LPublishManager::delElement(PubElement* elm)
{
if (elm->taskIndex >= 0)
{
theClient->getTaskManager()->done(elm->taskIndex);
}
_elmCnt--;
if ( elm->payload )
{
free(elm->payload);
}
free(elm);
}
/*
PubElement* PublishManager::add(const char* topicName, uint16_t topicId, MQTTSNPayload* payload, uint8_t qos, uint8_t retain, uint16_t msgId){
return add(topicName, topicId, payload->getRowData(), payload->getLen(), qos, retain, msgId);
}*/
PubElement* LPublishManager::add(const char* topicName, uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos,
uint8_t retain, uint16_t msgId, uint8_t topicType)
{
PubElement* elm = (PubElement*) calloc(1, sizeof(PubElement));
if (elm == 0)
{
return elm;
}
if (_last == 0)
{
_first = elm;
_last = elm;
}
else
{
elm->prev = _last;
_last->next = elm;
_last = elm;
}
elm->topicName = topicName;
elm->flag |= topicType;
if (qos == 0)
{
elm->flag |= MQTTSN_FLAG_QOS_0;
}
else if (qos == 1)
{
elm->flag |= MQTTSN_FLAG_QOS_1;
}
else if (qos == 2)
{
elm->flag |= MQTTSN_FLAG_QOS_2;
}
if (retain)
{
elm->flag |= MQTTSN_FLAG_RETAIN;
}
if (topicId)
{
elm->status = TOPICID_IS_READY;
elm->topicId = topicId;
}
elm->payloadlen = len;
elm->msgId = msgId;
elm->retryCount = MQTTSN_RETRY_COUNT;
elm->sendUTC = 0;
if (_publishedFlg == NEG_TASK_INDEX)
{
elm->taskIndex = -1;
}
else
{
elm->taskIndex = theClient->getTaskManager()->getIndex();
theClient->getTaskManager()->suspend(elm->taskIndex);
}
elm->payload = (uint8_t*) malloc(len);
if (elm->payload == 0)
{
delElement(elm);
return 0;
}
memcpy(elm->payload, payload, len);
++_elmCnt;
return elm;
}