/************************************************************************************** * Copyright (c) 2016, Tomoaki Yamaguchi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation **************************************************************************************/ #include #include #include "LMqttsnClientApp.h" #include "LTimer.h" #include "LScreen.h" #include "LGwProxy.h" #include "LMqttsnClient.h" #include "LSubscribeManager.h" using namespace std; using namespace linuxAsyncClient; extern void setUint16(uint8_t* pos, uint16_t val); extern uint16_t getUint16(const uint8_t* pos); extern LMqttsnClient* theClient; extern SUBSCRIBE_LIST; extern LScreen* theScreen; #define SUB_DONE 1 #define SUB_READY 0 /*======================================== Class SubscribeManager =======================================*/ LSubscribeManager::LSubscribeManager() { _first = 0; _last = 0; } LSubscribeManager::~LSubscribeManager() { SubElement* elm = _first; SubElement* sav = 0; while (elm) { sav = elm->next; if (elm != 0) { free(elm); } elm = sav; } } void LSubscribeManager::onConnect(void) { ASSERT("\033[0m\033[0;32m Attempting OnConnect.....\033[0m\033[0;37m\n"); if (_first == 0) { for (uint8_t i = 0; theOnPublishList[i].topic != 0; i++) { subscribe(theOnPublishList[i].topic, theOnPublishList[i].pubCallback, theOnPublishList[i].qos); } } else { SubElement* elm = _first; SubElement* pelm; do { pelm = elm; if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) { elm->done = SUB_READY; elm->retryCount = MQTTSN_RETRY_COUNT; subscribe(elm->topicName, elm->callback, elm->qos); } elm = pelm->next; } while (pelm->next); } while (!theClient->getSubscribeManager()->isDone()) { theClient->getGwProxy()->getMessage(); } ASSERT("\033[0m\033[0;32m OnConnect complete\033[0m\033[0;37m\n"); ASSERT("\033[0m\033[0;32m Test is Ready.\033[0m\033[0;37m\n"); } bool LSubscribeManager::isDone(void) { SubElement* elm = _first; SubElement* prevelm; while (elm) { prevelm = elm; if (elm->done == SUB_READY) { return false; } elm = prevelm->next; } return true; } void LSubscribeManager::send(SubElement* elm) { if (elm->done == SUB_DONE) { return; } uint8_t msg[MQTTSN_MAX_MSG_LENGTH + 1]; if (elm->topicType == MQTTSN_TOPIC_TYPE_NORMAL) { msg[0] = 5 + strlen(elm->topicName); strcpy((char*) msg + 5, elm->topicName); } else { msg[0] = 7; setUint16(msg + 5, elm->topicId); } msg[1] = elm->msgType; msg[2] = elm->qos | elm->topicType; if (elm->retryCount == MQTTSN_RETRY_COUNT) { elm->msgId = theClient->getGwProxy()->getNextMsgId(); } if ((elm->retryCount < MQTTSN_RETRY_COUNT) && elm->msgType == MQTTSN_TYPE_SUBSCRIBE) { msg[2] = msg[2] | MQTTSN_FLAG_DUP; } setUint16(msg + 3, elm->msgId); theClient->getGwProxy()->connect(); theClient->getGwProxy()->writeMsg(msg); theClient->getGwProxy()->resetPingReqTimer(); elm->sendUTC = time(NULL); elm->retryCount--; } void LSubscribeManager::subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos) { uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL; if ( strlen(topicName) <= 2) { topicType = MQTTSN_TOPIC_TYPE_SHORT; } SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, topicName, 0, topicType, qos, onPublish); send(elm); } void LSubscribeManager::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType) { SubElement* elm = add(MQTTSN_TYPE_SUBSCRIBE, 0, topicId, topicType, qos, onPublish); send(elm); } void LSubscribeManager::unsubscribe(const char* topicName) { SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, 0, 0); send(elm); } void LSubscribeManager::unsubscribe(uint16_t topicId, uint8_t topicType) { SubElement* elm = add(MQTTSN_TYPE_UNSUBSCRIBE, 0, topicId, topicType, 0, 0); send(elm); } void LSubscribeManager::checkTimeout(void) { SubElement* elm = _first; while (elm) { if (elm->sendUTC + MQTTSN_TIME_RETRY < time(NULL)) { if (elm->retryCount >= 0) { send(elm); } else { if ( elm->done == SUB_READY ) { if (elm->msgType == MQTTSN_TYPE_SUBSCRIBE) { ASSERT("\033[0m\033[0;31m\n!!!!!! SUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); }else{ ASSERT("\033[0m\033[0;31m\n!!!!!! UNSUBSCRIBE Error !!!!! Topic : %s\033[0m\033[0;37m\n\n", (char*)elm->topicName); } elm->done = SUB_DONE; } } } elm = elm->next; } } void LSubscribeManager::responce(const uint8_t* msg) { if (msg[0] == MQTTSN_TYPE_SUBACK) { uint16_t topicId = getUint16(msg + 2); uint16_t msgId = getUint16(msg + 4); uint8_t rc = msg[6]; LTopicTable* tt = theClient->getGwProxy()->getTopicTable(); SubElement* elm = getElement(msgId); if (elm) { if ( rc == MQTTSN_RC_ACCEPTED ) { tt->add((char*) elm->topicName, topicId, elm->topicType, elm->callback); getElement(msgId)->done = SUB_DONE; ASSERT("\033[0m\033[0;32m Topic \"%s\" Id : %d was Subscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, getElement(msgId)->topicId); } else { remove(elm); ASSERT("\033[0m\033[0;31m SUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); } } } else if (msg[0] == MQTTSN_TYPE_UNSUBACK) { uint16_t msgId = getUint16(msg + 1); SubElement* elm = getElement(msgId); if (elm) { LTopicTable* tt = theClient->getGwProxy()->getTopicTable(); tt->setCallback(elm->topicName, 0); ASSERT("\033[0m\033[0;32m Topic \"%s\" Id : %d was Unsubscribed. \033[0m\033[0;37m\n\n", getElement(msgId)->topicName, getElement(msgId)->topicId); remove(getElement(msgId)); } else { ASSERT("\033[0m\033[0;31m UNSUBACK Invalid messageId. %s\033[0m\033[0;37m\n\n", getElement(msgId)->topicName); remove(getElement(msgId)); } } } /* SubElement operations */ SubElement* LSubscribeManager::add(uint8_t msgType, const char* topicName, uint16_t topicId, uint8_t topicType, uint8_t qos, TopicCallback callback) { SubElement* elm = 0; if (topicName ) { elm = getElement(topicName, msgType); } else { elm = getElement(topicId, topicType); } if ( elm == 0 ) { elm = (SubElement*) calloc(1, sizeof(SubElement)); if (elm == 0) { return 0; } if (_last == 0) { _first = elm; _last = elm; } else { elm->prev = _last; _last->next = elm; _last = elm; } } elm->msgType = msgType; elm->callback = callback; elm->topicName = topicName; elm->topicId = topicId; elm->topicType = topicType; if (qos == 1) { elm->qos = MQTTSN_FLAG_QOS_1; } else if (qos == 2) { elm->qos = MQTTSN_FLAG_QOS_2; } else { elm->qos = MQTTSN_FLAG_QOS_0; } elm->msgId = 0; elm->retryCount = MQTTSN_RETRY_COUNT; elm->done = SUB_READY; elm->sendUTC = 0; return elm; } void LSubscribeManager::remove(SubElement* elm) { if (elm) { if (elm->prev == 0) { _first = elm->next; if (elm->next != 0) { elm->next->prev = 0; _last = elm->next; } free(elm); } else { if ( elm->next == 0 ) { _last = elm->prev; } elm->prev->next = elm->next; free(elm); } } } SubElement* LSubscribeManager::getElement(uint16_t msgId) { SubElement* elm = _first; while (elm) { if (elm->msgId == msgId) { return elm; } else { elm = elm->next; } } return 0; } SubElement* LSubscribeManager::getElement(const char* topicName, uint8_t msgType) { SubElement* elm = _first; while (elm) { if (strcmp(elm->topicName, topicName) == 0 && elm->msgType == msgType) { return elm; } else { elm = elm->next; } } return 0; } SubElement* LSubscribeManager::getElement(uint16_t topicId, uint8_t topicType) { SubElement* elm = _first; while (elm) { if (elm->topicId == topicId && elm->topicType == topicType) { return elm; } else { elm = elm->next; } } return 0; }