| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /* GATE PROJECT LICENSE: | ||
| 2 | +----------------------------------------------------------------------------+ | ||
| 3 | | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> | | ||
| 4 | | All rights reserved. | | ||
| 5 | | | | ||
| 6 | | Redistribution and use in source and binary forms, with or without | | ||
| 7 | | modification, are permitted provided that the following conditions are met:| | ||
| 8 | | | | ||
| 9 | | 1. Redistributions of source code must retain the above copyright notice, | | ||
| 10 | | this list of conditions and the following disclaimer. | | ||
| 11 | | 2. Redistributions in binary form must reproduce the above copyright | | ||
| 12 | | notice, this list of conditions and the following disclaimer in the | | ||
| 13 | | documentation and/or other materials provided with the distribution. | | ||
| 14 | | | | ||
| 15 | | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"| | ||
| 16 | | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | | ||
| 17 | | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | | ||
| 18 | | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | | ||
| 19 | | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | | ||
| 20 | | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | | ||
| 21 | | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | | ||
| 22 | | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | | ||
| 23 | | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | | ||
| 24 | | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF | | ||
| 25 | | THE POSSIBILITY OF SUCH DAMAGE. | | ||
| 26 | +----------------------------------------------------------------------------+ | ||
| 27 | */ | ||
| 28 | |||
| 29 | #include "gate/net/mqttprotocols.hpp" | ||
| 30 | #include "gate/exceptions.hpp" | ||
| 31 | #include "gate/net/sockets.hpp" | ||
| 32 | #include "gate/net/sockettools.hpp" | ||
| 33 | |||
| 34 | namespace gate | ||
| 35 | { | ||
| 36 | namespace net | ||
| 37 | { | ||
| 38 | ✗ | MQTT::Message::Message() | |
| 39 | { | ||
| 40 | ✗ | gate_mqtt_message_t* ptr = this; | |
| 41 | ✗ | gate_mem_clear(ptr, sizeof(gate_mqtt_message_t)); | |
| 42 | ✗ | } | |
| 43 | |||
| 44 | ✗ | MQTT::Message::Message(gate_mqtt_message_t const& src) | |
| 45 | { | ||
| 46 | ✗ | gate_mqtt_message_t* ptr = this; | |
| 47 | ✗ | gate_mem_clear(ptr, sizeof(gate_mqtt_message_t)); | |
| 48 | |||
| 49 | ✗ | switch (src.packet_type) | |
| 50 | { | ||
| 51 | ✗ | case gate_mqtt_packet_connect: | |
| 52 | { | ||
| 53 | ✗ | const StaticString clientId(src.content.connect.client_id, src.content.connect.client_id_len); | |
| 54 | ✗ | const StaticString willTopic(src.content.connect.will_topic, src.content.connect.will_topic_len); | |
| 55 | ✗ | const StaticString willMsg(src.content.connect.will_message, src.content.connect.will_message_len); | |
| 56 | ✗ | const StaticString user(src.content.connect.user, src.content.connect.user_len); | |
| 57 | ✗ | const StaticString pass(src.content.connect.password, src.content.connect.password_len); | |
| 58 | ✗ | *this = Message::createConnect(src.content.connect.keep_alive, clientId, willTopic, willMsg, user, pass); | |
| 59 | ✗ | break; | |
| 60 | } | ||
| 61 | ✗ | default: | |
| 62 | ✗ | break; | |
| 63 | } | ||
| 64 | ✗ | } | |
| 65 | |||
| 66 | ✗ | MQTT::Message::Message(Message const& src) | |
| 67 | ✗ | : datablock(src.datablock) | |
| 68 | { | ||
| 69 | ✗ | gate_mqtt_message_t* ptrDest = this; | |
| 70 | ✗ | gate_mqtt_message_t const* ptrSrc = &src; | |
| 71 | ✗ | gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t)); | |
| 72 | ✗ | } | |
| 73 | |||
| 74 | ✗ | MQTT::Message& MQTT::Message::operator=(Message const& src) | |
| 75 | { | ||
| 76 | ✗ | if (this != &src) | |
| 77 | { | ||
| 78 | ✗ | this->datablock = src.datablock; | |
| 79 | ✗ | gate_mqtt_message_t* ptrDest = this; | |
| 80 | ✗ | gate_mqtt_message_t const* ptrSrc = &src; | |
| 81 | ✗ | gate_mem_copy(ptrDest, ptrSrc, sizeof(gate_mqtt_message_t)); | |
| 82 | } | ||
| 83 | ✗ | return *this; | |
| 84 | } | ||
| 85 | |||
| 86 | ✗ | MQTT::Message::~Message() noexcept | |
| 87 | { | ||
| 88 | ✗ | } | |
| 89 | |||
| 90 | ✗ | void MQTT::Message::attachData(MemoryBlock& block) | |
| 91 | { | ||
| 92 | ✗ | this->datablock = block; | |
| 93 | ✗ | } | |
| 94 | |||
| 95 | ✗ | MemoryBlock const& MQTT::Message::getData() const | |
| 96 | { | ||
| 97 | ✗ | return this->datablock; | |
| 98 | } | ||
| 99 | |||
| 100 | |||
| 101 | ✗ | MQTT::Message MQTT::Message::createConnect(uint16_t keepAlive, String const& clientId, String const& willTopic, String const& willMessage, String const& user, String const& password) | |
| 102 | { | ||
| 103 | ✗ | size_t blockSize = clientId.length() + willTopic.length() + willMessage.length() + user.length() + password.length() + 10; | |
| 104 | ✗ | MemoryBlock block(blockSize); | |
| 105 | |||
| 106 | ✗ | Message msg; | |
| 107 | ✗ | msg.packet_type = gate_mqtt_packet_connect; | |
| 108 | ✗ | msg.packet_flags = 0; | |
| 109 | ✗ | msg.content.connect.flags = GATE_MQTT_CONNECT_FLAG_CLEAN; | |
| 110 | ✗ | msg.content.connect.level = 4; | |
| 111 | ✗ | msg.content.connect.keep_alive = keepAlive; | |
| 112 | |||
| 113 | ✗ | char* ptrBlock = static_cast<char*>(block.getContent()); | |
| 114 | size_t copied; | ||
| 115 | |||
| 116 | ✗ | copied = clientId.copyTo(ptrBlock, blockSize); | |
| 117 | ✗ | msg.content.connect.client_id = ptrBlock; | |
| 118 | ✗ | msg.content.connect.client_id_len = static_cast<uint16_t>(copied); | |
| 119 | ✗ | ptrBlock += (copied + 1); | |
| 120 | ✗ | blockSize -= (copied + 1); | |
| 121 | |||
| 122 | ✗ | if (!willTopic.empty()) | |
| 123 | { | ||
| 124 | ✗ | msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_WILLFLAG; | |
| 125 | |||
| 126 | ✗ | copied = willTopic.copyTo(ptrBlock, blockSize); | |
| 127 | ✗ | msg.content.connect.will_topic = ptrBlock; | |
| 128 | ✗ | msg.content.connect.will_topic_len = static_cast<uint16_t>(copied); | |
| 129 | ✗ | ptrBlock += (copied + 1); | |
| 130 | ✗ | blockSize -= (copied + 1); | |
| 131 | |||
| 132 | ✗ | copied = willMessage.copyTo(ptrBlock, blockSize); | |
| 133 | ✗ | msg.content.connect.will_message = ptrBlock; | |
| 134 | ✗ | msg.content.connect.will_message_len = static_cast<uint16_t>(copied); | |
| 135 | ✗ | ptrBlock += (copied + 1); | |
| 136 | ✗ | blockSize -= (copied + 1); | |
| 137 | } | ||
| 138 | |||
| 139 | ✗ | if (!user.empty()) | |
| 140 | { | ||
| 141 | ✗ | msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_USER; | |
| 142 | |||
| 143 | ✗ | copied = user.copyTo(ptrBlock, blockSize); | |
| 144 | ✗ | msg.content.connect.user = ptrBlock; | |
| 145 | ✗ | msg.content.connect.user_len = static_cast<uint16_t>(copied); | |
| 146 | ✗ | ptrBlock += (copied + 1); | |
| 147 | ✗ | blockSize -= (copied + 1); | |
| 148 | } | ||
| 149 | |||
| 150 | ✗ | if (!password.empty()) | |
| 151 | { | ||
| 152 | ✗ | msg.content.connect.flags |= GATE_MQTT_CONNECT_FLAG_PASSWORD; | |
| 153 | |||
| 154 | ✗ | copied = password.copyTo(ptrBlock, blockSize); | |
| 155 | ✗ | msg.content.connect.password = ptrBlock; | |
| 156 | ✗ | msg.content.connect.password_len = static_cast<uint16_t>(copied); | |
| 157 | //ptrBlock += (copied + 1); // no further usage | ||
| 158 | //blockSize -= (copied + 1); // no further usage | ||
| 159 | } | ||
| 160 | |||
| 161 | ✗ | msg.datablock = block; | |
| 162 | ✗ | return msg; | |
| 163 | } | ||
| 164 | |||
| 165 | ✗ | MQTT::Message MQTT::Message::createConnectAck(uint8_t flags, uint8_t retCode) | |
| 166 | { | ||
| 167 | ✗ | Message msg; | |
| 168 | ✗ | msg.packet_type = gate_mqtt_packet_connect_ack; | |
| 169 | ✗ | msg.packet_flags = 0; | |
| 170 | ✗ | msg.content.connect_ack.flags = flags; | |
| 171 | ✗ | msg.content.connect_ack.return_code = retCode; | |
| 172 | ✗ | return msg; | |
| 173 | } | ||
| 174 | |||
| 175 | ✗ | MQTT::Message MQTT::Message::createPublish(uint16_t id, uint8_t flags, String const& topic, String const& data) | |
| 176 | { | ||
| 177 | ✗ | size_t blockSize = topic.length() + data.length() + 4; | |
| 178 | ✗ | MemoryBlock block(blockSize); | |
| 179 | |||
| 180 | ✗ | Message msg; | |
| 181 | ✗ | msg.packet_type = gate_mqtt_packet_publish; | |
| 182 | ✗ | msg.packet_flags = flags; | |
| 183 | ✗ | msg.content.publish.packet_identifier = id; | |
| 184 | |||
| 185 | ✗ | char* ptrBlock = static_cast<char*>(block.getContent()); | |
| 186 | size_t copied; | ||
| 187 | |||
| 188 | ✗ | copied = topic.copyTo(ptrBlock, blockSize); | |
| 189 | ✗ | msg.content.publish.topic = ptrBlock; | |
| 190 | ✗ | msg.content.publish.topic_len = static_cast<uint16_t>(copied); | |
| 191 | ✗ | ptrBlock += (copied + 1); | |
| 192 | ✗ | blockSize -= (copied + 1); | |
| 193 | |||
| 194 | ✗ | copied = data.copyTo(ptrBlock, blockSize); | |
| 195 | ✗ | msg.content.publish.data = ptrBlock; | |
| 196 | ✗ | msg.content.publish.data_len = static_cast<uint16_t>(copied); | |
| 197 | //ptrBlock += (copied + 1); // no further usage | ||
| 198 | //blockSize -= (copied + 1); // no further usage | ||
| 199 | |||
| 200 | ✗ | msg.datablock = block; | |
| 201 | ✗ | return msg; | |
| 202 | } | ||
| 203 | |||
| 204 | ✗ | MQTT::Message MQTT::Message::createPublishAck(uint16_t id) | |
| 205 | { | ||
| 206 | ✗ | Message msg; | |
| 207 | ✗ | msg.packet_type = gate_mqtt_packet_publish_ack; | |
| 208 | ✗ | msg.packet_flags = 0; | |
| 209 | ✗ | msg.content.publish_ack.packet_identifier = id; | |
| 210 | ✗ | return msg; | |
| 211 | } | ||
| 212 | |||
| 213 | ✗ | MQTT::Message MQTT::Message::createPublishReceive(uint16_t id) | |
| 214 | { | ||
| 215 | ✗ | Message msg; | |
| 216 | ✗ | msg.packet_type = gate_mqtt_packet_publish_ack; | |
| 217 | ✗ | msg.packet_flags = 0; | |
| 218 | ✗ | msg.content.publish_received.packet_identifier = id; | |
| 219 | ✗ | return msg; | |
| 220 | } | ||
| 221 | |||
| 222 | ✗ | MQTT::Message MQTT::Message::createPublishRelease(uint16_t id) | |
| 223 | { | ||
| 224 | ✗ | Message msg; | |
| 225 | ✗ | msg.packet_type = gate_mqtt_packet_publish_ack; | |
| 226 | ✗ | msg.packet_flags = 0x02; | |
| 227 | ✗ | msg.content.publish_release.packet_identifier = id; | |
| 228 | ✗ | return msg; | |
| 229 | |||
| 230 | } | ||
| 231 | |||
| 232 | ✗ | MQTT::Message MQTT::Message::createPublishComplete(uint16_t id) | |
| 233 | { | ||
| 234 | ✗ | Message msg; | |
| 235 | ✗ | msg.packet_type = gate_mqtt_packet_publish_ack; | |
| 236 | ✗ | msg.packet_flags = 0; | |
| 237 | ✗ | msg.content.publish_complete.packet_identifier = id; | |
| 238 | ✗ | return msg; | |
| 239 | } | ||
| 240 | |||
| 241 | ✗ | MQTT::Message MQTT::Message::createSubscribe(uint16_t id, Array<String> const& topics, Array<uint8_t> const& qos) | |
| 242 | { | ||
| 243 | ✗ | Message msg; | |
| 244 | ✗ | msg.packet_type = gate_mqtt_packet_subscribe; | |
| 245 | ✗ | msg.packet_flags = 0; | |
| 246 | ✗ | msg.content.subscribe.packet_identifier = id; | |
| 247 | |||
| 248 | ✗ | msg.content.subscribe.topics_count = static_cast<uint16_t>(topics.length()); | |
| 249 | ✗ | if (msg.content.subscribe.topics_count > GATE_MQTT_ARRAY_MAX) | |
| 250 | { | ||
| 251 | ✗ | msg.content.subscribe.topics_count = GATE_MQTT_ARRAY_MAX; | |
| 252 | } | ||
| 253 | |||
| 254 | ✗ | ArrayList<size_t> lengths; | |
| 255 | ✗ | StringBuilder text; | |
| 256 | ✗ | for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx) | |
| 257 | { | ||
| 258 | ✗ | text.append(topics[ndx].c_str(), topics[ndx].length()); | |
| 259 | ✗ | lengths.add(topics[ndx].length()); | |
| 260 | } | ||
| 261 | ✗ | MemoryBlock block(text.ptr(), text.length()); | |
| 262 | ✗ | msg.datablock = block; | |
| 263 | ✗ | char const* ptrContent = static_cast<char const*>(msg.datablock.getContent()); | |
| 264 | |||
| 265 | ✗ | size_t offset = 0; | |
| 266 | ✗ | for (size_t ndx = 0; ndx < msg.content.subscribe.topics_count; ++ndx) | |
| 267 | { | ||
| 268 | ✗ | gate_mqtt_subscription_t& sub = msg.content.subscribe.topics[ndx]; | |
| 269 | ✗ | sub.topic_len = static_cast<uint16_t>(lengths[ndx]); | |
| 270 | ✗ | sub.topic = &ptrContent[offset]; | |
| 271 | ✗ | sub.qos = qos[ndx]; | |
| 272 | ✗ | offset += lengths[ndx]; | |
| 273 | } | ||
| 274 | |||
| 275 | ✗ | return msg; | |
| 276 | } | ||
| 277 | |||
| 278 | ✗ | MQTT::Message MQTT::Message::createSubscribe(uint16_t id, String const& topic, uint8_t qos) | |
| 279 | { | ||
| 280 | ✗ | ArrayList<String> topics; | |
| 281 | ✗ | ArrayList<uint8_t> qoss; | |
| 282 | ✗ | topics.add(topic); | |
| 283 | ✗ | qoss.add(qos); | |
| 284 | ✗ | return MQTT::Message::createSubscribe(id, topics.toArray(), qoss.toArray()); | |
| 285 | } | ||
| 286 | |||
| 287 | |||
| 288 | ✗ | MQTT::Message MQTT::Message::createSubscribeAck(uint16_t id, Array<uint8_t> const& retCodes) | |
| 289 | { | ||
| 290 | ✗ | Message msg; | |
| 291 | ✗ | msg.packet_type = gate_mqtt_packet_subscribe_ack; | |
| 292 | ✗ | msg.packet_flags = 0; | |
| 293 | ✗ | msg.content.subscribe_ack.packet_identifier = id; | |
| 294 | ✗ | return msg; | |
| 295 | } | ||
| 296 | |||
| 297 | ✗ | MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, Array<String> const& topics) | |
| 298 | { | ||
| 299 | ✗ | Message msg; | |
| 300 | ✗ | msg.packet_type = gate_mqtt_packet_unsubscribe; | |
| 301 | ✗ | msg.packet_flags = 0x02; | |
| 302 | ✗ | msg.content.unsubscribe.packet_identifier = id; | |
| 303 | |||
| 304 | ✗ | msg.content.unsubscribe.topics_count = static_cast<uint16_t>(topics.length()); | |
| 305 | ✗ | if (msg.content.unsubscribe.topics_count > GATE_MQTT_ARRAY_MAX) | |
| 306 | { | ||
| 307 | ✗ | msg.content.unsubscribe.topics_count = GATE_MQTT_ARRAY_MAX; | |
| 308 | } | ||
| 309 | |||
| 310 | ✗ | ArrayList<size_t> lengths; | |
| 311 | ✗ | StringBuilder text; | |
| 312 | ✗ | for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx) | |
| 313 | { | ||
| 314 | ✗ | text.append(topics[ndx].c_str(), topics[ndx].length()); | |
| 315 | ✗ | lengths.add(topics[ndx].length()); | |
| 316 | } | ||
| 317 | ✗ | MemoryBlock block(text.ptr(), text.length()); | |
| 318 | ✗ | msg.datablock = block; | |
| 319 | ✗ | char const* ptrContent = static_cast<char const*>(msg.datablock.getContent()); | |
| 320 | |||
| 321 | ✗ | size_t offset = 0; | |
| 322 | ✗ | for (size_t ndx = 0; ndx < msg.content.unsubscribe.topics_count; ++ndx) | |
| 323 | { | ||
| 324 | ✗ | gate_mqtt_subscription_t& sub = msg.content.unsubscribe.topics[ndx]; | |
| 325 | ✗ | sub.topic_len = static_cast<uint16_t>(lengths[ndx]); | |
| 326 | ✗ | sub.topic = &ptrContent[offset]; | |
| 327 | ✗ | sub.qos = 0; | |
| 328 | ✗ | offset += lengths[ndx]; | |
| 329 | } | ||
| 330 | ✗ | return msg; | |
| 331 | } | ||
| 332 | ✗ | MQTT::Message MQTT::Message::createUnsubscribe(uint16_t id, String const& topic) | |
| 333 | { | ||
| 334 | ✗ | ArrayList<String> topics; | |
| 335 | ✗ | topics.add(topic); | |
| 336 | ✗ | return MQTT::Message::createUnsubscribe(id, topics.toArray()); | |
| 337 | } | ||
| 338 | |||
| 339 | ✗ | MQTT::Message MQTT::Message::createUnsubscribeAck(uint16_t id) | |
| 340 | { | ||
| 341 | ✗ | Message msg; | |
| 342 | ✗ | msg.packet_type = gate_mqtt_packet_unsubscribe_ack; | |
| 343 | ✗ | msg.packet_flags = 0; | |
| 344 | ✗ | msg.content.unsubscribe_ack.packet_identifier = id; | |
| 345 | ✗ | return msg; | |
| 346 | } | ||
| 347 | |||
| 348 | ✗ | MQTT::Message MQTT::Message::createPingRequest() | |
| 349 | { | ||
| 350 | ✗ | Message msg; | |
| 351 | ✗ | msg.packet_type = gate_mqtt_packet_ping_request; | |
| 352 | ✗ | msg.packet_flags = 0; | |
| 353 | ✗ | return msg; | |
| 354 | } | ||
| 355 | |||
| 356 | ✗ | MQTT::Message MQTT::Message::createPingResponse() | |
| 357 | { | ||
| 358 | ✗ | Message msg; | |
| 359 | ✗ | msg.packet_type = gate_mqtt_packet_ping_response; | |
| 360 | ✗ | msg.packet_flags = 0; | |
| 361 | ✗ | return msg; | |
| 362 | } | ||
| 363 | |||
| 364 | ✗ | MQTT::Message MQTT::Message::createDisconnect() | |
| 365 | { | ||
| 366 | ✗ | Message msg; | |
| 367 | ✗ | msg.packet_type = gate_mqtt_packet_disconnect; | |
| 368 | ✗ | msg.packet_flags = 0; | |
| 369 | ✗ | return msg; | |
| 370 | } | ||
| 371 | |||
| 372 | ✗ | bool_t MQTT::Message::isType(gate_mqtt_packet_type_t tp) const | |
| 373 | { | ||
| 374 | ✗ | return this->packet_type == tp; | |
| 375 | } | ||
| 376 | |||
| 377 | ✗ | bool_t MQTT::Message::getConnect(uint16_t& keepAlive, String& clientId, String& willTopic, String& willMessage, String& user, String& password) const | |
| 378 | { | ||
| 379 | ✗ | if (!this->isType(gate_mqtt_packet_connect)) | |
| 380 | { | ||
| 381 | ✗ | return false; | |
| 382 | } | ||
| 383 | ✗ | keepAlive = this->content.connect.keep_alive; | |
| 384 | ✗ | clientId = String(this->content.connect.client_id, this->content.connect.client_id_len); | |
| 385 | ✗ | willTopic = String(this->content.connect.will_topic, this->content.connect.will_topic_len); | |
| 386 | ✗ | willMessage = String(this->content.connect.will_message, this->content.connect.will_message_len); | |
| 387 | ✗ | user = String(this->content.connect.user, this->content.connect.user_len); | |
| 388 | ✗ | password = String(this->content.connect.password, this->content.connect.password_len); | |
| 389 | ✗ | return true; | |
| 390 | } | ||
| 391 | |||
| 392 | ✗ | bool_t MQTT::Message::getConnectAck(uint8_t& flags, uint8_t& retCode) const | |
| 393 | { | ||
| 394 | ✗ | if (!this->isType(gate_mqtt_packet_connect_ack)) | |
| 395 | { | ||
| 396 | ✗ | return false; | |
| 397 | } | ||
| 398 | ✗ | flags = this->content.connect_ack.flags; | |
| 399 | ✗ | retCode = this->content.connect_ack.return_code; | |
| 400 | ✗ | return true; | |
| 401 | } | ||
| 402 | |||
| 403 | ✗ | bool_t MQTT::Message::getPublish(uint16_t& packId, uint8_t& flags, String& topic, String& data) const | |
| 404 | { | ||
| 405 | ✗ | if (!this->isType(gate_mqtt_packet_publish)) | |
| 406 | { | ||
| 407 | ✗ | return false; | |
| 408 | } | ||
| 409 | ✗ | packId = this->content.publish.packet_identifier; | |
| 410 | ✗ | flags = this->packet_flags; | |
| 411 | ✗ | topic = String(this->content.publish.topic, this->content.publish.topic_len); | |
| 412 | ✗ | data = String(this->content.publish.data, this->content.publish.data_len); | |
| 413 | ✗ | return true; | |
| 414 | } | ||
| 415 | |||
| 416 | ✗ | bool_t MQTT::Message::getPublishAck(uint16_t& id) const | |
| 417 | { | ||
| 418 | ✗ | if (!this->isType(gate_mqtt_packet_publish_ack)) | |
| 419 | { | ||
| 420 | ✗ | return false; | |
| 421 | } | ||
| 422 | ✗ | id = this->content.publish_ack.packet_identifier; | |
| 423 | ✗ | return true; | |
| 424 | } | ||
| 425 | |||
| 426 | ✗ | bool_t MQTT::Message::getPublishReceive(uint16_t& id) const | |
| 427 | { | ||
| 428 | ✗ | if (!this->isType(gate_mqtt_packet_publish_receive)) | |
| 429 | { | ||
| 430 | ✗ | return false; | |
| 431 | } | ||
| 432 | ✗ | id = this->content.publish_received.packet_identifier; | |
| 433 | ✗ | return true; | |
| 434 | } | ||
| 435 | |||
| 436 | ✗ | bool_t MQTT::Message::getPublishRelease(uint16_t& id) const | |
| 437 | { | ||
| 438 | ✗ | if (!this->isType(gate_mqtt_packet_publish_release)) | |
| 439 | { | ||
| 440 | ✗ | return false; | |
| 441 | } | ||
| 442 | ✗ | id = this->content.publish_release.packet_identifier; | |
| 443 | ✗ | return true; | |
| 444 | } | ||
| 445 | |||
| 446 | ✗ | bool_t MQTT::Message::getPublishComplete(uint16_t& id) const | |
| 447 | { | ||
| 448 | ✗ | if (!this->isType(gate_mqtt_packet_publish_complete)) | |
| 449 | { | ||
| 450 | ✗ | return false; | |
| 451 | } | ||
| 452 | ✗ | id = this->content.publish_complete.packet_identifier; | |
| 453 | ✗ | return true; | |
| 454 | } | ||
| 455 | |||
| 456 | ✗ | bool_t MQTT::Message::getSubscribe(uint16_t& id, ArrayList<String>& topics, ArrayList<uint8_t>& qos) const | |
| 457 | { | ||
| 458 | ✗ | if (!this->isType(gate_mqtt_packet_subscribe)) | |
| 459 | { | ||
| 460 | ✗ | return false; | |
| 461 | } | ||
| 462 | ✗ | id = this->content.subscribe.packet_identifier; | |
| 463 | ✗ | for (uint16_t ndx = 0; ndx != this->content.subscribe.topics_count; ++ndx) | |
| 464 | { | ||
| 465 | ✗ | topics.add(String( | |
| 466 | ✗ | this->content.subscribe.topics[ndx].topic, this->content.subscribe.topics[ndx].topic_len)); | |
| 467 | ✗ | qos.add(this->content.subscribe.topics[ndx].qos); | |
| 468 | } | ||
| 469 | ✗ | return true; | |
| 470 | } | ||
| 471 | |||
| 472 | ✗ | bool_t MQTT::Message::getSubscribeAck(uint16_t& id, ArrayList<uint8_t>& retCodes) const | |
| 473 | { | ||
| 474 | ✗ | if (!this->isType(gate_mqtt_packet_subscribe_ack)) | |
| 475 | { | ||
| 476 | ✗ | return false; | |
| 477 | } | ||
| 478 | |||
| 479 | ✗ | id = this->content.subscribe_ack.packet_identifier; | |
| 480 | ✗ | for (uint16_t ndx = 0; ndx != this->content.subscribe_ack.ret_codes_count; ++ndx) | |
| 481 | { | ||
| 482 | ✗ | retCodes.add(this->content.subscribe_ack.ret_codes[ndx]); | |
| 483 | } | ||
| 484 | ✗ | return true; | |
| 485 | } | ||
| 486 | |||
| 487 | ✗ | bool_t MQTT::Message::getUnsubscribe(uint16_t& id, ArrayList<String>& topics) const | |
| 488 | { | ||
| 489 | ✗ | if (!this->isType(gate_mqtt_packet_unsubscribe)) | |
| 490 | { | ||
| 491 | ✗ | return false; | |
| 492 | } | ||
| 493 | ✗ | id = this->content.unsubscribe.packet_identifier; | |
| 494 | ✗ | for (uint16_t ndx = 0; ndx != this->content.unsubscribe.topics_count; ++ndx) | |
| 495 | { | ||
| 496 | ✗ | topics.add(String( | |
| 497 | ✗ | this->content.unsubscribe.topics[ndx].topic, this->content.unsubscribe.topics[ndx].topic_len)); | |
| 498 | } | ||
| 499 | ✗ | return true; | |
| 500 | } | ||
| 501 | |||
| 502 | ✗ | bool_t MQTT::Message::getUnsubscribeAck(uint16_t& id) const | |
| 503 | { | ||
| 504 | ✗ | if (!this->isType(gate_mqtt_packet_unsubscribe_ack)) | |
| 505 | { | ||
| 506 | ✗ | return false; | |
| 507 | } | ||
| 508 | ✗ | id = this->content.unsubscribe_ack.packet_identifier; | |
| 509 | ✗ | return true; | |
| 510 | } | ||
| 511 | |||
| 512 | |||
| 513 | |||
| 514 | |||
| 515 | ✗ | void MQTT::sendMessage(Stream& mqttStream, Message const& msg) | |
| 516 | { | ||
| 517 | ✗ | result_t result = gate_mqtt_send_message(mqttStream.c_impl(), &msg); | |
| 518 | ✗ | GATEXX_CHECK_EXCEPTION(result); | |
| 519 | ✗ | } | |
| 520 | |||
| 521 | ✗ | bool_t MQTT::hasQueuedMessage(Stream& mqttStream) | |
| 522 | { | ||
| 523 | ✗ | result_t result = gate_mqtt_has_queued_message(mqttStream.c_impl()); | |
| 524 | ✗ | if (result == GATE_RESULT_ENDOFSTREAM) | |
| 525 | { | ||
| 526 | ✗ | return false; | |
| 527 | } | ||
| 528 | ✗ | GATEXX_CHECK_EXCEPTION(result); | |
| 529 | ✗ | return true; | |
| 530 | } | ||
| 531 | |||
| 532 | |||
| 533 | ✗ | void MQTT::receiveMessage(Stream& mqttStream, Message& msg) | |
| 534 | { | ||
| 535 | ✗ | gate_memoryblock_t* ptr_block = NULL; | |
| 536 | ✗ | result_t result = gate_mqtt_receive_message(mqttStream.c_impl(), &msg, &ptr_block); | |
| 537 | ✗ | GATEXX_CHECK_EXCEPTION(result); | |
| 538 | ✗ | MemoryBlock newBlock(ptr_block); | |
| 539 | ✗ | msg.attachData(newBlock); | |
| 540 | ✗ | } | |
| 541 | |||
| 542 | |||
| 543 | ✗ | MQTTClient::MQTTClient(Stream& mqttConnection) | |
| 544 | ✗ | : transport(mqttConnection) | |
| 545 | { | ||
| 546 | ✗ | } | |
| 547 | |||
| 548 | ✗ | static Stream createTcpConnection(String const& host, uint16_t port) | |
| 549 | { | ||
| 550 | ✗ | StringBuilder builder; | |
| 551 | ✗ | builder << host << ":" << port; | |
| 552 | ✗ | ControlStream stream = SocketStream::createClient(builder.toString()); | |
| 553 | ✗ | return stream; | |
| 554 | } | ||
| 555 | |||
| 556 | ✗ | MQTTClient::MQTTClient(String const& mqttHost, uint16_t port) | |
| 557 | ✗ | : transport(createTcpConnection(mqttHost, port)) | |
| 558 | { | ||
| 559 | ✗ | } | |
| 560 | |||
| 561 | ✗ | MQTTClient::~MQTTClient() noexcept | |
| 562 | { | ||
| 563 | ✗ | } | |
| 564 | |||
| 565 | ✗ | uint16_t MQTTClient::getNextPacketId() | |
| 566 | { | ||
| 567 | ✗ | int32_t id = ++this->nextPacketId; | |
| 568 | ✗ | return static_cast<uint16_t>(id); | |
| 569 | } | ||
| 570 | |||
| 571 | |||
| 572 | ✗ | void MQTTClient::connect(String const& clientId, String const& user, String const& password, uint16_t timeoutSeconds) | |
| 573 | { | ||
| 574 | ✗ | const MQTT::Message request = MQTT::Message::createConnect(timeoutSeconds, clientId, String(), String(), user, password); | |
| 575 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 576 | |||
| 577 | ✗ | MQTT::Message response; | |
| 578 | ✗ | MQTT::receiveMessage(this->transport, response); | |
| 579 | |||
| 580 | ✗ | if (response.packet_type != gate_mqtt_packet_connect_ack) | |
| 581 | { | ||
| 582 | ✗ | raiseException(results::InvalidHeader, "Invalid response message type"); | |
| 583 | } | ||
| 584 | |||
| 585 | ✗ | if (response.content.connect_ack.return_code != 0) | |
| 586 | { | ||
| 587 | ✗ | raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code); | |
| 588 | } | ||
| 589 | ✗ | } | |
| 590 | |||
| 591 | ✗ | void MQTTClient::disconnect() | |
| 592 | { | ||
| 593 | ✗ | const MQTT::Message request = MQTT::Message::createDisconnect(); | |
| 594 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 595 | ✗ | } | |
| 596 | |||
| 597 | |||
| 598 | ✗ | MQTTClient::PacketId MQTTClient::subscribe(String const& topic) | |
| 599 | { | ||
| 600 | ✗ | const PacketId packetId = this->getNextPacketId(); | |
| 601 | ✗ | const MQTT::Message request = MQTT::Message::createSubscribe(packetId, topic); | |
| 602 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 603 | |||
| 604 | ✗ | MQTT::Message response; | |
| 605 | ✗ | MQTT::receiveMessage(this->transport, response); | |
| 606 | |||
| 607 | ✗ | if (response.packet_type != gate_mqtt_packet_subscribe_ack) | |
| 608 | { | ||
| 609 | ✗ | raiseException(results::InvalidHeader, "Invalid response message type"); | |
| 610 | } | ||
| 611 | ✗ | if (response.content.subscribe_ack.packet_identifier != packetId) | |
| 612 | { | ||
| 613 | ✗ | raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code); | |
| 614 | } | ||
| 615 | ✗ | return packetId; | |
| 616 | } | ||
| 617 | |||
| 618 | ✗ | MQTTClient::PacketId MQTTClient::unsubscribe(String const& topic) | |
| 619 | { | ||
| 620 | ✗ | const PacketId packetId = this->getNextPacketId(); | |
| 621 | ✗ | const MQTT::Message request = MQTT::Message::createUnsubscribe(packetId, topic); | |
| 622 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 623 | |||
| 624 | ✗ | MQTT::Message response; | |
| 625 | ✗ | MQTT::receiveMessage(this->transport, response); | |
| 626 | |||
| 627 | ✗ | if (response.packet_type != gate_mqtt_packet_unsubscribe_ack) | |
| 628 | { | ||
| 629 | ✗ | raiseException(results::InvalidHeader, "Invalid response message type"); | |
| 630 | } | ||
| 631 | ✗ | if (response.content.unsubscribe_ack.packet_identifier != packetId) | |
| 632 | { | ||
| 633 | ✗ | raiseException(results::InvalidData, "Connect failed", NULL, response.content.connect_ack.return_code); | |
| 634 | } | ||
| 635 | ✗ | return packetId; | |
| 636 | } | ||
| 637 | |||
| 638 | ✗ | MQTTClient::PacketId MQTTClient::publish(String const& topic, String const& data, uint8_t flags) | |
| 639 | { | ||
| 640 | ✗ | const PacketId packetId = this->getNextPacketId(); | |
| 641 | ✗ | const MQTT::Message request = MQTT::Message::createPublish(packetId, flags, topic, data); | |
| 642 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 643 | |||
| 644 | ✗ | if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS1)) | |
| 645 | { | ||
| 646 | ✗ | MQTT::Message response; | |
| 647 | ✗ | MQTT::receiveMessage(this->transport, response); | |
| 648 | |||
| 649 | ✗ | if (response.packet_type != gate_mqtt_packet_publish_ack) | |
| 650 | { | ||
| 651 | ✗ | raiseException(results::InvalidHeader, "Invalid response message type, publish_ack expected", NULL, (int32_t)response.packet_type); | |
| 652 | } | ||
| 653 | ✗ | if (response.content.publish_ack.packet_identifier != packetId) | |
| 654 | { | ||
| 655 | ✗ | raiseException(results::InvalidData, "Publish-acknowledge failed", NULL, response.content.publish_ack.packet_identifier); | |
| 656 | } | ||
| 657 | } | ||
| 658 | |||
| 659 | ✗ | if (GATE_FLAG_ENABLED(flags, GATE_MQTT_FLAG_QOS2)) | |
| 660 | { | ||
| 661 | ✗ | MQTT::Message response; | |
| 662 | ✗ | MQTT::receiveMessage(this->transport, response); | |
| 663 | |||
| 664 | ✗ | if (response.packet_type != gate_mqtt_packet_publish_receive) | |
| 665 | { | ||
| 666 | ✗ | raiseException(results::InvalidHeader, "Invalid response message type, publish_rec expected", NULL, (int32_t)response.packet_type); | |
| 667 | } | ||
| 668 | ✗ | if (response.content.publish_received.packet_identifier != packetId) | |
| 669 | { | ||
| 670 | ✗ | raiseException(results::InvalidData, "Publish-received failed", NULL, response.content.publish_received.packet_identifier); | |
| 671 | } | ||
| 672 | |||
| 673 | // send PUBREL msg | ||
| 674 | ✗ | const MQTT::Message request2 = MQTT::Message::createPublishRelease(packetId); | |
| 675 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 676 | |||
| 677 | ✗ | MQTT::Message response2; | |
| 678 | ✗ | MQTT::receiveMessage(this->transport, response2); | |
| 679 | |||
| 680 | ✗ | if (response2.packet_type != gate_mqtt_packet_publish_complete) | |
| 681 | { | ||
| 682 | ✗ | raiseException(results::InvalidHeader, "Invalid response message type, publish_comp expected", NULL, (int32_t)response2.packet_type); | |
| 683 | } | ||
| 684 | ✗ | if (response2.content.publish_complete.packet_identifier != packetId) | |
| 685 | { | ||
| 686 | ✗ | raiseException(results::InvalidData, "Publish-complete failed", NULL, response2.content.publish_complete.packet_identifier); | |
| 687 | } | ||
| 688 | } | ||
| 689 | |||
| 690 | ✗ | return packetId; | |
| 691 | } | ||
| 692 | |||
| 693 | ✗ | void MQTTClient::sendPing() | |
| 694 | { | ||
| 695 | ✗ | MQTT::Message request = MQTT::Message::createPingRequest(); | |
| 696 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 697 | ✗ | } | |
| 698 | |||
| 699 | ✗ | void MQTTClient::sendPingResponse() | |
| 700 | { | ||
| 701 | ✗ | MQTT::Message request = MQTT::Message::createPingResponse(); | |
| 702 | ✗ | MQTT::sendMessage(this->transport, request); | |
| 703 | ✗ | } | |
| 704 | |||
| 705 | |||
| 706 | ✗ | bool_t MQTTClient::hasNextMessage() | |
| 707 | { | ||
| 708 | ✗ | return MQTT::hasQueuedMessage(this->transport); | |
| 709 | } | ||
| 710 | |||
| 711 | ✗ | MQTT::Message MQTTClient::getNextMessage() | |
| 712 | { | ||
| 713 | ✗ | MQTT::Message msg; | |
| 714 | ✗ | MQTT::receiveMessage(this->transport, msg); | |
| 715 | ✗ | return msg; | |
| 716 | } | ||
| 717 | |||
| 718 | |||
| 719 | } // end of namespace net | ||
| 720 | } // end of namespace gate | ||
| 721 |