Publish acks

This commit is contained in:
Ian Craggs
2014-03-30 23:14:38 +01:00
parent 87d51a560b
commit 4953d3c200
4 changed files with 126 additions and 55 deletions

View File

@@ -33,7 +33,7 @@
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success
*/
int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, int* packetid, MQTTSN_topicid* topic,
int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, unsigned short* packetid, MQTTSN_topicid* topic,
unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)
{
MQTTSNFlags flags;
@@ -85,36 +85,27 @@ exit:
return rc;
}
#if 0
/**
* Deserializes the supplied (wire) buffer into an ack
* @param type returned integer - the MQTT packet type
* @param dup returned integer - the MQTT dup flag
* @param packetid returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int MQTTDeserialize_ack(int* type, int* dup, int* packetid, char* buf, int buflen)
int MQTTSNDeserialize_puback(unsigned short* topicid, unsigned short* packetid,
unsigned char* returncode, unsigned char* buf, int buflen)
{
MQTTHeader header;
char* curdata = buf;
char* enddata = NULL;
unsigned char* curdata = buf;
unsigned char* enddata = NULL;
int rc = 0;
int mylen;
int mylen = 0;
FUNC_ENTRY;
header.byte = readChar(&curdata);
*dup = header.bits.dup;
*type = header.bits.type;
curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
enddata = curdata + mylen;
if (enddata - curdata < 2)
curdata += (rc = MQTTSNPacket_decodeBuf(curdata, &mylen)); /* read length */
enddata = buf + mylen;
if (enddata - curdata > buflen)
goto exit;
if (readChar(&curdata) != MQTTSN_PUBACK)
goto exit;
*topicid = readInt(&curdata);
*packetid = readInt(&curdata);
*returncode = readChar(&curdata);
rc = 1;
exit:
@@ -122,4 +113,36 @@ exit:
return rc;
}
#endif
/**
* Deserializes the supplied (wire) buffer into an ack
* @param type returned integer - the MQTT packet type
* @param packetid returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
int MQTTSNDeserialize_ack(unsigned char* type, unsigned short* packetid, unsigned char* buf, int buflen)
{
unsigned char* curdata = buf;
unsigned char* enddata = NULL;
int rc = 0;
int mylen = 0;
FUNC_ENTRY;
curdata += (rc = MQTTSNPacket_decodeBuf(curdata, &mylen)); /* read length */
enddata = buf + mylen;
if (enddata - curdata > buflen)
goto exit;
*type = readChar(&curdata);
if (*type != MQTTSN_PUBREL && *type != MQTTSN_PUBREC && *type != MQTTSN_PUBCOMP)
goto exit;
*packetid = readInt(&curdata);
rc = 1;
exit:
FUNC_EXIT_RC(rc);
return rc;
}

View File

@@ -14,17 +14,23 @@
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#ifndef MQTTSNPUBLISH_H_
#if !defined(MQTTSNPUBLISH_H_)
#define MQTTSNPUBLISH_H_
int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, int retained, int packetid,
int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, int retained, unsigned short packetid,
MQTTSN_topicid topic, unsigned char* payload, int payloadlen);
int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, int* packetid,
int MQTTSNDeserialize_publish(int* dup, int* qos, int* retained, unsigned short* packetid,
MQTTSN_topicid* topic, unsigned char** payload, int* payloadlen, unsigned char* buf, int len);
int MQTTSerialize_puback(char* buf, int buflen, unsigned short packetid, unsigned short topicid, unsigned char returncode);
int MQTTSerialize_pubrel(char* buf, int buflen, int packetid);
int MQTTSerialize_pubcomp(char* buf, int buflen, int packetid);
int MQTTSNSerialize_puback(unsigned char* buf, int buflen, unsigned short topicid, unsigned short packetid,
unsigned char returncode);
int MQTTSNDeserialize_puback(unsigned short* topicid, unsigned short* packetid,
unsigned char* returncode, unsigned char* buf, int buflen);
int MQTTSNSerialize_pubrec(unsigned char* buf, int buflen, unsigned short packetid);
int MQTTSNSerialize_pubrel(unsigned char* buf, int buflen, int dup, unsigned short packetid);
int MQTTSNSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid);
int MQTTSNDeserialize_ack(unsigned char* packettype, unsigned short* packetid, unsigned char* buf, int buflen);
#endif /* MQTTSNPUBLISH_H_ */

View File

@@ -51,7 +51,7 @@ int MQTTSNSerialize_publishLength(int payloadlen, MQTTSN_topicid topic, int qos)
* @param payloadlen integer - the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error
*/
int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, int retained, int packetid,
int MQTTSNSerialize_publish(unsigned char* buf, int buflen, int dup, int qos, int retained, unsigned short packetid,
MQTTSN_topicid topic, unsigned char* payload, int payloadlen)
{
unsigned char *ptr = buf;
@@ -103,7 +103,35 @@ exit:
}
#if 0
int MQTTSNSerialize_puback(unsigned char* buf, int buflen, unsigned short topicid, unsigned short packetid,
unsigned char returncode)
{
unsigned char *ptr = buf;
int len = 0;
int rc = 0;
FUNC_ENTRY;
if ((len = MQTTSNPacket_len(6)) > buflen)
{
rc = MQTTSNPACKET_BUFFER_TOO_SHORT;
goto exit;
}
ptr += MQTTSNPacket_encode(ptr, len); /* write length */
writeChar(&ptr, MQTTSN_PUBACK); /* write message type */
writeInt(&ptr, topicid);
writeInt(&ptr, packetid);
writeChar(&ptr, returncode);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Serializes the ack packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
@@ -113,25 +141,23 @@ exit:
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_ack(char* buf, int buflen, int type, int dup, int packetid)
int MQTTSNSerialize_ack(unsigned char* buf, int buflen, unsigned short packet_type, int dup, unsigned short packetid)
{
MQTTHeader header;
int rc = 0;
char *ptr = buf;
unsigned char *ptr = buf;
int len = 4; /* ack packet length */
FUNC_ENTRY;
if (buflen < 4)
if (len > buflen)
{
rc = MQTTPACKET_BUFFER_TOO_SHORT;
rc = MQTTSNPACKET_BUFFER_TOO_SHORT;
goto exit;
}
header.bits.type = type;
header.bits.dup = dup;
header.bits.qos = 0;
writeChar(&ptr, header.byte); /* write header */
ptr += MQTTSNPacket_encode(ptr, len); /* write length */
writeChar(&ptr, packet_type); /* write packet type */
ptr += MQTTPacket_encode(ptr, 2); /* write remaining length */
writeInt(&ptr, packetid);
rc = ptr - buf;
exit:
FUNC_EXIT_RC(rc);
@@ -146,9 +172,9 @@ exit:
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_puback(char* buf, int buflen, int packetid)
int MQTTSNSerialize_pubrec(unsigned char* buf, int buflen, unsigned short packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBACK, packetid, 0);
return MQTTSNSerialize_ack(buf, buflen, MQTTSN_PUBREC, 0, packetid);
}
@@ -160,9 +186,9 @@ int MQTTSerialize_puback(char* buf, int buflen, int packetid)
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_pubrel(char* buf, int buflen, int dup, int packetid)
int MQTTSNSerialize_pubrel(unsigned char* buf, int buflen, int dup, unsigned short packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBREL, packetid, dup);
return MQTTSNSerialize_ack(buf, buflen, MQTTSN_PUBREL, dup, packetid);
}
@@ -173,10 +199,7 @@ int MQTTSerialize_pubrel(char* buf, int buflen, int dup, int packetid)
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
int MQTTSerialize_pubcomp(char* buf, int buflen, int packetid)
int MQTTSNSerialize_pubcomp(unsigned char* buf, int buflen, unsigned short packetid)
{
return MQTTSerialize_ack(buf, buflen, PUBCOMP, packetid, 0);
return MQTTSNSerialize_ack(buf, buflen, MQTTSN_PUBCOMP, 0, packetid);
}
#endif

View File

@@ -343,7 +343,7 @@ int test2(struct Options options)
int dup = 0;
int qos = 2;
int retained = 0;
int msgid = 23;
unsigned short msgid = 23;
MQTTSN_topicid topic;
unsigned char *payload = (unsigned char*)"kkhkhkjkj jkjjk jk jk ";
int payloadlen = strlen((char*)payload);
@@ -351,10 +351,13 @@ int test2(struct Options options)
int dup2 = 1;
int qos2 = 1;
int retained2 = 1;
int msgid2 = 3243;
unsigned short msgid2 = 3243;
MQTTSN_topicid topic2;
unsigned char *payload2 = NULL;
int payloadlen2 = 0;
unsigned char acktype;
unsigned char returncode = 3, returncode2 = -99;
fprintf(xml, "<testcase classname=\"test1\" name=\"de/serialization\"");
global_start_time = start_clock();
@@ -388,6 +391,22 @@ int test2(struct Options options)
assert("payloads should be the same",
memcmp(payload, payload2, payloadlen) == 0, "payloads were different %s\n", "");
rc = MQTTSNSerialize_puback(buf, buflen, topic.data.id, msgid, returncode);
assert("good rc from serialize puback", rc > 0, "rc was %d\n", rc);
rc = MQTTSNDeserialize_puback(&topic2.data.id, &msgid2, &returncode2, buf, buflen);
assert("good rc from deserialize puback", rc > 0, "rc was %d\n", rc);
assert("msgids should be the same", msgid == msgid2, "msgids were different %d\n", msgid2);
assert("return codes should be the same", returncode == returncode2, "return codes were different %d\n", returncode2);
rc = MQTTSNSerialize_pubrec(buf, buflen, msgid);
assert("good rc from serialize pubrec", rc > 0, "rc was %d\n", rc);
rc = MQTTSNDeserialize_ack(&acktype, &msgid2, buf, buflen);
assert("good rc from deserialize pubrec", rc == 1, "rc was %d\n", rc);
assert("Acktype should be MQTTSN_PUBREC", acktype == MQTTSN_PUBREC, "acktype was %d\n", acktype);
assert("msgids should be the same", msgid == msgid2, "msgids were different %d\n", msgid2);
/*exit:*/
MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);