GCC Code Coverage Report


Directory: src/gate/
File: src/gate/cxx_queues.cpp
Date: 2025-12-12 23:40:09
Exec Total Coverage
Lines: 151 185 81.6%
Functions: 40 46 87.0%
Branches: 34 85 40.0%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28 #include "gate/queues.hpp"
29 #include "gate/results.hpp"
30 #include "gate/exceptions.hpp"
31
32 namespace gate
33 {
34
35 /////////////////////////////////////
36 // //
37 // class ExeQueue implementation //
38 // //
39 /////////////////////////////////////
40
41 1 ExeQueue::ExeQueue(gate_entrypoint_t init_code,
42 gate_entrypoint_t shutdown_code,
43 1 void* code_param)
44 {
45 1 result_t result = gate_exequeue_create(&this->impl, init_code, shutdown_code, code_param);
46
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATEXX_CHECK_ERROR(result);
47 1 }
48 2 ExeQueue::~ExeQueue() noexcept
49 {
50 1 gate_exequeue_destroy(this->impl);
51 1 }
52
53
54 4 void ExeQueue::push(gate_entrypoint_t function, void* parameter, gate_mem_dtor_t parameter_destructor)
55 {
56 4 result_t result = gate_exequeue_push(this->impl, function, parameter, parameter_destructor);
57
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 GATEXX_CHECK_ERROR(result);
58 4 }
59
60 1 static gate_result_t execute_runnable(void* ptr)
61 {
62 1 gate_result_t ret = GATE_RESULT_NULLPOINTER;
63 1 gate_runnable_t* runner = (gate_runnable_t*)ptr;
64
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (runner)
65 {
66 1 ret = gate_runnable_run(runner);
67 }
68 1 return ret;
69 }
70
71 1 static void release_runnable(void* ptr)
72 {
73 1 gate_runnable_t* runner = (gate_runnable_t*)ptr;
74
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (runner)
75 {
76 1 gate_object_release(runner);
77 }
78 1 }
79
80 1 void ExeQueue::push(Runnable const& code)
81 {
82 1 gate_runnable_t* runner = code.c_impl();
83
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (runner)
84 {
85 1 gate_object_retain(runner);
86 1 result_t result = gate_exequeue_push(this->impl, &execute_runnable, (void*)runner, &release_runnable);
87
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result))
88 {
89 gate_object_release(runner);
90 GATEXX_RAISE_ERROR(result);
91 }
92 }
93 1 }
94 1 size_t ExeQueue::itemCount()
95 {
96 1 gate_size_t item_count = 0;
97
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 result_t result = gate_exequeue_itemcount(this->impl, &item_count);
98
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1 GATEXX_CHECK_ERROR(result);
99 1 return item_count;
100 }
101
102
103
104
105
106 //////////////////////////////////////
107 // //
108 // class DataQueue implementation //
109 // //
110 //////////////////////////////////////
111
112 2 DataQueue::DataQueue(gate_dataqueue_t* ptr_queue_to_embed)
113 2 : Startable((gate_startable_t*)ptr_queue_to_embed)
114 {
115 2 }
116 1 DataQueue::DataQueue(DataQueue const& src)
117 1 : Startable(src)
118 {
119 1 }
120 1 DataQueue& DataQueue::operator=(DataQueue const& src)
121 {
122
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (this != &src)
123 {
124 1 Startable::operator=(src);
125 }
126 1 return *this;
127 }
128 3 DataQueue::~DataQueue()
129 {
130 3 }
131
132 14 gate_dataqueue_t* DataQueue::c_impl() const noexcept
133 {
134 14 return (gate_dataqueue_t*)Startable::c_impl();
135 }
136
137 1 void DataQueue::setCallback(gate_dataqueue_status_t statusCallback, void* callbackData)
138 {
139 1 result_t result = gate_dataqueue_set_callback(this->c_impl(), statusCallback, callbackData);
140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATEXX_CHECK_ERROR(result);
141 1 }
142
143 1 void DataQueue::setCallback(DataQueueSink& sink)
144 {
145 1 this->setCallback(&DataQueueSink::queue_callback, &sink);
146 1 }
147
148 3 DataQueue::channel_id_t DataQueue::open(String const& address, enumint_t flags, void* userParam)
149 {
150 3 channel_id_t channelId = 0;
151
1/2
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
3 result_t result = gate_dataqueue_open(this->c_impl(), address.c_impl(), flags, userParam, &channelId);
152
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
3 GATEXX_CHECK_EXCEPTION(result);
153 3 return channelId;
154 }
155 1 void DataQueue::close(channel_id_t channel_id)
156 {
157 1 result_t result = gate_dataqueue_close(this->c_impl(), channel_id);
158
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATEXX_CHECK_EXCEPTION(result);
159 1 }
160 1 void DataQueue::beginRead(channel_id_t channel_id, size_t size, void* userParam)
161 {
162 1 result_t result = gate_dataqueue_begin_read(this->c_impl(), channel_id, size, userParam);
163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATEXX_CHECK_EXCEPTION(result);
164 1 }
165 1 void DataQueue::beginWrite(channel_id_t channel_id, char const* buffer, gate_size_t bufferLen, void* userParam)
166 {
167 1 result_t result = gate_dataqueue_begin_write(this->c_impl(), channel_id, buffer, bufferLen, userParam);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATEXX_CHECK_EXCEPTION(result);
169 1 }
170
171
172 2 DataQueueSink::DataQueueSink()
173 {
174 2 }
175 4 DataQueueSink::~DataQueueSink() noexcept
176 {
177 4 }
178 13 void DataQueueSink::onStatus(uint32_t resultType, channel_id_t channelId, result_t resultCode, char const* buffer, size_t bufferLength, void* userParam)
179 {
180
5/7
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 8 times.
13 switch (resultType)
181 {
182 2 case GATE_DATAQUEUE_RESULT_OPEN:
183 {
184
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (GATE_SUCCEEDED(resultCode))
185 {
186 1 this->onOpen(channelId, buffer, bufferLength, userParam);
187 }
188 else
189 {
190 1 this->onOpenFailed(channelId, resultCode, buffer, bufferLength, userParam);
191 }
192 2 break;
193 }
194 1 case GATE_DATAQUEUE_RESULT_OPENNEW:
195 {
196
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(resultCode))
197 {
198 1 this->onOpenNew(channelId, buffer, bufferLength, userParam);
199 }
200 else
201 {
202 this->onOpenNewFailed(channelId, resultCode, buffer, bufferLength, userParam);
203 }
204 1 break;
205 }
206 case GATE_DATAQUEUE_RESULT_CLOSE:
207 {
208 break;
209 }
210 1 case GATE_DATAQUEUE_RESULT_READ:
211 {
212
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(resultCode))
213 {
214 1 this->onReadCompleted(channelId, buffer, bufferLength, userParam);
215 }
216 else
217 {
218 this->onReadError(channelId, resultCode, userParam);
219 }
220 1 break;
221 }
222 1 case GATE_DATAQUEUE_RESULT_WRITE:
223 {
224
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(resultCode))
225 {
226 1 this->onWriteCompleted(channelId, buffer, bufferLength, userParam);
227 }
228 else
229 {
230 this->onWriteError(channelId, resultCode, userParam);
231 }
232 1 break;
233 }
234 case GATE_DATAQUEUE_RESULT_ERROR:
235 {
236 this->onError(channelId, resultCode, buffer, bufferLength);
237 break;
238 }
239 }
240 13 }
241
242 1 void DataQueueSink::onOpen(channel_id_t channelId, char const* address, size_t addressLength, void* userParam)
243 {
244 GATE_UNUSED_ARG(channelId);
245 GATE_UNUSED_ARG(address);
246 GATE_UNUSED_ARG(addressLength);
247 GATE_UNUSED_ARG(userParam);
248 1 }
249 1 void DataQueueSink::onOpenFailed(channel_id_t channelId, result_t result, char const* errorMsg, size_t errorMsgLength, void* userParam)
250 {
251 GATE_UNUSED_ARG(channelId);
252 GATE_UNUSED_ARG(result);
253 GATE_UNUSED_ARG(errorMsg);
254 GATE_UNUSED_ARG(errorMsgLength);
255 GATE_UNUSED_ARG(userParam);
256 1 }
257 1 void DataQueueSink::onOpenNew(channel_id_t channelId, char const* address, size_t addressLength, void* userParam)
258 {
259 GATE_UNUSED_ARG(channelId);
260 GATE_UNUSED_ARG(address);
261 GATE_UNUSED_ARG(addressLength);
262 GATE_UNUSED_ARG(userParam);
263 1 }
264 1 void DataQueueSink::onOpenNewFailed(channel_id_t channelId, result_t result, char const* errorMsg, size_t errorMsgLength, void* userParam)
265 {
266 GATE_UNUSED_ARG(channelId);
267 GATE_UNUSED_ARG(result);
268 GATE_UNUSED_ARG(errorMsg);
269 GATE_UNUSED_ARG(errorMsgLength);
270 GATE_UNUSED_ARG(userParam);
271 1 }
272
273 1 void DataQueueSink::onReadCompleted(channel_id_t channelId, char const* buffer, size_t bufferLength, void* userParam)
274 {
275 GATE_UNUSED_ARG(channelId);
276 GATE_UNUSED_ARG(buffer);
277 GATE_UNUSED_ARG(bufferLength);
278 GATE_UNUSED_ARG(userParam);
279 1 }
280 1 void DataQueueSink::onWriteCompleted(channel_id_t channelId, char const* buffer, size_t bufferLength, void* userParam)
281 {
282 GATE_UNUSED_ARG(channelId);
283 GATE_UNUSED_ARG(buffer);
284 GATE_UNUSED_ARG(bufferLength);
285 GATE_UNUSED_ARG(userParam);
286 1 }
287
288 1 void DataQueueSink::onReadError(channel_id_t channelId, result_t errorCode, void* userParam)
289 {
290 GATE_UNUSED_ARG(channelId);
291 GATE_UNUSED_ARG(errorCode);
292 GATE_UNUSED_ARG(userParam);
293 1 }
294 1 void DataQueueSink::onWriteError(channel_id_t channelId, result_t errorCode, void* userParam)
295 {
296 GATE_UNUSED_ARG(channelId);
297 GATE_UNUSED_ARG(errorCode);
298 GATE_UNUSED_ARG(userParam);
299 1 }
300
301 1 void DataQueueSink::onError(channel_id_t channelId, result_t errorCode, char const* errorMsg, size_t errorMsgSize)
302 {
303 GATE_UNUSED_ARG(channelId);
304 GATE_UNUSED_ARG(errorCode);
305 GATE_UNUSED_ARG(errorMsg);
306 GATE_UNUSED_ARG(errorMsgSize);
307 1 }
308
309
310 12 void DataQueueSink::queue_callback(void* callbackData, uint32_t statusType, channel_id_t channelId, result_t resultCode,
311 char const* buffer, size_t bufferLength, void* userParam)
312 {
313 12 DataQueueSink* ptrSink = static_cast<DataQueueSink*>(callbackData);
314 12 ptrSink->onStatus(statusType, channelId, resultCode, buffer, bufferLength, userParam);
315 12 }
316
317
318
319
320
321
322 2 MsgQueueSink::~MsgQueueSink()
323 {
324 2 }
325
326
327 1 MsgQueue::MsgQueue(gate_msgqueue_t* ptr_queue)
328 1 : Startable((gate_startable_t*)ptr_queue)
329 {
330 1 }
331 MsgQueue::MsgQueue(MsgQueue const& src)
332 : Startable(src)
333 {
334 }
335 MsgQueue& MsgQueue::operator=(MsgQueue const& src)
336 {
337 Startable::operator=(src);
338 return *this;
339 }
340
341 10 gate_msgqueue_t* MsgQueue::c_impl() const noexcept
342 {
343 10 return (gate_msgqueue_t*)Startable::c_impl();
344 }
345
346
347 1 MsgQueue::target_id_t MsgQueue::addTarget(String const& address)
348 {
349 target_id_t ret;
350
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 result_t result = gate_msgqueue_add_target(this->c_impl(), address.c_impl(), &ret);
351
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1 GATEXX_CHECK_EXCEPTION(result);
352 1 return ret;
353 }
354 String MsgQueue::getTarget(target_id_t targetID)
355 {
356 gate_string_t address;
357 result_t result = gate_msgqueue_get_target(this->c_impl(), targetID, &address);
358 GATEXX_CHECK_EXCEPTION(result);
359 return String::createFrom(address);
360 }
361 MsgQueue::target_id_t MsgQueue::resolveTarget(String const& address)
362 {
363 target_id_t ret;
364 result_t result = gate_msgqueue_resolve_target(this->c_impl(), address.c_impl(), &ret);
365 GATEXX_CHECK_EXCEPTION(result);
366 return ret;
367 }
368 void MsgQueue::removeTarget(target_id_t targetID)
369 {
370 result_t result = gate_msgqueue_remove_target(this->c_impl(), targetID);
371 GATEXX_CHECK_EXCEPTION(result);
372 }
373
374 3 void MsgQueue::publish(target_id_t targetID, uint32_t messageType, void const* data, size_t length)
375 {
376 3 result_t result = gate_msgqueue_publish(this->c_impl(), targetID, messageType, data, length);
377
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 GATEXX_CHECK_EXCEPTION(result);
378 2 }
379 1 MsgQueue::subscription_id_t MsgQueue::subscribe(target_id_t targetID, receiver_callback_t callback, void* user_param)
380 {
381 subscription_id_t ret;
382
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 result_t result = gate_msgqueue_subscribe(this->c_impl(), targetID, callback, user_param, &ret);
383
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1 GATEXX_CHECK_EXCEPTION(result);
384 1 return ret;
385 }
386 2 static void MsgQueue_callback_sink(gate_msgqueue_t* msgqueue, gate_msgqueue_target_id_t target_id, gate_uint32_t message_type, void const* data, gate_size_t length, void* user_param)
387 {
388 2 MsgQueueSink* ptr_sink = (MsgQueueSink*)user_param;
389
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (ptr_sink)
390 {
391 try
392 {
393
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ptr_sink->onMessage(target_id, message_type, data, length);
394 }
395 catch (...) {}
396 }
397 2 }
398 1 MsgQueue::subscription_id_t MsgQueue::subscribe(target_id_t targetID, MsgQueueSink* ptr_sink)
399 {
400 1 subscription_id_t id = this->subscribe(targetID, &MsgQueue_callback_sink, ptr_sink);
401 1 return id;
402 }
403 void MsgQueue::unsubscribe(subscription_id_t subscriptionID)
404 {
405 result_t result = gate_msgqueue_unsubscribe(this->c_impl(), subscriptionID);
406 GATEXX_CHECK_EXCEPTION(result);
407 }
408
409
410 1 MsgQueue MemoryMsgQueue::create()
411 {
412 1 gate_msgqueue_t* ptr_queue = gate_memory_msgqueue_create();
413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (NULL == ptr_queue)
414 {
415 GATEXX_RAISE_ERROR(results::OutOfMemory);
416 }
417 1 MsgQueue queue(ptr_queue);
418 1 return queue;
419 }
420
421
422
423 } // end of namespace gate
424