GCC Code Coverage Report


Directory: src/gate/
File: src/gate/net/mqttprotocols.c
Date: 2025-12-12 23:40:09
Exec Total Coverage
Lines: 0 528 0.0%
Functions: 0 28 0.0%
Branches: 0 306 0.0%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28
29 #include "gate/net/mqttprotocols.h"
30 #include "gate/libraries.h"
31 #include "gate/streams.h"
32 #include "gate/results.h"
33 #include "gate/serializers.h"
34 #include "gate/debugging.h"
35
36
37 /* #include <mosquitto.h> */
38
39
40 gate_result_t gate_mqtt_write_byte(gate_stream_t* stream, gate_uint8_t value)
41 {
42 return gate_stream_write(stream, (char const*)&value, 1, NULL);
43 }
44
45 gate_result_t gate_mqtt_write_word(gate_stream_t* stream, gate_uint16_t value)
46 {
47 char buff[4];
48 gate_size_t len = gate_serialize_uint16_b(buff, sizeof(buff), &value);
49 return gate_stream_write_block(stream, buff, len, NULL);
50 }
51
52 gate_result_t gate_mqtt_write_length(gate_stream_t* stream, gate_size_t value)
53 {
54 gate_result_t ret = GATE_RESULT_OK;
55 unsigned char encoded_char;
56
57 while (GATE_SUCCEEDED(ret))
58 {
59 encoded_char = value % 128;
60 value /= 128;
61 if (value > 0)
62 {
63 encoded_char |= 128;
64 }
65 ret = gate_stream_write(stream, (char const*)&encoded_char, 1, NULL);
66 if (value == 0)
67 {
68 /* all bits are encoded, we are done */
69 break;
70 }
71 }
72 return ret;
73 }
74
75 gate_result_t gate_mqtt_write_string(gate_stream_t* stream, char const* str, gate_size_t length)
76 {
77 gate_uint16_t const len = (gate_uint16_t)length;
78 gate_result_t ret = gate_mqtt_write_word(stream, len);
79 if (GATE_SUCCEEDED(ret))
80 {
81 ret = gate_stream_write(stream, str, length, NULL);
82 }
83 return ret;
84 }
85
86 gate_result_t gate_mqtt_read_byte(gate_stream_t* stream, gate_uint8_t* value)
87 {
88 char chr = 0;
89 gate_size_t received = 0;
90 gate_result_t ret = gate_stream_read(stream, &chr, 1, &received);
91 if (GATE_SUCCEEDED(ret))
92 {
93 if (received != 1)
94 {
95 ret = GATE_RESULT_ENDOFSTREAM;
96 }
97 else
98 {
99 *value = (gate_uint8_t)chr;
100 }
101 }
102 return ret;
103 }
104
105 gate_result_t gate_mqtt_read_word(gate_stream_t* stream, gate_uint16_t* value)
106 {
107 char buff[2];
108 gate_size_t received = 0;
109 gate_result_t ret = gate_stream_read_block(stream, &buff[0], 2, &received);
110 if (GATE_SUCCEEDED(ret))
111 {
112 if (received != 2)
113 {
114 ret = GATE_RESULT_ENDOFSTREAM;
115 }
116 else
117 {
118 gate_deserialize_uint16_b(buff, 2, value);
119 }
120 }
121 return ret;
122 }
123 gate_result_t gate_mqtt_read_length(gate_stream_t* stream, gate_size_t* ptr_value)
124 {
125 gate_result_t ret = GATE_RESULT_FAILED;
126 gate_size_t value = 0;
127 unsigned char b;
128 gate_size_t len_received = 0;
129 gate_size_t multiplier = 1;
130
131 do
132 {
133 ret = gate_stream_read(stream, (char*)&b, 1, &len_received);
134 GATE_BREAK_IF_FAILED(ret);
135 if (len_received == 0)
136 {
137 ret = GATE_RESULT_ENDOFSTREAM;
138 break;
139 }
140 value += (b & 127) * multiplier;
141 multiplier *= 128;
142 if (multiplier > 128 * 128 * 128)
143 {
144 ret = GATE_RESULT_INVALIDCONTENT;
145 break;
146 }
147 } while ((b & 128) != 0);
148
149 if (ptr_value)
150 {
151 *ptr_value = value;
152 }
153 return ret;
154 }
155 gate_result_t gate_mqtt_read_string(gate_stream_t* stream, gate_strbuilder_t* builder)
156 {
157 gate_uint16_t str_length = 0;
158 gate_result_t ret = gate_mqtt_read_word(stream, &str_length);
159
160 if (GATE_SUCCEEDED(ret))
161 {
162 char buffer[4096];
163 gate_size_t transferred = 0;
164 gate_size_t received = 0;
165 while (transferred < str_length)
166 {
167 gate_size_t required = str_length - transferred;
168 if (required > sizeof(buffer))
169 {
170 required = sizeof(buffer);
171 }
172 ret = gate_stream_read_block(stream, buffer, required, &received);
173 GATE_BREAK_IF_FAILED(ret);
174 gate_strbuilder_append_text(builder, buffer, received);
175 if (received < required)
176 {
177 ret = GATE_RESULT_INVALIDCONTENT;
178 break;
179 }
180 transferred += received;
181 }
182 }
183 return ret;
184 }
185
186 gate_result_t gate_mqtt_read_string_length_skip_data(gate_stream_t* stream, gate_uint16_t* ptr_length)
187 {
188 gate_uint16_t str_length = 0;
189 gate_result_t ret = gate_mqtt_read_word(stream, &str_length);
190 if (GATE_SUCCEEDED(ret))
191 {
192 char buffer[4096];
193 gate_size_t transferred = 0;
194 if (ptr_length)
195 {
196 *ptr_length = str_length;
197 }
198 while (transferred < str_length)
199 {
200 gate_size_t required = str_length - transferred;
201 gate_size_t received = 0;
202 if (required > sizeof(buffer))
203 {
204 required = sizeof(buffer);
205 }
206 ret = gate_stream_read_block(stream, buffer, required, &received);
207 GATE_BREAK_IF_FAILED(ret);
208 if (received < required)
209 {
210 ret = GATE_RESULT_INVALIDCONTENT;
211 break;
212 }
213 transferred += received;
214 }
215 }
216 return ret;
217 }
218
219
220 static gate_result_t serialize_memstream_packet(
221 gate_stream_t* output_stream,
222 gate_mqtt_packet_type_t packet_type,
223 unsigned char packet_flags,
224 gate_memstream_t* memstream)
225 {
226 gate_result_t ret;
227
228 do
229 {
230 gate_uint8_t const fixed_header_byte = (((gate_uint8_t)packet_type & 0x0f) << 4) | (packet_flags & 0x0f);
231 char const* const ptr_content = gate_memstream_get_data(memstream);
232 gate_size_t const content_size = gate_memstream_size(memstream);
233
234 ret = gate_mqtt_write_byte(output_stream, fixed_header_byte);
235 GATE_BREAK_IF_FAILED(ret);
236 ret = gate_mqtt_write_length(output_stream, content_size);
237 GATE_BREAK_IF_FAILED(ret);
238 ret = gate_stream_write_block(output_stream, ptr_content, content_size, NULL);
239 GATE_BREAK_IF_FAILED(ret);
240 } while (0);
241 return ret;
242 }
243
244 static gate_result_t serialize_connect_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
245 {
246 gate_result_t ret = GATE_RESULT_OK;
247
248 do
249 {
250 gate_memstream_impl_t data_memstream_impl;
251 char data_memstream_buffer[4096];
252
253 gate_memstream_t* const data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, data_memstream_buffer, sizeof(data_memstream_buffer), 0);
254 gate_stream_t* const data_stream = (gate_stream_t*)data_memstream;
255
256 /* var header */
257 ret = gate_mqtt_write_string(data_stream, "MQTT", 4);
258 GATE_BREAK_IF_FAILED(ret);
259 ret = gate_mqtt_write_byte(data_stream, msg->content.connect.level);
260 GATE_BREAK_IF_FAILED(ret);
261 ret = gate_mqtt_write_byte(data_stream, msg->content.connect.flags);
262 GATE_BREAK_IF_FAILED(ret);
263 ret = gate_mqtt_write_word(data_stream, msg->content.connect.keep_alive);
264 GATE_BREAK_IF_FAILED(ret);
265
266 /* payload */
267 ret = gate_mqtt_write_string(data_stream, msg->content.connect.client_id, msg->content.connect.client_id_len);
268 GATE_BREAK_IF_FAILED(ret);
269 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_WILLFLAG))
270 {
271 ret = gate_mqtt_write_string(data_stream, msg->content.connect.will_topic, msg->content.connect.will_topic_len);
272 GATE_BREAK_IF_FAILED(ret);
273 ret = gate_mqtt_write_string(data_stream, msg->content.connect.will_message, msg->content.connect.will_message_len);
274 GATE_BREAK_IF_FAILED(ret);
275 }
276 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_USER))
277 {
278 ret = gate_mqtt_write_string(data_stream, msg->content.connect.user, msg->content.connect.user_len);
279 GATE_BREAK_IF_FAILED(ret);
280 }
281 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_PASSWORD))
282 {
283 ret = gate_mqtt_write_string(data_stream, msg->content.connect.password, msg->content.connect.password_len);
284 GATE_BREAK_IF_FAILED(ret);
285 }
286
287 /* build full packet */
288 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_connect, msg->packet_flags, data_memstream);
289 } while (0);
290 return ret;
291 }
292
293 static gate_result_t serialize_connect_ack_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
294 {
295 gate_result_t ret = GATE_RESULT_OK;
296
297 do
298 {
299 gate_memstream_impl_t data_memstream_impl;
300 char data_memstream_buffer[64];
301
302 gate_memstream_t* const data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, data_memstream_buffer, sizeof(data_memstream_buffer), 0);
303 gate_stream_t* const data_stream = (gate_stream_t*)data_memstream;
304
305 /* var header */
306 ret = gate_mqtt_write_byte(data_stream, msg->content.connect_ack.flags);
307 GATE_BREAK_IF_FAILED(ret);
308 ret = gate_mqtt_write_byte(data_stream, msg->content.connect_ack.return_code);
309 GATE_BREAK_IF_FAILED(ret);
310
311 /* build full packet */
312 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_connect_ack, msg->packet_flags, data_memstream);
313 GATE_BREAK_IF_FAILED(ret);
314
315 /* succeeded */
316 } while (0);
317 return ret;
318 }
319
320 static gate_result_t serialize_pubish_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
321 {
322 gate_result_t ret;
323 gate_memstream_t* data_memstream = NULL;
324
325 do
326 {
327 gate_size_t written = 0;
328 const gate_size_t content_len = msg->content.publish.data_len + msg->content.publish.topic_len + 5;
329 gate_stream_t* data_stream;
330 data_memstream = gate_memstream_create(content_len);
331 if (NULL == data_memstream)
332 {
333 ret = GATE_RESULT_OUTOFMEMORY;
334 break;
335 }
336 data_stream = (gate_stream_t*)data_memstream;
337
338
339 /* var header */
340 ret = gate_mqtt_write_string(data_stream, msg->content.publish.topic, msg->content.publish.topic_len);
341 GATE_BREAK_IF_FAILED(ret);
342
343 if (GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS1) || GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS2))
344 {
345 ret = gate_mqtt_write_word(data_stream, msg->content.publish.packet_identifier);
346 GATE_BREAK_IF_FAILED(ret);
347 }
348
349 /* payload */
350 ret = gate_stream_write_block(data_stream, msg->content.publish.data, msg->content.publish.data_len, &written);
351 GATE_BREAK_IF_FAILED(ret);
352
353 /* build full packet */
354 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_publish, msg->packet_flags, data_memstream);
355 GATE_BREAK_IF_FAILED(ret);
356
357 /* success case reached */
358 } while (0);
359
360 if (data_memstream != NULL)
361 {
362 gate_object_release(data_memstream);
363 }
364
365 return ret;
366 }
367
368
369 static gate_result_t serialize_nocontent_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
370 {
371 gate_memstream_impl_t data_memstream_impl;
372 gate_memstream_t* data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, NULL, 0, 0);
373 return serialize_memstream_packet(output_stream, msg->packet_type, msg->packet_flags, data_memstream);
374 }
375
376 static gate_result_t serialize_packet_identifier_only_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg, gate_uint16_t id)
377 {
378 char buffer[16];
379 gate_memstream_impl_t data_memstream_impl;
380 gate_memstream_t* data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, buffer, sizeof(buffer), 0);
381
382 gate_result_t result = gate_mqtt_write_word((gate_stream_t*)data_memstream, id);
383
384 if (GATE_SUCCEEDED(result))
385 {
386 result = serialize_memstream_packet(output_stream, msg->packet_type, msg->packet_flags, data_memstream);
387 }
388 return result;
389 }
390
391 static gate_result_t serialize_subscribe_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
392 {
393 gate_result_t ret;
394 gate_memstream_t* data_memstream = NULL;
395
396 do
397 {
398 gate_stream_t* data_stream;
399 gate_mqtt_subscription_t const* subscr;
400 gate_uint16_t subscr_count;
401
402 data_memstream = gate_memstream_create(128);
403 if (NULL == data_memstream)
404 {
405 ret = GATE_RESULT_OUTOFMEMORY;
406 break;
407 }
408 data_stream = (gate_stream_t*)data_memstream;
409
410
411 /* var header */
412 ret = gate_mqtt_write_word(data_stream, msg->content.subscribe.packet_identifier);
413 GATE_BREAK_IF_FAILED(ret);
414
415 /* payload */
416 subscr_count = msg->content.subscribe.topics_count;
417 subscr = msg->content.subscribe.topics;
418 for (; subscr_count > 0; --subscr_count, ++subscr)
419 {
420 ret = gate_mqtt_write_string(data_stream, subscr->topic, subscr->topic_len);
421 GATE_BREAK_IF_FAILED(ret);
422
423 ret = gate_mqtt_write_byte(data_stream, subscr->qos);
424 GATE_BREAK_IF_FAILED(ret);
425 }
426
427 GATE_BREAK_IF_FAILED(ret);
428
429 /* build full packet */
430 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_subscribe, 0x02, data_memstream);
431 GATE_BREAK_IF_FAILED(ret);
432
433 /* success case reached */
434 } while (0);
435
436 if (data_memstream != NULL)
437 {
438 gate_object_release(data_memstream);
439 }
440
441 return ret;
442 }
443
444 static gate_result_t serialize_subscribe_ack_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
445 {
446 gate_result_t ret;
447 gate_memstream_t* data_memstream;
448
449 do
450 {
451 gate_stream_t* data_stream;
452 gate_uint8_t const* ret_codes;
453 gate_uint16_t ret_codes_count;
454
455 data_memstream = gate_memstream_create(128);
456 if (NULL == data_memstream)
457 {
458 ret = GATE_RESULT_OUTOFMEMORY;
459 break;
460 }
461 data_stream = (gate_stream_t*)data_memstream;
462
463
464 /* var header */
465 ret = gate_mqtt_write_word(data_stream, msg->content.subscribe_ack.packet_identifier);
466 GATE_BREAK_IF_FAILED(ret);
467
468 /* payload */
469 ret_codes_count = msg->content.subscribe_ack.ret_codes_count;
470 ret_codes = msg->content.subscribe_ack.ret_codes;
471 for (; ret_codes_count > 0; --ret_codes_count, ++ret_codes)
472 {
473 ret = gate_mqtt_write_byte(data_stream, *ret_codes);
474 GATE_BREAK_IF_FAILED(ret);
475 }
476
477 GATE_BREAK_IF_FAILED(ret);
478
479 /* build full packet */
480 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_subscribe_ack, msg->packet_flags, data_memstream);
481 GATE_BREAK_IF_FAILED(ret);
482
483 /* success case reached */
484 } while (0);
485
486 if (data_memstream != NULL)
487 {
488 gate_object_release(data_memstream);
489 }
490
491 return ret;
492 }
493
494 static gate_result_t serialize_unsubscribe_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
495 {
496 gate_result_t ret;
497 gate_memstream_t* data_memstream;
498
499 do
500 {
501 gate_stream_t* data_stream;
502 gate_mqtt_subscription_t const* subscr;
503 gate_uint16_t subscr_count;
504
505 data_memstream = gate_memstream_create(128);
506 if (NULL == data_memstream)
507 {
508 ret = GATE_RESULT_OUTOFMEMORY;
509 break;
510 }
511 data_stream = (gate_stream_t*)data_memstream;
512
513
514 /* var header */
515 ret = gate_mqtt_write_word(data_stream, msg->content.subscribe.packet_identifier);
516 GATE_BREAK_IF_FAILED(ret);
517
518 /* payload */
519 subscr_count = msg->content.unsubscribe.topics_count;
520 subscr = msg->content.unsubscribe.topics;
521 for (; subscr_count > 0; --subscr_count, ++subscr)
522 {
523 ret = gate_mqtt_write_string(data_stream, subscr->topic, subscr->topic_len);
524 GATE_BREAK_IF_FAILED(ret);
525 }
526
527 GATE_BREAK_IF_FAILED(ret);
528
529 /* build full packet */
530 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_subscribe, msg->packet_flags, data_memstream);
531 GATE_BREAK_IF_FAILED(ret);
532
533 /* success case reached */
534 } while (0);
535
536 if (data_memstream != NULL)
537 {
538 gate_object_release(data_memstream);
539 }
540
541 return ret;
542 }
543
544
545 gate_result_t gate_mqtt_send_message(gate_stream_t* mqtt_stream, gate_mqtt_message_t const* msg)
546 {
547 if (!mqtt_stream || !msg)
548 {
549 return GATE_RESULT_INVALIDARG;
550 }
551
552 switch (msg->packet_type)
553 {
554 case gate_mqtt_packet_connect: return serialize_connect_message(mqtt_stream, msg);
555 case gate_mqtt_packet_connect_ack: return serialize_connect_ack_message(mqtt_stream, msg);
556 case gate_mqtt_packet_publish: return serialize_pubish_message(mqtt_stream, msg);
557 case gate_mqtt_packet_publish_ack: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_ack.packet_identifier);
558 case gate_mqtt_packet_publish_receive: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_received.packet_identifier);
559 case gate_mqtt_packet_publish_release: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_release.packet_identifier);
560 case gate_mqtt_packet_publish_complete: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_complete.packet_identifier);
561 case gate_mqtt_packet_subscribe: return serialize_subscribe_message(mqtt_stream, msg);
562 case gate_mqtt_packet_subscribe_ack: return serialize_subscribe_ack_message(mqtt_stream, msg);
563 case gate_mqtt_packet_unsubscribe: return serialize_unsubscribe_message(mqtt_stream, msg);
564 case gate_mqtt_packet_unsubscribe_ack: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.unsubscribe_ack.packet_identifier);
565 case gate_mqtt_packet_ping_request: return serialize_nocontent_message(mqtt_stream, msg);
566 case gate_mqtt_packet_ping_response: return serialize_nocontent_message(mqtt_stream, msg);
567 case gate_mqtt_packet_disconnect: return serialize_nocontent_message(mqtt_stream, msg);
568 default: return GATE_RESULT_NOTSUPPORTED;
569 }
570 }
571
572 static gate_result_t deserialize_connect(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
573 {
574 gate_result_t ret = GATE_RESULT_FAILED;
575
576 do
577 {
578 gate_size_t offset;
579 gate_strbuilder_t builder = GATE_INIT_EMPTY;
580 char buffer[4096];
581 gate_memstream_impl_t block_stream;
582 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
583 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
584
585 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
586 &block_stream, ptr_block_content, block_content_len, block_content_len);
587
588 if (strm == NULL)
589 {
590 ret = GATE_RESULT_OUTOFMEMORY;
591 break;
592 }
593
594 gate_strbuilder_create_static(&builder, buffer, sizeof(buffer), 0);
595
596 ret = gate_mqtt_read_string(strm, &builder);
597 GATE_BREAK_IF_FAILED(ret);
598
599 if (gate_str_compare(gate_strbuilder_ptr(&builder, 0), gate_strbuilder_length(&builder), "MQTT", 4) != 0)
600 {
601 /* Not a valid MQTT connect message */
602 ret = GATE_RESULT_INVALIDHEADER;
603 break;
604 }
605
606 ret = gate_mqtt_read_byte(strm, &msg->content.connect.level);
607 GATE_BREAK_IF_FAILED(ret);
608
609 ret = gate_mqtt_read_byte(strm, &msg->content.connect.flags);
610 GATE_BREAK_IF_FAILED(ret);
611
612 ret = gate_mqtt_read_word(strm, &msg->content.connect.keep_alive);
613 GATE_BREAK_IF_FAILED(ret);
614
615 offset = 10; /* payload starts at block offset 10: */
616
617 /* read client id */
618 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.client_id_len);
619 offset += 2;
620 msg->content.connect.client_id = &ptr_block_content[offset];
621 offset += msg->content.connect.client_id_len;
622
623 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_WILLFLAG))
624 {
625 /* read optional will topic+content */
626 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.will_topic_len);
627 offset += 2;
628 msg->content.connect.will_topic = &ptr_block_content[offset];
629 offset += msg->content.connect.will_topic_len;
630
631 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.will_message_len);
632 offset += 2;
633 msg->content.connect.will_message = &ptr_block_content[offset];
634 offset += msg->content.connect.will_message_len;
635 }
636
637 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_USER))
638 {
639 /* read optional user string */
640 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.user_len);
641 offset += 2;
642 msg->content.connect.user = &ptr_block_content[offset];
643 offset += msg->content.connect.user_len;
644 }
645
646 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_PASSWORD))
647 {
648 /* read optional will topic+content */
649 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.password_len);
650 offset += 2;
651 msg->content.connect.password = &ptr_block_content[offset];
652 offset += msg->content.connect.password_len;
653 }
654 } while (0);
655
656 return ret;
657 }
658
659
660 static gate_result_t deserialize_connect_ack(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
661 {
662 gate_result_t ret = GATE_RESULT_FAILED;
663 gate_uint8_t ack_flags = 0;
664 gate_uint8_t ret_code = 0;
665
666 do
667 {
668 gate_memstream_impl_t block_stream;
669 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
670 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
671
672 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
673 &block_stream, ptr_block_content, block_content_len, block_content_len);
674
675 if (strm == NULL)
676 {
677 ret = GATE_RESULT_OUTOFMEMORY;
678 break;
679 }
680
681
682 if (block_content_len != 2)
683 {
684 ret = GATE_RESULT_INVALIDHEADER;
685 break;
686 }
687
688 ret = gate_mqtt_read_byte(strm, &ack_flags);
689 GATE_BREAK_IF_FAILED(ret);
690 ret = gate_mqtt_read_byte(strm, &ret_code);
691 GATE_BREAK_IF_FAILED(ret);
692
693 msg->content.connect_ack.flags = ack_flags;
694 msg->content.connect_ack.return_code = ret_code;
695 } while (0);
696
697 return ret;
698 }
699
700 static gate_result_t deserialize_package_identifier_only_message(gate_memoryblock_t* block, gate_uint16_t* ptr_packet_id)
701 {
702 gate_result_t ret;
703 void const* const ptr_block_data = gate_memoryblock_get_content(block);
704 gate_size_t const block_len = gate_memoryblock_get_size(block);
705
706 do
707 {
708 gate_memstream_impl_t data_strm;
709 gate_stream_t* input_strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
710 &data_strm, ptr_block_data, block_len, block_len);
711 if (input_strm == NULL)
712 {
713 ret = GATE_RESULT_OUTOFMEMORY;
714 break;
715 }
716
717 ret = gate_mqtt_read_word(input_strm, ptr_packet_id);
718 GATE_BREAK_IF_FAILED(ret);
719
720 /* success case reached */
721 } while (0);
722
723 return ret;
724
725 }
726
727 static gate_result_t deserialize_publish(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
728 {
729 gate_result_t ret = GATE_RESULT_FAILED;
730
731 do
732 {
733 gate_size_t offset;
734 gate_memstream_impl_t block_stream;
735 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
736 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
737
738 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
739 &block_stream, ptr_block_content, block_content_len, block_content_len);
740
741 if (strm == NULL)
742 {
743 ret = GATE_RESULT_OUTOFMEMORY;
744 break;
745 }
746
747 ret = gate_mqtt_read_string_length_skip_data(strm, &msg->content.publish.topic_len);
748 GATE_BREAK_IF_FAILED(ret);
749 offset = 2;
750 msg->content.publish.topic = &ptr_block_content[offset];
751 offset += msg->content.publish.topic_len;
752
753 if (GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS1) || GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS2))
754 {
755 ret = gate_mqtt_read_word(strm, &msg->content.publish.packet_identifier);
756 GATE_BREAK_IF_FAILED(ret);
757 offset += 2;
758 }
759 else
760 {
761 msg->content.publish.packet_identifier = 0;
762 }
763
764 msg->content.publish.data_len = (gate_uint32_t)(block_content_len - offset);
765 msg->content.publish.data = &ptr_block_content[offset];
766
767 /* success case reached */
768 } while (0);
769
770 return ret;
771 }
772
773 static gate_result_t deserialize_subscribe(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
774 {
775 gate_result_t ret = GATE_RESULT_FAILED;
776
777 do
778 {
779 gate_size_t offset;
780 gate_memstream_impl_t block_stream;
781 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
782 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
783
784 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
785 &block_stream, ptr_block_content, block_content_len, block_content_len);
786
787 if (strm == NULL)
788 {
789 ret = GATE_RESULT_OUTOFMEMORY;
790 break;
791 }
792
793 ret = gate_mqtt_read_word(strm, &msg->content.subscribe.packet_identifier);
794 GATE_BREAK_IF_FAILED(ret);
795
796 offset = 2;
797 msg->content.subscribe.topics_count = 0;
798 while ((gate_memstream_size(&block_stream) >= 4) && (msg->content.subscribe.topics_count < GATE_MQTT_ARRAY_MAX))
799 {
800 gate_mqtt_subscription_t* ptr_sub = &msg->content.subscribe.topics[msg->content.subscribe.topics_count];
801
802 ret = gate_mqtt_read_string_length_skip_data(strm, &ptr_sub->topic_len);
803 GATE_BREAK_IF_FAILED(ret);
804 offset += 2;
805 ptr_sub->topic = &ptr_block_content[offset];
806 offset += ptr_sub->topic_len;
807
808 ret = gate_mqtt_read_byte(strm, &ptr_sub->qos);
809 GATE_BREAK_IF_FAILED(ret);
810 offset += 1;
811
812 ++msg->content.subscribe.topics_count;
813 }
814 GATE_BREAK_IF_FAILED(ret);
815
816 /* success case reached */
817 } while (0);
818
819 return ret;
820 }
821
822 static gate_result_t deserialize_subscribe_ack(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
823 {
824 gate_result_t ret = GATE_RESULT_FAILED;
825
826 do
827 {
828 gate_size_t offset;
829 gate_memstream_impl_t block_stream;
830 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
831 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
832
833 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
834 &block_stream, ptr_block_content, block_content_len, block_content_len);
835
836 if (strm == NULL)
837 {
838 ret = GATE_RESULT_OUTOFMEMORY;
839 break;
840 }
841
842 ret = gate_mqtt_read_word(strm, &msg->content.subscribe_ack.packet_identifier);
843 GATE_BREAK_IF_FAILED(ret);
844
845 offset = 2;
846 msg->content.subscribe_ack.ret_codes_count = 0;
847 while ((gate_memstream_size(&block_stream) > 0) && (msg->content.subscribe_ack.ret_codes_count < GATE_MQTT_ARRAY_MAX))
848 {
849 ret = gate_mqtt_read_byte(strm, &msg->content.subscribe_ack.ret_codes[msg->content.subscribe_ack.ret_codes_count]);
850 GATE_BREAK_IF_FAILED(ret);
851
852 ++msg->content.subscribe_ack.ret_codes_count;
853 }
854 GATE_BREAK_IF_FAILED(ret);
855
856 /* success case reached */
857 } while (0);
858
859 return ret;
860 }
861
862 static gate_result_t deserialize_unsubscribe(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
863 {
864 gate_result_t ret = GATE_RESULT_FAILED;
865
866 do
867 {
868 gate_size_t offset;
869 gate_memstream_impl_t block_stream;
870 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
871 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
872
873 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
874 &block_stream, ptr_block_content, block_content_len, block_content_len);
875
876 if (strm == NULL)
877 {
878 ret = GATE_RESULT_OUTOFMEMORY;
879 break;
880 }
881
882 ret = gate_mqtt_read_word(strm, &msg->content.unsubscribe.packet_identifier);
883 GATE_BREAK_IF_FAILED(ret);
884
885 offset = 2;
886 msg->content.unsubscribe.topics_count = 0;
887 while ((gate_memstream_size(&block_stream) >= 3) && (msg->content.unsubscribe.topics_count < GATE_MQTT_ARRAY_MAX))
888 {
889 gate_mqtt_subscription_t* ptr_sub = &msg->content.unsubscribe.topics[msg->content.unsubscribe.topics_count];
890
891 ret = gate_mqtt_read_string_length_skip_data(strm, &ptr_sub->topic_len);
892 GATE_BREAK_IF_FAILED(ret);
893 offset += 2;
894 ptr_sub->topic = &ptr_block_content[offset];
895 offset += ptr_sub->topic_len;
896
897 ptr_sub->qos = 0;
898
899 ++msg->content.unsubscribe.topics_count;
900 }
901 GATE_BREAK_IF_FAILED(ret);
902
903 /* success case reached */
904 } while (0);
905
906 return ret;
907 }
908
909
910 gate_result_t gate_mqtt_has_queued_message(gate_stream_t* mqtt_stream)
911 {
912 gate_result_t ret = GATE_RESULT_FAILED;
913 char buffer[1024];
914 char* ptr_buffer = &buffer[0];
915 gate_size_t buffer_capacity = sizeof(buffer);
916 gate_size_t returned;
917 gate_memstream_impl_t memstrm;
918 gate_stream_t* strm = NULL;
919
920 do
921 {
922 gate_uint8_t fixed_header_byte = 0;
923 gate_size_t fixed_header_contentlen = 0;
924 gate_size_t remaining_bytes;
925 gate_size_t fixed_header_length;
926 gate_size_t total_packet_length;
927
928 ret = gate_stream_peek(mqtt_stream, ptr_buffer, buffer_capacity, &returned);
929 GATE_BREAK_IF_FAILED(ret);
930
931 if (returned < 2)
932 {
933 ret = GATE_RESULT_ENDOFSTREAM;
934 break;
935 }
936
937 strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(&memstrm, ptr_buffer, returned, returned);
938 if (!strm)
939 {
940 ret = GATE_RESULT_OUTOFMEMORY;
941 break;
942 }
943
944 ret = gate_mqtt_read_byte(strm, &fixed_header_byte);
945 GATE_BREAK_IF_FAILED(ret);
946
947 ret = gate_mqtt_read_length(strm, &fixed_header_contentlen);
948 GATE_BREAK_IF_FAILED(ret);
949
950 remaining_bytes = gate_memstream_size(&memstrm);
951 GATE_DEBUG_ASSERT(returned >= remaining_bytes);
952 fixed_header_length = returned - remaining_bytes;
953 total_packet_length = fixed_header_length + fixed_header_contentlen;
954
955 if (total_packet_length > buffer_capacity)
956 {
957 ptr_buffer = (char*)gate_mem_alloc(total_packet_length);
958 if (ptr_buffer == NULL)
959 {
960 ret = GATE_RESULT_OUTOFMEMORY;
961 break;
962 }
963 }
964
965 ret = gate_stream_peek(mqtt_stream, ptr_buffer, total_packet_length, &returned);
966 GATE_BREAK_IF_FAILED(ret);
967
968 if (returned < total_packet_length)
969 {
970 /* not enough bytes in stream to decode a full package */
971 ret = GATE_RESULT_ENDOFSTREAM;
972 break;
973 }
974
975 /* success case, we have a full packet in the stream */
976 ret = GATE_RESULT_OK;
977 } while (0);
978
979 if ((ptr_buffer != NULL) && (ptr_buffer != &buffer[0]))
980 {
981 gate_mem_dealloc(ptr_buffer);
982 }
983
984 return ret;
985 }
986
987
988 gate_result_t gate_mqtt_receive_message(gate_stream_t* mqtt_stream, gate_mqtt_message_t* msg, gate_memoryblock_t** ptr_new_memblock)
989 {
990 gate_result_t ret;
991 gate_memoryblock_t* mem_block = NULL;
992
993 do
994 {
995 gate_uint8_t fixed_header_byte = 0;
996 gate_size_t fixed_header_len = 0;
997 gate_uint64_t content_transferred = 0;
998
999 if (!mqtt_stream || !msg || !ptr_new_memblock)
1000 {
1001 ret = GATE_RESULT_INVALIDARG;
1002 break;
1003 }
1004
1005 ret = gate_mqtt_read_byte(mqtt_stream, &fixed_header_byte);
1006 GATE_BREAK_IF_FAILED(ret);
1007
1008 ret = gate_mqtt_read_length(mqtt_stream, &fixed_header_len);
1009 GATE_BREAK_IF_FAILED(ret);
1010
1011 msg->packet_type = (gate_mqtt_packet_type_t)((fixed_header_byte & 0xf0) >> 4);
1012 msg->packet_flags = fixed_header_byte & 0x0f;
1013
1014 if (fixed_header_len > 0)
1015 {
1016 gate_memstream_impl_t memstream;
1017
1018 mem_block = gate_memoryblock_create(fixed_header_len);
1019 if (NULL == mem_block)
1020 {
1021 ret = GATE_RESULT_OUTOFMEMORY;
1022 break;
1023 }
1024
1025 if (NULL == gate_memstream_create_static_unmanaged(&memstream, gate_memoryblock_get_content(mem_block), gate_memoryblock_get_size(mem_block), 0))
1026 {
1027 ret = GATE_RESULT_OUTOFMEMORY;
1028 break;
1029 }
1030
1031 ret = gate_stream_transfer_limit(mqtt_stream, fixed_header_len, (gate_stream_t*)&memstream, &content_transferred);
1032 GATE_BREAK_IF_FAILED(ret);
1033 if (content_transferred < fixed_header_len)
1034 {
1035 ret = GATE_RESULT_ENDOFSTREAM;
1036 break;
1037 }
1038 }
1039
1040 switch (msg->packet_type)
1041 {
1042 case gate_mqtt_packet_connect:
1043 ret = deserialize_connect(mem_block, msg);
1044 break;
1045 case gate_mqtt_packet_connect_ack:
1046 ret = deserialize_connect_ack(mem_block, msg);
1047 break;
1048 case gate_mqtt_packet_publish:
1049 ret = deserialize_publish(mem_block, msg);
1050 break;
1051 case gate_mqtt_packet_publish_ack:
1052 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_ack.packet_identifier);
1053 break;
1054 case gate_mqtt_packet_publish_receive:
1055 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_received.packet_identifier);
1056 break;
1057 case gate_mqtt_packet_publish_release:
1058 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_release.packet_identifier);
1059 break;
1060 case gate_mqtt_packet_publish_complete:
1061 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_complete.packet_identifier);
1062 break;
1063 case gate_mqtt_packet_subscribe:
1064 ret = deserialize_subscribe(mem_block, msg);
1065 break;
1066 case gate_mqtt_packet_subscribe_ack:
1067 ret = deserialize_subscribe_ack(mem_block, msg);
1068 break;
1069 case gate_mqtt_packet_unsubscribe:
1070 ret = deserialize_unsubscribe(mem_block, msg);
1071 break;
1072 case gate_mqtt_packet_unsubscribe_ack:
1073 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.unsubscribe_ack.packet_identifier);
1074 break;
1075
1076 case gate_mqtt_packet_ping_request:
1077 case gate_mqtt_packet_ping_response:
1078 case gate_mqtt_packet_disconnect:
1079 /* no content to deserialize */
1080 break;
1081 default:
1082 ret = GATE_RESULT_NOTSUPPORTED;
1083 break;
1084 }
1085 GATE_BREAK_IF_FAILED(ret);
1086
1087 if (ptr_new_memblock != NULL)
1088 {
1089 *ptr_new_memblock = mem_block;
1090 mem_block = NULL;
1091 }
1092 } while (0);
1093
1094 if (mem_block)
1095 {
1096 gate_object_release(mem_block);
1097 mem_block = NULL;
1098 }
1099
1100 return ret;
1101 }
1102