GCC Code Coverage Report


Directory: src/gate/
File: src/gate/cxx_queues.cpp
Date: 2025-09-14 13:10:38
Exec Total Coverage
Lines: 49 177 27.7%
Functions: 13 44 29.5%
Branches: 13 82 15.9%

Line Branch Exec Source
1 #include "gate/queues.hpp"
2 #include "gate/results.hpp"
3 #include "gate/exceptions.hpp"
4
5 namespace gate
6 {
7
8 /////////////////////////////////////
9 // //
10 // class ExeQueue implementation //
11 // //
12 /////////////////////////////////////
13
14 1 ExeQueue::ExeQueue(gate_entrypoint_t init_code,
15 gate_entrypoint_t shutdown_code,
16 1 void* code_param)
17 {
18 1 result_t result = gate_exequeue_create(&this->impl, init_code, shutdown_code, code_param);
19
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATEXX_CHECK_ERROR(result);
20 1 }
21 2 ExeQueue::~ExeQueue() noexcept
22 {
23 1 gate_exequeue_destroy(this->impl);
24 1 }
25
26
27 5 void ExeQueue::push(gate_entrypoint_t function, void* parameter, gate_mem_dtor_t parameter_destructor)
28 {
29 5 result_t result = gate_exequeue_push(this->impl, function, parameter, parameter_destructor);
30
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 GATEXX_CHECK_ERROR(result);
31 5 }
32
33 static gate_result_t execute_runnable(void* ptr)
34 {
35 gate_result_t ret = GATE_RESULT_NULLPOINTER;
36 gate_runnable_t* runner = (gate_runnable_t*)ptr;
37 if (runner)
38 {
39 ret = gate_runnable_run(runner);
40 gate_object_release(runner);
41 }
42 return ret;
43 }
44
45 static void release_runnable(void* ptr)
46 {
47 gate_runnable_t* runner = (gate_runnable_t*)ptr;
48 if (runner)
49 {
50 gate_object_release(runner);
51 }
52 }
53
54 void ExeQueue::push(Runnable const& code)
55 {
56 gate_runnable_t* runner = code.c_impl();
57 if (runner)
58 {
59 gate_object_retain(runner);
60 result_t result = gate_exequeue_push(this->impl, &execute_runnable, (void*)runner, &release_runnable);
61 if (GATE_FAILED(result))
62 {
63 gate_object_release(runner);
64 GATEXX_RAISE_ERROR(result);
65 }
66 }
67 }
68 1 size_t ExeQueue::itemCount()
69 {
70 1 gate_size_t item_count = 0;
71
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 result_t result = gate_exequeue_itemcount(this->impl, &item_count);
72
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1 GATEXX_CHECK_ERROR(result);
73 1 return item_count;
74 }
75
76
77
78
79
80 //////////////////////////////////////
81 // //
82 // class DataQueue implementation //
83 // //
84 //////////////////////////////////////
85
86 DataQueue::DataQueue(gate_dataqueue_t* ptr_queue_to_embed)
87 : Startable((gate_startable_t*)ptr_queue_to_embed)
88 {
89 }
90 DataQueue::DataQueue(DataQueue const& src)
91 : Startable(src)
92 {
93 }
94 DataQueue& DataQueue::operator=(DataQueue const& src)
95 {
96 if (this != &src)
97 {
98 Startable::operator=(src);
99 }
100 return *this;
101 }
102 DataQueue::~DataQueue()
103 {
104 }
105
106 gate_dataqueue_t* DataQueue::c_impl() const noexcept
107 {
108 return (gate_dataqueue_t*)Startable::c_impl();
109 }
110
111 void DataQueue::setCallback(gate_dataqueue_status_t statusCallback, void* callbackData)
112 {
113 result_t result = gate_dataqueue_set_callback(this->c_impl(), statusCallback, callbackData);
114 GATEXX_CHECK_ERROR(result);
115 }
116
117 void DataQueue::setCallback(DataQueueSink& sink)
118 {
119 this->setCallback(&DataQueueSink::queue_callback, &sink);
120 }
121
122 DataQueue::channel_id_t DataQueue::open(String const& address, enumint_t flags, void* userParam)
123 {
124 channel_id_t channelId = 0;
125 result_t result = gate_dataqueue_open(this->c_impl(), address.c_impl(), flags, userParam, &channelId);
126 GATEXX_CHECK_EXCEPTION(result);
127 return channelId;
128 }
129 void DataQueue::close(channel_id_t channel_id)
130 {
131 result_t result = gate_dataqueue_close(this->c_impl(), channel_id);
132 GATEXX_CHECK_EXCEPTION(result);
133 }
134 void DataQueue::beginRead(channel_id_t channel_id, size_t size, void* userParam)
135 {
136 result_t result = gate_dataqueue_begin_read(this->c_impl(), channel_id, size, userParam);
137 GATEXX_CHECK_EXCEPTION(result);
138 }
139 void DataQueue::beginWrite(channel_id_t channel_id, char const* buffer, gate_size_t bufferLen, void* userParam)
140 {
141 result_t result = gate_dataqueue_begin_write(this->c_impl(), channel_id, buffer, bufferLen, userParam);
142 GATEXX_CHECK_EXCEPTION(result);
143 }
144
145
146 DataQueueSink::DataQueueSink()
147 {
148 }
149 DataQueueSink::~DataQueueSink()
150 {
151 }
152 void DataQueueSink::onStatus(uint32_t resultType, channel_id_t channelId, result_t resultCode, char const* buffer, size_t bufferLength, void* userParam)
153 {
154 switch (resultType)
155 {
156 case GATE_DATAQUEUE_RESULT_OPEN:
157 {
158 if (GATE_SUCCEEDED(resultCode))
159 {
160 this->onOpen(channelId, buffer, bufferLength, userParam);
161 }
162 else
163 {
164 this->onOpenFailed(channelId, resultCode, buffer, bufferLength, userParam);
165 }
166 break;
167 }
168 case GATE_DATAQUEUE_RESULT_CLOSE:
169 {
170 break;
171 }
172 case GATE_DATAQUEUE_RESULT_READ:
173 {
174 if (GATE_SUCCEEDED(resultCode))
175 {
176 this->onReadCompleted(channelId, buffer, bufferLength, userParam);
177 }
178 else
179 {
180 this->onReadError(channelId, resultCode, userParam);
181 }
182 break;
183 }
184 case GATE_DATAQUEUE_RESULT_WRITE:
185 {
186 if (GATE_SUCCEEDED(resultCode))
187 {
188 this->onWriteCompleted(channelId, buffer, bufferLength, userParam);
189 }
190 else
191 {
192 this->onWriteError(channelId, resultCode, userParam);
193 }
194 break;
195 }
196 case GATE_DATAQUEUE_RESULT_ERROR:
197 {
198 this->onError(channelId, resultCode, buffer, bufferLength);
199 break;
200 }
201 }
202 }
203
204 void DataQueueSink::onOpen(channel_id_t channelId, char const* address, size_t addressLength, void* userParam)
205 {
206 GATE_UNUSED_ARG(channelId);
207 GATE_UNUSED_ARG(address);
208 GATE_UNUSED_ARG(addressLength);
209 GATE_UNUSED_ARG(userParam);
210 }
211 void DataQueueSink::onOpenFailed(channel_id_t channelId, result_t result, char const* errorMsg, size_t errorMsgLength, void* userParam)
212 {
213 GATE_UNUSED_ARG(channelId);
214 GATE_UNUSED_ARG(result);
215 GATE_UNUSED_ARG(errorMsg);
216 GATE_UNUSED_ARG(errorMsgLength);
217 GATE_UNUSED_ARG(userParam);
218 }
219
220 void DataQueueSink::onReadCompleted(channel_id_t channelId, char const* buffer, size_t bufferLength, void* userParam)
221 {
222 GATE_UNUSED_ARG(channelId);
223 GATE_UNUSED_ARG(buffer);
224 GATE_UNUSED_ARG(bufferLength);
225 GATE_UNUSED_ARG(userParam);
226 }
227 void DataQueueSink::onWriteCompleted(channel_id_t channelId, char const* buffer, size_t bufferLength, void* userParam)
228 {
229 GATE_UNUSED_ARG(channelId);
230 GATE_UNUSED_ARG(buffer);
231 GATE_UNUSED_ARG(bufferLength);
232 GATE_UNUSED_ARG(userParam);
233 }
234
235 void DataQueueSink::onReadError(channel_id_t channelId, result_t errorCode, void* userParam)
236 {
237 GATE_UNUSED_ARG(channelId);
238 GATE_UNUSED_ARG(errorCode);
239 GATE_UNUSED_ARG(userParam);
240 }
241 void DataQueueSink::onWriteError(channel_id_t channelId, result_t errorCode, void* userParam)
242 {
243 GATE_UNUSED_ARG(channelId);
244 GATE_UNUSED_ARG(errorCode);
245 GATE_UNUSED_ARG(userParam);
246 }
247
248 void DataQueueSink::onError(channel_id_t channelId, result_t errorCode, char const* errorMsg, size_t errorMsgSize)
249 {
250 GATE_UNUSED_ARG(channelId);
251 GATE_UNUSED_ARG(errorCode);
252 GATE_UNUSED_ARG(errorMsg);
253 GATE_UNUSED_ARG(errorMsgSize);
254 }
255
256
257 void DataQueueSink::queue_callback(void* callbackData, uint32_t statusType, channel_id_t channelId, result_t resultCode,
258 char const* buffer, size_t bufferLength, void* userParam)
259 {
260 DataQueueSink* ptrSink = static_cast<DataQueueSink*>(callbackData);
261 ptrSink->onStatus(statusType, channelId, resultCode, buffer, bufferLength, userParam);
262 }
263
264
265
266
267
268
269 2 MsgQueueSink::~MsgQueueSink()
270 {
271 2 }
272
273
274 1 MsgQueue::MsgQueue(gate_msgqueue_t* ptr_queue)
275 1 : Startable((gate_startable_t*)ptr_queue)
276 {
277 1 }
278 MsgQueue::MsgQueue(MsgQueue const& src)
279 : Startable(src)
280 {
281 }
282 MsgQueue& MsgQueue::operator=(MsgQueue const& src)
283 {
284 Startable::operator=(src);
285 return *this;
286 }
287
288 10 gate_msgqueue_t* MsgQueue::c_impl() const noexcept
289 {
290 10 return (gate_msgqueue_t*)Startable::c_impl();
291 }
292
293
294 1 MsgQueue::target_id_t MsgQueue::addTarget(String const& address)
295 {
296 target_id_t ret;
297
1/2
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 result_t result = gate_msgqueue_add_target(this->c_impl(), address.c_impl(), &ret);
298
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1 GATEXX_CHECK_EXCEPTION(result);
299 1 return ret;
300 }
301 String MsgQueue::getTarget(target_id_t targetID)
302 {
303 gate_string_t address;
304 result_t result = gate_msgqueue_get_target(this->c_impl(), targetID, &address);
305 GATEXX_CHECK_EXCEPTION(result);
306 return String::createFrom(address);
307 }
308 MsgQueue::target_id_t MsgQueue::resolveTarget(String const& address)
309 {
310 target_id_t ret;
311 result_t result = gate_msgqueue_resolve_target(this->c_impl(), address.c_impl(), &ret);
312 GATEXX_CHECK_EXCEPTION(result);
313 return ret;
314 }
315 void MsgQueue::removeTarget(target_id_t targetID)
316 {
317 result_t result = gate_msgqueue_remove_target(this->c_impl(), targetID);
318 GATEXX_CHECK_EXCEPTION(result);
319 }
320
321 3 void MsgQueue::publish(target_id_t targetID, uint32_t messageType, void const* data, size_t length)
322 {
323 3 result_t result = gate_msgqueue_publish(this->c_impl(), targetID, messageType, data, length);
324
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 GATEXX_CHECK_EXCEPTION(result);
325 2 }
326 1 MsgQueue::subscription_id_t MsgQueue::subscribe(target_id_t targetID, receiver_callback_t callback, void* user_param)
327 {
328 subscription_id_t ret;
329
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 result_t result = gate_msgqueue_subscribe(this->c_impl(), targetID, callback, user_param, &ret);
330
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1 GATEXX_CHECK_EXCEPTION(result);
331 1 return ret;
332 }
333 2 static void MsgQueue_callback_sink(gate_msgqueue_t* msgqueue, gate_msgqueue_target_id_t target_id, gate_uint32_t message_type, void const* data, gate_size_t length, void* user_param)
334 {
335 2 MsgQueueSink* ptr_sink = (MsgQueueSink*)user_param;
336
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (ptr_sink)
337 {
338 try
339 {
340
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ptr_sink->onMessage(target_id, message_type, data, length);
341 }
342 catch (...) {}
343 }
344 2 }
345 1 MsgQueue::subscription_id_t MsgQueue::subscribe(target_id_t targetID, MsgQueueSink* ptr_sink)
346 {
347 1 subscription_id_t id = this->subscribe(targetID, &MsgQueue_callback_sink, ptr_sink);
348 1 return id;
349 }
350 void MsgQueue::unsubscribe(subscription_id_t subscriptionID)
351 {
352 result_t result = gate_msgqueue_unsubscribe(this->c_impl(), subscriptionID);
353 GATEXX_CHECK_EXCEPTION(result);
354 }
355
356
357 1 MsgQueue MemoryMsgQueue::create()
358 {
359 1 gate_msgqueue_t* ptr_queue = gate_memory_msgqueue_create();
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (NULL == ptr_queue)
361 {
362 GATEXX_RAISE_ERROR(results::OutOfMemory);
363 }
364 1 MsgQueue queue(ptr_queue);
365 1 return queue;
366 }
367
368
369
370 } // end of namespace gate
371