/******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ #include "MQTTSNPacket.h" #include #if !defined(SOCKET_ERROR) /** error in socket operation */ #define SOCKET_ERROR -1 #endif #if defined(WIN32) /* default on Windows is 64 - increase to make Linux and Windows the same */ #define FD_SETSIZE 1024 #include #include #define MAXHOSTNAMELEN 256 #define EAGAIN WSAEWOULDBLOCK #define EINTR WSAEINTR #define EINVAL WSAEINVAL #define EINPROGRESS WSAEINPROGRESS #define EWOULDBLOCK WSAEWOULDBLOCK #define ENOTCONN WSAENOTCONN #define ECONNRESET WSAECONNRESET #define ioctl ioctlsocket #define socklen_t int #else #define INVALID_SOCKET SOCKET_ERROR #include #include #include #include #include #include #include #include #include #include #include #include #include #endif #if defined(WIN32) #include #else #include #include #endif int Socket_error(char* aString, int sock) { #if defined(WIN32) int errno; #endif #if defined(WIN32) errno = WSAGetLastError(); #endif if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) { if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) { int orig_errno = errno; char* errmsg = strerror(errno); printf("Socket error %d (%s) in %s for socket %d\n", orig_errno, errmsg, aString, sock); } } return errno; } int sendPacketBuffer(int asocket, char* host, int port, unsigned char* buf, int buflen) { struct sockaddr_in cliaddr; int rc = 0; memset(&cliaddr, 0, sizeof(cliaddr)); cliaddr.sin_family = AF_INET; cliaddr.sin_addr.s_addr = inet_addr(host); cliaddr.sin_port = htons(port); if ((rc = sendto(asocket, buf, buflen, 0, (const struct sockaddr*)&cliaddr, sizeof(cliaddr))) == SOCKET_ERROR) Socket_error("sendto", asocket); else if (rc == buflen) rc = 0; return rc; } int mysock = 0; char *host = "127.0.0.1"; int port = 1884; int getdata(unsigned char* buf, size_t count) { int rc = recvfrom(mysock, buf, count, 0, NULL, NULL); printf("received %d bytes count %d\n", rc, (int)count); return rc; } int main(int argc, char** argv) { int rc = 0; unsigned char buf[200]; int buflen = sizeof(buf); MQTTSN_topicid topic; unsigned char* payload = (unsigned char*)"mypayload"; int payloadlen = strlen((char*)payload); int len = 0; int dup = 0; int qos = 1; int retained = 0, packetid = 1; char *topicname = "a long topic name"; MQTTSNPacket_connectData options = MQTTSNPacket_connectData_initializer; unsigned short topicid; if (argc > 1) host = argv[1]; if (argc > 2) port = atoi(argv[2]); printf("Sending to hostname %s port %d\n", host, port); mysock = socket(AF_INET, SOCK_DGRAM, 0); if (mysock == INVALID_SOCKET) rc = Socket_error("socket", mysock); options.clientID.cstring = "pub0sub1 MQTT-SN"; len = MQTTSNSerialize_connect(buf, buflen, &options); rc = sendPacketBuffer(mysock, host, port, buf, len); /* wait for connack */ if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_CONNACK) { int connack_rc = -1; if (MQTTSNDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0) { printf("Unable to connect, return code %d\n", connack_rc); goto exit; } else printf("connected rc %d\n", connack_rc); } else goto exit; /* subscribe */ printf("Subscribing\n"); topic.type = MQTTSN_TOPIC_TYPE_NORMAL; topic.data.long_.name = "substopic"; topic.data.long_.len = strlen(topic.data.long_.name); len = MQTTSNSerialize_subscribe(buf, buflen, 0, 2, /*msgid*/ 1, &topic); rc = sendPacketBuffer(mysock, host, port, buf, len); if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_SUBACK) /* wait for suback */ { unsigned short submsgid; int granted_qos; unsigned char returncode; rc = MQTTSNDeserialize_suback(&granted_qos, &topicid, &submsgid, &returncode, buf, buflen); if (granted_qos != 2 || returncode != 0) { printf("granted qos != 2, %d return code %d\n", granted_qos, returncode); goto exit; } else printf("suback topic id %d\n", topicid); } else goto exit; printf("Publishing\n"); /* publish with short name */ topic.type = MQTTSN_TOPIC_TYPE_NORMAL; topic.data.id = topicid; len = MQTTSNSerialize_publish(buf, buflen, dup, qos, retained, packetid, topic, payload, payloadlen); rc = sendPacketBuffer(mysock, host, port, buf, len); /* wait for puback */ if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_PUBACK) { unsigned short packet_id, topic_id; unsigned char returncode; if (MQTTSNDeserialize_puback(&topic_id, &packet_id, &returncode, buf, buflen) != 1 || returncode != MQTTSN_RC_ACCEPTED) printf("Unable to publish, return code %d\n", returncode); else printf("puback received, msgid %d topic id %d\n", packet_id, topic_id); } else goto exit; printf("Receive publish\n"); if (MQTTSNPacket_read(buf, buflen, getdata) == MQTTSN_PUBLISH) { unsigned short packet_id; int dup, qos, retained, payloadlen; unsigned char* payload; MQTTSN_topicid pubtopic; if (MQTTSNDeserialize_publish(&dup, &qos, &retained, &packet_id, &pubtopic, &payload, &payloadlen, buf, buflen) != 1) printf("Error deserializing publish\n"); else printf("publish received, id %d qos %d\n", packet_id, qos); if (qos == 1) { len = MQTTSNSerialize_puback(buf, buflen, pubtopic.data.id, packet_id, MQTTSN_RC_ACCEPTED); rc = sendPacketBuffer(mysock, host, port, buf, len); if (rc == 0) printf("puback sent\n"); } } else goto exit; len = MQTTSNSerialize_disconnect(buf, buflen, 0); rc = sendPacketBuffer(mysock, host, port, buf, len); exit: rc = shutdown(mysock, SHUT_WR); rc = close(mysock); return 0; }