From d6831bb7a78161b8fc26bf320d9979c2c165b723 Mon Sep 17 00:00:00 2001 From: Ian Craggs Date: Fri, 21 Mar 2014 15:32:55 +0000 Subject: [PATCH] Add QoS -1 sample --- samples/qos-1pub.c | 146 +++++++++++++++++++++++++++++++++ src/MQTTSNDeserializePublish.c | 13 ++- src/MQTTSNPacket.h | 5 ++ src/MQTTSNSerializePublish.c | 23 +++++- 4 files changed, 182 insertions(+), 5 deletions(-) create mode 100644 samples/qos-1pub.c diff --git a/samples/qos-1pub.c b/samples/qos-1pub.c new file mode 100644 index 0000000..14f2a01 --- /dev/null +++ b/samples/qos-1pub.c @@ -0,0 +1,146 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTSNPacket.h" + +#include + +#if !defined(SOCKET_ERROR) + /** error in socket operation */ + #define SOCKET_ERROR -1 +#endif + +#if defined(WIN32) +/* default on Windows is 64 - increase to make Linux and Windows the same */ +#define FD_SETSIZE 1024 +#include +#include +#define MAXHOSTNAMELEN 256 +#define EAGAIN WSAEWOULDBLOCK +#define EINTR WSAEINTR +#define EINVAL WSAEINVAL +#define EINPROGRESS WSAEINPROGRESS +#define EWOULDBLOCK WSAEWOULDBLOCK +#define ENOTCONN WSAENOTCONN +#define ECONNRESET WSAECONNRESET +#define ioctl ioctlsocket +#define socklen_t int +#else +#define INVALID_SOCKET SOCKET_ERROR +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#if defined(WIN32) +#include +#else +#include +#include +#endif + + +int Socket_error(char* aString, int sock) +{ +#if defined(WIN32) + int errno; +#endif + +#if defined(WIN32) + errno = WSAGetLastError(); +#endif + if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) + { + if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) + { + int orig_errno = errno; + char* errmsg = strerror(errno); + + printf("Socket error %d (%s) in %s for socket %d\n", orig_errno, errmsg, aString, sock); + } + } + return errno; +} + + +int sendPacketBuffer(int asocket, char* host, int port, unsigned char* buf, int buflen) +{ + struct sockaddr_in cliaddr; + int rc = 0; + + memset(&cliaddr, 0, sizeof(cliaddr)); + cliaddr.sin_family = AF_INET; + cliaddr.sin_addr.s_addr = inet_addr(host); + cliaddr.sin_port = htons(port); + + if ((rc = sendto(asocket, buf, buflen, 0, (const struct sockaddr*)&cliaddr, sizeof(cliaddr))) == SOCKET_ERROR) + Socket_error("sendto", asocket); + else + rc = 0; + return rc; +} + + +int main(int argc, char** argv) +{ + int rc = 0; + unsigned char buf[200]; + int buflen = sizeof(buf); + int mysock = 0; + MQTTSN_topicid topic; + unsigned char* payload = (unsigned char*)"mypayload"; + int payloadlen = strlen((char*)payload); + int len = 0; + int dup = 0; + int qos = 3; + int retained = 0, packetid = 0; + char *host = argv[1]; + char *topicname = "a long topic name"; + int port = 1883; + + if (argc > 2) + port = atoi(argv[2]); + + printf("Sending to hostname %s port %d\n", host, port); + + mysock = socket(AF_INET, SOCK_DGRAM, 0); + if (mysock == INVALID_SOCKET) + rc = Socket_error("socket", mysock); + + topic.type = MQTTSN_TOPIC_TYPE_NORMAL; + topic.data.qos3.longname = topicname; + topic.data.qos3.longlen = strlen(topicname); + + len = MQTTSNSerialize_publish(buf, buflen, dup, qos, retained, packetid, + topic, payload, payloadlen); + + rc = sendPacketBuffer(mysock, host, port, buf, len); + + rc = shutdown(mysock, SHUT_WR); + rc = close(mysock); + + return 0; +} diff --git a/src/MQTTSNDeserializePublish.c b/src/MQTTSNDeserializePublish.c index 7371189..d18c7da 100644 --- a/src/MQTTSNDeserializePublish.c +++ b/src/MQTTSNDeserializePublish.c @@ -57,7 +57,12 @@ int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, int* packetid, *retained = flags.bits.retain; topic->type = flags.bits.topicIdType; - if (topic->type == MQTTSN_TOPIC_TYPE_NORMAL || topic->type == MQTTSN_TOPIC_TYPE_PREDEFINED) + if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) + { + /* special arrangement for long topic names in QoS -1 publishes. The length of the topic is in the topicid field */ + topic->data.qos3.longlen = readInt(&curdata); + } + else if (topic->type == MQTTSN_TOPIC_TYPE_NORMAL || topic->type == MQTTSN_TOPIC_TYPE_PREDEFINED) topic->data.id = readInt(&curdata); else { @@ -66,6 +71,12 @@ int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, int* packetid, } *packetid = readInt(&curdata); + if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) + { + topic->data.qos3.longname = curdata; + curdata += topic->data.qos3.longlen; + } + *payloadlen = enddata - curdata; *payload = curdata; rc = 1; diff --git a/src/MQTTSNPacket.h b/src/MQTTSNPacket.h index 05a7782..64e6e69 100644 --- a/src/MQTTSNPacket.h +++ b/src/MQTTSNPacket.h @@ -65,6 +65,11 @@ typedef struct { unsigned short id; char name[2]; + struct + { + char* longname; + int longlen; + } qos3; } data; } MQTTSN_topicid; diff --git a/src/MQTTSNSerializePublish.c b/src/MQTTSNSerializePublish.c index c56250f..60b25e1 100644 --- a/src/MQTTSNSerializePublish.c +++ b/src/MQTTSNSerializePublish.c @@ -27,9 +27,14 @@ * @param payloadlen the length of the payload to be sent * @return the length of buffer needed to contain the serialized version of the packet */ -int MQTTSNSerialize_publishLength(int payloadlen) +int MQTTSNSerialize_publishLength(int payloadlen, MQTTSN_topicid topic, int qos) { - return payloadlen + 6; + int len = 6; + + if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) + len += topic.data.qos3.longlen; + + return payloadlen + len; } @@ -55,7 +60,7 @@ int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, in int rc = 0; FUNC_ENTRY; - if ((len = MQTTSNPacket_len(MQTTSNSerialize_publishLength(payloadlen))) > buflen) + if ((len = MQTTSNPacket_len(MQTTSNSerialize_publishLength(payloadlen, topic, qos))) > buflen) { rc = MQTTSNPACKET_BUFFER_TOO_SHORT; goto exit; @@ -70,7 +75,12 @@ int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, in flags.bits.topicIdType = topic.type; writeChar(&ptr, flags.all); - if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL || topic.type == MQTTSN_TOPIC_TYPE_PREDEFINED) + if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) + { + /* special arrangement for long topic names in QoS -1 publishes. The length of the topic is in the topicid field */ + writeInt(&ptr, topic.data.qos3.longlen); /* topic length */ + } + else if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL || topic.type == MQTTSN_TOPIC_TYPE_PREDEFINED) writeInt(&ptr, topic.data.id); else { @@ -78,6 +88,11 @@ int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, in writeChar(&ptr, topic.data.name[1]); } writeInt(&ptr, packetid); + if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) + { + memcpy(ptr, topic.data.qos3.longname, topic.data.qos3.longlen); + ptr += topic.data.qos3.longlen; + } memcpy(ptr, payload, payloadlen); ptr += payloadlen;