/************************************************************************************** * Copyright (c) 2016, Tomoaki Yamaguchi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation **************************************************************************************/ #include "MQTTSNGWBrokerSendTask.h" #include "MQTTSNGWDefines.h" #include "MQTTSNGateway.h" #include "MQTTSNGWClient.h" #include "MQTTGWPacket.h" #include using namespace std; using namespace MQTTSNGW; char* currentDateTime(); #define ERRMSG_FORMAT "\n%s \x1b[0m\x1b[31merror:\x1b[0m\x1b[37m Can't Xmit to the Broker. errno=%d\n" /*===================================== Class BrokerSendTask =====================================*/ BrokerSendTask::BrokerSendTask(Gateway* gateway) { _gateway = gateway; _gateway->attach((Thread*)this); _gwparams = 0; _light = 0; } BrokerSendTask::~BrokerSendTask() { } /** * Initialize attributs of this class */ void BrokerSendTask::initialize(int argc, char** argv) { _gwparams = _gateway->getGWParams(); _light = _gateway->getLightIndicator(); } /** * connect to the broker and send MQTT messges */ void BrokerSendTask::run() { Event* ev = 0; MQTTGWPacket* packet = 0; Client* client = 0; int rc = 0; while (true) { ev = _gateway->getBrokerSendQue()->wait(); if ( ev->getEventType() == EtStop ) { WRITELOG("%s BrokerSendTask stopped.\n", currentDateTime()); delete ev; return; } if ( ev->getEventType() == EtBrokerSend) { client = ev->getClient(); packet = ev->getMQTTGWPacket(); if ( packet->getType() == CONNECT && client->getNetwork()->isValid() ) { client->getNetwork()->close(); } if ( !client->getNetwork()->isValid() ) { /* connect to the broker and send a packet */ if (client->isSecureNetwork()) { rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->portSecure, _gwparams->rootCApath, _gwparams->rootCAfile, _gwparams->certKey, _gwparams->privateKey); } else { rc = client->getNetwork()->connect(_gwparams->brokerName, _gwparams->port); } if ( !rc ) { /* disconnect the broker and the client */ WRITELOG("%s BrokerSendTask: %s can't connect to the broker. errno=%d %s %s\n", ERRMSG_HEADER, client->getClientId(), errno, strerror(errno), ERRMSG_FOOTER); delete ev; client->getNetwork()->close(); continue; } } /* send a packet */ _light->blueLight(true); if ( (rc = packet->send(client->getNetwork())) > 0 ) { if ( packet->getType() == CONNECT ) { client->connectSended(); } log(client, packet); } else { WRITELOG("%s BrokerSendTask: %s can't send a packet to the broker. errno=%d %s %s\n", ERRMSG_HEADER, client->getClientId(), rc == -1 ? errno : 0, strerror(errno), ERRMSG_FOOTER); client->getNetwork()->close(); /* Disconnect the client */ packet = new MQTTGWPacket(); packet->setHeader(DISCONNECT); Event* ev1 = new Event(); ev1->setBrokerRecvEvent(client, packet); _gateway->getPacketEventQue()->post(ev1); } _light->blueLight(false); } delete ev; } } /** * write message content into stdout or Ringbuffer */ void BrokerSendTask::log(Client* client, MQTTGWPacket* packet) { char pbuf[(SIZE_OF_LOG_PACKET + 5 )* 3]; char msgId[6]; switch (packet->getType()) { case CONNECT: WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); break; case PUBLISH: WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); break; case SUBSCRIBE: case UNSUBSCRIBE: case PUBACK: case PUBREC: case PUBREL: case PUBCOMP: WRITELOG(FORMAT_W_MSGID_Y_W, currentDateTime(), packet->getName(), packet->getMsgId(msgId), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); break; case PINGREQ: WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); break; case DISCONNECT: WRITELOG(FORMAT_Y_Y_W, currentDateTime(), packet->getName(), RIGHTARROWB, client->getClientId(), packet->print(pbuf)); break; default: break; } }