Line |
Branch |
Exec |
Source |
1 |
|
|
/* GATE PROJECT LICENSE: |
2 |
|
|
+----------------------------------------------------------------------------+ |
3 |
|
|
| Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> | |
4 |
|
|
| All rights reserved. | |
5 |
|
|
| | |
6 |
|
|
| Redistribution and use in source and binary forms, with or without | |
7 |
|
|
| modification, are permitted provided that the following conditions are met:| |
8 |
|
|
| | |
9 |
|
|
| 1. Redistributions of source code must retain the above copyright notice, | |
10 |
|
|
| this list of conditions and the following disclaimer. | |
11 |
|
|
| 2. Redistributions in binary form must reproduce the above copyright | |
12 |
|
|
| notice, this list of conditions and the following disclaimer in the | |
13 |
|
|
| documentation and/or other materials provided with the distribution. | |
14 |
|
|
| | |
15 |
|
|
| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"| |
16 |
|
|
| AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
17 |
|
|
| IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
18 |
|
|
| ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
19 |
|
|
| LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
20 |
|
|
| CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
21 |
|
|
| SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
22 |
|
|
| INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
23 |
|
|
| CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
24 |
|
|
| ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF | |
25 |
|
|
| THE POSSIBILITY OF SUCH DAMAGE. | |
26 |
|
|
+----------------------------------------------------------------------------+ |
27 |
|
|
*/ |
28 |
|
|
|
29 |
|
|
#include "gate/net/mqttprotocols.hpp" |
30 |
|
|
#include "gate/exceptions.hpp" |
31 |
|
|
#include "gate/net/sockets.hpp" |
32 |
|
|
#include "gate/net/sockettools.hpp" |
33 |
|
|
|
34 |
|
|
namespace gate |
35 |
|
|
{ |
36 |
|
|
namespace net |
37 |
|
|
{ |
38 |
|
✗ |
MQTT::Message::Message() |
39 |
|
|
{ |
40 |
|
✗ |
gate_mqtt_message_t* ptr = this; |
41 |
|
✗ |
gate_mem_clear(ptr, sizeof(gate_mqtt_message_t)); |
42 |
|
✗ |
} |
43 |
|
|
|
44 |
|
✗ |
MQTT::Message::Message(gate_mqtt_message_t const& src) |
45 |
|
|
{ |
46 |
|
✗ |
gate_mqtt_message_t* ptr = this; |
47 |
|
✗ |
gate_mem_clear(ptr, sizeof(gate_mqtt_message_t)); |
48 |
|
|
|
49 |
|
✗ |
switch (src.packet_type) |
50 |
|
|
{ |
51 |
|
✗ |
case gate_mqtt_packet_connect: |
52 |
|
|
{ |
53 |
|
✗ |
const StaticString clientId(src.content.connect.client_id, src.content.connect.client_id_len); |
54 |
|
✗ |
const StaticString willTopic(src.content.connect.will_topic, src.content.connect.will_topic_len); |
55 |
|
✗ |
const StaticString willMsg(src.content.connect.will_message, src.content.connect.will_message_len); |
56 |
|
✗ |
const StaticString user(src.content.connect.user, src.content.connect.user_len); |
57 |
|
✗ |
const StaticString pass(src.content.connect.password, src.content.connect.password_len); |
58 |
|
✗ |
*this = Message::createConnect(src.content.connect.keep_alive, clientId, willTopic, willMsg, user, pass); |
59 |
|
✗ |
break; |
60 |
|
|
} |
61 |
|
✗ |
default: |
62 |
|
✗ |
break; |
63 |
|
|
} |
64 |
|
✗ |
} |
65 |
|
|
|
66 |
|
✗ |
MQTT::Message::Message(Message const& src) |
67 |
|
✗ |
: datablock(src.datablock) |
68 |
|
|
{ |
69 |
|
✗ |
gate_mqtt_message_t* ptrDest = this; |
70 |
|
✗ |
gate_mqtt_message_t const* ptrSrc = &src; |
71 |
|
✗ |
gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t)); |
72 |
|
✗ |
} |
73 |
|
|
|
74 |
|
✗ |
MQTT::Message& MQTT::Message::operator=(Message const& src) |
75 |
|
|
{ |
76 |
|
✗ |
if (this != &src) |
77 |
|
|
{ |
78 |
|
✗ |
this->datablock = src.datablock; |
79 |
|
✗ |
gate_mqtt_message_t* ptrDest = this; |
80 |
|
✗ |
gate_mqtt_message_t const* ptrSrc = &src; |
81 |
|
✗ |
gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t)); |
82 |
|
|
} |
83 |
|
✗ |
return *this; |
84 |
|
|
} |
85 |
|
|
|
86 |
|
✗ |
MQTT::Message::~Message() noexcept |
87 |
|
|
{ |
88 |
|
✗ |
} |
89 |
|
|
|
90 |
|
✗ |
void MQTT::Message::attachData(MemoryBlock& block) |
91 |
|
|
{ |
92 |
|
✗ |
this->datablock = block; |
93 |
|
✗ |
} |
94 |
|
|
|
95 |
|
✗ |
MemoryBlock const& MQTT::Message::getData() const |
96 |
|
|
{ |
97 |
|
✗ |
return this->datablock; |
98 |
|
|
} |
99 |
|
|
|
100 |
|
|
|
101 |
|
✗ |
MQTT::Message MQTT::Message::createConnect(uint16_t keepAlive, String const& clientId, String const& willTopic, String const& willMessage, String const& user, String const& password) |
102 |
|
|
{ |
103 |
|
✗ |
size_t blockSize = clientId.length() + willTopic.length() + willMessage.length() + user.length() + password.length() + 10; |
104 |
|
✗ |
MemoryBlock block(blockSize); |
105 |
|
|
|
106 |
|
✗ |
Message msg; |
107 |
|
✗ |
msg.packet_type = gate_mqtt_packet_connect; |
108 |
|
✗ |
msg.packet_flags = 0; |
109 |
|
✗ |
msg.content.connect.flags = GATE_MQTT_CONNECT_FLAG_CLEAN; |
110 |
|
✗ |
msg.content.connect.level = 4; |
111 |
|
✗ |
msg.content.connect.keep_alive = keepAlive; |
112 |
|
|
|
113 |
|
✗ |
char* ptrBlock = static_cast<char*>(block.getContent()); |
114 |
|
|
size_t copied; |
115 |
|
|
|
116 |
|
✗ |
copied = clientId.copyTo(ptrBlock, blockSize); |
117 |
|
✗ |
msg.content.connect.client_id = ptrBlock; |
118 |
|
✗ |
msg.content.connect.client_id_len = static_cast<uint16_t>(copied); |
119 |
|
✗ |
ptrBlock += (copied + 1); |
120 |
|
✗ |
blockSize -= (copied + 1); |
121 |
|
|
|
122 |
|
✗ |
if (!willTopic.empty()) |
123 |
|
|
{ |
124 |
|
✗ |
msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_WILLFLAG; |
125 |
|
|
|
126 |
|
✗ |
copied = willTopic.copyTo(ptrBlock, blockSize); |
127 |
|
✗ |
msg.content.connect.will_topic = ptrBlock; |
128 |
|
✗ |
msg.content.connect.will_topic_len = static_cast<uint16_t>(copied); |
129 |
|
✗ |
ptrBlock += (copied + 1); |
130 |
|
✗ |
blockSize -= (copied + 1); |
131 |
|
|
|
132 |
|
✗ |
copied = willMessage.copyTo(ptrBlock, blockSize); |
133 |
|
✗ |
msg.content.connect.will_message = ptrBlock; |
134 |
|
✗ |
msg.content.connect.will_message_len = static_cast<uint16_t>(copied); |
135 |
|
✗ |
ptrBlock += (copied + 1); |
136 |
|
✗ |
blockSize -= (copied + 1); |
137 |
|
|
} |
138 |
|
|
|
139 |
|
✗ |
if (!user.empty()) |
140 |
|
|
{ |
141 |
|
✗ |
msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_USER; |
142 |
|
|
|
143 |
|
✗ |
copied = user.copyTo(ptrBlock, blockSize); |
144 |
|
✗ |
msg.content.connect.user = ptrBlock; |
145 |
|
✗ |
msg.content.connect.user_len = static_cast<uint16_t>(copied); |
146 |
|
✗ |
ptrBlock += (copied + 1); |
147 |
|
✗ |
blockSize -= (copied + 1); |
148 |
|
|
} |
149 |
|
|
|
150 |
|
✗ |
if (!password.empty()) |
151 |
|
|
{ |
152 |
|
✗ |
msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_PASSWORD; |
153 |
|
|
|
154 |
|
✗ |
copied = password.copyTo(ptrBlock, blockSize); |
155 |
|
✗ |
msg.content.connect.password = ptrBlock; |
156 |
|
✗ |
msg.content.connect.password_len = static_cast<uint16_t>(copied); |
157 |
|
✗ |
ptrBlock += (copied + 1); |
158 |
|
✗ |
blockSize -= (copied + 1); |
159 |
|
|
} |
160 |
|
|
|
161 |
|
✗ |
msg.datablock = block; |
162 |
|
✗ |
return msg; |
163 |
|
|
} |
164 |
|
|
|
165 |
|
✗ |
MQTT::Message MQTT::Message::createConnectAck(uint8_t flags, uint8_t retCode) |
166 |
|
|
{ |
167 |
|
✗ |
Message msg; |
168 |
|
✗ |
msg.packet_type = gate_mqtt_packet_connect_ack; |
169 |
|
✗ |
msg.packet_flags = 0; |
170 |
|
✗ |
msg.content.connect_ack.flags = flags; |
171 |
|
✗ |
msg.content.connect_ack.return_code = retCode; |
172 |
|
✗ |
return msg; |
173 |
|
|
} |
174 |
|
|
|
175 |
|
✗ |
MQTT::Message MQTT::Message::createPublish(uint16_t id, uint8_t flags, String const& topic, String const& data) |
176 |
|
|
{ |
177 |
|
✗ |
size_t blockSize = topic.length() + data.length() + 4; |
178 |
|
✗ |
MemoryBlock block(blockSize); |
179 |
|
|
|
180 |
|
✗ |
Message msg; |
181 |
|
✗ |
msg.packet_type = gate_mqtt_packet_publish; |
182 |
|
✗ |
msg.packet_flags = flags; |
183 |
|
✗ |
msg.content.publish.packet_identifier = id; |
184 |
|
|
|
185 |
|
✗ |
char* ptrBlock = static_cast<char*>(block.getContent()); |
186 |
|
|
size_t copied; |
187 |
|
|
|
188 |
|
✗ |
copied = topic.copyTo(ptrBlock, blockSize); |
189 |
|
✗ |
msg.content.publish.topic = ptrBlock; |
190 |
|
✗ |
msg.content.publish.topic_len = static_cast<uint16_t>(copied); |
191 |
|
✗ |
ptrBlock += (copied + 1); |
192 |
|
✗ |
blockSize -= (copied + 1); |
193 |
|
|
|
194 |
|
✗ |
copied = data.copyTo(ptrBlock, blockSize); |
195 |
|
✗ |
msg.content.publish.data = ptrBlock; |
196 |
|
✗ |
msg.content.publish.data_len = static_cast<uint16_t>(copied); |
197 |
|
✗ |
ptrBlock += (copied + 1); |
198 |
|
✗ |
blockSize -= (copied + 1); |
199 |
|
|
|
200 |
|
✗ |
msg.datablock = block; |
201 |
|
✗ |
return msg; |
202 |
|
|
} |
203 |
|
|
|
204 |
|
✗ |
MQTT::Message MQTT::Message::createPublishAck(uint16_t id) |
205 |
|
|
{ |
206 |
|
✗ |
Message msg; |
207 |
|
✗ |
msg.packet_type = gate_mqtt_packet_publish_ack; |
208 |
|
✗ |
msg.packet_flags = 0; |
209 |
|
✗ |
msg.content.publish_ack.packet_identifier = id; |
210 |
|
✗ |
return msg; |
211 |
|
|
} |
212 |
|
|
|
213 |
|
✗ |
MQTT::Message MQTT::Message::createPublishReceive(uint16_t id) |
214 |
|
|
{ |
215 |
|
✗ |
Message msg; |
216 |
|
✗ |
msg.packet_type = gate_mqtt_packet_publish_ack; |
217 |
|
✗ |
msg.packet_flags = 0; |
218 |
|
✗ |
msg.content.publish_received.packet_identifier = id; |
219 |
|
✗ |
return msg; |
220 |
|
|
} |
221 |
|
|
|
222 |
|
✗ |
MQTT::Message MQTT::Message::createPublishRelease(uint16_t id) |
223 |
|
|
{ |
224 |
|
✗ |
Message msg; |
225 |
|
✗ |
msg.packet_type = gate_mqtt_packet_publish_ack; |
226 |
|
✗ |
msg.packet_flags = 0x02; |
227 |
|
✗ |
msg.content.publish_release.packet_identifier = id; |
228 |
|
✗ |
return msg; |
229 |
|
|
|
230 |
|
|
} |
231 |
|
|
|
232 |
|
✗ |
MQTT::Message MQTT::Message::createPublishComplete(uint16_t id) |
233 |
|
|
{ |
234 |
|
✗ |
Message msg; |
235 |
|
✗ |
msg.packet_type = gate_mqtt_packet_publish_ack; |
236 |
|
✗ |
msg.packet_flags = 0; |
237 |
|
✗ |
msg.content.publish_complete.packet_identifier = id; |
238 |
|
✗ |
return msg; |
239 |
|
|
} |
240 |
|
|
|
241 |
|
✗ |
MQTT::Message MQTT::Message::createSubscribe(uint16_t id, Array<String> const& topics, Array<uint8_t> const& qos) |
242 |
|
|
{ |
243 |
|
✗ |
Message msg; |
244 |
|
✗ |
msg.packet_type = gate_mqtt_packet_subscribe; |
245 |
|
✗ |
msg.packet_flags = 0; |
246 |
|
✗ |
msg.content.subscribe.packet_identifier = id; |
247 |
|
|
|
248 |
|
✗ |
msg.content.subscribe.topics_count = static_cast<uint16_t>(topics.length()); |
249 |
|
✗ |
if (msg.content.subscribe.topics_count > GATE_MQTT_ARRAY_MAX) |
250 |
|
|
{ |
251 |
|
✗ |
msg.content.subscribe.topics_count = GATE_MQTT_ARRAY_MAX; |
252 |
|
|
} |
253 |
|
|
|
254 |
|
✗ |
ArrayList<size_t> lengths; |
255 |
|
✗ |
StringBuilder text; |
256 |
|
✗ |
for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx) |
257 |
|
|
{ |
258 |
|
✗ |
text.append(topics[ndx].c_str(), topics[ndx].length()); |
259 |
|
✗ |
lengths.add(topics[ndx].length()); |
260 |
|
|
} |
261 |
|
✗ |
MemoryBlock block(text.ptr(), text.length()); |
262 |
|
✗ |
msg.datablock = block; |
263 |
|
✗ |
char const* ptrContent = static_cast<char const*>(msg.datablock.getContent()); |
264 |
|
|
|
265 |
|
✗ |
size_t offset = 0; |
266 |
|
✗ |
for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx) |
267 |
|
|
{ |
268 |
|
✗ |
gate_mqtt_subscription_t& sub = msg.content.subscribe.topics[ndx]; |
269 |
|
✗ |
sub.topic_len = static_cast<uint16_t>(lengths[ndx]); |
270 |
|
✗ |
sub.topic = &ptrContent[offset]; |
271 |
|
✗ |
sub.qos = qos[ndx]; |
272 |
|
✗ |
offset += lengths[ndx]; |
273 |
|
|
} |
274 |
|
|
|
275 |
|
✗ |
return msg; |
276 |
|
|
} |
277 |
|
|
|
278 |
|
✗ |
MQTT::Message MQTT::Message::createSubscribe(uint16_t id, String const& topic, uint8_t qos) |
279 |
|
|
{ |
280 |
|
✗ |
ArrayList<String> topics; |
281 |
|
✗ |
ArrayList<uint8_t> qoss; |
282 |
|
✗ |
topics.add(topic); |
283 |
|
✗ |
qoss.add(qos); |
284 |
|
✗ |
return MQTT::Message::createSubscribe(id, topics.toArray(), qoss.toArray()); |
285 |
|
|
} |
286 |
|
|
|
287 |
|
|
|
288 |
|
✗ |
MQTT::Message MQTT::Message::createSubscribeAck(uint16_t id, Array<uint8_t> const& retCodes) |
289 |
|
|
{ |
290 |
|
✗ |
Message msg; |
291 |
|
✗ |
msg.packet_type = gate_mqtt_packet_subscribe_ack; |
292 |
|
✗ |
msg.packet_flags = 0; |
293 |
|
✗ |
msg.content.subscribe_ack.packet_identifier = id; |
294 |
|
✗ |
return msg; |
295 |
|
|
} |
296 |
|
|
|
297 |
|
✗ |
MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, Array<String> const& topics) |
298 |
|
|
{ |
299 |
|
✗ |
Message msg; |
300 |
|
✗ |
msg.packet_type = gate_mqtt_packet_unsubscribe; |
301 |
|
✗ |
msg.packet_flags = 0x02; |
302 |
|
✗ |
msg.content.unsubscribe.packet_identifier = id; |
303 |
|
|
|
304 |
|
✗ |
msg.content.unsubscribe.topics_count = static_cast<uint16_t>(topics.length()); |
305 |
|
✗ |
if (msg.content.unsubscribe.topics_count > GATE_MQTT_ARRAY_MAX) |
306 |
|
|
{ |
307 |
|
✗ |
msg.content.unsubscribe.topics_count = GATE_MQTT_ARRAY_MAX; |
308 |
|
|
} |
309 |
|
|
|
310 |
|
✗ |
ArrayList<size_t> lengths; |
311 |
|
✗ |
StringBuilder text; |
312 |
|
✗ |
for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx) |
313 |
|
|
{ |
314 |
|
✗ |
text.append(topics[ndx].c_str(), topics[ndx].length()); |
315 |
|
✗ |
lengths.add(topics[ndx].length()); |
316 |
|
|
} |
317 |
|
✗ |
MemoryBlock block(text.ptr(), text.length()); |
318 |
|
✗ |
msg.datablock = block; |
319 |
|
✗ |
char const* ptrContent = static_cast<char const*>(msg.datablock.getContent()); |
320 |
|
|
|
321 |
|
✗ |
size_t offset = 0; |
322 |
|
✗ |
for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx) |
323 |
|
|
{ |
324 |
|
✗ |
gate_mqtt_subscription_t& sub = msg.content.unsubscribe.topics[ndx]; |
325 |
|
✗ |
sub.topic_len = static_cast<uint16_t>(lengths[ndx]); |
326 |
|
✗ |
sub.topic = &ptrContent[offset]; |
327 |
|
✗ |
sub.qos = 0; |
328 |
|
✗ |
offset += lengths[ndx]; |
329 |
|
|
} |
330 |
|
✗ |
return msg; |
331 |
|
|
} |
332 |
|
✗ |
MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, String const& topic) |
333 |
|
|
{ |
334 |
|
✗ |
ArrayList<String> topics; |
335 |
|
✗ |
topics.add(topic); |
336 |
|
✗ |
return MQTT::Message::createUnsubscribe(id, topics.toArray()); |
337 |
|
|
} |
338 |
|
|
|
339 |
|
✗ |
MQTT::Message MQTT::Message::createUnsubscribeAck(uint16_t id) |
340 |
|
|
{ |
341 |
|
✗ |
Message msg; |
342 |
|
✗ |
msg.packet_type = gate_mqtt_packet_unsubscribe_ack; |
343 |
|
✗ |
msg.packet_flags = 0; |
344 |
|
✗ |
msg.content.unsubscribe_ack.packet_identifier = id; |
345 |
|
✗ |
return msg; |
346 |
|
|
} |
347 |
|
|
|
348 |
|
✗ |
MQTT::Message MQTT::Message::createPingRequest() |
349 |
|
|
{ |
350 |
|
✗ |
Message msg; |
351 |
|
✗ |
msg.packet_type = gate_mqtt_packet_ping_request; |
352 |
|
✗ |
msg.packet_flags = 0; |
353 |
|
✗ |
return msg; |
354 |
|
|
} |
355 |
|
|
|
356 |
|
✗ |
MQTT::Message MQTT::Message::createPingResponse() |
357 |
|
|
{ |
358 |
|
✗ |
Message msg; |
359 |
|
✗ |
msg.packet_type = gate_mqtt_packet_ping_response; |
360 |
|
✗ |
msg.packet_flags = 0; |
361 |
|
✗ |
return msg; |
362 |
|
|
} |
363 |
|
|
|
364 |
|
✗ |
MQTT::Message MQTT::Message::createDisconnect() |
365 |
|
|
{ |
366 |
|
✗ |
Message msg; |
367 |
|
✗ |
msg.packet_type = gate_mqtt_packet_disconnect; |
368 |
|
✗ |
msg.packet_flags = 0; |
369 |
|
✗ |
return msg; |
370 |
|
|
} |
371 |
|
|
|
372 |
|
✗ |
bool_t MQTT::Message::isType(gate_mqtt_packet_type_t tp) const |
373 |
|
|
{ |
374 |
|
✗ |
return this->packet_type == tp; |
375 |
|
|
} |
376 |
|
|
|
377 |
|
✗ |
bool_t MQTT::Message::getConnect(uint16_t& keepAlive, String& clientId, String& willTopic, String& willMessage, String& user, String& password) const |
378 |
|
|
{ |
379 |
|
✗ |
if (!this->isType(gate_mqtt_packet_connect)) |
380 |
|
|
{ |
381 |
|
✗ |
return false; |
382 |
|
|
} |
383 |
|
✗ |
keepAlive = this->content.connect.keep_alive; |
384 |
|
✗ |
clientId = String(this->content.connect.client_id, this->content.connect.client_id_len); |
385 |
|
✗ |
willTopic = String(this->content.connect.will_topic, this->content.connect.will_topic_len); |
386 |
|
✗ |
willMessage = String(this->content.connect.will_message, this->content.connect.will_message_len); |
387 |
|
✗ |
user = String(this->content.connect.user, this->content.connect.user_len); |
388 |
|
✗ |
password = String(this->content.connect.password, this->content.connect.password_len); |
389 |
|
✗ |
return true; |
390 |
|
|
} |
391 |
|
|
|
392 |
|
✗ |
bool_t MQTT::Message::getConnectAck(uint8_t& flags, uint8_t& retCode) const |
393 |
|
|
{ |
394 |
|
✗ |
if (!this->isType(gate_mqtt_packet_connect_ack)) |
395 |
|
|
{ |
396 |
|
✗ |
return false; |
397 |
|
|
} |
398 |
|
✗ |
flags = this->content.connect_ack.flags; |
399 |
|
✗ |
retCode = this->content.connect_ack.return_code; |
400 |
|
✗ |
return true; |
401 |
|
|
} |
402 |
|
|
|
403 |
|
✗ |
bool_t MQTT::Message::getPublish(uint16_t& packId, uint8_t& flags, String& topic, String& data) const |
404 |
|
|
{ |
405 |
|
✗ |
if (!this->isType(gate_mqtt_packet_publish)) |
406 |
|
|
{ |
407 |
|
✗ |
return false; |
408 |
|
|
} |
409 |
|
✗ |
packId = this->content.publish.packet_identifier; |
410 |
|
✗ |
flags = this->packet_flags; |
411 |
|
✗ |
topic = String(this->content.publish.topic, this->content.publish.topic_len); |
412 |
|
✗ |
data = String(this->content.publish.data, this->content.publish.data_len); |
413 |
|
✗ |
return true; |
414 |
|
|
} |
415 |
|
|
|
416 |
|
✗ |
bool_t MQTT::Message::getPublishAck(uint16_t& id) const |
417 |
|
|
{ |
418 |
|
✗ |
if (!this->isType(gate_mqtt_packet_publish_ack)) |
419 |
|
|
{ |
420 |
|
✗ |
return false; |
421 |
|
|
} |
422 |
|
✗ |
id = this->content.publish_ack.packet_identifier; |
423 |
|
✗ |
return true; |
424 |
|
|
} |
425 |
|
|
|
426 |
|
✗ |
bool_t MQTT::Message::getPublishReceive(uint16_t& id) const |
427 |
|
|
{ |
428 |
|
✗ |
if (!this->isType(gate_mqtt_packet_publish_receive)) |
429 |
|
|
{ |
430 |
|
✗ |
return false; |
431 |
|
|
} |
432 |
|
✗ |
id = this->content.publish_received.packet_identifier; |
433 |
|
✗ |
return true; |
434 |
|
|
} |
435 |
|
|
|
436 |
|
✗ |
bool_t MQTT::Message::getPublishRelease(uint16_t& id) const |
437 |
|
|
{ |
438 |
|
✗ |
if (!this->isType(gate_mqtt_packet_publish_release)) |
439 |
|
|
{ |
440 |
|
✗ |
return false; |
441 |
|
|
} |
442 |
|
✗ |
id = this->content.publish_release.packet_identifier; |
443 |
|
✗ |
return true; |
444 |
|
|
} |
445 |
|
|
|
446 |
|
✗ |
bool_t MQTT::Message::getPublishComplete(uint16_t& id) const |
447 |
|
|
{ |
448 |
|
✗ |
if (!this->isType(gate_mqtt_packet_publish_complete)) |
449 |
|
|
{ |
450 |
|
✗ |
return false; |
451 |
|
|
} |
452 |
|
✗ |
id = this->content.publish_complete.packet_identifier; |
453 |
|
✗ |
return true; |
454 |
|
|
} |
455 |
|
|
|
456 |
|
✗ |
bool_t MQTT::Message::getSubscribe(uint16_t& id, ArrayList<String>& topics, ArrayList<uint8_t>& qos) const |
457 |
|
|
{ |
458 |
|
✗ |
if (!this->isType(gate_mqtt_packet_subscribe)) |
459 |
|
|
{ |
460 |
|
✗ |
return false; |
461 |
|
|
} |
462 |
|
✗ |
id = this->content.subscribe.packet_identifier; |
463 |
|
✗ |
for (uint16_t ndx = 0; ndx != this->content.subscribe.topics_count; ++ndx) |
464 |
|
|
{ |
465 |
|
✗ |
topics.add(String( |
466 |
|
✗ |
this->content.subscribe.topics[ndx].topic, this->content.subscribe.topics[ndx].topic_len)); |
467 |
|
✗ |
qos.add(this->content.subscribe.topics[ndx].qos); |
468 |
|
|
} |
469 |
|
✗ |
return true; |
470 |
|
|
return true; |
471 |
|
|
} |
472 |
|
|
|
473 |
|
✗ |
bool_t MQTT::Message::getSubscribeAck(uint16_t& id, ArrayList<uint8_t>& retCodes) const |
474 |
|
|
{ |
475 |
|
✗ |
if (!this->isType(gate_mqtt_packet_subscribe_ack)) |
476 |
|
|
{ |
477 |
|
✗ |
return false; |
478 |
|
|
} |
479 |
|
|
|
480 |
|
✗ |
id = this->content.subscribe_ack.packet_identifier; |
481 |
|
✗ |
for (uint16_t ndx = 0; ndx != this->content.subscribe_ack.ret_codes_count; ++ndx) |
482 |
|
|
{ |
483 |
|
✗ |
retCodes.add(this->content.subscribe_ack.ret_codes[ndx]); |
484 |
|
|
} |
485 |
|
✗ |
return true; |
486 |
|
|
} |
487 |
|
|
|
488 |
|
✗ |
bool_t MQTT::Message::getUnsubscribe(uint16_t& id, ArrayList<String>& topics) const |
489 |
|
|
{ |
490 |
|
✗ |
if (!this->isType(gate_mqtt_packet_unsubscribe)) |
491 |
|
|
{ |
492 |
|
✗ |
return false; |
493 |
|
|
} |
494 |
|
✗ |
id = this->content.unsubscribe.packet_identifier; |
495 |
|
✗ |
for (uint16_t ndx = 0; ndx != this->content.unsubscribe.topics_count; ++ndx) |
496 |
|
|
{ |
497 |
|
✗ |
topics.add(String( |
498 |
|
✗ |
this->content.unsubscribe.topics[ndx].topic, this->content.unsubscribe.topics[ndx].topic_len)); |
499 |
|
|
} |
500 |
|
✗ |
return true; |
501 |
|
|
} |
502 |
|
|
|
503 |
|
✗ |
bool_t MQTT::Message::getUnsubscribeAck(uint16_t& id) const |
504 |
|
|
{ |
505 |
|
✗ |
if (!this->isType(gate_mqtt_packet_unsubscribe_ack)) |
506 |
|
|
{ |
507 |
|
✗ |
return false; |
508 |
|
|
} |
509 |
|
✗ |
id = this->content.unsubscribe_ack.packet_identifier; |
510 |
|
✗ |
return true; |
511 |
|
|
} |
512 |
|
|
|
513 |
|
|
|
514 |
|
|
|
515 |
|
|
|
516 |
|
✗ |
void MQTT::sendMessage(Stream& mqttStream, Message const& msg) |
517 |
|
|
{ |
518 |
|
✗ |
result_t result = gate_mqtt_send_message(mqttStream.c_impl(), &msg); |
519 |
|
✗ |
GATEXX_CHECK_EXCEPTION(result); |
520 |
|
✗ |
} |
521 |
|
|
|
522 |
|
✗ |
bool_t MQTT::hasQueuedMessage(Stream& mqttStream) |
523 |
|
|
{ |
524 |
|
✗ |
result_t result = gate_mqtt_has_queued_message(mqttStream.c_impl()); |
525 |
|
✗ |
if (result == GATE_RESULT_ENDOFSTREAM) |
526 |
|
|
{ |
527 |
|
✗ |
return false; |
528 |
|
|
} |
529 |
|
✗ |
GATEXX_CHECK_EXCEPTION(result); |
530 |
|
✗ |
return true; |
531 |
|
|
} |
532 |
|
|
|
533 |
|
|
|
534 |
|
✗ |
void MQTT::receiveMessage(Stream& mqttStream, Message& msg) |
535 |
|
|
{ |
536 |
|
✗ |
gate_memoryblock_t* ptr_block = NULL; |
537 |
|
✗ |
result_t result = gate_mqtt_receive_message(mqttStream.c_impl(), &msg, &ptr_block); |
538 |
|
✗ |
GATEXX_CHECK_EXCEPTION(result); |
539 |
|
✗ |
MemoryBlock newBlock(ptr_block); |
540 |
|
✗ |
msg.attachData(newBlock); |
541 |
|
✗ |
} |
542 |
|
|
|
543 |
|
|
|
544 |
|
✗ |
MQTTClient::MQTTClient(Stream& mqttConnection) |
545 |
|
✗ |
: transport(mqttConnection) |
546 |
|
|
{ |
547 |
|
✗ |
} |
548 |
|
|
|
549 |
|
✗ |
static Stream createTcpConnection(String const& host, uint16_t port) |
550 |
|
|
{ |
551 |
|
✗ |
StringBuilder builder; |
552 |
|
✗ |
builder << host << ":" << port; |
553 |
|
✗ |
ControlStream stream = SocketStream::createClient(builder.toString()); |
554 |
|
✗ |
return stream; |
555 |
|
|
} |
556 |
|
|
|
557 |
|
✗ |
MQTTClient::MQTTClient(String const& mqttHost, uint16_t port) |
558 |
|
✗ |
: transport(createTcpConnection(mqttHost, port)) |
559 |
|
|
{ |
560 |
|
✗ |
} |
561 |
|
|
|
562 |
|
✗ |
MQTTClient::~MQTTClient() noexcept |
563 |
|
|
{ |
564 |
|
✗ |
} |
565 |
|
|
|
566 |
|
✗ |
uint16_t MQTTClient::getNextPacketId() |
567 |
|
|
{ |
568 |
|
✗ |
int32_t id = ++this->nextPacketId; |
569 |
|
✗ |
return static_cast<uint16_t>(id); |
570 |
|
|
} |
571 |
|
|
|
572 |
|
|
|
573 |
|
✗ |
void MQTTClient::connect(String const& clientId, String const& user, String const& password, uint16_t timeoutSeconds) |
574 |
|
|
{ |
575 |
|
✗ |
const MQTT::Message request = MQTT::Message::createConnect(timeoutSeconds, clientId, String(), String(), user, password); |
576 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
577 |
|
|
|
578 |
|
✗ |
MQTT::Message response; |
579 |
|
✗ |
MQTT::receiveMessage(this->transport, response); |
580 |
|
|
|
581 |
|
✗ |
if (response.packet_type != gate_mqtt_packet_connect_ack) |
582 |
|
|
{ |
583 |
|
✗ |
raiseException(results::InvalidHeader, "Invalid response message type"); |
584 |
|
|
} |
585 |
|
|
|
586 |
|
✗ |
if (response.content.connect_ack.return_code != 0) |
587 |
|
|
{ |
588 |
|
✗ |
raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code); |
589 |
|
|
} |
590 |
|
✗ |
} |
591 |
|
|
|
592 |
|
✗ |
void MQTTClient::disconnect() |
593 |
|
|
{ |
594 |
|
✗ |
const MQTT::Message request = MQTT::Message::createDisconnect(); |
595 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
596 |
|
✗ |
} |
597 |
|
|
|
598 |
|
|
|
599 |
|
✗ |
MQTTClient::PacketId MQTTClient::subscribe(String const& topic) |
600 |
|
|
{ |
601 |
|
✗ |
const PacketId packetId = this->getNextPacketId(); |
602 |
|
✗ |
const MQTT::Message request = MQTT::Message::createSubscribe(packetId, topic); |
603 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
604 |
|
|
|
605 |
|
✗ |
MQTT::Message response; |
606 |
|
✗ |
MQTT::receiveMessage(this->transport, response); |
607 |
|
|
|
608 |
|
✗ |
if (response.packet_type != gate_mqtt_packet_subscribe_ack) |
609 |
|
|
{ |
610 |
|
✗ |
raiseException(results::InvalidHeader, "Invalid response message type"); |
611 |
|
|
} |
612 |
|
✗ |
if (response.content.subscribe_ack.packet_identifier != packetId) |
613 |
|
|
{ |
614 |
|
✗ |
raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code); |
615 |
|
|
} |
616 |
|
✗ |
return packetId; |
617 |
|
|
} |
618 |
|
|
|
619 |
|
✗ |
MQTTClient::PacketId MQTTClient::unsubscribe(String const& topic) |
620 |
|
|
{ |
621 |
|
✗ |
const PacketId packetId = this->getNextPacketId(); |
622 |
|
✗ |
const MQTT::Message request = MQTT::Message::createUnsubscribe(packetId, topic); |
623 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
624 |
|
|
|
625 |
|
✗ |
MQTT::Message response; |
626 |
|
✗ |
MQTT::receiveMessage(this->transport, response); |
627 |
|
|
|
628 |
|
✗ |
if (response.packet_type != gate_mqtt_packet_unsubscribe_ack) |
629 |
|
|
{ |
630 |
|
✗ |
raiseException(results::InvalidHeader, "Invalid response message type"); |
631 |
|
|
} |
632 |
|
✗ |
if (response.content.unsubscribe_ack.packet_identifier != packetId) |
633 |
|
|
{ |
634 |
|
✗ |
raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code); |
635 |
|
|
} |
636 |
|
✗ |
return packetId; |
637 |
|
|
} |
638 |
|
|
|
639 |
|
✗ |
MQTTClient::PacketId MQTTClient::publish(String const& topic, String const& data, uint8_t flags) |
640 |
|
|
{ |
641 |
|
✗ |
const PacketId packetId = this->getNextPacketId(); |
642 |
|
✗ |
const MQTT::Message request = MQTT::Message::createPublish(packetId, flags, topic, data); |
643 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
644 |
|
|
|
645 |
|
✗ |
if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS1)) |
646 |
|
|
{ |
647 |
|
✗ |
MQTT::Message response; |
648 |
|
✗ |
MQTT::receiveMessage(this->transport, response); |
649 |
|
|
|
650 |
|
✗ |
if (response.packet_type != gate_mqtt_packet_publish_ack) |
651 |
|
|
{ |
652 |
|
✗ |
raiseException(results::InvalidHeader, "Invalid response message type, publish_ack expected", NULL, (int32_t)response.packet_type); |
653 |
|
|
} |
654 |
|
✗ |
if (response.content.publish_ack.packet_identifier != packetId) |
655 |
|
|
{ |
656 |
|
✗ |
raiseException(results::InvalidData, "Publish-acknowledge failed", NULL, response.content.publish_ack.packet_identifier); |
657 |
|
|
} |
658 |
|
|
} |
659 |
|
|
|
660 |
|
✗ |
if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS2)) |
661 |
|
|
{ |
662 |
|
✗ |
MQTT::Message response; |
663 |
|
✗ |
MQTT::receiveMessage(this->transport, response); |
664 |
|
|
|
665 |
|
✗ |
if (response.packet_type != gate_mqtt_packet_publish_receive) |
666 |
|
|
{ |
667 |
|
✗ |
raiseException(results::InvalidHeader, "Invalid response message type, publish_rec expected", NULL, (int32_t)response.packet_type); |
668 |
|
|
} |
669 |
|
✗ |
if (response.content.publish_received.packet_identifier != packetId) |
670 |
|
|
{ |
671 |
|
✗ |
raiseException(results::InvalidData, "Publish-received failed", NULL, response.content.publish_received.packet_identifier); |
672 |
|
|
} |
673 |
|
|
|
674 |
|
|
// send PUBREL msg |
675 |
|
✗ |
const MQTT::Message request2 = MQTT::Message::createPublishRelease(packetId); |
676 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
677 |
|
|
|
678 |
|
✗ |
MQTT::Message response2; |
679 |
|
✗ |
MQTT::receiveMessage(this->transport, response2); |
680 |
|
|
|
681 |
|
✗ |
if (response2.packet_type != gate_mqtt_packet_publish_complete) |
682 |
|
|
{ |
683 |
|
✗ |
raiseException(results::InvalidHeader, "Invalid response message type, publish_comp expected", NULL, (int32_t)response2.packet_type); |
684 |
|
|
} |
685 |
|
✗ |
if (response2.content.publish_complete.packet_identifier != packetId) |
686 |
|
|
{ |
687 |
|
✗ |
raiseException(results::InvalidData, "Publish-complete failed", NULL, response2.content.publish_complete.packet_identifier); |
688 |
|
|
} |
689 |
|
|
} |
690 |
|
|
|
691 |
|
✗ |
return packetId; |
692 |
|
|
} |
693 |
|
|
|
694 |
|
✗ |
void MQTTClient::sendPing() |
695 |
|
|
{ |
696 |
|
✗ |
MQTT::Message request = MQTT::Message::createPingRequest(); |
697 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
698 |
|
✗ |
} |
699 |
|
|
|
700 |
|
✗ |
void MQTTClient::sendPingResponse() |
701 |
|
|
{ |
702 |
|
✗ |
MQTT::Message request = MQTT::Message::createPingResponse(); |
703 |
|
✗ |
MQTT::sendMessage(this->transport, request); |
704 |
|
✗ |
} |
705 |
|
|
|
706 |
|
|
|
707 |
|
✗ |
bool_t MQTTClient::hasNextMessage() |
708 |
|
|
{ |
709 |
|
✗ |
return MQTT::hasQueuedMessage(this->transport); |
710 |
|
|
} |
711 |
|
|
|
712 |
|
✗ |
MQTT::Message MQTTClient::getNextMessage() |
713 |
|
|
{ |
714 |
|
✗ |
MQTT::Message msg; |
715 |
|
✗ |
MQTT::receiveMessage(this->transport, msg); |
716 |
|
✗ |
return msg; |
717 |
|
|
} |
718 |
|
|
|
719 |
|
|
|
720 |
|
|
} // end of namespace net |
721 |
|
|
} // end of namespace gate |
722 |
|
|
|