GCC Code Coverage Report


Directory: src/gate/
File: src/gate/queues.c
Date: 2025-09-14 13:10:38
Exec Total Coverage
Lines: 223 404 55.2%
Functions: 23 35 65.7%
Branches: 65 172 37.8%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28
29 #include "gate/queues.h"
30 #include "gate/threading.h"
31 #include "gate/synchronization.h"
32 #include "gate/results.h"
33 #include "gate/atomics.h"
34 #include "gate/arrays.h"
35 #include "gate/hashes.h"
36 #include "gate/maps.h"
37 #include "gate/comparers.h"
38
39
40 /**************************************
41 ** queueitembuffer implementation **
42 **************************************/
43
44 struct gate_queueitembuffer_class
45 {
46 gate_size_t max_length;
47 gate_mutex_t mutex;
48 gate_synccondition_t cond;
49 gate_arraylist_t buffer;
50 };
51
52 gate_queueitembuffer_t gate_queueitembuffer_create(gate_size_t max_length, gate_size_t workitem_size,
53 gate_mem_copyctor_t workitem_cctor, gate_mem_dtor_t workitem_dtor)
54 {
55 gate_queueitembuffer_t ret = NULL;
56 gate_queueitembuffer_t impl = NULL;
57 gate_result_t result;
58 do
59 {
60 impl = (gate_queueitembuffer_t)gate_mem_alloc(sizeof(struct gate_queueitembuffer_class));
61 if (!impl)
62 {
63 break;
64 }
65 gate_mem_clear(impl, sizeof(struct gate_queueitembuffer_class));
66 impl->max_length = max_length;
67 result = gate_mutex_create(&impl->mutex);
68 if (GATE_FAILED(result))
69 {
70 break;
71 }
72 result = gate_synccondition_create(&impl->cond);
73 if (GATE_FAILED(result))
74 {
75 gate_mutex_destroy(&impl->mutex);
76 break;
77 }
78 impl->buffer = gate_arraylist_create(workitem_size, NULL, 0, workitem_cctor, workitem_dtor);
79 if (impl->buffer == NULL)
80 {
81 gate_synccondition_destroy(&impl->cond);
82 gate_mutex_destroy(&impl->mutex);
83 break;
84 }
85
86 ret = impl;
87 impl = NULL;
88 } while (0);
89
90 if (impl != NULL)
91 {
92 gate_mem_dealloc(impl);
93 }
94
95 return ret;
96 }
97
98 gate_result_t gate_queueitembuffer_destroy(gate_queueitembuffer_t buffer)
99 {
100 if (buffer)
101 {
102 gate_mutex_destroy(&buffer->mutex);
103 gate_synccondition_destroy(&buffer->cond);
104 gate_arraylist_release(buffer->buffer);
105 gate_mem_clear(buffer, sizeof(struct gate_queueitembuffer_class));
106 }
107 return GATE_RESULT_OK;
108 }
109
110 gate_size_t gate_queueitembuffer_length(gate_queueitembuffer_t buffer)
111 {
112 gate_size_t ret = 0;
113 gate_result_t result;
114 do
115 {
116 result = gate_mutex_acquire(&buffer->mutex);
117 GATE_BREAK_IF_FAILED(result);
118 ret = gate_arraylist_length(buffer->buffer);
119 gate_mutex_release(&buffer->mutex);
120 } while (0);
121 return ret;
122 }
123
124 gate_result_t gate_queueitembuffer_push(gate_queueitembuffer_t buffer, void const* item_input)
125 {
126 gate_result_t ret = GATE_RESULT_FAILED;
127 gate_size_t curlen;
128 do
129 {
130 ret = gate_mutex_acquire(&buffer->mutex);
131 GATE_BREAK_IF_FAILED(ret);
132
133 curlen = gate_arraylist_length(buffer->buffer);
134 if (curlen >= buffer->max_length)
135 {
136 ret = GATE_RESULT_BUFFERTOOSMALL;
137 }
138 else if (NULL == gate_arraylist_add(buffer->buffer, item_input))
139 {
140 ret = GATE_RESULT_OUTOFMEMORY;
141 }
142 else
143 {
144 ret = GATE_RESULT_OK;
145 }
146
147 gate_mutex_release(&buffer->mutex);
148
149 if (GATE_SUCCEEDED(ret))
150 {
151 gate_synccondition_signal_all(&buffer->cond);
152 }
153 } while (0);
154 return ret;
155 }
156 gate_result_t gate_queueitembuffer_pop(gate_queueitembuffer_t buffer, gate_uint32_t timeout_ms, void* item_output, gate_size_t item_size)
157 {
158 gate_result_t ret = GATE_RESULT_FAILED;
159 gate_size_t sz;
160
161 do
162 {
163 ret = gate_mutex_acquire(&buffer->mutex);
164 GATE_BREAK_IF_FAILED(ret);
165 do
166 {
167 if (timeout_ms != 0)
168 {
169 if (gate_arraylist_length(buffer->buffer) == 0)
170 {
171 if (timeout_ms == GATE_QUEUEITEMBUFFER_WAIT_INFINITE)
172 {
173 ret = gate_synccondition_wait(&buffer->cond, &buffer->mutex);
174 }
175 else
176 {
177 ret = gate_synccondition_timed_wait(&buffer->cond, &buffer->mutex, timeout_ms);
178 }
179 }
180 GATE_BREAK_IF_FAILED(ret);
181 }
182
183 if (gate_arraylist_length(buffer->buffer) == 0)
184 {
185 ret = GATE_RESULT_TIMEOUT;
186 }
187 else
188 {
189 sz = gate_arraylist_get_value(buffer->buffer, 0, item_output, item_size);
190 if (sz == 0)
191 {
192 ret = GATE_RESULT_OUTOFMEMORY;
193 }
194 else
195 {
196 gate_arraylist_remove(buffer->buffer, 0, 1);
197 ret = GATE_RESULT_OK;
198 }
199 }
200 } while (0);
201 gate_mutex_release(&buffer->mutex);
202 } while (0);
203 return ret;
204 }
205
206
207
208 28 int gate_compare_channel_id(void const* item1, void const* item2)
209 {
210 28 gate_channel_id_t const* ch1 = (gate_channel_id_t const*)item1;
211 28 gate_channel_id_t const* ch2 = (gate_channel_id_t const*)item2;
212
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 15 times.
28 if (*ch1 == *ch2)
213 {
214 13 return 0;
215 }
216
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 else if (*ch1 < *ch2)
217 {
218 return -1;
219 }
220 else
221 {
222 15 return +1;
223 }
224 }
225
226
227
228
229 /*******************************
230 ** exequeue implementation **
231 *******************************/
232
233 typedef struct gate_exequeue_item_class
234 {
235 gate_entrypoint_t func;
236 void* param;
237 gate_mem_dtor_t param_destructor;
238
239 } gate_exequeue_item_t;
240
241
242
243 typedef struct gate_exequeue_impl_class
244 {
245 GATE_INTERFACE_VTBL(gate_runnable) const* vtbl;
246 gate_atomic_int_t ref_counter;
247 gate_thread_t threadhandle;
248 gate_syncevent_t syncevent;
249 gate_mutex_t mutex;
250 gate_arraylist_t items;
251 gate_atomic_flag_t running;
252 gate_entrypoint_t thread_init;
253 gate_entrypoint_t thread_shutdown;
254 void* thread_param;
255 } gate_exequeue_impl_t;
256
257 3 static void gate_exequeue_impl_release(void* disp)
258 {
259 3 gate_exequeue_impl_t* ptr = (gate_exequeue_impl_t*)disp;
260
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
3 if (gate_atomic_int_dec(&ptr->ref_counter) == 0)
261 {
262 1 gate_bool_t is_current = false;
263 1 gate_result_t result = gate_thread_is_current(&ptr->threadhandle, &is_current);
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result))
265 {
266 /* assume worst case: we are releasing the object inside the worker thread itself */
267 is_current = true;
268 }
269
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (is_current)
270 {
271 /* detaching the thread handle is the only action we can perform */
272 1 gate_thread_detach(&ptr->threadhandle);
273 }
274 else
275 {
276 /* we are in the main or another controlling thread -> await worker shutdown*/
277 gate_thread_join(&ptr->threadhandle, &result);
278 }
279
280 1 gate_mutex_destroy(&ptr->mutex);
281 1 gate_syncevent_destroy(&ptr->syncevent);
282 1 gate_arraylist_release(ptr->items);
283
284 1 gate_mem_dealloc(ptr);
285 }
286 3 }
287 1 static int gate_exequeue_impl_retain(void* disp)
288 {
289 1 gate_exequeue_impl_t* ptr = (gate_exequeue_impl_t*)disp;
290 1 return (int)gate_atomic_int_inc(&ptr->ref_counter);
291 }
292 static char const* gate_exequeue_impl_get_interface_name(void* disp)
293 {
294 (void)disp;
295 return GATE_INTERFACE_NAME_RUNNABLE;
296 }
297
298 1 static gate_result_t gate_exequeue_impl_run(void* disp)
299 {
300 1 gate_exequeue_impl_t* ptr = (gate_exequeue_impl_t*)disp;
301 1 gate_result_t ret = GATE_RESULT_OK;
302 1 gate_bool_t has_next_item = false;
303 1 gate_exequeue_item_t item = GATE_INIT_EMPTY;
304
305
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (ptr->thread_init)
306 {
307 1 ptr->thread_init(ptr->thread_param);
308 }
309
310
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 1 times.
8 while (gate_atomic_flag_set(&ptr->running) == true)
311 {
312 7 ret = gate_mutex_acquire(&ptr->mutex);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (GATE_FAILED(ret))
314 {
315 break;
316 }
317
318 7 has_next_item = (0 != gate_arraylist_length(ptr->items));
319
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 2 times.
7 if (has_next_item)
320 {
321 5 item = *((gate_exequeue_item_t*)gate_arraylist_get(ptr->items, 0));
322 5 gate_arraylist_remove(ptr->items, 0, 1);
323 }
324
325 7 gate_mutex_release(&ptr->mutex);
326
327
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 2 times.
7 if (has_next_item)
328 {
329
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if (item.func != NULL)
330 {
331 5 item.func(item.param);
332 }
333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (item.param_destructor != NULL)
334 {
335 item.param_destructor(item.param);
336 }
337 }
338 else
339 {
340 2 ret = gate_syncevent_wait(&ptr->syncevent);
341
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (GATE_FAILED(ret))
342 {
343 break;
344 }
345 }
346 }
347
348
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (ptr->thread_shutdown)
349 {
350 1 ptr->thread_shutdown(ptr->thread_param);
351 }
352
353 1 gate_object_release(ptr);
354
355 1 return ret;
356 }
357
358 static GATE_INTERFACE_VTBL(gate_runnable) gate_exequeue_impl_vtbl;
359 1 static void gate_init_exequeue_impl_vtbl()
360 {
361
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!gate_exequeue_impl_vtbl.get_interface_name)
362 {
363 GATE_INTERFACE_VTBL(gate_runnable) const local_vtbl =
364 {
365 &gate_exequeue_impl_get_interface_name,
366 &gate_exequeue_impl_release,
367 &gate_exequeue_impl_retain,
368
369 &gate_exequeue_impl_run
370 };
371
372 1 gate_exequeue_impl_vtbl = local_vtbl;
373 }
374 1 }
375
376
377 1 gate_result_t gate_exequeue_create(gate_exequeue_t* queue,
378 gate_entrypoint_t init_task,
379 gate_entrypoint_t shutdown_task,
380 void* task_param)
381 {
382 gate_result_t ret;
383 1 gate_exequeue_impl_t* impl = (gate_exequeue_impl_t*)gate_mem_alloc(sizeof(gate_exequeue_impl_t));
384 do
385 {
386
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (impl == NULL)
387 {
388 ret = GATE_RESULT_OUTOFMEMORY;
389 break;
390 }
391 1 gate_init_exequeue_impl_vtbl();
392 1 impl->vtbl = &gate_exequeue_impl_vtbl;
393 1 gate_atomic_flag_set(&impl->running);
394 1 gate_atomic_int_init(&impl->ref_counter, 2);
395
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (GATE_FAILED(ret = gate_syncevent_create(&impl->syncevent, true)))
396 {
397 gate_mem_dealloc(impl);
398 break;
399 }
400
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (GATE_FAILED(ret = gate_mutex_create(&impl->mutex)))
401 {
402 gate_syncevent_destroy(&impl->syncevent);
403 gate_mem_dealloc(impl);
404 break;
405 }
406 1 impl->items = gate_arraylist_create(sizeof(gate_exequeue_item_t), NULL, 0, NULL, NULL);
407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (impl->items == NULL)
408 {
409 gate_mutex_destroy(&impl->mutex);
410 gate_syncevent_destroy(&impl->syncevent);
411 gate_mem_dealloc(impl);
412 ret = GATE_RESULT_OUTOFMEMORY;
413 break;
414 }
415 1 impl->thread_init = init_task;
416 1 impl->thread_shutdown = shutdown_task;
417 1 impl->thread_param = task_param;
418 1 ret = gate_thread_start((gate_runnable_t*)impl, &impl->threadhandle, NULL);
419
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
420 {
421 gate_arraylist_release(impl->items);
422 gate_mutex_destroy(&impl->mutex);
423 gate_syncevent_destroy(&impl->syncevent);
424 gate_mem_dealloc(impl);
425 break;
426 }
427 1 *queue = impl;
428 } while (0);
429
430 1 return ret;
431 }
432 1 gate_result_t gate_exequeue_destroy(gate_exequeue_t queue)
433 {
434 1 gate_exequeue_impl_t* impl = (gate_exequeue_impl_t*)queue;
435 gate_result_t ret;
436 1 gate_atomic_flag_clear(&impl->running);
437 1 ret = gate_syncevent_set(&impl->syncevent);
438
439 1 gate_object_release(impl);
440 1 return ret;
441 }
442 5 gate_result_t gate_exequeue_push(gate_exequeue_t queue, gate_entrypoint_t functions, void* parameter, gate_mem_dtor_t paramdestructor)
443 {
444 5 gate_exequeue_impl_t* impl = (gate_exequeue_impl_t*)queue;
445 gate_result_t ret;
446 gate_exequeue_item_t item;
447 5 item.func = functions;
448 5 item.param = parameter;
449 5 item.param_destructor = paramdestructor;
450
451 5 ret = gate_mutex_acquire(&impl->mutex);
452
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if (GATE_SUCCEEDED(ret))
453 {
454
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (NULL == gate_arraylist_add(impl->items, &item))
455 {
456 ret = GATE_RESULT_OUTOFMEMORY;
457 }
458
459 5 gate_mutex_release(&impl->mutex);
460 }
461
462
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if (GATE_SUCCEEDED(ret))
463 {
464 5 ret = gate_syncevent_set(&impl->syncevent);
465 }
466
467 5 return ret;
468 }
469 1 gate_result_t gate_exequeue_itemcount(gate_exequeue_t queue, gate_size_t* items)
470 {
471 1 gate_exequeue_impl_t* impl = (gate_exequeue_impl_t*)queue;
472 gate_result_t ret;
473
474 1 ret = gate_mutex_acquire(&impl->mutex);
475
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(ret))
476 {
477
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (items != NULL)
478 {
479 1 *items = gate_arraylist_length(impl->items);
480 }
481 1 gate_mutex_release(&impl->mutex);
482 }
483
484 1 return ret;
485 }
486
487
488 /*******************************
489 ** msgqueue implementation **
490 *******************************/
491
492
493 typedef struct gate_msgqueue_subscription_class
494 {
495 gate_msgqueue_receiver_t receiver;
496 void* user_param;
497 } gate_msgqueue_subscription_t;
498
499 typedef struct gate_msgqueue_target_class
500 {
501 gate_string_t address;
502 gate_map_t subscriptions; /* maps: gate_msgqueue_subscriber_t -> gate_msgqueue_subscription_t */
503
504 } gate_msgqueue_target_t;
505
506 2 static void msgqueue_target_dtor(void* dst)
507 {
508 2 gate_msgqueue_target_t* ptr = (gate_msgqueue_target_t*)dst;
509 2 gate_string_release(&ptr->address);
510 2 gate_map_destroy(&ptr->subscriptions);
511 2 }
512
513 2 static gate_result_t msgqueue_target_cctor(void* dst, void const* src)
514 {
515 2 gate_result_t ret = GATE_RESULT_OUTOFMEMORY;
516 2 gate_msgqueue_target_t* ptr_dst = (gate_msgqueue_target_t*)dst;
517 2 gate_msgqueue_target_t const* ptr_src = (gate_msgqueue_target_t const*)src;
518 do
519 {
520 2 gate_mem_clear(ptr_dst, sizeof(gate_msgqueue_target_t));
521
522
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (NULL == src)
523 {
524 1 gate_string_create_empty(&ptr_dst->address);
525 1 gate_map_create(&ptr_dst->subscriptions, gate_compare_uintptr,
526 sizeof(gate_msgqueue_subscription_id_t), NULL, NULL,
527 sizeof(gate_msgqueue_subscription_t), NULL, NULL);
528 }
529 else
530 {
531
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_string_clone(&ptr_dst->address, &ptr_src->address))
532 {
533 break;
534 }
535
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_map_copy(&ptr_dst->subscriptions, &ptr_src->subscriptions))
536 {
537 break;
538 }
539 }
540 2 ret = GATE_RESULT_OK;
541 } while (0);
542
543
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (GATE_FAILED(ret))
544 {
545 msgqueue_target_dtor(ptr_dst);
546 }
547 2 return ret;
548 }
549
550 typedef struct gate_msgqueue_impl
551 {
552 GATE_INTERFACE_VTBL(gate_msgqueue)* vtbl;
553
554 gate_atomic_int_t ref_counter;
555 gate_atomic_int_t state;
556 gate_atomic_int_t next_subscriber_id;
557 gate_mutex_t mutex;
558 gate_map_t targets; /* maps: gate_msgqueue_target_id_t -> gate_msgqueue_target_t */
559 } gate_msgqueue_impl_t;
560
561 1 static gate_result_t msgqueue_init(gate_msgqueue_impl_t* self)
562 {
563 gate_result_t ret;
564 do
565 {
566 1 gate_atomic_int_init(&self->ref_counter, 1);
567 1 gate_atomic_int_init(&self->state, GATE_MSGQUEUE_STATUS_OFFLINE);
568 1 gate_atomic_int_init(&self->next_subscriber_id, 0);
569
570 1 ret = gate_mutex_create(&self->mutex);
571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
572
573
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_map_create(&self->targets, gate_compare_uintptr,
574 sizeof(gate_msgqueue_target_id_t), NULL, NULL,
575 sizeof(gate_msgqueue_target_t), &msgqueue_target_cctor, &msgqueue_target_dtor))
576 {
577 ret = GATE_RESULT_OUTOFMEMORY;
578 break;
579 }
580
581 } while (0);
582 1 return ret;
583 }
584
585 1 static void msgqueue_destroy(gate_msgqueue_impl_t* self)
586 {
587 1 gate_mutex_destroy(&self->mutex);
588 1 gate_map_destroy(&self->targets);
589 1 }
590
591 static char const* msgqueue_get_interface_name(void* obj)
592 {
593 (void)obj;
594 return GATE_INTERFACE_NAME_MSGQUEUE;
595 }
596 1 static void msgqueue_release(void* obj)
597 {
598 1 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
599
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 if (0 == gate_atomic_int_dec(&self->ref_counter))
600 {
601 1 msgqueue_destroy(self);
602 1 gate_mem_dealloc(self);
603 }
604 1 }
605 static int msgqueue_retain(void* obj)
606 {
607 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
608 return gate_atomic_int_inc(&self->ref_counter);
609 }
610
611 5 static gate_bool_t update_state(gate_atomic_int_t* atom, gate_int32_t from, gate_int32_t to)
612 {
613 5 return from == gate_atomic_int_xchg_if(atom, from, to);
614 }
615
616 1 static gate_result_t msgqueue_start(void* obj)
617 {
618 1 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
619
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (!update_state(&self->state, GATE_MSGQUEUE_STATUS_OFFLINE, GATE_MSGQUEUE_STATUS_STARTING))
620 {
621 return GATE_RESULT_INVALIDSTATE;
622 }
623
624 1 gate_atomic_int_set(&self->state, GATE_MSGQUEUE_STATUS_ONLINE);
625 1 return GATE_RESULT_OK;
626 }
627 1 static gate_result_t msgqueue_stop(void* obj)
628 {
629 1 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
630
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (!update_state(&self->state, GATE_MSGQUEUE_STATUS_ONLINE, GATE_MSGQUEUE_STATUS_STOPPING))
631 {
632 return GATE_RESULT_INVALIDSTATE;
633 }
634
635 1 gate_atomic_int_set(&self->state, GATE_MSGQUEUE_STATUS_OFFLINE);
636 1 return GATE_RESULT_OK;
637 }
638 3 static gate_enumint_t msgqueue_get_status(void* obj)
639 {
640 3 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
641 3 return (gate_enumint_t)gate_atomic_int_get(&self->state);
642 }
643
644 1 static gate_result_t msgqueue_add_target(void* obj, gate_string_t const* target_address, gate_msgqueue_target_id_t* ptr_target_id)
645 {
646 1 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
647 1 gate_result_t ret = GATE_RESULT_FAILED;
648 gate_msgqueue_target_id_t target_id;
649 gate_map_iterator_t iter;
650 1 gate_msgqueue_target_t target = GATE_INIT_EMPTY;
651
652 1 target_id = gate_string_hash(target_address);
653
654 do
655 {
656 1 ret = gate_mutex_acquire(&self->mutex);
657
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
658 do
659 {
660 1 iter = gate_map_get(&self->targets, &target_id);
661
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (gate_map_iterator_valid(iter))
662 {
663 ret = GATE_RESULT_ALREADYEXISTS;
664 break;
665 }
666
667 1 ret = msgqueue_target_cctor(&target, NULL);
668
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
669
670
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_string_create_copy(&target.address, target_address))
671 {
672 ret = GATE_RESULT_OUTOFMEMORY;
673 break;
674 }
675
676
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_map_add(&self->targets, &target_id, &target))
677 {
678 ret = GATE_RESULT_OUTOFMEMORY;
679 break;
680 }
681
682
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (ptr_target_id)
683 {
684 1 *ptr_target_id = target_id;
685 }
686
687 } while (0);
688 1 gate_mutex_release(&self->mutex);
689
690 } while (0);
691
692 1 msgqueue_target_dtor(&target);
693
694 1 return ret;
695
696 }
697 static gate_result_t msgqueue_get_target(void* obj, gate_msgqueue_target_id_t target_id, gate_string_t* ptr_target_address)
698 {
699 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
700 gate_result_t ret = GATE_RESULT_FAILED;
701 gate_msgqueue_target_t const* ptr_target;
702 do
703 {
704 ret = gate_mutex_acquire(&self->mutex);
705 GATE_BREAK_IF_FAILED(ret);
706 do
707 {
708 ptr_target = (gate_msgqueue_target_t const*)gate_map_get_value(&self->targets, &target_id);
709 if (!ptr_target)
710 {
711 ret = GATE_RESULT_NOMATCH;
712 break;
713 }
714
715 if (ptr_target_address)
716 {
717 if (NULL == gate_string_clone(ptr_target_address, &ptr_target->address))
718 {
719 ret = GATE_RESULT_OUTOFMEMORY;
720 break;
721 }
722 }
723 ret = GATE_RESULT_OK;
724 } while (0);
725 gate_mutex_release(&self->mutex);
726
727 } while (0);
728
729 return ret;
730 }
731 static gate_result_t msgqueue_resolve_target(void* obj, gate_string_t const* target_address, gate_msgqueue_target_id_t* ptr_target_id)
732 {
733 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
734 gate_result_t ret = GATE_RESULT_FAILED;
735 gate_map_iterator_t iter;
736 gate_msgqueue_target_id_t target_id;
737
738 do
739 {
740 target_id = (gate_msgqueue_target_id_t)gate_hash_generate_string(target_address);
741
742 ret = gate_mutex_acquire(&self->mutex);
743 GATE_BREAK_IF_FAILED(ret);
744 do
745 {
746 iter = gate_map_get(&self->targets, &target_id);
747 if (!gate_map_iterator_valid(iter))
748 {
749 ret = GATE_RESULT_NOMATCH;
750 break;
751 }
752
753 if (ptr_target_id)
754 {
755 *ptr_target_id = target_id;
756 }
757 ret = GATE_RESULT_OK;
758 } while (0);
759 gate_mutex_release(&self->mutex);
760
761 } while (0);
762
763 return ret;
764 }
765 static gate_result_t msgqueue_remove_target(void* obj, gate_msgqueue_target_id_t target_id)
766 {
767 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
768 gate_result_t ret = GATE_RESULT_FAILED;
769 do
770 {
771 ret = gate_mutex_acquire(&self->mutex);
772 GATE_BREAK_IF_FAILED(ret);
773 do
774 {
775 if (!gate_map_remove(&self->targets, &target_id))
776 {
777 ret = GATE_RESULT_NOMATCH;
778 break;
779 }
780 ret = GATE_RESULT_OK;
781 } while (0);
782 gate_mutex_release(&self->mutex);
783 } while (0);
784 return ret;
785 }
786
787 3 static gate_result_t msgqueue_publish(void* obj, gate_msgqueue_target_id_t target_id, gate_uint32_t message_type, void const* data, gate_size_t length)
788 {
789 3 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
790 3 gate_result_t ret = GATE_RESULT_FAILED;
791 gate_msgqueue_target_t const* ptr_target;
792 gate_msgqueue_subscription_t subscriptions[64];
793 3 gate_size_t subscriptions_count = 0;
794 gate_size_t ndx;
795 3 gate_size_t const subscriptions_max = sizeof(subscriptions) / sizeof(subscriptions[0]);
796 gate_msgqueue_subscription_t const* ptr_subscription;
797 gate_map_iterator_t iter;
798
799 do
800 {
801
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
3 if (!update_state(&self->state, GATE_MSGQUEUE_STATUS_ONLINE, GATE_MSGQUEUE_STATUS_ONLINE))
802 {
803 1 ret = GATE_RESULT_INVALIDSTATE;
804 1 break;
805 }
806
807 2 ret = gate_mutex_acquire(&self->mutex);
808
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
809 do
810 {
811 2 ptr_target = (gate_msgqueue_target_t const*)gate_map_get_value(&self->targets, &target_id);
812
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!ptr_target)
813 {
814 ret = GATE_RESULT_INVALIDARG;
815 break;
816 }
817
818
2/2
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 2 times.
4 GATE_MAP_FOREACH(iter, &ptr_target->subscriptions)
819 {
820 2 ptr_subscription = (gate_msgqueue_subscription_t const*)gate_map_iterator_value(iter);
821
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (ptr_subscription->receiver)
822 {
823 2 subscriptions[subscriptions_count] = *ptr_subscription;
824 2 ++subscriptions_count;
825
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (subscriptions_count >= subscriptions_max)
826 {
827 break;
828 }
829 }
830 }
831 2 ret = GATE_RESULT_OK;
832 } while (0);
833 2 gate_mutex_release(&self->mutex);
834
835
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
836
837
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 for (ndx = 0; ndx != subscriptions_count; ++ndx)
838 {
839 2 subscriptions[ndx].receiver((gate_msgqueue_t*)self, target_id, message_type, data, length, subscriptions[ndx].user_param);
840 }
841 } while (0);
842
843 3 return ret;
844 }
845 1 static gate_result_t msgqueue_subscribe(void* obj, gate_msgqueue_target_id_t target_id, gate_msgqueue_receiver_t receiver, void* user_param, gate_msgqueue_subscription_id_t* ptr_subscriber)
846 {
847 1 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
848 1 gate_result_t ret = GATE_RESULT_FAILED;
849 gate_msgqueue_subscription_id_t subscriber_id;
850 gate_msgqueue_subscription_t subscription;
851 gate_msgqueue_target_t* ptr_target;
852 do
853 {
854 1 ret = gate_mutex_acquire(&self->mutex);
855
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
856 do
857 {
858 1 ptr_target = (gate_msgqueue_target_t*)gate_map_get_value(&self->targets, &target_id);
859
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!ptr_target)
860 {
861 ret = GATE_RESULT_INVALIDARG;
862 break;
863 }
864
865 1 subscriber_id = gate_atomic_int_inc(&self->next_subscriber_id);
866 1 subscription.receiver = receiver;
867 1 subscription.user_param = user_param;
868
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_map_add(&ptr_target->subscriptions, &subscriber_id, &subscription))
869 {
870 ret = GATE_RESULT_OUTOFMEMORY;
871 break;
872 }
873
874
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (ptr_subscriber)
875 {
876 1 *ptr_subscriber = subscriber_id;
877 }
878 1 ret = GATE_RESULT_OK;
879 } while (0);
880 1 gate_mutex_release(&self->mutex);
881 } while (0);
882
883 1 return ret;
884 }
885 static gate_result_t msgqueue_unsubscribe(void* obj, gate_msgqueue_subscription_id_t subscriber)
886 {
887 gate_msgqueue_impl_t* self = (gate_msgqueue_impl_t*)obj;
888 gate_result_t ret = GATE_RESULT_FAILED;
889 gate_map_iterator_t iter;
890 gate_msgqueue_target_t* ptr_target;
891 do
892 {
893 ret = gate_mutex_acquire(&self->mutex);
894 GATE_BREAK_IF_FAILED(ret);
895 do
896 {
897 ret = GATE_RESULT_NOMATCH;
898 GATE_MAP_FOREACH(iter, &self->targets)
899 {
900 ptr_target = (gate_msgqueue_target_t*)gate_map_iterator_value(iter);
901 if (gate_map_remove(&ptr_target->subscriptions, &subscriber))
902 {
903 ret = GATE_RESULT_OK;
904 break;
905 }
906 }
907 } while (0);
908 gate_mutex_release(&self->mutex);
909 } while (0);
910
911 return ret;
912
913 }
914
915 static GATE_INTERFACE_VTBL(gate_msgqueue) gate_msgqueue_vtbl;
916 1 static void gate_init_msgqueue_vtbl()
917 {
918
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!gate_msgqueue_vtbl.get_interface_name)
919 {
920 GATE_INTERFACE_VTBL(gate_msgqueue) const local_vtbl =
921 {
922 &msgqueue_get_interface_name,
923 &msgqueue_release,
924 &msgqueue_retain,
925
926 &msgqueue_start,
927 &msgqueue_stop,
928 &msgqueue_get_status,
929
930 &msgqueue_add_target,
931 &msgqueue_get_target,
932 &msgqueue_resolve_target,
933 &msgqueue_remove_target,
934
935 &msgqueue_publish,
936 &msgqueue_subscribe,
937 &msgqueue_unsubscribe
938 };
939 1 gate_msgqueue_vtbl = local_vtbl;
940 }
941 1 }
942
943 1 gate_msgqueue_t* gate_memory_msgqueue_create()
944 {
945 1 gate_msgqueue_t* ret = NULL;
946 1 gate_msgqueue_impl_t* impl = NULL;
947 gate_result_t result;
948 do
949 {
950 1 impl = gate_mem_alloc(sizeof(gate_msgqueue_impl_t));
951
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!impl)
952 {
953 break;
954 }
955 1 gate_mem_clear(impl, sizeof(gate_msgqueue_impl_t));
956 1 gate_init_msgqueue_vtbl();
957 1 impl->vtbl = &gate_msgqueue_vtbl;
958 1 result = msgqueue_init(impl);
959
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(result);
960
961 1 ret = (gate_msgqueue_t*)impl;
962 1 impl = NULL;
963 } while (0);
964
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (impl != NULL)
966 {
967 msgqueue_destroy(impl);
968 gate_mem_dealloc(impl);
969 }
970 1 return ret;
971 }
972
973
974
975
976
977