GCC Code Coverage Report


Directory: src/gate/
File: src/gate/net/mqttprotocols.c
Date: 2025-09-14 13:10:38
Exec Total Coverage
Lines: 0 534 0.0%
Functions: 0 28 0.0%
Branches: 0 304 0.0%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28
29 #include "gate/net/mqttprotocols.h"
30 #include "gate/libraries.h"
31 #include "gate/streams.h"
32 #include "gate/results.h"
33 #include "gate/serializers.h"
34 #include "gate/debugging.h"
35
36
37 /* #include <mosquitto.h> */
38
39
40 gate_result_t gate_mqtt_write_byte(gate_stream_t* stream, gate_uint8_t value)
41 {
42 return gate_stream_write(stream, (char const*)&value, 1, NULL);
43 }
44
45 gate_result_t gate_mqtt_write_word(gate_stream_t* stream, gate_uint16_t value)
46 {
47 char buff[4];
48 gate_size_t len = gate_serialize_uint16_b(buff, sizeof(buff), &value);
49 return gate_stream_write_block(stream, buff, len, NULL);
50 }
51
52 gate_result_t gate_mqtt_write_length(gate_stream_t* stream, gate_size_t value)
53 {
54 gate_result_t ret = GATE_RESULT_OK;
55 gate_size_t bufferused = 0;
56 unsigned char encoded_char;
57
58 while (GATE_SUCCEEDED(ret))
59 {
60 encoded_char = value % 128;
61 value /= 128;
62 if (value > 0)
63 {
64 encoded_char |= 128;
65 }
66 ret = gate_stream_write(stream, (char const*)&encoded_char, 1, NULL);
67 if (value == 0)
68 {
69 /* all bits are encoded, we are done */
70 break;
71 }
72 }
73 return ret;
74 }
75
76 gate_result_t gate_mqtt_write_string(gate_stream_t* stream, char const* str, gate_size_t length)
77 {
78 gate_uint16_t const len = (gate_uint16_t)length;
79 gate_result_t ret = gate_mqtt_write_word(stream, len);
80 if (GATE_SUCCEEDED(ret))
81 {
82 ret = gate_stream_write(stream, str, length, NULL);
83 }
84 return ret;
85 }
86
87 gate_result_t gate_mqtt_read_byte(gate_stream_t* stream, gate_uint8_t* value)
88 {
89 char chr = 0;
90 gate_size_t received = 0;
91 gate_result_t ret = gate_stream_read(stream, &chr, 1, &received);
92 if (GATE_SUCCEEDED(ret))
93 {
94 if (received != 1)
95 {
96 ret = GATE_RESULT_ENDOFSTREAM;
97 }
98 else
99 {
100 *value = (gate_uint8_t)chr;
101 }
102 }
103 return ret;
104 }
105
106 gate_result_t gate_mqtt_read_word(gate_stream_t* stream, gate_uint16_t* value)
107 {
108 char buff[2];
109 gate_size_t received = 0;
110 gate_result_t ret = gate_stream_read_block(stream, &buff[0], 2, &received);
111 if (GATE_SUCCEEDED(ret))
112 {
113 if (received != 2)
114 {
115 ret = GATE_RESULT_ENDOFSTREAM;
116 }
117 else
118 {
119 gate_deserialize_uint16_b(buff, 2, value);
120 }
121 }
122 return ret;
123 }
124 gate_result_t gate_mqtt_read_length(gate_stream_t* stream, gate_size_t* ptr_value)
125 {
126 gate_result_t ret = GATE_RESULT_FAILED;
127 gate_size_t value = 0;
128 unsigned char b;
129 gate_size_t len_received = 0;
130 gate_size_t multiplier = 1;
131
132 do
133 {
134 ret = gate_stream_read(stream, (char*)&b, 1, &len_received);
135 GATE_BREAK_IF_FAILED(ret);
136 if (len_received == 0)
137 {
138 ret = GATE_RESULT_ENDOFSTREAM;
139 break;
140 }
141 value += (b & 127) * multiplier;
142 multiplier *= 128;
143 if (multiplier > 128 * 128 * 128)
144 {
145 ret = GATE_RESULT_INVALIDCONTENT;
146 break;
147 }
148 } while ((b & 128) != 0);
149
150 if (ptr_value)
151 {
152 *ptr_value = value;
153 }
154 return ret;
155 }
156 gate_result_t gate_mqtt_read_string(gate_stream_t* stream, gate_strbuilder_t* builder)
157 {
158 char buffer[4096];
159 gate_size_t transferred = 0;
160 gate_size_t received = 0;
161 gate_uint16_t str_length = 0;
162 gate_result_t ret = gate_mqtt_read_word(stream, &str_length);
163 if (GATE_SUCCEEDED(ret))
164 {
165 while (transferred < str_length)
166 {
167 gate_size_t required = str_length - transferred;
168 if (required > sizeof(buffer))
169 {
170 required = sizeof(buffer);
171 }
172 ret = gate_stream_read_block(stream, buffer, required, &received);
173 GATE_BREAK_IF_FAILED(ret);
174 gate_strbuilder_append_text(builder, buffer, received);
175 if (received < required)
176 {
177 ret = GATE_RESULT_INVALIDCONTENT;
178 break;
179 }
180 transferred += received;
181 }
182 }
183 return ret;
184 }
185
186 gate_result_t gate_mqtt_read_string_length_skip_data(gate_stream_t* stream, gate_uint16_t* ptr_length)
187 {
188 char buffer[4096];
189 gate_size_t transferred = 0;
190 gate_size_t received = 0;
191 gate_uint16_t str_length = 0;
192 gate_result_t ret = gate_mqtt_read_word(stream, &str_length);
193 if (GATE_SUCCEEDED(ret))
194 {
195 if (ptr_length)
196 {
197 *ptr_length = str_length;
198 }
199 while (transferred < str_length)
200 {
201 gate_size_t required = str_length - transferred;
202 if (required > sizeof(buffer))
203 {
204 required = sizeof(buffer);
205 }
206 ret = gate_stream_read_block(stream, buffer, required, &received);
207 GATE_BREAK_IF_FAILED(ret);
208 if (received < required)
209 {
210 ret = GATE_RESULT_INVALIDCONTENT;
211 break;
212 }
213 transferred += received;
214 }
215 }
216 return ret;
217 }
218
219
220 static gate_result_t serialize_memstream_packet(
221 gate_stream_t* output_stream,
222 gate_mqtt_packet_type_t packet_type,
223 unsigned char packet_flags,
224 gate_memstream_t* memstream)
225 {
226 gate_uint8_t const fixed_header_byte = (((gate_uint8_t)packet_type & 0x0f) << 4) | (packet_flags & 0x0f);
227 char const* const ptr_content = gate_memstream_get_data(memstream);
228 gate_size_t const content_size = gate_memstream_size(memstream);
229 gate_result_t ret;
230
231 do
232 {
233 ret = gate_mqtt_write_byte(output_stream, fixed_header_byte);
234 GATE_BREAK_IF_FAILED(ret);
235 ret = gate_mqtt_write_length(output_stream, content_size);
236 GATE_BREAK_IF_FAILED(ret);
237 ret = gate_stream_write_block(output_stream, ptr_content, content_size, NULL);
238 GATE_BREAK_IF_FAILED(ret);
239 } while (0);
240 return ret;
241 }
242
243 static gate_result_t serialize_connect_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
244 {
245 gate_result_t ret = GATE_RESULT_OK;
246 gate_memstream_impl_t data_memstream_impl;
247 char data_memstream_buffer[4096];
248
249 gate_memstream_t* const data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, data_memstream_buffer, sizeof(data_memstream_buffer), 0);
250 gate_stream_t* const data_stream = (gate_stream_t*)data_memstream;
251
252 do
253 {
254 /* var header */
255 ret = gate_mqtt_write_string(data_stream, "MQTT", 4);
256 GATE_BREAK_IF_FAILED(ret);
257 ret = gate_mqtt_write_byte(data_stream, msg->content.connect.level);
258 GATE_BREAK_IF_FAILED(ret);
259 ret = gate_mqtt_write_byte(data_stream, msg->content.connect.flags);
260 GATE_BREAK_IF_FAILED(ret);
261 ret = gate_mqtt_write_word(data_stream, msg->content.connect.keep_alive);
262 GATE_BREAK_IF_FAILED(ret);
263
264 /* payload */
265 ret = gate_mqtt_write_string(data_stream, msg->content.connect.client_id, msg->content.connect.client_id_len);
266 GATE_BREAK_IF_FAILED(ret);
267 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_WILLFLAG))
268 {
269 ret = gate_mqtt_write_string(data_stream, msg->content.connect.will_topic, msg->content.connect.will_topic_len);
270 GATE_BREAK_IF_FAILED(ret);
271 ret = gate_mqtt_write_string(data_stream, msg->content.connect.will_message, msg->content.connect.will_message_len);
272 GATE_BREAK_IF_FAILED(ret);
273 }
274 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_USER))
275 {
276 ret = gate_mqtt_write_string(data_stream, msg->content.connect.user, msg->content.connect.user_len);
277 GATE_BREAK_IF_FAILED(ret);
278 }
279 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_PASSWORD))
280 {
281 ret = gate_mqtt_write_string(data_stream, msg->content.connect.password, msg->content.connect.password_len);
282 GATE_BREAK_IF_FAILED(ret);
283 }
284
285 /* build full packet */
286 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_connect, msg->packet_flags, data_memstream);
287 } while (0);
288 return ret;
289 }
290
291 static gate_result_t serialize_connect_ack_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
292 {
293 gate_result_t ret = GATE_RESULT_OK;
294 gate_memstream_impl_t data_memstream_impl;
295 char data_memstream_buffer[64];
296
297 gate_memstream_t* const data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, data_memstream_buffer, sizeof(data_memstream_buffer), 0);
298 gate_stream_t* const data_stream = (gate_stream_t*)data_memstream;
299
300 do
301 {
302 /* var header */
303 ret = gate_mqtt_write_byte(data_stream, msg->content.connect_ack.flags);
304 GATE_BREAK_IF_FAILED(ret);
305 ret = gate_mqtt_write_byte(data_stream, msg->content.connect_ack.return_code);
306 GATE_BREAK_IF_FAILED(ret);
307
308 /* build full packet */
309 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_connect_ack, msg->packet_flags, data_memstream);
310 GATE_BREAK_IF_FAILED(ret);
311
312 /* succeeded */
313 } while (0);
314 return ret;
315 }
316
317 static gate_result_t serialize_pubish_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
318 {
319 gate_result_t ret;
320 gate_size_t written = 0;
321 gate_memstream_t* data_memstream = NULL;
322 gate_stream_t* data_stream;
323
324 do
325 {
326 const gate_size_t content_len = msg->content.publish.data_len + msg->content.publish.topic_len + 5;
327 data_memstream = gate_memstream_create(content_len);
328 if (NULL == data_memstream)
329 {
330 ret = GATE_RESULT_OUTOFMEMORY;
331 break;
332 }
333 data_stream = (gate_stream_t*)data_memstream;
334
335
336 /* var header */
337 ret = gate_mqtt_write_string(data_stream, msg->content.publish.topic, msg->content.publish.topic_len);
338 GATE_BREAK_IF_FAILED(ret);
339
340 if (GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS1) || GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS2))
341 {
342 ret = gate_mqtt_write_word(data_stream, msg->content.publish.packet_identifier);
343 GATE_BREAK_IF_FAILED(ret);
344 }
345
346 /* payload */
347 ret = gate_stream_write_block(data_stream, msg->content.publish.data, msg->content.publish.data_len, &written);
348 GATE_BREAK_IF_FAILED(ret);
349
350 /* build full packet */
351 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_publish, msg->packet_flags, data_memstream);
352 GATE_BREAK_IF_FAILED(ret);
353
354 /* success case reached */
355 } while (0);
356
357 if (data_memstream != NULL)
358 {
359 gate_object_release(data_memstream);
360 }
361
362 return ret;
363 }
364
365
366 static gate_result_t serialize_nocontent_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
367 {
368 gate_memstream_impl_t data_memstream_impl;
369 gate_memstream_t* data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, NULL, 0, 0);
370 return serialize_memstream_packet(output_stream, msg->packet_type, msg->packet_flags, data_memstream);
371 }
372
373 static gate_result_t serialize_packet_identifier_only_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg, gate_uint16_t id)
374 {
375 char buffer[16];
376 gate_memstream_impl_t data_memstream_impl;
377 gate_memstream_t* data_memstream = gate_memstream_create_static_unmanaged(&data_memstream_impl, buffer, sizeof(buffer), 0);
378
379 gate_result_t result = gate_mqtt_write_word((gate_stream_t*)data_memstream, id);
380
381 if (GATE_SUCCEEDED(result))
382 {
383 result = serialize_memstream_packet(output_stream, msg->packet_type, msg->packet_flags, data_memstream);
384 }
385 return result;
386 }
387
388 static gate_result_t serialize_subscribe_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
389 {
390 gate_result_t ret;
391 gate_size_t written = 0;
392 gate_memstream_t* data_memstream = NULL;
393 gate_stream_t* data_stream;
394 gate_mqtt_subscription_t const* subscr;
395 gate_uint16_t subscr_count;
396
397 do
398 {
399 data_memstream = gate_memstream_create(128);
400 if (NULL == data_memstream)
401 {
402 ret = GATE_RESULT_OUTOFMEMORY;
403 break;
404 }
405 data_stream = (gate_stream_t*)data_memstream;
406
407
408 /* var header */
409 ret = gate_mqtt_write_word(data_stream, msg->content.subscribe.packet_identifier);
410 GATE_BREAK_IF_FAILED(ret);
411
412 /* payload */
413 subscr_count = msg->content.subscribe.topics_count;
414 subscr = msg->content.subscribe.topics;
415 for (; subscr_count > 0; --subscr_count, ++subscr)
416 {
417 ret = gate_mqtt_write_string(data_stream, subscr->topic, subscr->topic_len);
418 GATE_BREAK_IF_FAILED(ret);
419
420 ret = gate_mqtt_write_byte(data_stream, subscr->qos);
421 GATE_BREAK_IF_FAILED(ret);
422 }
423
424 GATE_BREAK_IF_FAILED(ret);
425
426 /* build full packet */
427 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_subscribe, 0x02, data_memstream);
428 GATE_BREAK_IF_FAILED(ret);
429
430 /* success case reached */
431 } while (0);
432
433 if (data_memstream != NULL)
434 {
435 gate_object_release(data_memstream);
436 }
437
438 return ret;
439 }
440
441 static gate_result_t serialize_subscribe_ack_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
442 {
443 gate_result_t ret;
444 gate_size_t written = 0;
445 gate_memstream_t* data_memstream = NULL;
446 gate_stream_t* data_stream;
447 gate_uint8_t const* ret_codes;
448 gate_uint16_t ret_codes_count;
449
450 do
451 {
452 data_memstream = gate_memstream_create(128);
453 if (NULL == data_memstream)
454 {
455 ret = GATE_RESULT_OUTOFMEMORY;
456 break;
457 }
458 data_stream = (gate_stream_t*)data_memstream;
459
460
461 /* var header */
462 ret = gate_mqtt_write_word(data_stream, msg->content.subscribe_ack.packet_identifier);
463 GATE_BREAK_IF_FAILED(ret);
464
465 /* payload */
466 ret_codes_count = msg->content.subscribe_ack.ret_codes_count;
467 ret_codes = msg->content.subscribe_ack.ret_codes;
468 for (; ret_codes_count > 0; --ret_codes_count, ++ret_codes)
469 {
470 ret = gate_mqtt_write_byte(data_stream, *ret_codes);
471 GATE_BREAK_IF_FAILED(ret);
472 }
473
474 GATE_BREAK_IF_FAILED(ret);
475
476 /* build full packet */
477 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_subscribe_ack, msg->packet_flags, data_memstream);
478 GATE_BREAK_IF_FAILED(ret);
479
480 /* success case reached */
481 } while (0);
482
483 if (data_memstream != NULL)
484 {
485 gate_object_release(data_memstream);
486 }
487
488 return ret;
489 }
490
491 static gate_result_t serialize_unsubscribe_message(gate_stream_t* output_stream, gate_mqtt_message_t const* msg)
492 {
493 gate_result_t ret;
494 gate_size_t written = 0;
495 gate_memstream_t* data_memstream = NULL;
496 gate_stream_t* data_stream;
497 gate_mqtt_subscription_t const* subscr;
498 gate_uint16_t subscr_count;
499
500 do
501 {
502 data_memstream = gate_memstream_create(128);
503 if (NULL == data_memstream)
504 {
505 ret = GATE_RESULT_OUTOFMEMORY;
506 break;
507 }
508 data_stream = (gate_stream_t*)data_memstream;
509
510
511 /* var header */
512 ret = gate_mqtt_write_word(data_stream, msg->content.subscribe.packet_identifier);
513 GATE_BREAK_IF_FAILED(ret);
514
515 /* payload */
516 subscr_count = msg->content.unsubscribe.topics_count;
517 subscr = msg->content.unsubscribe.topics;
518 for (; subscr_count > 0; --subscr_count, ++subscr)
519 {
520 ret = gate_mqtt_write_string(data_stream, subscr->topic, subscr->topic_len);
521 GATE_BREAK_IF_FAILED(ret);
522 }
523
524 GATE_BREAK_IF_FAILED(ret);
525
526 /* build full packet */
527 ret = serialize_memstream_packet(output_stream, gate_mqtt_packet_subscribe, msg->packet_flags, data_memstream);
528 GATE_BREAK_IF_FAILED(ret);
529
530 /* success case reached */
531 } while (0);
532
533 if (data_memstream != NULL)
534 {
535 gate_object_release(data_memstream);
536 }
537
538 return ret;
539 }
540
541
542 gate_result_t gate_mqtt_send_message(gate_stream_t* mqtt_stream, gate_mqtt_message_t const* msg)
543 {
544 if (!mqtt_stream || !msg)
545 {
546 return GATE_RESULT_INVALIDARG;
547 }
548
549 switch (msg->packet_type)
550 {
551 case gate_mqtt_packet_connect: return serialize_connect_message(mqtt_stream, msg);
552 case gate_mqtt_packet_connect_ack: return serialize_connect_ack_message(mqtt_stream, msg);
553 case gate_mqtt_packet_publish: return serialize_pubish_message(mqtt_stream, msg);
554 case gate_mqtt_packet_publish_ack: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_ack.packet_identifier);
555 case gate_mqtt_packet_publish_receive: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_received.packet_identifier);
556 case gate_mqtt_packet_publish_release: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_release.packet_identifier);
557 case gate_mqtt_packet_publish_complete: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.publish_complete.packet_identifier);
558 case gate_mqtt_packet_subscribe: return serialize_subscribe_message(mqtt_stream, msg);
559 case gate_mqtt_packet_subscribe_ack: return serialize_subscribe_ack_message(mqtt_stream, msg);
560 case gate_mqtt_packet_unsubscribe: return serialize_unsubscribe_message(mqtt_stream, msg);
561 case gate_mqtt_packet_unsubscribe_ack: return serialize_packet_identifier_only_message(mqtt_stream, msg, msg->content.unsubscribe_ack.packet_identifier);
562 case gate_mqtt_packet_ping_request: return serialize_nocontent_message(mqtt_stream, msg);
563 case gate_mqtt_packet_ping_response: return serialize_nocontent_message(mqtt_stream, msg);
564 case gate_mqtt_packet_disconnect: return serialize_nocontent_message(mqtt_stream, msg);
565 default: return GATE_RESULT_NOTSUPPORTED;
566 }
567 }
568
569 static gate_result_t deserialize_connect(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
570 {
571 gate_result_t ret = GATE_RESULT_FAILED;
572
573 do
574 {
575 gate_size_t offset;
576 gate_strbuilder_t builder = GATE_INIT_EMPTY;
577 char buffer[4096];
578 gate_memstream_impl_t block_stream;
579 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
580 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
581
582 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
583 &block_stream, ptr_block_content, block_content_len, block_content_len);
584
585 if (strm == NULL)
586 {
587 ret = GATE_RESULT_OUTOFMEMORY;
588 break;
589 }
590
591 gate_strbuilder_create_static(&builder, buffer, sizeof(buffer), 0);
592
593 ret = gate_mqtt_read_string(strm, &builder);
594 GATE_BREAK_IF_FAILED(ret);
595
596 if (gate_str_compare(gate_strbuilder_ptr(&builder, 0), gate_strbuilder_length(&builder), "MQTT", 4) != 0)
597 {
598 /* Not a valid MQTT connect message */
599 ret = GATE_RESULT_INVALIDHEADER;
600 break;
601 }
602
603 ret = gate_mqtt_read_byte(strm, &msg->content.connect.level);
604 GATE_BREAK_IF_FAILED(ret);
605
606 ret = gate_mqtt_read_byte(strm, &msg->content.connect.flags);
607 GATE_BREAK_IF_FAILED(ret);
608
609 ret = gate_mqtt_read_word(strm, &msg->content.connect.keep_alive);
610 GATE_BREAK_IF_FAILED(ret);
611
612 offset = 10; /* payload starts at block offset 10: */
613
614 /* read client id */
615 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.client_id_len);
616 offset += 2;
617 msg->content.connect.client_id = &ptr_block_content[offset];
618 offset += msg->content.connect.client_id_len;
619
620 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_WILLFLAG))
621 {
622 /* read optional will topic+content */
623 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.will_topic_len);
624 offset += 2;
625 msg->content.connect.will_topic = &ptr_block_content[offset];
626 offset += msg->content.connect.will_topic_len;
627
628 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.will_message_len);
629 offset += 2;
630 msg->content.connect.will_message = &ptr_block_content[offset];
631 offset += msg->content.connect.will_message_len;
632 }
633
634 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_USER))
635 {
636 /* read optional user string */
637 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.user_len);
638 offset += 2;
639 msg->content.connect.user = &ptr_block_content[offset];
640 offset += msg->content.connect.user_len;
641 }
642
643 if (GATE_FLAG_ENABLED(msg->content.connect.flags, GATE_MQTT_CONNECT_FLAG_PASSWORD))
644 {
645 /* read optional will topic+content */
646 gate_mqtt_read_string_length_skip_data(strm, &msg->content.connect.password_len);
647 offset += 2;
648 msg->content.connect.password = &ptr_block_content[offset];
649 offset += msg->content.connect.password_len;
650 }
651 } while (0);
652
653 return ret;
654 }
655
656
657 static gate_result_t deserialize_connect_ack(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
658 {
659 gate_result_t ret = GATE_RESULT_FAILED;
660 gate_uint8_t ack_flags = 0;
661 gate_uint8_t ret_code = 0;
662
663 do
664 {
665 gate_memstream_impl_t block_stream;
666 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
667 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
668
669 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
670 &block_stream, ptr_block_content, block_content_len, block_content_len);
671
672 if (strm == NULL)
673 {
674 ret = GATE_RESULT_OUTOFMEMORY;
675 break;
676 }
677
678
679 if (block_content_len != 2)
680 {
681 ret = GATE_RESULT_INVALIDHEADER;
682 break;
683 }
684
685 ret = gate_mqtt_read_byte(strm, &ack_flags);
686 GATE_BREAK_IF_FAILED(ret);
687 ret = gate_mqtt_read_byte(strm, &ret_code);
688 GATE_BREAK_IF_FAILED(ret);
689
690 msg->content.connect_ack.flags = ack_flags;
691 msg->content.connect_ack.return_code = ret_code;
692 } while (0);
693
694 return ret;
695 }
696
697 static gate_result_t deserialize_package_identifier_only_message(gate_memoryblock_t* block, gate_uint16_t* ptr_packet_id)
698 {
699 gate_result_t ret;
700 void const* const ptr_block_data = gate_memoryblock_get_content(block);
701 gate_size_t const block_len = gate_memoryblock_get_size(block);
702
703 do
704 {
705 gate_memstream_impl_t data_strm;
706 gate_stream_t* input_strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
707 &data_strm, ptr_block_data, block_len, block_len);
708 if (input_strm == NULL)
709 {
710 ret = GATE_RESULT_OUTOFMEMORY;
711 break;
712 }
713
714 ret = gate_mqtt_read_word(input_strm, ptr_packet_id);
715 GATE_BREAK_IF_FAILED(ret);
716
717 /* success case reached */
718 } while (0);
719
720 return ret;
721
722 }
723
724 static gate_result_t deserialize_publish(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
725 {
726 gate_result_t ret = GATE_RESULT_FAILED;
727
728 do
729 {
730 gate_size_t offset;
731 gate_memstream_impl_t block_stream;
732 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
733 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
734
735 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
736 &block_stream, ptr_block_content, block_content_len, block_content_len);
737
738 if (strm == NULL)
739 {
740 ret = GATE_RESULT_OUTOFMEMORY;
741 break;
742 }
743
744 ret = gate_mqtt_read_string_length_skip_data(strm, &msg->content.publish.topic_len);
745 GATE_BREAK_IF_FAILED(ret);
746 offset = 2;
747 msg->content.publish.topic = &ptr_block_content[offset];
748 offset += msg->content.publish.topic_len;
749
750 if (GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS1) || GATE_FLAG_ENABLED(msg->packet_flags, GATE_MQTT_FLAG_QOS2))
751 {
752 ret = gate_mqtt_read_word(strm, &msg->content.publish.packet_identifier);
753 GATE_BREAK_IF_FAILED(ret);
754 offset += 2;
755 }
756 else
757 {
758 msg->content.publish.packet_identifier = 0;
759 }
760
761 msg->content.publish.data_len = (gate_uint32_t)(block_content_len - offset);
762 msg->content.publish.data = &ptr_block_content[offset];
763
764 /* success case reached */
765 } while (0);
766
767 return ret;
768 }
769
770 static gate_result_t deserialize_subscribe(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
771 {
772 gate_result_t ret = GATE_RESULT_FAILED;
773
774 do
775 {
776 gate_size_t offset;
777 gate_memstream_impl_t block_stream;
778 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
779 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
780
781 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
782 &block_stream, ptr_block_content, block_content_len, block_content_len);
783
784 if (strm == NULL)
785 {
786 ret = GATE_RESULT_OUTOFMEMORY;
787 break;
788 }
789
790 ret = gate_mqtt_read_word(strm, &msg->content.subscribe.packet_identifier);
791 GATE_BREAK_IF_FAILED(ret);
792
793 offset = 2;
794 msg->content.subscribe.topics_count = 0;
795 while ((gate_memstream_size(&block_stream) >= 4) && (msg->content.subscribe.topics_count < GATE_MQTT_ARRAY_MAX))
796 {
797 gate_mqtt_subscription_t* ptr_sub = &msg->content.subscribe.topics[msg->content.subscribe.topics_count];
798
799 ret = gate_mqtt_read_string_length_skip_data(strm, &ptr_sub->topic_len);
800 GATE_BREAK_IF_FAILED(ret);
801 offset += 2;
802 ptr_sub->topic = &ptr_block_content[offset];
803 offset += ptr_sub->topic_len;
804
805 ret = gate_mqtt_read_byte(strm, &ptr_sub->qos);
806 GATE_BREAK_IF_FAILED(ret);
807 offset += 1;
808
809 ++msg->content.subscribe.topics_count;
810 }
811 GATE_BREAK_IF_FAILED(ret);
812
813 /* success case reached */
814 } while (0);
815
816 return ret;
817 }
818
819 static gate_result_t deserialize_subscribe_ack(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
820 {
821 gate_result_t ret = GATE_RESULT_FAILED;
822
823 do
824 {
825 gate_size_t offset;
826 gate_memstream_impl_t block_stream;
827 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
828 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
829
830 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
831 &block_stream, ptr_block_content, block_content_len, block_content_len);
832
833 if (strm == NULL)
834 {
835 ret = GATE_RESULT_OUTOFMEMORY;
836 break;
837 }
838
839 ret = gate_mqtt_read_word(strm, &msg->content.subscribe_ack.packet_identifier);
840 GATE_BREAK_IF_FAILED(ret);
841
842 offset = 2;
843 msg->content.subscribe_ack.ret_codes_count = 0;
844 while ((gate_memstream_size(&block_stream) > 0) && (msg->content.subscribe_ack.ret_codes_count < GATE_MQTT_ARRAY_MAX))
845 {
846 ret = gate_mqtt_read_byte(strm, &msg->content.subscribe_ack.ret_codes[msg->content.subscribe_ack.ret_codes_count]);
847 GATE_BREAK_IF_FAILED(ret);
848
849 ++msg->content.subscribe_ack.ret_codes_count;
850 }
851 GATE_BREAK_IF_FAILED(ret);
852
853 /* success case reached */
854 } while (0);
855
856 return ret;
857 }
858
859 static gate_result_t deserialize_unsubscribe(gate_memoryblock_t* block, gate_mqtt_message_t* msg)
860 {
861 gate_result_t ret = GATE_RESULT_FAILED;
862
863 do
864 {
865 gate_size_t offset;
866 gate_memstream_impl_t block_stream;
867 char const* const ptr_block_content = (char const*)gate_memoryblock_get_content(block);
868 gate_size_t const block_content_len = gate_memoryblock_get_size(block);
869
870 gate_stream_t* const strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(
871 &block_stream, ptr_block_content, block_content_len, block_content_len);
872
873 if (strm == NULL)
874 {
875 ret = GATE_RESULT_OUTOFMEMORY;
876 break;
877 }
878
879 ret = gate_mqtt_read_word(strm, &msg->content.unsubscribe.packet_identifier);
880 GATE_BREAK_IF_FAILED(ret);
881
882 offset = 2;
883 msg->content.unsubscribe.topics_count = 0;
884 while ((gate_memstream_size(&block_stream) >= 3) && (msg->content.unsubscribe.topics_count < GATE_MQTT_ARRAY_MAX))
885 {
886 gate_mqtt_subscription_t* ptr_sub = &msg->content.unsubscribe.topics[msg->content.unsubscribe.topics_count];
887
888 ret = gate_mqtt_read_string_length_skip_data(strm, &ptr_sub->topic_len);
889 GATE_BREAK_IF_FAILED(ret);
890 offset += 2;
891 ptr_sub->topic = &ptr_block_content[offset];
892 offset += ptr_sub->topic_len;
893
894 ptr_sub->qos = 0;
895
896 ++msg->content.unsubscribe.topics_count;
897 }
898 GATE_BREAK_IF_FAILED(ret);
899
900 /* success case reached */
901 } while (0);
902
903 return ret;
904 }
905
906
907 gate_result_t gate_mqtt_has_queued_message(gate_stream_t* mqtt_stream)
908 {
909 gate_result_t ret = GATE_RESULT_FAILED;
910 char buffer[1024];
911 char* ptr_buffer = &buffer[0];
912 gate_size_t buffer_capacity = sizeof(buffer);
913 gate_size_t returned;
914 gate_memstream_impl_t memstrm;
915 gate_stream_t* strm = NULL;
916
917 do
918 {
919 gate_uint8_t fixed_header_byte = 0;
920 gate_size_t fixed_header_contentlen = 0;
921 gate_size_t remaining_bytes;
922 gate_size_t fixed_header_length;
923 gate_size_t total_packet_length;
924
925 ret = gate_stream_peek(mqtt_stream, ptr_buffer, buffer_capacity, &returned);
926 GATE_BREAK_IF_FAILED(ret);
927
928 if (returned < 2)
929 {
930 ret = GATE_RESULT_ENDOFSTREAM;
931 break;
932 }
933
934 strm = (gate_stream_t*)gate_memstream_create_static_unmanaged_readonly(&memstrm, ptr_buffer, returned, returned);
935 if (!strm)
936 {
937 ret = GATE_RESULT_OUTOFMEMORY;
938 break;
939 }
940
941 ret = gate_mqtt_read_byte(strm, &fixed_header_byte);
942 GATE_BREAK_IF_FAILED(ret);
943
944 ret = gate_mqtt_read_length(strm, &fixed_header_contentlen);
945 GATE_BREAK_IF_FAILED(ret);
946
947 remaining_bytes = gate_memstream_size(&memstrm);
948 GATE_DEBUG_ASSERT(returned >= remaining_bytes);
949 fixed_header_length = returned - remaining_bytes;
950 total_packet_length = fixed_header_length + fixed_header_contentlen;
951
952 if (total_packet_length > buffer_capacity)
953 {
954 ptr_buffer = (char*)gate_mem_alloc(total_packet_length);
955 if (ptr_buffer == NULL)
956 {
957 ret = GATE_RESULT_OUTOFMEMORY;
958 break;
959 }
960 }
961
962 ret = gate_stream_peek(mqtt_stream, ptr_buffer, total_packet_length, &returned);
963 GATE_BREAK_IF_FAILED(ret);
964
965 if (returned < total_packet_length)
966 {
967 /* not enough bytes in stream to decode a full package */
968 ret = GATE_RESULT_ENDOFSTREAM;
969 break;
970 }
971
972 /* success case, we have a full packet in the stream */
973 ret = GATE_RESULT_OK;
974 } while (0);
975
976 if ((ptr_buffer != NULL) && (ptr_buffer != &buffer[0]))
977 {
978 gate_mem_dealloc(ptr_buffer);
979 }
980
981 return ret;
982 }
983
984
985 gate_result_t gate_mqtt_receive_message(gate_stream_t* mqtt_stream, gate_mqtt_message_t* msg, gate_memoryblock_t** ptr_new_memblock)
986 {
987 gate_result_t ret;
988 gate_memoryblock_t* mem_block = NULL;
989
990 do
991 {
992 gate_uint8_t fixed_header_byte = 0;
993 gate_size_t fixed_header_len = 0;
994 gate_uint64_t content_transferred = 0;
995
996 if (!mqtt_stream || !msg || !ptr_new_memblock)
997 {
998 ret = GATE_RESULT_INVALIDARG;
999 break;
1000 }
1001
1002 ret = gate_mqtt_read_byte(mqtt_stream, &fixed_header_byte);
1003 GATE_BREAK_IF_FAILED(ret);
1004
1005 ret = gate_mqtt_read_length(mqtt_stream, &fixed_header_len);
1006 GATE_BREAK_IF_FAILED(ret);
1007
1008 msg->packet_type = (gate_mqtt_packet_type_t)((fixed_header_byte & 0xf0) >> 4);
1009 msg->packet_flags = fixed_header_byte & 0x0f;
1010
1011 if (fixed_header_len > 0)
1012 {
1013 gate_memstream_impl_t memstream;
1014
1015 mem_block = gate_memoryblock_create(fixed_header_len);
1016 if (NULL == mem_block)
1017 {
1018 ret = GATE_RESULT_OUTOFMEMORY;
1019 break;
1020 }
1021
1022 if (NULL == gate_memstream_create_static_unmanaged(&memstream, gate_memoryblock_get_content(mem_block), gate_memoryblock_get_size(mem_block), 0))
1023 {
1024 ret = GATE_RESULT_OUTOFMEMORY;
1025 break;
1026 }
1027
1028 ret = gate_stream_transfer_limit(mqtt_stream, fixed_header_len, (gate_stream_t*)&memstream, &content_transferred);
1029 GATE_BREAK_IF_FAILED(ret);
1030 if (content_transferred < fixed_header_len)
1031 {
1032 ret = GATE_RESULT_ENDOFSTREAM;
1033 break;
1034 }
1035 }
1036
1037 switch (msg->packet_type)
1038 {
1039 case gate_mqtt_packet_connect:
1040 ret = deserialize_connect(mem_block, msg);
1041 break;
1042 case gate_mqtt_packet_connect_ack:
1043 ret = deserialize_connect_ack(mem_block, msg);
1044 break;
1045 case gate_mqtt_packet_publish:
1046 ret = deserialize_publish(mem_block, msg);
1047 break;
1048 case gate_mqtt_packet_publish_ack:
1049 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_ack.packet_identifier);
1050 break;
1051 case gate_mqtt_packet_publish_receive:
1052 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_received.packet_identifier);
1053 break;
1054 case gate_mqtt_packet_publish_release:
1055 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_release.packet_identifier);
1056 break;
1057 case gate_mqtt_packet_publish_complete:
1058 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.publish_complete.packet_identifier);
1059 break;
1060 case gate_mqtt_packet_subscribe:
1061 ret = deserialize_subscribe(mem_block, msg);
1062 break;
1063 case gate_mqtt_packet_subscribe_ack:
1064 ret = deserialize_subscribe_ack(mem_block, msg);
1065 break;
1066 case gate_mqtt_packet_unsubscribe:
1067 ret = deserialize_unsubscribe(mem_block, msg);
1068 break;
1069 case gate_mqtt_packet_unsubscribe_ack:
1070 ret = deserialize_package_identifier_only_message(mem_block, &msg->content.unsubscribe_ack.packet_identifier);
1071 break;
1072
1073 case gate_mqtt_packet_ping_request:
1074 case gate_mqtt_packet_ping_response:
1075 case gate_mqtt_packet_disconnect:
1076 /* no content to deserialize */
1077 break;
1078 default:
1079 ret = GATE_RESULT_NOTSUPPORTED;
1080 break;
1081 }
1082 GATE_BREAK_IF_FAILED(ret);
1083
1084 if (ptr_new_memblock != NULL)
1085 {
1086 *ptr_new_memblock = mem_block;
1087 mem_block = NULL;
1088 }
1089 } while (0);
1090
1091 if (mem_block)
1092 {
1093 gate_object_release(mem_block);
1094 mem_block = NULL;
1095 }
1096
1097 return ret;
1098 }
1099