diff --git a/samples/build b/samples/build index e397cad..70e7b4e 100644 --- a/samples/build +++ b/samples/build @@ -1,3 +1,4 @@ gcc -Wall qos0pub.c -I ../src ../src/MQTTSNSerializePublish.c ../src/MQTTSNPacket.c ../src/MQTTSNConnectClient.c -o qos0pub -Os -s gcc -Wall qos-1pub.c -I ../src ../src/MQTTSNSerializePublish.c ../src/MQTTSNPacket.c -o qos-1pub -Os -s gcc -Wall qos1pub.c -I ../src ../src/MQTTSNSerializePublish.c ../src/MQTTSNDeserializePublish.c ../src/MQTTSNPacket.c ../src/MQTTSNConnectClient.c -o qos1pub -Os -s +gcc -Wall pub0sub1.c -I ../src ../src/MQTTSNSerializePublish.c ../src/MQTTSNDeserializePublish.c ../src/MQTTSNPacket.c ../src/MQTTSNConnectClient.c ../src/MQTTSNSubscribeClient.c -o pub0sub1 -Os -s diff --git a/samples/pub0sub1.c b/samples/pub0sub1.c new file mode 100644 index 0000000..a33b663 --- /dev/null +++ b/samples/pub0sub1.c @@ -0,0 +1,249 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#include "MQTTSNPacket.h" + +#include + +#if !defined(SOCKET_ERROR) + /** error in socket operation */ + #define SOCKET_ERROR -1 +#endif + +#if defined(WIN32) +/* default on Windows is 64 - increase to make Linux and Windows the same */ +#define FD_SETSIZE 1024 +#include +#include +#define MAXHOSTNAMELEN 256 +#define EAGAIN WSAEWOULDBLOCK +#define EINTR WSAEINTR +#define EINVAL WSAEINVAL +#define EINPROGRESS WSAEINPROGRESS +#define EWOULDBLOCK WSAEWOULDBLOCK +#define ENOTCONN WSAENOTCONN +#define ECONNRESET WSAECONNRESET +#define ioctl ioctlsocket +#define socklen_t int +#else +#define INVALID_SOCKET SOCKET_ERROR +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#if defined(WIN32) +#include +#else +#include +#include +#endif + + +int Socket_error(char* aString, int sock) +{ +#if defined(WIN32) + int errno; +#endif + +#if defined(WIN32) + errno = WSAGetLastError(); +#endif + if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) + { + if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) + { + int orig_errno = errno; + char* errmsg = strerror(errno); + + printf("Socket error %d (%s) in %s for socket %d\n", orig_errno, errmsg, aString, sock); + } + } + return errno; +} + + +int sendPacketBuffer(int asocket, char* host, int port, unsigned char* buf, int buflen) +{ + struct sockaddr_in cliaddr; + int rc = 0; + + memset(&cliaddr, 0, sizeof(cliaddr)); + cliaddr.sin_family = AF_INET; + cliaddr.sin_addr.s_addr = inet_addr(host); + cliaddr.sin_port = htons(port); + + if ((rc = sendto(asocket, buf, buflen, 0, (const struct sockaddr*)&cliaddr, sizeof(cliaddr))) == SOCKET_ERROR) + Socket_error("sendto", asocket); + else if (rc == buflen) + rc = 0; + return rc; +} + + +int mysock = 0; +char *host = "127.0.0.1"; +int port = 1884; + +int getdata(unsigned char* buf, size_t count) +{ + int rc = recvfrom(mysock, buf, count, 0, NULL, NULL); + printf("received %d bytes count %d\n", rc, (int)count); + return rc; +} + + +int main(int argc, char** argv) +{ + int rc = 0; + unsigned char buf[200]; + int buflen = sizeof(buf); + MQTTSN_topicid topic; + unsigned char* payload = (unsigned char*)"mypayload"; + int payloadlen = strlen((char*)payload); + int len = 0; + int dup = 0; + int qos = 1; + int retained = 0, packetid = 1; + char *topicname = "a long topic name"; + MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer; + unsigned short topicid; + + if (argc > 1) + host = argv[1]; + + if (argc > 2) + port = atoi(argv[2]); + + printf("Sending to hostname %s port %d\n", host, port); + + mysock = socket(AF_INET, SOCK_DGRAM, 0); + if (mysock == INVALID_SOCKET) + rc = Socket_error("socket", mysock); + + options.clientID.cstring = "pub0sub1 MQTT-SN"; + len = MQTTSNSerialize_connect(buf, buflen, &options); + rc = sendPacketBuffer(mysock, host, port, buf, len); + + /* wait for connack */ + if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_CONNACK) + { + int connack_rc = -1; + + if (MQTTSNDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0) + { + printf("Unable to connect, return code %d\n", connack_rc); + goto exit; + } + else + printf("connected rc %d\n", connack_rc); + } + else + goto exit; + + + /* subscribe */ + printf("Subscribing\n"); + topic.type = MQTTSN_TOPIC_TYPE_NORMAL; + topic.data.long_.name = "substopic"; + topic.data.long_.len = strlen(topic.data.long_.name); + len = MQTTSNSerialize_subscribe(buf, buflen, 0, 2, /*msgid*/ 1, &topic); + rc = sendPacketBuffer(mysock, host, port, buf, len); + + if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_SUBACK) /* wait for suback */ + { + unsigned short submsgid; + int granted_qos; + unsigned char returncode; + + rc = MQTTSNDeserialize_suback(&granted_qos, &topicid, &submsgid, &returncode, buf, buflen); + if (granted_qos != 2 || returncode != 0) + { + printf("granted qos != 2, %d return code %d\n", granted_qos, returncode); + goto exit; + } + else + printf("suback topic id %d\n", topicid); + } + else + goto exit; + + printf("Publishing\n"); + /* publish with short name */ + topic.type = MQTTSN_TOPIC_TYPE_NORMAL; + topic.data.id = topicid; + len = MQTTSNSerialize_publish(buf, buflen, dup, qos, retained, packetid, + topic, payload, payloadlen); + rc = sendPacketBuffer(mysock, host, port, buf, len); + + /* wait for puback */ + if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_PUBACK) + { + unsigned short packet_id, topic_id; + unsigned char returncode; + + if (MQTTSNDeserialize_puback(&topic_id, &packet_id, &returncode, buf, buflen) != 1 || returncode != MQTTSN_RC_ACCEPTED) + printf("Unable to publish, return code %d\n", returncode); + else + printf("puback received, msgid %d topic id %d\n", packet_id, topic_id); + } + else + goto exit; + + printf("Receive publish\n"); + if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_PUBLISH) + { + unsigned short packet_id; + int dup, qos, retained, payloadlen; + unsigned char* payload; + MQTTSN_topicid pubtopic; + + if (MQTTSNDeserialize_publish(&dup, &qos, &retained, &packet_id, &pubtopic, + &payload, &payloadlen, buf, buflen) != 1) + printf("Error deserializing publish\n"); + else + printf("publish received, id %d qos %d\n", packet_id, qos); + + if (qos == 1) + { + len = MQTTSNSerialize_puback(buf, buflen, pubtopic.data.id, packet_id, MQTTSN_RC_ACCEPTED); + rc = sendPacketBuffer(mysock, host, port, buf, len); + if (rc == 0) + printf("puback sent\n"); + } + } + else + goto exit; + + len = MQTTSNSerialize_disconnect(buf, buflen, 0); + rc = sendPacketBuffer(mysock, host, port, buf, len); + +exit: + rc = shutdown(mysock, SHUT_WR); + rc = close(mysock); + + return 0; +} diff --git a/samples/qos-1pub.c b/samples/qos-1pub.c index 14f2a01..7113dec 100644 --- a/samples/qos-1pub.c +++ b/samples/qos-1pub.c @@ -131,8 +131,8 @@ int main(int argc, char** argv) rc = Socket_error("socket", mysock); topic.type = MQTTSN_TOPIC_TYPE_NORMAL; - topic.data.qos3.longname = topicname; - topic.data.qos3.longlen = strlen(topicname); + topic.data.long_.name = topicname; + topic.data.long_.len = strlen(topicname); len = MQTTSNSerialize_publish(buf, buflen, dup, qos, retained, packetid, topic, payload, payloadlen); diff --git a/samples/qos0pub.c b/samples/qos0pub.c index 77a07dd..d40b238 100644 --- a/samples/qos0pub.c +++ b/samples/qos0pub.c @@ -139,8 +139,8 @@ int main(int argc, char** argv) rc = sendPacketBuffer(mysock, host, port, buf, len); topic.type = MQTTSN_TOPIC_TYPE_NORMAL; - topic.data.qos3.longname = topicname; - topic.data.qos3.longlen = strlen(topicname); + topic.data.long_.name = topicname; + topic.data.long_.len = strlen(topicname); len = MQTTSNSerialize_publish(buf, buflen - len, dup, qos, retained, packetid, topic, payload, payloadlen); rc = sendPacketBuffer(mysock, host, port, buf, len); diff --git a/samples/qos1pub.c b/samples/qos1pub.c index 6d66707..c62d0eb 100644 --- a/samples/qos1pub.c +++ b/samples/qos1pub.c @@ -165,7 +165,7 @@ int main(int argc, char** argv) /* publish with short name */ topic.type = MQTTSN_TOPIC_TYPE_SHORT; - memcpy(topic.data.name, "tt", 2); + memcpy(topic.data.short_name, "tt", 2); len = MQTTSNSerialize_publish(buf, buflen - len, dup, qos, retained, packetid, topic, payload, payloadlen); rc = sendPacketBuffer(mysock, host, port, buf, len); diff --git a/src/MQTTSNConnect.h b/src/MQTTSNConnect.h index 82ab961..40ec8cf 100644 --- a/src/MQTTSNConnect.h +++ b/src/MQTTSNConnect.h @@ -30,7 +30,7 @@ typedef struct int willFlag; } MQTTSNPacket_connectData; -#define MQTTSNPacket_connectData_initializer { {'M', 'Q', 'S', 'C'}, 0, {NULL, {0, NULL}}, 10, 0, 0 } +#define MQTTSNPacket_connectData_initializer { {'M', 'Q', 'S', 'C'}, 0, {NULL, {0, NULL}}, 10, 1, 0 } int MQTTSNSerialize_connect(unsigned char* buf, int buflen, MQTTSNPacket_connectData* options); int MQTTSNDeserialize_connect(MQTTSNPacket_connectData* data, unsigned char* buf, int len); diff --git a/src/MQTTSNDeserializePublish.c b/src/MQTTSNDeserializePublish.c index 7593bbd..b654314 100644 --- a/src/MQTTSNDeserializePublish.c +++ b/src/MQTTSNDeserializePublish.c @@ -60,21 +60,21 @@ int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, unsigned short* if (topic->type == MQTTSN_TOPIC_TYPE_NORMAL && *qos == 3) { /* special arrangement for long topic names in QoS -1 publishes. The length of the topic is in the topicid field */ - topic->data.qos3.longlen = readInt(&curdata); + topic->data.long_.len = readInt(&curdata); } else if (topic->type == MQTTSN_TOPIC_TYPE_NORMAL || topic->type == MQTTSN_TOPIC_TYPE_PREDEFINED) topic->data.id = readInt(&curdata); else { - topic->data.name[0] = readChar(&curdata); - topic->data.name[1] = readChar(&curdata); + topic->data.short_name[0] = readChar(&curdata); + topic->data.short_name[1] = readChar(&curdata); } *packetid = readInt(&curdata); if (topic->type == MQTTSN_TOPIC_TYPE_NORMAL && *qos == 3) { - topic->data.qos3.longname = (char*)curdata; - curdata += topic->data.qos3.longlen; + topic->data.long_.name = (char*)curdata; + curdata += topic->data.long_.len; } *payloadlen = enddata - curdata; diff --git a/src/MQTTSNPacket.h b/src/MQTTSNPacket.h index abb9512..1dae284 100644 --- a/src/MQTTSNPacket.h +++ b/src/MQTTSNPacket.h @@ -64,12 +64,12 @@ typedef struct union { unsigned short id; - char name[2]; + char short_name[2]; struct { - char* longname; - int longlen; - } qos3; + char* name; + int len; + } long_; } data; } MQTTSN_topicid; @@ -122,8 +122,8 @@ int MQTTstrlen(MQTTString mqttstring); #include "MQTTSNConnect.h" #include "MQTTSNPublish.h" -/*#include "MQTTSNSubscribe.h" -#include "MQTTSNUnsubscribe.h" +#include "MQTTSNSubscribe.h" +/*#include "MQTTSNUnsubscribe.h" */ #include diff --git a/src/MQTTSNSerializePublish.c b/src/MQTTSNSerializePublish.c index f9e8176..45da30e 100644 --- a/src/MQTTSNSerializePublish.c +++ b/src/MQTTSNSerializePublish.c @@ -32,7 +32,7 @@ int MQTTSNSerialize_publishLength(int payloadlen, MQTTSN_topicid topic, int qos) int len = 6; if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) - len += topic.data.qos3.longlen; + len += topic.data.long_.len; return payloadlen + len; } @@ -78,20 +78,20 @@ int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, in if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) { /* special arrangement for long topic names in QoS -1 publishes. The length of the topic is in the topicid field */ - writeInt(&ptr, topic.data.qos3.longlen); /* topic length */ + writeInt(&ptr, topic.data.long_.len); /* topic length */ } else if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL || topic.type == MQTTSN_TOPIC_TYPE_PREDEFINED) writeInt(&ptr, topic.data.id); else { - writeChar(&ptr, topic.data.name[0]); - writeChar(&ptr, topic.data.name[1]); + writeChar(&ptr, topic.data.short_name[0]); + writeChar(&ptr, topic.data.short_name[1]); } writeInt(&ptr, packetid); if (topic.type == MQTTSN_TOPIC_TYPE_NORMAL && qos == 3) { - memcpy(ptr, topic.data.qos3.longname, topic.data.qos3.longlen); - ptr += topic.data.qos3.longlen; + memcpy(ptr, topic.data.long_.name, topic.data.long_.len); + ptr += topic.data.long_.len; } memcpy(ptr, payload, payloadlen); ptr += payloadlen; diff --git a/src/MQTTSNSubscribe.h b/src/MQTTSNSubscribe.h new file mode 100644 index 0000000..3261ac1 --- /dev/null +++ b/src/MQTTSNSubscribe.h @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright (c) 2014 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Ian Craggs - initial API and implementation and/or initial documentation + *******************************************************************************/ + +#if !defined(MQTTSNSUBSCRIBE_H_) +#define MQTTSNSUBSCRIBE_H_ + +int MQTTSNSerialize_subscribe(unsigned char* buf, int buflen, int dup, int qos, unsigned short packetid, + MQTTSN_topicid* topicFilter); + +int MQTTSNDeserialize_suback(int* qos, unsigned short* topicid, unsigned short* packetid, + unsigned char* returncode, unsigned char* buf, int buflen); + +#endif /* MQTTSNSUBSCRIBE_H_ */ diff --git a/src/MQTTSNSubscribeClient.c b/src/MQTTSNSubscribeClient.c index 1153f97..f06a5b7 100644 --- a/src/MQTTSNSubscribeClient.c +++ b/src/MQTTSNSubscribeClient.c @@ -14,15 +14,104 @@ * Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ +#include "MQTTSNPacket.h" +#include "StackTrace.h" +#include -int MQTTDeserialize_subscribe(int* dup, int* packetid, int maxcount, int* count, MQTTString topicFilters[], int requestedQoSs[], - char* buf, int buflen) +/** + * Determines the length of the MQTTSN subscribe packet that would be produced using the supplied parameters, + * excluding length + * @param topicName the topic name to be used in the publish + * @return the length of buffer needed to contain the serialized version of the packet + */ +int MQTTSNSerialize_subscribeLength(MQTTSN_topicid* topicFilter) { + int len = 4; + + if (topicFilter->type == MQTTSN_TOPIC_TYPE_NORMAL) + len += topicFilter->data.long_.len; + + return len; +} + + +int MQTTSNSerialize_subscribe(unsigned char* buf, int buflen, int dup, int qos, unsigned short packetid, MQTTSN_topicid* topicFilter) +{ + unsigned char *ptr = buf; + MQTTSNFlags flags; + int len = 0; + int rc = 0; + + FUNC_ENTRY; + if ((len = MQTTSNPacket_len(MQTTSNSerialize_subscribeLength(topicFilter))) > buflen) + { + rc = MQTTSNPACKET_BUFFER_TOO_SHORT; + goto exit; + } + ptr += MQTTSNPacket_encode(ptr, len); /* write length */ + writeChar(&ptr, MQTTSN_SUBSCRIBE); /* write message type */ + + flags.all = 0; + flags.bits.dup = dup; + flags.bits.QoS = qos; + flags.bits.topicIdType = topicFilter->type; + writeChar(&ptr, flags.all); + + writeInt(&ptr, packetid); + + /* now the topic id or name */ + if (topicFilter->type == MQTTSN_TOPIC_TYPE_NORMAL) /* means long topic name */ + { + memcpy(ptr, topicFilter->data.long_.name, topicFilter->data.long_.len); + ptr += topicFilter->data.long_.len; + } + else if (topicFilter->type == MQTTSN_TOPIC_TYPE_PREDEFINED) + writeInt(&ptr, topicFilter->data.id); + else if (topicFilter->type == MQTTSN_TOPIC_TYPE_SHORT) + { + writeChar(&ptr, topicFilter->data.short_name[0]); + writeChar(&ptr, topicFilter->data.short_name[1]); + } + + rc = ptr - buf; +exit: + FUNC_EXIT_RC(rc); + return rc; } -int MQTTSerialize_suback(char* buf, int buflen, int packetid, int count, int* grantedQoSs) +int MQTTSNDeserialize_suback(int* qos, unsigned short* topicid, unsigned short* packetid, + unsigned char* returncode, unsigned char* buf, int buflen) { + MQTTSNFlags flags; + unsigned char* curdata = buf; + unsigned char* enddata = NULL; + int rc = 0; + int mylen = 0; + + FUNC_ENTRY; + curdata += (rc = MQTTSNPacket_decode(curdata, buflen, &mylen)); /* read length */ + enddata = buf + mylen; + if (enddata - curdata > buflen) + goto exit; + + if (readChar(&curdata) != MQTTSN_SUBACK) + goto exit; + + flags.all = readChar(&curdata); + *qos = flags.bits.QoS; + + *topicid = readInt(&curdata); + *packetid = readInt(&curdata); + *returncode = readChar(&curdata); + + rc = 1; +exit: + FUNC_EXIT_RC(rc); + return rc; +} + +