mirror of
https://github.com/eclipse/paho.mqtt-sn.embedded-c.git
synced 2025-12-13 23:46:51 +01:00
@@ -63,75 +63,76 @@ void MQTTGWPublishHandler::handlePublish(Client* client, MQTTGWPacket* packet)
|
|||||||
topicId.data.long_.len = pub.topiclen;
|
topicId.data.long_.len = pub.topiclen;
|
||||||
topicId.data.long_.name = pub.topic;
|
topicId.data.long_.name = pub.topic;
|
||||||
id = client->getTopics()->getTopicId(&topicId);
|
id = client->getTopics()->getTopicId(&topicId);
|
||||||
}
|
|
||||||
|
|
||||||
if (id > 0)
|
if ( id > 0 )
|
||||||
{
|
|
||||||
topicId.data.id = id;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* This message might be subscribed with wild card. */
|
|
||||||
Topic* topic = client->getTopics()->match(&topicId);
|
|
||||||
if (topic == 0)
|
|
||||||
{
|
{
|
||||||
WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n");
|
|
||||||
if (pub.header.bits.qos == 1)
|
|
||||||
{
|
|
||||||
replyACK(client, &pub, PUBACK);
|
|
||||||
}
|
|
||||||
else if ( pub.header.bits.qos == 2 )
|
|
||||||
{
|
|
||||||
replyACK(client, &pub, PUBREC);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* add the Topic and get a TopicId */
|
|
||||||
topic = client->getTopics()->add(&topicId);
|
|
||||||
id = topic->getTopicId();
|
|
||||||
|
|
||||||
if (id > 0)
|
|
||||||
{
|
|
||||||
/* create REGACK */
|
|
||||||
MQTTSNPacket* regPacket = new MQTTSNPacket();
|
|
||||||
|
|
||||||
MQTTSNString topicName;
|
|
||||||
topicName.lenstring.len = topicId.data.long_.len;
|
|
||||||
topicName.lenstring.data = topicId.data.long_.name;
|
|
||||||
|
|
||||||
uint16_t regackMsgId = client->getNextSnMsgId();
|
|
||||||
regPacket->setREGISTER(id, regackMsgId, &topicName);
|
|
||||||
|
|
||||||
if (client->isSleep())
|
|
||||||
{
|
|
||||||
client->setClientSleepPacket(regPacket);
|
|
||||||
WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(),
|
|
||||||
RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved.");
|
|
||||||
}
|
|
||||||
else if (client->isActive())
|
|
||||||
{
|
|
||||||
/* send REGISTER */
|
|
||||||
Event* evrg = new Event();
|
|
||||||
evrg->setClientSendEvent(client, regPacket);
|
|
||||||
_gateway->getClientSendQue()->post(evrg);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* send PUBLISH */
|
|
||||||
topicId.data.id = id;
|
topicId.data.id = id;
|
||||||
snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
|
|
||||||
(uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
|
|
||||||
pub.payloadlen);
|
|
||||||
client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n");
|
/* This message might be subscribed with wild card. */
|
||||||
return;
|
Topic* topic = client->getTopics()->match(&topicId);
|
||||||
|
if (topic == 0)
|
||||||
|
{
|
||||||
|
WRITELOG(" Invalid Topic. PUBLISH message is canceled.\n");
|
||||||
|
if (pub.header.bits.qos == 1)
|
||||||
|
{
|
||||||
|
replyACK(client, &pub, PUBACK);
|
||||||
|
}
|
||||||
|
else if ( pub.header.bits.qos == 2 )
|
||||||
|
{
|
||||||
|
replyACK(client, &pub, PUBREC);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* add the Topic and get a TopicId */
|
||||||
|
topic = client->getTopics()->add(&topicId);
|
||||||
|
id = topic->getTopicId();
|
||||||
|
|
||||||
|
if (id > 0)
|
||||||
|
{
|
||||||
|
/* create REGACK */
|
||||||
|
MQTTSNPacket* regPacket = new MQTTSNPacket();
|
||||||
|
|
||||||
|
MQTTSNString topicName;
|
||||||
|
topicName.lenstring.len = topicId.data.long_.len;
|
||||||
|
topicName.lenstring.data = topicId.data.long_.name;
|
||||||
|
|
||||||
|
uint16_t regackMsgId = client->getNextSnMsgId();
|
||||||
|
regPacket->setREGISTER(id, regackMsgId, &topicName);
|
||||||
|
|
||||||
|
if (client->isSleep())
|
||||||
|
{
|
||||||
|
client->setClientSleepPacket(regPacket);
|
||||||
|
WRITELOG(FORMAT_BL_NL, currentDateTime(), regPacket->getName(),
|
||||||
|
RIGHTARROW, client->getClientId(), "is sleeping. REGISTER was saved.");
|
||||||
|
}
|
||||||
|
else if (client->isActive())
|
||||||
|
{
|
||||||
|
/* send REGISTER */
|
||||||
|
Event* evrg = new Event();
|
||||||
|
evrg->setClientSendEvent(client, regPacket);
|
||||||
|
_gateway->getClientSendQue()->post(evrg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* send PUBLISH */
|
||||||
|
topicId.data.id = id;
|
||||||
|
snPacket->setPUBLISH((uint8_t) pub.header.bits.dup, (int) pub.header.bits.qos,
|
||||||
|
(uint8_t) pub.header.bits.retain, (uint16_t) pub.msgId, topicId, (uint8_t*) pub.payload,
|
||||||
|
pub.payloadlen);
|
||||||
|
client->getWaitREGACKPacketList()->setPacket(snPacket, regackMsgId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
WRITELOG("\x1b[0m\x1b[31mMQTTGWPublishHandler Can't create a Topic.\n");
|
||||||
|
delete snPacket;
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TopicId was aquired. */
|
/* TopicId was acquired. */
|
||||||
if (client->isSleep())
|
if (client->isSleep())
|
||||||
{
|
{
|
||||||
/* client is sleeping. save PUBLISH */
|
/* client is sleeping. save PUBLISH */
|
||||||
|
|||||||
Reference in New Issue
Block a user