GCC Code Coverage Report


Directory: src/gate/
File: src/gate/net/cxx_mqttprotocols.cpp
Date: 2025-12-12 23:40:09
Exec Total Coverage
Lines: 0 399 0.0%
Functions: 0 52 0.0%
Branches: 0 286 0.0%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28
29 #include "gate/net/mqttprotocols.hpp"
30 #include "gate/exceptions.hpp"
31 #include "gate/net/sockets.hpp"
32 #include "gate/net/sockettools.hpp"
33
34 namespace gate
35 {
36 namespace net
37 {
38 MQTT::Message::Message()
39 {
40 gate_mqtt_message_t* ptr = this;
41 gate_mem_clear(ptr, sizeof(gate_mqtt_message_t));
42 }
43
44 MQTT::Message::Message(gate_mqtt_message_t const& src)
45 {
46 gate_mqtt_message_t* ptr = this;
47 gate_mem_clear(ptr, sizeof(gate_mqtt_message_t));
48
49 switch (src.packet_type)
50 {
51 case gate_mqtt_packet_connect:
52 {
53 const StaticString clientId(src.content.connect.client_id, src.content.connect.client_id_len);
54 const StaticString willTopic(src.content.connect.will_topic, src.content.connect.will_topic_len);
55 const StaticString willMsg(src.content.connect.will_message, src.content.connect.will_message_len);
56 const StaticString user(src.content.connect.user, src.content.connect.user_len);
57 const StaticString pass(src.content.connect.password, src.content.connect.password_len);
58 *this = Message::createConnect(src.content.connect.keep_alive, clientId, willTopic, willMsg, user, pass);
59 break;
60 }
61 default:
62 break;
63 }
64 }
65
66 MQTT::Message::Message(Message const& src)
67 : datablock(src.datablock)
68 {
69 gate_mqtt_message_t* ptrDest = this;
70 gate_mqtt_message_t const* ptrSrc = &src;
71 gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t));
72 }
73
74 MQTT::Message& MQTT::Message::operator=(Message const& src)
75 {
76 if (this != &src)
77 {
78 this->datablock = src.datablock;
79 gate_mqtt_message_t* ptrDest = this;
80 gate_mqtt_message_t const* ptrSrc = &src;
81 gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t));
82 }
83 return *this;
84 }
85
86 MQTT::Message::~Message() noexcept
87 {
88 }
89
90 void MQTT::Message::attachData(MemoryBlock& block)
91 {
92 this->datablock = block;
93 }
94
95 MemoryBlock const& MQTT::Message::getData() const
96 {
97 return this->datablock;
98 }
99
100
101 MQTT::Message MQTT::Message::createConnect(uint16_t keepAlive, String const& clientId, String const& willTopic, String const& willMessage, String const& user, String const& password)
102 {
103 size_t blockSize = clientId.length() + willTopic.length() + willMessage.length() + user.length() + password.length() + 10;
104 MemoryBlock block(blockSize);
105
106 Message msg;
107 msg.packet_type = gate_mqtt_packet_connect;
108 msg.packet_flags = 0;
109 msg.content.connect.flags = GATE_MQTT_CONNECT_FLAG_CLEAN;
110 msg.content.connect.level = 4;
111 msg.content.connect.keep_alive = keepAlive;
112
113 char* ptrBlock = static_cast<char*>(block.getContent());
114 size_t copied;
115
116 copied = clientId.copyTo(ptrBlock, blockSize);
117 msg.content.connect.client_id = ptrBlock;
118 msg.content.connect.client_id_len = static_cast<uint16_t>(copied);
119 ptrBlock += (copied + 1);
120 blockSize -= (copied + 1);
121
122 if (!willTopic.empty())
123 {
124 msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_WILLFLAG;
125
126 copied = willTopic.copyTo(ptrBlock, blockSize);
127 msg.content.connect.will_topic = ptrBlock;
128 msg.content.connect.will_topic_len = static_cast<uint16_t>(copied);
129 ptrBlock += (copied + 1);
130 blockSize -= (copied + 1);
131
132 copied = willMessage.copyTo(ptrBlock, blockSize);
133 msg.content.connect.will_message = ptrBlock;
134 msg.content.connect.will_message_len = static_cast<uint16_t>(copied);
135 ptrBlock += (copied + 1);
136 blockSize -= (copied + 1);
137 }
138
139 if (!user.empty())
140 {
141 msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_USER;
142
143 copied = user.copyTo(ptrBlock, blockSize);
144 msg.content.connect.user = ptrBlock;
145 msg.content.connect.user_len = static_cast<uint16_t>(copied);
146 ptrBlock += (copied + 1);
147 blockSize -= (copied + 1);
148 }
149
150 if (!password.empty())
151 {
152 msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_PASSWORD;
153
154 copied = password.copyTo(ptrBlock, blockSize);
155 msg.content.connect.password = ptrBlock;
156 msg.content.connect.password_len = static_cast<uint16_t>(copied);
157 //ptrBlock += (copied + 1); // no further usage
158 //blockSize -= (copied + 1); // no further usage
159 }
160
161 msg.datablock = block;
162 return msg;
163 }
164
165 MQTT::Message MQTT::Message::createConnectAck(uint8_t flags, uint8_t retCode)
166 {
167 Message msg;
168 msg.packet_type = gate_mqtt_packet_connect_ack;
169 msg.packet_flags = 0;
170 msg.content.connect_ack.flags = flags;
171 msg.content.connect_ack.return_code = retCode;
172 return msg;
173 }
174
175 MQTT::Message MQTT::Message::createPublish(uint16_t id, uint8_t flags, String const& topic, String const& data)
176 {
177 size_t blockSize = topic.length() + data.length() + 4;
178 MemoryBlock block(blockSize);
179
180 Message msg;
181 msg.packet_type = gate_mqtt_packet_publish;
182 msg.packet_flags = flags;
183 msg.content.publish.packet_identifier = id;
184
185 char* ptrBlock = static_cast<char*>(block.getContent());
186 size_t copied;
187
188 copied = topic.copyTo(ptrBlock, blockSize);
189 msg.content.publish.topic = ptrBlock;
190 msg.content.publish.topic_len = static_cast<uint16_t>(copied);
191 ptrBlock += (copied + 1);
192 blockSize -= (copied + 1);
193
194 copied = data.copyTo(ptrBlock, blockSize);
195 msg.content.publish.data = ptrBlock;
196 msg.content.publish.data_len = static_cast<uint16_t>(copied);
197 //ptrBlock += (copied + 1); // no further usage
198 //blockSize -= (copied + 1); // no further usage
199
200 msg.datablock = block;
201 return msg;
202 }
203
204 MQTT::Message MQTT::Message::createPublishAck(uint16_t id)
205 {
206 Message msg;
207 msg.packet_type = gate_mqtt_packet_publish_ack;
208 msg.packet_flags = 0;
209 msg.content.publish_ack.packet_identifier = id;
210 return msg;
211 }
212
213 MQTT::Message MQTT::Message::createPublishReceive(uint16_t id)
214 {
215 Message msg;
216 msg.packet_type = gate_mqtt_packet_publish_ack;
217 msg.packet_flags = 0;
218 msg.content.publish_received.packet_identifier = id;
219 return msg;
220 }
221
222 MQTT::Message MQTT::Message::createPublishRelease(uint16_t id)
223 {
224 Message msg;
225 msg.packet_type = gate_mqtt_packet_publish_ack;
226 msg.packet_flags = 0x02;
227 msg.content.publish_release.packet_identifier = id;
228 return msg;
229
230 }
231
232 MQTT::Message MQTT::Message::createPublishComplete(uint16_t id)
233 {
234 Message msg;
235 msg.packet_type = gate_mqtt_packet_publish_ack;
236 msg.packet_flags = 0;
237 msg.content.publish_complete.packet_identifier = id;
238 return msg;
239 }
240
241 MQTT::Message MQTT::Message::createSubscribe(uint16_t id, Array<String> const& topics, Array<uint8_t> const& qos)
242 {
243 Message msg;
244 msg.packet_type = gate_mqtt_packet_subscribe;
245 msg.packet_flags = 0;
246 msg.content.subscribe.packet_identifier = id;
247
248 msg.content.subscribe.topics_count = static_cast<uint16_t>(topics.length());
249 if (msg.content.subscribe.topics_count > GATE_MQTT_ARRAY_MAX)
250 {
251 msg.content.subscribe.topics_count = GATE_MQTT_ARRAY_MAX;
252 }
253
254 ArrayList<size_t> lengths;
255 StringBuilder text;
256 for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx)
257 {
258 text.append(topics[ndx].c_str(), topics[ndx].length());
259 lengths.add(topics[ndx].length());
260 }
261 MemoryBlock block(text.ptr(), text.length());
262 msg.datablock = block;
263 char const* ptrContent = static_cast<char const*>(msg.datablock.getContent());
264
265 size_t offset = 0;
266 for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx)
267 {
268 gate_mqtt_subscription_t& sub = msg.content.subscribe.topics[ndx];
269 sub.topic_len = static_cast<uint16_t>(lengths[ndx]);
270 sub.topic = &ptrContent[offset];
271 sub.qos = qos[ndx];
272 offset += lengths[ndx];
273 }
274
275 return msg;
276 }
277
278 MQTT::Message MQTT::Message::createSubscribe(uint16_t id, String const& topic, uint8_t qos)
279 {
280 ArrayList<String> topics;
281 ArrayList<uint8_t> qoss;
282 topics.add(topic);
283 qoss.add(qos);
284 return MQTT::Message::createSubscribe(id, topics.toArray(), qoss.toArray());
285 }
286
287
288 MQTT::Message MQTT::Message::createSubscribeAck(uint16_t id, Array<uint8_t> const& retCodes)
289 {
290 Message msg;
291 msg.packet_type = gate_mqtt_packet_subscribe_ack;
292 msg.packet_flags = 0;
293 msg.content.subscribe_ack.packet_identifier = id;
294 return msg;
295 }
296
297 MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, Array<String> const& topics)
298 {
299 Message msg;
300 msg.packet_type = gate_mqtt_packet_unsubscribe;
301 msg.packet_flags = 0x02;
302 msg.content.unsubscribe.packet_identifier = id;
303
304 msg.content.unsubscribe.topics_count = static_cast<uint16_t>(topics.length());
305 if (msg.content.unsubscribe.topics_count > GATE_MQTT_ARRAY_MAX)
306 {
307 msg.content.unsubscribe.topics_count = GATE_MQTT_ARRAY_MAX;
308 }
309
310 ArrayList<size_t> lengths;
311 StringBuilder text;
312 for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx)
313 {
314 text.append(topics[ndx].c_str(), topics[ndx].length());
315 lengths.add(topics[ndx].length());
316 }
317 MemoryBlock block(text.ptr(), text.length());
318 msg.datablock = block;
319 char const* ptrContent = static_cast<char const*>(msg.datablock.getContent());
320
321 size_t offset = 0;
322 for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx)
323 {
324 gate_mqtt_subscription_t& sub = msg.content.unsubscribe.topics[ndx];
325 sub.topic_len = static_cast<uint16_t>(lengths[ndx]);
326 sub.topic = &ptrContent[offset];
327 sub.qos = 0;
328 offset += lengths[ndx];
329 }
330 return msg;
331 }
332 MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, String const& topic)
333 {
334 ArrayList<String> topics;
335 topics.add(topic);
336 return MQTT::Message::createUnsubscribe(id, topics.toArray());
337 }
338
339 MQTT::Message MQTT::Message::createUnsubscribeAck(uint16_t id)
340 {
341 Message msg;
342 msg.packet_type = gate_mqtt_packet_unsubscribe_ack;
343 msg.packet_flags = 0;
344 msg.content.unsubscribe_ack.packet_identifier = id;
345 return msg;
346 }
347
348 MQTT::Message MQTT::Message::createPingRequest()
349 {
350 Message msg;
351 msg.packet_type = gate_mqtt_packet_ping_request;
352 msg.packet_flags = 0;
353 return msg;
354 }
355
356 MQTT::Message MQTT::Message::createPingResponse()
357 {
358 Message msg;
359 msg.packet_type = gate_mqtt_packet_ping_response;
360 msg.packet_flags = 0;
361 return msg;
362 }
363
364 MQTT::Message MQTT::Message::createDisconnect()
365 {
366 Message msg;
367 msg.packet_type = gate_mqtt_packet_disconnect;
368 msg.packet_flags = 0;
369 return msg;
370 }
371
372 bool_t MQTT::Message::isType(gate_mqtt_packet_type_t tp) const
373 {
374 return this->packet_type == tp;
375 }
376
377 bool_t MQTT::Message::getConnect(uint16_t& keepAlive, String& clientId, String& willTopic, String& willMessage, String& user, String& password) const
378 {
379 if (!this->isType(gate_mqtt_packet_connect))
380 {
381 return false;
382 }
383 keepAlive = this->content.connect.keep_alive;
384 clientId = String(this->content.connect.client_id, this->content.connect.client_id_len);
385 willTopic = String(this->content.connect.will_topic, this->content.connect.will_topic_len);
386 willMessage = String(this->content.connect.will_message, this->content.connect.will_message_len);
387 user = String(this->content.connect.user, this->content.connect.user_len);
388 password = String(this->content.connect.password, this->content.connect.password_len);
389 return true;
390 }
391
392 bool_t MQTT::Message::getConnectAck(uint8_t& flags, uint8_t& retCode) const
393 {
394 if (!this->isType(gate_mqtt_packet_connect_ack))
395 {
396 return false;
397 }
398 flags = this->content.connect_ack.flags;
399 retCode = this->content.connect_ack.return_code;
400 return true;
401 }
402
403 bool_t MQTT::Message::getPublish(uint16_t& packId, uint8_t& flags, String& topic, String& data) const
404 {
405 if (!this->isType(gate_mqtt_packet_publish))
406 {
407 return false;
408 }
409 packId = this->content.publish.packet_identifier;
410 flags = this->packet_flags;
411 topic = String(this->content.publish.topic, this->content.publish.topic_len);
412 data = String(this->content.publish.data, this->content.publish.data_len);
413 return true;
414 }
415
416 bool_t MQTT::Message::getPublishAck(uint16_t& id) const
417 {
418 if (!this->isType(gate_mqtt_packet_publish_ack))
419 {
420 return false;
421 }
422 id = this->content.publish_ack.packet_identifier;
423 return true;
424 }
425
426 bool_t MQTT::Message::getPublishReceive(uint16_t& id) const
427 {
428 if (!this->isType(gate_mqtt_packet_publish_receive))
429 {
430 return false;
431 }
432 id = this->content.publish_received.packet_identifier;
433 return true;
434 }
435
436 bool_t MQTT::Message::getPublishRelease(uint16_t& id) const
437 {
438 if (!this->isType(gate_mqtt_packet_publish_release))
439 {
440 return false;
441 }
442 id = this->content.publish_release.packet_identifier;
443 return true;
444 }
445
446 bool_t MQTT::Message::getPublishComplete(uint16_t& id) const
447 {
448 if (!this->isType(gate_mqtt_packet_publish_complete))
449 {
450 return false;
451 }
452 id = this->content.publish_complete.packet_identifier;
453 return true;
454 }
455
456 bool_t MQTT::Message::getSubscribe(uint16_t& id, ArrayList<String>& topics, ArrayList<uint8_t>& qos) const
457 {
458 if (!this->isType(gate_mqtt_packet_subscribe))
459 {
460 return false;
461 }
462 id = this->content.subscribe.packet_identifier;
463 for (uint16_t ndx = 0; ndx != this->content.subscribe.topics_count; ++ndx)
464 {
465 topics.add(String(
466 this->content.subscribe.topics[ndx].topic, this->content.subscribe.topics[ndx].topic_len));
467 qos.add(this->content.subscribe.topics[ndx].qos);
468 }
469 return true;
470 }
471
472 bool_t MQTT::Message::getSubscribeAck(uint16_t& id, ArrayList<uint8_t>& retCodes) const
473 {
474 if (!this->isType(gate_mqtt_packet_subscribe_ack))
475 {
476 return false;
477 }
478
479 id = this->content.subscribe_ack.packet_identifier;
480 for (uint16_t ndx = 0; ndx != this->content.subscribe_ack.ret_codes_count; ++ndx)
481 {
482 retCodes.add(this->content.subscribe_ack.ret_codes[ndx]);
483 }
484 return true;
485 }
486
487 bool_t MQTT::Message::getUnsubscribe(uint16_t& id, ArrayList<String>& topics) const
488 {
489 if (!this->isType(gate_mqtt_packet_unsubscribe))
490 {
491 return false;
492 }
493 id = this->content.unsubscribe.packet_identifier;
494 for (uint16_t ndx = 0; ndx != this->content.unsubscribe.topics_count; ++ndx)
495 {
496 topics.add(String(
497 this->content.unsubscribe.topics[ndx].topic, this->content.unsubscribe.topics[ndx].topic_len));
498 }
499 return true;
500 }
501
502 bool_t MQTT::Message::getUnsubscribeAck(uint16_t& id) const
503 {
504 if (!this->isType(gate_mqtt_packet_unsubscribe_ack))
505 {
506 return false;
507 }
508 id = this->content.unsubscribe_ack.packet_identifier;
509 return true;
510 }
511
512
513
514
515 void MQTT::sendMessage(Stream& mqttStream, Message const& msg)
516 {
517 result_t result = gate_mqtt_send_message(mqttStream.c_impl(), &msg);
518 GATEXX_CHECK_EXCEPTION(result);
519 }
520
521 bool_t MQTT::hasQueuedMessage(Stream& mqttStream)
522 {
523 result_t result = gate_mqtt_has_queued_message(mqttStream.c_impl());
524 if (result == GATE_RESULT_ENDOFSTREAM)
525 {
526 return false;
527 }
528 GATEXX_CHECK_EXCEPTION(result);
529 return true;
530 }
531
532
533 void MQTT::receiveMessage(Stream& mqttStream, Message& msg)
534 {
535 gate_memoryblock_t* ptr_block = NULL;
536 result_t result = gate_mqtt_receive_message(mqttStream.c_impl(), &msg, &ptr_block);
537 GATEXX_CHECK_EXCEPTION(result);
538 MemoryBlock newBlock(ptr_block);
539 msg.attachData(newBlock);
540 }
541
542
543 MQTTClient::MQTTClient(Stream& mqttConnection)
544 : transport(mqttConnection)
545 {
546 }
547
548 static Stream createTcpConnection(String const& host, uint16_t port)
549 {
550 StringBuilder builder;
551 builder << host << ":" << port;
552 ControlStream stream = SocketStream::createClient(builder.toString());
553 return stream;
554 }
555
556 MQTTClient::MQTTClient(String const& mqttHost, uint16_t port)
557 : transport(createTcpConnection(mqttHost, port))
558 {
559 }
560
561 MQTTClient::~MQTTClient() noexcept
562 {
563 }
564
565 uint16_t MQTTClient::getNextPacketId()
566 {
567 int32_t id = ++this->nextPacketId;
568 return static_cast<uint16_t>(id);
569 }
570
571
572 void MQTTClient::connect(String const& clientId, String const& user, String const& password, uint16_t timeoutSeconds)
573 {
574 const MQTT::Message request = MQTT::Message::createConnect(timeoutSeconds, clientId, String(), String(), user, password);
575 MQTT::sendMessage(this->transport, request);
576
577 MQTT::Message response;
578 MQTT::receiveMessage(this->transport, response);
579
580 if (response.packet_type != gate_mqtt_packet_connect_ack)
581 {
582 raiseException(results::InvalidHeader, "Invalid response message type");
583 }
584
585 if (response.content.connect_ack.return_code != 0)
586 {
587 raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code);
588 }
589 }
590
591 void MQTTClient::disconnect()
592 {
593 const MQTT::Message request = MQTT::Message::createDisconnect();
594 MQTT::sendMessage(this->transport, request);
595 }
596
597
598 MQTTClient::PacketId MQTTClient::subscribe(String const& topic)
599 {
600 const PacketId packetId = this->getNextPacketId();
601 const MQTT::Message request = MQTT::Message::createSubscribe(packetId, topic);
602 MQTT::sendMessage(this->transport, request);
603
604 MQTT::Message response;
605 MQTT::receiveMessage(this->transport, response);
606
607 if (response.packet_type != gate_mqtt_packet_subscribe_ack)
608 {
609 raiseException(results::InvalidHeader, "Invalid response message type");
610 }
611 if (response.content.subscribe_ack.packet_identifier != packetId)
612 {
613 raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code);
614 }
615 return packetId;
616 }
617
618 MQTTClient::PacketId MQTTClient::unsubscribe(String const& topic)
619 {
620 const PacketId packetId = this->getNextPacketId();
621 const MQTT::Message request = MQTT::Message::createUnsubscribe(packetId, topic);
622 MQTT::sendMessage(this->transport, request);
623
624 MQTT::Message response;
625 MQTT::receiveMessage(this->transport, response);
626
627 if (response.packet_type != gate_mqtt_packet_unsubscribe_ack)
628 {
629 raiseException(results::InvalidHeader, "Invalid response message type");
630 }
631 if (response.content.unsubscribe_ack.packet_identifier != packetId)
632 {
633 raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code);
634 }
635 return packetId;
636 }
637
638 MQTTClient::PacketId MQTTClient::publish(String const& topic, String const& data, uint8_t flags)
639 {
640 const PacketId packetId = this->getNextPacketId();
641 const MQTT::Message request = MQTT::Message::createPublish(packetId, flags, topic, data);
642 MQTT::sendMessage(this->transport, request);
643
644 if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS1))
645 {
646 MQTT::Message response;
647 MQTT::receiveMessage(this->transport, response);
648
649 if (response.packet_type != gate_mqtt_packet_publish_ack)
650 {
651 raiseException(results::InvalidHeader, "Invalid response message type, publish_ack expected", NULL, (int32_t)response.packet_type);
652 }
653 if (response.content.publish_ack.packet_identifier != packetId)
654 {
655 raiseException(results::InvalidData, "Publish-acknowledge failed", NULL, response.content.publish_ack.packet_identifier);
656 }
657 }
658
659 if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS2))
660 {
661 MQTT::Message response;
662 MQTT::receiveMessage(this->transport, response);
663
664 if (response.packet_type != gate_mqtt_packet_publish_receive)
665 {
666 raiseException(results::InvalidHeader, "Invalid response message type, publish_rec expected", NULL, (int32_t)response.packet_type);
667 }
668 if (response.content.publish_received.packet_identifier != packetId)
669 {
670 raiseException(results::InvalidData, "Publish-received failed", NULL, response.content.publish_received.packet_identifier);
671 }
672
673 // send PUBREL msg
674 const MQTT::Message request2 = MQTT::Message::createPublishRelease(packetId);
675 MQTT::sendMessage(this->transport, request);
676
677 MQTT::Message response2;
678 MQTT::receiveMessage(this->transport, response2);
679
680 if (response2.packet_type != gate_mqtt_packet_publish_complete)
681 {
682 raiseException(results::InvalidHeader, "Invalid response message type, publish_comp expected", NULL, (int32_t)response2.packet_type);
683 }
684 if (response2.content.publish_complete.packet_identifier != packetId)
685 {
686 raiseException(results::InvalidData, "Publish-complete failed", NULL, response2.content.publish_complete.packet_identifier);
687 }
688 }
689
690 return packetId;
691 }
692
693 void MQTTClient::sendPing()
694 {
695 MQTT::Message request = MQTT::Message::createPingRequest();
696 MQTT::sendMessage(this->transport, request);
697 }
698
699 void MQTTClient::sendPingResponse()
700 {
701 MQTT::Message request = MQTT::Message::createPingResponse();
702 MQTT::sendMessage(this->transport, request);
703 }
704
705
706 bool_t MQTTClient::hasNextMessage()
707 {
708 return MQTT::hasQueuedMessage(this->transport);
709 }
710
711 MQTT::Message MQTTClient::getNextMessage()
712 {
713 MQTT::Message msg;
714 MQTT::receiveMessage(this->transport, msg);
715 return msg;
716 }
717
718
719 } // end of namespace net
720 } // end of namespace gate
721