GCC Code Coverage Report


Directory: src/gate/
File: src/gate/threadpools.c
Date: 2025-09-14 13:10:38
Exec Total Coverage
Lines: 118 140 84.3%
Functions: 11 11 100.0%
Branches: 41 70 58.6%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28 #include "gate/threadpools.h"
29 #include "gate/synchronization.h"
30 #include "gate/arrays.h"
31 #include "gate/results.h"
32
33 struct gate_threadpool_class
34 {
35 gate_mutex_t lock;
36 gate_synccondition_t cond;
37 gate_atomic_int_t state;
38 gate_uint32_t max_pending_tasks;
39 gate_uint32_t max_threads;
40 gate_arraylist_t tasks;
41 gate_thread_t threads[1];
42 };
43
44
45 #define GATE_THREADPOOL_STATE_OFFLINE 0
46 #define GATE_THREADPOOL_STATE_STARTING 1
47 #define GATE_THREADPOOL_STATE_ONLINE 2
48 #define GATE_THREADPOOL_STATE_STOPPING 3
49
50 1 static gate_bool_t is_starting(gate_threadpool_t pool)
51 {
52 1 return gate_atomic_int_get(&pool->state) == GATE_THREADPOOL_STATE_STARTING;
53 }
54
55 7 static gate_bool_t is_online(gate_threadpool_t pool)
56 {
57 7 return gate_atomic_int_get(&pool->state) == GATE_THREADPOOL_STATE_ONLINE;
58 }
59
60 1 static gate_result_t gate_threadpool_code(void* param)
61 {
62 1 gate_result_t ret = GATE_RESULT_OK;
63 1 gate_threadpool_t pool = (gate_threadpool_t)param;
64 void* ptr_entry;
65 gate_runnable_t* task;
66
67 {
68 1 ret = gate_mutex_acquire(&pool->lock);
69
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
70 {
71 return ret;
72 }
73
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 while (is_starting(pool))
74 {
75 gate_synccondition_timed_wait(&pool->cond, &pool->lock, 200);
76 }
77 1 gate_mutex_release(&pool->lock);
78 }
79
80
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
3 while (is_online(pool))
81 {
82 2 task = NULL;
83
84 {
85 2 ret = gate_mutex_acquire(&pool->lock);
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (GATE_FAILED(ret))
87 {
88 break;
89 }
90
91 do
92 {
93
4/4
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 1 times.
3 while ((gate_arraylist_length(pool->tasks) == 0) && is_online(pool))
94 {
95 1 gate_synccondition_timed_wait(&pool->cond, &pool->lock, 1000);
96 }
97
98
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
2 if (!is_online(pool))
99 {
100 1 break;
101 }
102
103
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 if (gate_arraylist_length(pool->tasks) > 0)
104 {
105 1 ptr_entry = gate_arraylist_get(pool->tasks, 0);
106 }
107 else
108 {
109 ptr_entry = NULL;
110 }
111
112
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (ptr_entry != NULL)
113 {
114 1 task = *(gate_runnable_t**)ptr_entry;
115
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (task != NULL)
116 {
117 1 gate_object_retain(task);
118 }
119 1 gate_arraylist_remove(pool->tasks, 0, 1);
120 }
121 } while (0);
122 2 gate_mutex_release(&pool->lock);
123 }
124
125
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (task != NULL)
126 {
127 1 gate_runnable_run(task);
128 1 gate_object_release(task);
129 }
130 }
131
132 1 return ret;
133 }
134
135 1 gate_result_t gate_threadpool_create(gate_threadpool_t* ptr_pool, gate_uint32_t thread_count, gate_uint32_t max_tasks)
136 {
137 1 gate_result_t ret = GATE_RESULT_FAILED;
138 1 gate_threadpool_t pool = NULL;
139 do
140 {
141
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (thread_count == 0)
142 {
143 1 thread_count = 1;
144 }
145
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (max_tasks == 0)
146 {
147 1 max_tasks = 1;
148 }
149
150 1 pool = gate_mem_alloc(sizeof(struct gate_threadpool_class) + sizeof(gate_thread_t) * thread_count);
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pool == NULL) break;
152 1 gate_mem_clear(pool, sizeof(struct gate_threadpool_class) + sizeof(gate_thread_t) * thread_count);
153
154 1 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE);
155
156 1 ret = gate_mutex_create(&pool->lock);
157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
158 {
159 gate_mem_dealloc(pool);
160 pool = NULL;
161 break;
162 }
163
164 1 ret = gate_synccondition_create(&pool->cond);
165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
166
167 1 pool->tasks = gate_arraylist_create(sizeof(gate_runnable_t*), NULL, 16, &gate_object_ptr_copyctor, &gate_object_ptr_dtor);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pool->tasks == NULL)
169 {
170 break;
171 }
172
173 1 pool->max_threads = thread_count;
174 1 pool->max_pending_tasks = max_tasks;
175
176 1 *ptr_pool = pool;
177 1 pool = NULL;
178 1 ret = GATE_RESULT_OK;
179 } while (0);
180
181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pool != NULL)
182 {
183 if (pool->tasks != NULL)
184 {
185 gate_arraylist_release(pool->tasks);
186 }
187 gate_synccondition_destroy(&pool->cond);
188 gate_mutex_destroy(&pool->lock);
189 }
190
191 1 return ret;
192 }
193
194 1 gate_result_t gate_threadpool_destroy(gate_threadpool_t pool)
195 {
196 1 gate_result_t ret = GATE_RESULT_OK;
197 gate_result_t result;
198 1 gate_threadpool_stop(pool);
199
200 1 gate_arraylist_release(pool->tasks);
201
202 1 result = gate_synccondition_destroy(&pool->cond);
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result)) ret = result;
204
205 1 result = gate_mutex_destroy(&pool->lock);
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result)) ret = result;
207
208 1 gate_mem_dealloc(pool);
209
210 1 return ret;
211 }
212
213 1 gate_result_t gate_threadpool_start(gate_threadpool_t pool)
214 {
215 1 gate_result_t ret = GATE_RESULT_FAILED;
216 1 gate_size_t ndx = 0;
217
218
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (GATE_THREADPOOL_STATE_OFFLINE != gate_atomic_int_xchg_if(
219 &pool->state, GATE_THREADPOOL_STATE_OFFLINE, GATE_THREADPOOL_STATE_STARTING))
220 {
221 ret = GATE_RESULT_INVALIDSTATE;
222 }
223 else
224 {
225
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 for (ndx = 0; ndx != pool->max_threads; ++ndx)
226 {
227 1 ret = gate_thread_start_code(&gate_threadpool_code, pool, &pool->threads[ndx], NULL);
228
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
229 {
230 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_STOPPING);
231 gate_synccondition_signal_all(&pool->cond);
232 while (ndx-- != 0)
233 {
234 gate_thread_join(&pool->threads[ndx], NULL);
235 }
236 break;
237 }
238 }
239
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(ret))
240 {
241 1 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_ONLINE);
242 1 gate_synccondition_signal_all(&pool->cond);
243 }
244 else
245 {
246 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE);
247 }
248 }
249 1 return ret;
250 }
251
252 2 gate_result_t gate_threadpool_stop(gate_threadpool_t pool)
253 {
254 2 gate_result_t ret = GATE_RESULT_FAILED;
255 gate_size_t ndx;
256
257
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
2 if (GATE_THREADPOOL_STATE_ONLINE != gate_atomic_int_xchg_if(
258 &pool->state, GATE_THREADPOOL_STATE_ONLINE, GATE_THREADPOOL_STATE_STOPPING))
259 {
260 1 ret = GATE_RESULT_INVALIDSTATE;
261 }
262 else
263 {
264 1 gate_synccondition_signal_all(&pool->cond);
265
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 for (ndx = 0; ndx != pool->max_threads; ++ndx)
266 {
267 1 gate_thread_join(&pool->threads[ndx], NULL);
268 }
269 1 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE);
270 1 ret = GATE_RESULT_OK;
271 }
272 2 return ret;
273 }
274 2 gate_result_t gate_threadpool_add_task(gate_threadpool_t pool, gate_runnable_t* task)
275 {
276 2 gate_result_t ret = GATE_RESULT_FAILED;
277 do
278 {
279 2 ret = gate_mutex_acquire(&pool->lock);
280
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
281
282
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (gate_arraylist_length(pool->tasks) >= pool->max_pending_tasks)
283 {
284 ret = GATE_RESULT_OUTOFBOUNDS;
285 }
286
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 else if (NULL != gate_arraylist_add(pool->tasks, &task))
287 {
288 2 ret = GATE_RESULT_OK;
289 }
290 else
291 {
292 ret = GATE_RESULT_FAILED;
293 }
294 2 gate_mutex_release(&pool->lock);
295 2 gate_synccondition_signal_all(&pool->cond);
296 } while (0);
297 2 return ret;
298 }
299
300 1 static gate_bool_t gate_threadpool_remove_task_condition(void const* item, void* param)
301 {
302 1 gate_runnable_t* const* src = (gate_runnable_t* const*)item;
303 1 gate_runnable_t* dst = (gate_runnable_t*)param;
304 1 return *src == dst;
305 }
306
307 1 gate_result_t gate_threadpool_remove_task(gate_threadpool_t pool, gate_runnable_t* task)
308 {
309 1 gate_result_t ret = GATE_RESULT_FAILED;
310 gate_size_t count1, count2;
311 do
312 {
313 1 ret = gate_mutex_acquire(&pool->lock);
314
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
315 1 count1 = gate_arraylist_length(pool->tasks);
316 1 ret = gate_arraylist_remove_if(pool->tasks, gate_threadpool_remove_task_condition, task);
317
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(ret))
318 {
319 1 count2 = gate_arraylist_length(pool->tasks);
320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (count2 == count1)
321 {
322 /* nothing was removed, signal "unchanged" OK-state */
323 ret = GATE_RESULT_OK_UNCHANGED;
324 }
325 }
326 1 gate_mutex_release(&pool->lock);
327 } while (0);
328 1 return ret;
329 }
330 4 gate_size_t gate_threadpool_pending_tasks(gate_threadpool_t pool)
331 {
332 4 gate_size_t ret = 0;
333 4 gate_result_t result = gate_mutex_acquire(&pool->lock);
334
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (GATE_SUCCEEDED(result))
335 {
336 4 ret = gate_arraylist_length(pool->tasks);
337 4 gate_mutex_release(&pool->lock);
338 }
339 4 return ret;
340 }
341