GCC Code Coverage Report


Directory: src/gate/
File: src/gate/net/cxx_mqttprotocols.cpp
Date: 2025-09-14 13:10:38
Exec Total Coverage
Lines: 0 403 0.0%
Functions: 0 52 0.0%
Branches: 0 286 0.0%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28
29 #include "gate/net/mqttprotocols.hpp"
30 #include "gate/exceptions.hpp"
31 #include "gate/net/sockets.hpp"
32 #include "gate/net/sockettools.hpp"
33
34 namespace gate
35 {
36 namespace net
37 {
38 MQTT::Message::Message()
39 {
40 gate_mqtt_message_t* ptr = this;
41 gate_mem_clear(ptr, sizeof(gate_mqtt_message_t));
42 }
43
44 MQTT::Message::Message(gate_mqtt_message_t const& src)
45 {
46 gate_mqtt_message_t* ptr = this;
47 gate_mem_clear(ptr, sizeof(gate_mqtt_message_t));
48
49 switch (src.packet_type)
50 {
51 case gate_mqtt_packet_connect:
52 {
53 const StaticString clientId(src.content.connect.client_id, src.content.connect.client_id_len);
54 const StaticString willTopic(src.content.connect.will_topic, src.content.connect.will_topic_len);
55 const StaticString willMsg(src.content.connect.will_message, src.content.connect.will_message_len);
56 const StaticString user(src.content.connect.user, src.content.connect.user_len);
57 const StaticString pass(src.content.connect.password, src.content.connect.password_len);
58 *this = Message::createConnect(src.content.connect.keep_alive, clientId, willTopic, willMsg, user, pass);
59 break;
60 }
61 default:
62 break;
63 }
64 }
65
66 MQTT::Message::Message(Message const& src)
67 : datablock(src.datablock)
68 {
69 gate_mqtt_message_t* ptrDest = this;
70 gate_mqtt_message_t const* ptrSrc = &src;
71 gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t));
72 }
73
74 MQTT::Message& MQTT::Message::operator=(Message const& src)
75 {
76 if (this != &src)
77 {
78 this->datablock = src.datablock;
79 gate_mqtt_message_t* ptrDest = this;
80 gate_mqtt_message_t const* ptrSrc = &src;
81 gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t));
82 }
83 return *this;
84 }
85
86 MQTT::Message::~Message() noexcept
87 {
88 }
89
90 void MQTT::Message::attachData(MemoryBlock& block)
91 {
92 this->datablock = block;
93 }
94
95 MemoryBlock const& MQTT::Message::getData() const
96 {
97 return this->datablock;
98 }
99
100
101 MQTT::Message MQTT::Message::createConnect(uint16_t keepAlive, String const& clientId, String const& willTopic, String const& willMessage, String const& user, String const& password)
102 {
103 size_t blockSize = clientId.length() + willTopic.length() + willMessage.length() + user.length() + password.length() + 10;
104 MemoryBlock block(blockSize);
105
106 Message msg;
107 msg.packet_type = gate_mqtt_packet_connect;
108 msg.packet_flags = 0;
109 msg.content.connect.flags = GATE_MQTT_CONNECT_FLAG_CLEAN;
110 msg.content.connect.level = 4;
111 msg.content.connect.keep_alive = keepAlive;
112
113 char* ptrBlock = static_cast<char*>(block.getContent());
114 size_t copied;
115
116 copied = clientId.copyTo(ptrBlock, blockSize);
117 msg.content.connect.client_id = ptrBlock;
118 msg.content.connect.client_id_len = static_cast<uint16_t>(copied);
119 ptrBlock += (copied + 1);
120 blockSize -= (copied + 1);
121
122 if (!willTopic.empty())
123 {
124 msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_WILLFLAG;
125
126 copied = willTopic.copyTo(ptrBlock, blockSize);
127 msg.content.connect.will_topic = ptrBlock;
128 msg.content.connect.will_topic_len = static_cast<uint16_t>(copied);
129 ptrBlock += (copied + 1);
130 blockSize -= (copied + 1);
131
132 copied = willMessage.copyTo(ptrBlock, blockSize);
133 msg.content.connect.will_message = ptrBlock;
134 msg.content.connect.will_message_len = static_cast<uint16_t>(copied);
135 ptrBlock += (copied + 1);
136 blockSize -= (copied + 1);
137 }
138
139 if (!user.empty())
140 {
141 msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_USER;
142
143 copied = user.copyTo(ptrBlock, blockSize);
144 msg.content.connect.user = ptrBlock;
145 msg.content.connect.user_len = static_cast<uint16_t>(copied);
146 ptrBlock += (copied + 1);
147 blockSize -= (copied + 1);
148 }
149
150 if (!password.empty())
151 {
152 msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_PASSWORD;
153
154 copied = password.copyTo(ptrBlock, blockSize);
155 msg.content.connect.password = ptrBlock;
156 msg.content.connect.password_len = static_cast<uint16_t>(copied);
157 ptrBlock += (copied + 1);
158 blockSize -= (copied + 1);
159 }
160
161 msg.datablock = block;
162 return msg;
163 }
164
165 MQTT::Message MQTT::Message::createConnectAck(uint8_t flags, uint8_t retCode)
166 {
167 Message msg;
168 msg.packet_type = gate_mqtt_packet_connect_ack;
169 msg.packet_flags = 0;
170 msg.content.connect_ack.flags = flags;
171 msg.content.connect_ack.return_code = retCode;
172 return msg;
173 }
174
175 MQTT::Message MQTT::Message::createPublish(uint16_t id, uint8_t flags, String const& topic, String const& data)
176 {
177 size_t blockSize = topic.length() + data.length() + 4;
178 MemoryBlock block(blockSize);
179
180 Message msg;
181 msg.packet_type = gate_mqtt_packet_publish;
182 msg.packet_flags = flags;
183 msg.content.publish.packet_identifier = id;
184
185 char* ptrBlock = static_cast<char*>(block.getContent());
186 size_t copied;
187
188 copied = topic.copyTo(ptrBlock, blockSize);
189 msg.content.publish.topic = ptrBlock;
190 msg.content.publish.topic_len = static_cast<uint16_t>(copied);
191 ptrBlock += (copied + 1);
192 blockSize -= (copied + 1);
193
194 copied = data.copyTo(ptrBlock, blockSize);
195 msg.content.publish.data = ptrBlock;
196 msg.content.publish.data_len = static_cast<uint16_t>(copied);
197 ptrBlock += (copied + 1);
198 blockSize -= (copied + 1);
199
200 msg.datablock = block;
201 return msg;
202 }
203
204 MQTT::Message MQTT::Message::createPublishAck(uint16_t id)
205 {
206 Message msg;
207 msg.packet_type = gate_mqtt_packet_publish_ack;
208 msg.packet_flags = 0;
209 msg.content.publish_ack.packet_identifier = id;
210 return msg;
211 }
212
213 MQTT::Message MQTT::Message::createPublishReceive(uint16_t id)
214 {
215 Message msg;
216 msg.packet_type = gate_mqtt_packet_publish_ack;
217 msg.packet_flags = 0;
218 msg.content.publish_received.packet_identifier = id;
219 return msg;
220 }
221
222 MQTT::Message MQTT::Message::createPublishRelease(uint16_t id)
223 {
224 Message msg;
225 msg.packet_type = gate_mqtt_packet_publish_ack;
226 msg.packet_flags = 0x02;
227 msg.content.publish_release.packet_identifier = id;
228 return msg;
229
230 }
231
232 MQTT::Message MQTT::Message::createPublishComplete(uint16_t id)
233 {
234 Message msg;
235 msg.packet_type = gate_mqtt_packet_publish_ack;
236 msg.packet_flags = 0;
237 msg.content.publish_complete.packet_identifier = id;
238 return msg;
239 }
240
241 MQTT::Message MQTT::Message::createSubscribe(uint16_t id, Array<String> const& topics, Array<uint8_t> const& qos)
242 {
243 Message msg;
244 msg.packet_type = gate_mqtt_packet_subscribe;
245 msg.packet_flags = 0;
246 msg.content.subscribe.packet_identifier = id;
247
248 msg.content.subscribe.topics_count = static_cast<uint16_t>(topics.length());
249 if (msg.content.subscribe.topics_count > GATE_MQTT_ARRAY_MAX)
250 {
251 msg.content.subscribe.topics_count = GATE_MQTT_ARRAY_MAX;
252 }
253
254 ArrayList<size_t> lengths;
255 StringBuilder text;
256 for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx)
257 {
258 text.append(topics[ndx].c_str(), topics[ndx].length());
259 lengths.add(topics[ndx].length());
260 }
261 MemoryBlock block(text.ptr(), text.length());
262 msg.datablock = block;
263 char const* ptrContent = static_cast<char const*>(msg.datablock.getContent());
264
265 size_t offset = 0;
266 for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx)
267 {
268 gate_mqtt_subscription_t& sub = msg.content.subscribe.topics[ndx];
269 sub.topic_len = static_cast<uint16_t>(lengths[ndx]);
270 sub.topic = &ptrContent[offset];
271 sub.qos = qos[ndx];
272 offset += lengths[ndx];
273 }
274
275 return msg;
276 }
277
278 MQTT::Message MQTT::Message::createSubscribe(uint16_t id, String const& topic, uint8_t qos)
279 {
280 ArrayList<String> topics;
281 ArrayList<uint8_t> qoss;
282 topics.add(topic);
283 qoss.add(qos);
284 return MQTT::Message::createSubscribe(id, topics.toArray(), qoss.toArray());
285 }
286
287
288 MQTT::Message MQTT::Message::createSubscribeAck(uint16_t id, Array<uint8_t> const& retCodes)
289 {
290 Message msg;
291 msg.packet_type = gate_mqtt_packet_subscribe_ack;
292 msg.packet_flags = 0;
293 msg.content.subscribe_ack.packet_identifier = id;
294 return msg;
295 }
296
297 MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, Array<String> const& topics)
298 {
299 Message msg;
300 msg.packet_type = gate_mqtt_packet_unsubscribe;
301 msg.packet_flags = 0x02;
302 msg.content.unsubscribe.packet_identifier = id;
303
304 msg.content.unsubscribe.topics_count = static_cast<uint16_t>(topics.length());
305 if (msg.content.unsubscribe.topics_count > GATE_MQTT_ARRAY_MAX)
306 {
307 msg.content.unsubscribe.topics_count = GATE_MQTT_ARRAY_MAX;
308 }
309
310 ArrayList<size_t> lengths;
311 StringBuilder text;
312 for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx)
313 {
314 text.append(topics[ndx].c_str(), topics[ndx].length());
315 lengths.add(topics[ndx].length());
316 }
317 MemoryBlock block(text.ptr(), text.length());
318 msg.datablock = block;
319 char const* ptrContent = static_cast<char const*>(msg.datablock.getContent());
320
321 size_t offset = 0;
322 for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx)
323 {
324 gate_mqtt_subscription_t& sub = msg.content.unsubscribe.topics[ndx];
325 sub.topic_len = static_cast<uint16_t>(lengths[ndx]);
326 sub.topic = &ptrContent[offset];
327 sub.qos = 0;
328 offset += lengths[ndx];
329 }
330 return msg;
331 }
332 MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, String const& topic)
333 {
334 ArrayList<String> topics;
335 topics.add(topic);
336 return MQTT::Message::createUnsubscribe(id, topics.toArray());
337 }
338
339 MQTT::Message MQTT::Message::createUnsubscribeAck(uint16_t id)
340 {
341 Message msg;
342 msg.packet_type = gate_mqtt_packet_unsubscribe_ack;
343 msg.packet_flags = 0;
344 msg.content.unsubscribe_ack.packet_identifier = id;
345 return msg;
346 }
347
348 MQTT::Message MQTT::Message::createPingRequest()
349 {
350 Message msg;
351 msg.packet_type = gate_mqtt_packet_ping_request;
352 msg.packet_flags = 0;
353 return msg;
354 }
355
356 MQTT::Message MQTT::Message::createPingResponse()
357 {
358 Message msg;
359 msg.packet_type = gate_mqtt_packet_ping_response;
360 msg.packet_flags = 0;
361 return msg;
362 }
363
364 MQTT::Message MQTT::Message::createDisconnect()
365 {
366 Message msg;
367 msg.packet_type = gate_mqtt_packet_disconnect;
368 msg.packet_flags = 0;
369 return msg;
370 }
371
372 bool_t MQTT::Message::isType(gate_mqtt_packet_type_t tp) const
373 {
374 return this->packet_type == tp;
375 }
376
377 bool_t MQTT::Message::getConnect(uint16_t& keepAlive, String& clientId, String& willTopic, String& willMessage, String& user, String& password) const
378 {
379 if (!this->isType(gate_mqtt_packet_connect))
380 {
381 return false;
382 }
383 keepAlive = this->content.connect.keep_alive;
384 clientId = String(this->content.connect.client_id, this->content.connect.client_id_len);
385 willTopic = String(this->content.connect.will_topic, this->content.connect.will_topic_len);
386 willMessage = String(this->content.connect.will_message, this->content.connect.will_message_len);
387 user = String(this->content.connect.user, this->content.connect.user_len);
388 password = String(this->content.connect.password, this->content.connect.password_len);
389 return true;
390 }
391
392 bool_t MQTT::Message::getConnectAck(uint8_t& flags, uint8_t& retCode) const
393 {
394 if (!this->isType(gate_mqtt_packet_connect_ack))
395 {
396 return false;
397 }
398 flags = this->content.connect_ack.flags;
399 retCode = this->content.connect_ack.return_code;
400 return true;
401 }
402
403 bool_t MQTT::Message::getPublish(uint16_t& packId, uint8_t& flags, String& topic, String& data) const
404 {
405 if (!this->isType(gate_mqtt_packet_publish))
406 {
407 return false;
408 }
409 packId = this->content.publish.packet_identifier;
410 flags = this->packet_flags;
411 topic = String(this->content.publish.topic, this->content.publish.topic_len);
412 data = String(this->content.publish.data, this->content.publish.data_len);
413 return true;
414 }
415
416 bool_t MQTT::Message::getPublishAck(uint16_t& id) const
417 {
418 if (!this->isType(gate_mqtt_packet_publish_ack))
419 {
420 return false;
421 }
422 id = this->content.publish_ack.packet_identifier;
423 return true;
424 }
425
426 bool_t MQTT::Message::getPublishReceive(uint16_t& id) const
427 {
428 if (!this->isType(gate_mqtt_packet_publish_receive))
429 {
430 return false;
431 }
432 id = this->content.publish_received.packet_identifier;
433 return true;
434 }
435
436 bool_t MQTT::Message::getPublishRelease(uint16_t& id) const
437 {
438 if (!this->isType(gate_mqtt_packet_publish_release))
439 {
440 return false;
441 }
442 id = this->content.publish_release.packet_identifier;
443 return true;
444 }
445
446 bool_t MQTT::Message::getPublishComplete(uint16_t& id) const
447 {
448 if (!this->isType(gate_mqtt_packet_publish_complete))
449 {
450 return false;
451 }
452 id = this->content.publish_complete.packet_identifier;
453 return true;
454 }
455
456 bool_t MQTT::Message::getSubscribe(uint16_t& id, ArrayList<String>& topics, ArrayList<uint8_t>& qos) const
457 {
458 if (!this->isType(gate_mqtt_packet_subscribe))
459 {
460 return false;
461 }
462 id = this->content.subscribe.packet_identifier;
463 for (uint16_t ndx = 0; ndx != this->content.subscribe.topics_count; ++ndx)
464 {
465 topics.add(String(
466 this->content.subscribe.topics[ndx].topic, this->content.subscribe.topics[ndx].topic_len));
467 qos.add(this->content.subscribe.topics[ndx].qos);
468 }
469 return true;
470 return true;
471 }
472
473 bool_t MQTT::Message::getSubscribeAck(uint16_t& id, ArrayList<uint8_t>& retCodes) const
474 {
475 if (!this->isType(gate_mqtt_packet_subscribe_ack))
476 {
477 return false;
478 }
479
480 id = this->content.subscribe_ack.packet_identifier;
481 for (uint16_t ndx = 0; ndx != this->content.subscribe_ack.ret_codes_count; ++ndx)
482 {
483 retCodes.add(this->content.subscribe_ack.ret_codes[ndx]);
484 }
485 return true;
486 }
487
488 bool_t MQTT::Message::getUnsubscribe(uint16_t& id, ArrayList<String>& topics) const
489 {
490 if (!this->isType(gate_mqtt_packet_unsubscribe))
491 {
492 return false;
493 }
494 id = this->content.unsubscribe.packet_identifier;
495 for (uint16_t ndx = 0; ndx != this->content.unsubscribe.topics_count; ++ndx)
496 {
497 topics.add(String(
498 this->content.unsubscribe.topics[ndx].topic, this->content.unsubscribe.topics[ndx].topic_len));
499 }
500 return true;
501 }
502
503 bool_t MQTT::Message::getUnsubscribeAck(uint16_t& id) const
504 {
505 if (!this->isType(gate_mqtt_packet_unsubscribe_ack))
506 {
507 return false;
508 }
509 id = this->content.unsubscribe_ack.packet_identifier;
510 return true;
511 }
512
513
514
515
516 void MQTT::sendMessage(Stream& mqttStream, Message const& msg)
517 {
518 result_t result = gate_mqtt_send_message(mqttStream.c_impl(), &msg);
519 GATEXX_CHECK_EXCEPTION(result);
520 }
521
522 bool_t MQTT::hasQueuedMessage(Stream& mqttStream)
523 {
524 result_t result = gate_mqtt_has_queued_message(mqttStream.c_impl());
525 if (result == GATE_RESULT_ENDOFSTREAM)
526 {
527 return false;
528 }
529 GATEXX_CHECK_EXCEPTION(result);
530 return true;
531 }
532
533
534 void MQTT::receiveMessage(Stream& mqttStream, Message& msg)
535 {
536 gate_memoryblock_t* ptr_block = NULL;
537 result_t result = gate_mqtt_receive_message(mqttStream.c_impl(), &msg, &ptr_block);
538 GATEXX_CHECK_EXCEPTION(result);
539 MemoryBlock newBlock(ptr_block);
540 msg.attachData(newBlock);
541 }
542
543
544 MQTTClient::MQTTClient(Stream& mqttConnection)
545 : transport(mqttConnection)
546 {
547 }
548
549 static Stream createTcpConnection(String const& host, uint16_t port)
550 {
551 StringBuilder builder;
552 builder << host << ":" << port;
553 ControlStream stream = SocketStream::createClient(builder.toString());
554 return stream;
555 }
556
557 MQTTClient::MQTTClient(String const& mqttHost, uint16_t port)
558 : transport(createTcpConnection(mqttHost, port))
559 {
560 }
561
562 MQTTClient::~MQTTClient() noexcept
563 {
564 }
565
566 uint16_t MQTTClient::getNextPacketId()
567 {
568 int32_t id = ++this->nextPacketId;
569 return static_cast<uint16_t>(id);
570 }
571
572
573 void MQTTClient::connect(String const& clientId, String const& user, String const& password, uint16_t timeoutSeconds)
574 {
575 const MQTT::Message request = MQTT::Message::createConnect(timeoutSeconds, clientId, String(), String(), user, password);
576 MQTT::sendMessage(this->transport, request);
577
578 MQTT::Message response;
579 MQTT::receiveMessage(this->transport, response);
580
581 if (response.packet_type != gate_mqtt_packet_connect_ack)
582 {
583 raiseException(results::InvalidHeader, "Invalid response message type");
584 }
585
586 if (response.content.connect_ack.return_code != 0)
587 {
588 raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code);
589 }
590 }
591
592 void MQTTClient::disconnect()
593 {
594 const MQTT::Message request = MQTT::Message::createDisconnect();
595 MQTT::sendMessage(this->transport, request);
596 }
597
598
599 MQTTClient::PacketId MQTTClient::subscribe(String const& topic)
600 {
601 const PacketId packetId = this->getNextPacketId();
602 const MQTT::Message request = MQTT::Message::createSubscribe(packetId, topic);
603 MQTT::sendMessage(this->transport, request);
604
605 MQTT::Message response;
606 MQTT::receiveMessage(this->transport, response);
607
608 if (response.packet_type != gate_mqtt_packet_subscribe_ack)
609 {
610 raiseException(results::InvalidHeader, "Invalid response message type");
611 }
612 if (response.content.subscribe_ack.packet_identifier != packetId)
613 {
614 raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code);
615 }
616 return packetId;
617 }
618
619 MQTTClient::PacketId MQTTClient::unsubscribe(String const& topic)
620 {
621 const PacketId packetId = this->getNextPacketId();
622 const MQTT::Message request = MQTT::Message::createUnsubscribe(packetId, topic);
623 MQTT::sendMessage(this->transport, request);
624
625 MQTT::Message response;
626 MQTT::receiveMessage(this->transport, response);
627
628 if (response.packet_type != gate_mqtt_packet_unsubscribe_ack)
629 {
630 raiseException(results::InvalidHeader, "Invalid response message type");
631 }
632 if (response.content.unsubscribe_ack.packet_identifier != packetId)
633 {
634 raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code);
635 }
636 return packetId;
637 }
638
639 MQTTClient::PacketId MQTTClient::publish(String const& topic, String const& data, uint8_t flags)
640 {
641 const PacketId packetId = this->getNextPacketId();
642 const MQTT::Message request = MQTT::Message::createPublish(packetId, flags, topic, data);
643 MQTT::sendMessage(this->transport, request);
644
645 if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS1))
646 {
647 MQTT::Message response;
648 MQTT::receiveMessage(this->transport, response);
649
650 if (response.packet_type != gate_mqtt_packet_publish_ack)
651 {
652 raiseException(results::InvalidHeader, "Invalid response message type, publish_ack expected", NULL, (int32_t)response.packet_type);
653 }
654 if (response.content.publish_ack.packet_identifier != packetId)
655 {
656 raiseException(results::InvalidData, "Publish-acknowledge failed", NULL, response.content.publish_ack.packet_identifier);
657 }
658 }
659
660 if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS2))
661 {
662 MQTT::Message response;
663 MQTT::receiveMessage(this->transport, response);
664
665 if (response.packet_type != gate_mqtt_packet_publish_receive)
666 {
667 raiseException(results::InvalidHeader, "Invalid response message type, publish_rec expected", NULL, (int32_t)response.packet_type);
668 }
669 if (response.content.publish_received.packet_identifier != packetId)
670 {
671 raiseException(results::InvalidData, "Publish-received failed", NULL, response.content.publish_received.packet_identifier);
672 }
673
674 // send PUBREL msg
675 const MQTT::Message request2 = MQTT::Message::createPublishRelease(packetId);
676 MQTT::sendMessage(this->transport, request);
677
678 MQTT::Message response2;
679 MQTT::receiveMessage(this->transport, response2);
680
681 if (response2.packet_type != gate_mqtt_packet_publish_complete)
682 {
683 raiseException(results::InvalidHeader, "Invalid response message type, publish_comp expected", NULL, (int32_t)response2.packet_type);
684 }
685 if (response2.content.publish_complete.packet_identifier != packetId)
686 {
687 raiseException(results::InvalidData, "Publish-complete failed", NULL, response2.content.publish_complete.packet_identifier);
688 }
689 }
690
691 return packetId;
692 }
693
694 void MQTTClient::sendPing()
695 {
696 MQTT::Message request = MQTT::Message::createPingRequest();
697 MQTT::sendMessage(this->transport, request);
698 }
699
700 void MQTTClient::sendPingResponse()
701 {
702 MQTT::Message request = MQTT::Message::createPingResponse();
703 MQTT::sendMessage(this->transport, request);
704 }
705
706
707 bool_t MQTTClient::hasNextMessage()
708 {
709 return MQTT::hasQueuedMessage(this->transport);
710 }
711
712 MQTT::Message MQTTClient::getNextMessage()
713 {
714 MQTT::Message msg;
715 MQTT::receiveMessage(this->transport, msg);
716 return msg;
717 }
718
719
720 } // end of namespace net
721 } // end of namespace gate
722