GCC Code Coverage Report


Directory: src/gate/
File: src/gate/threadpools.c
Date: 2025-12-12 23:40:09
Exec Total Coverage
Lines: 118 140 84.3%
Functions: 11 11 100.0%
Branches: 41 70 58.6%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28 #include "gate/threadpools.h"
29 #include "gate/synchronization.h"
30 #include "gate/arrays.h"
31 #include "gate/results.h"
32
33 struct gate_threadpool_class
34 {
35 gate_mutex_t lock;
36 gate_synccondition_t cond;
37 gate_atomic_int_t state;
38 gate_uint32_t max_pending_tasks;
39 gate_uint32_t max_threads;
40 gate_arraylist_t tasks;
41 gate_thread_t threads[1];
42 };
43
44
45 #define GATE_THREADPOOL_STATE_OFFLINE 0
46 #define GATE_THREADPOOL_STATE_STARTING 1
47 #define GATE_THREADPOOL_STATE_ONLINE 2
48 #define GATE_THREADPOOL_STATE_STOPPING 3
49
50 1 static gate_bool_t is_starting(gate_threadpool_t pool)
51 {
52 1 return gate_atomic_int_get(&pool->state) == GATE_THREADPOOL_STATE_STARTING;
53 }
54
55 7 static gate_bool_t is_online(gate_threadpool_t pool)
56 {
57 7 return gate_atomic_int_get(&pool->state) == GATE_THREADPOOL_STATE_ONLINE;
58 }
59
60 1 static gate_result_t gate_threadpool_code(void* param)
61 {
62 1 gate_result_t ret = GATE_RESULT_OK;
63 1 gate_threadpool_t pool = (gate_threadpool_t)param;
64 void* ptr_entry;
65
66 {
67 1 ret = gate_mutex_acquire(&pool->lock);
68
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
69 {
70 return ret;
71 }
72
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 while (is_starting(pool))
73 {
74 gate_synccondition_timed_wait(&pool->cond, &pool->lock, 200);
75 }
76 1 gate_mutex_release(&pool->lock);
77 }
78
79
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
3 while (is_online(pool))
80 {
81 2 gate_runnable_t* task = NULL;
82
83 {
84 2 ret = gate_mutex_acquire(&pool->lock);
85
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (GATE_FAILED(ret))
86 {
87 break;
88 }
89
90 do
91 {
92
4/4
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 1 times.
3 while ((gate_arraylist_length(pool->tasks) == 0) && is_online(pool))
93 {
94 1 gate_synccondition_timed_wait(&pool->cond, &pool->lock, 1000);
95 }
96
97
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
2 if (!is_online(pool))
98 {
99 1 break;
100 }
101
102
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 if (gate_arraylist_length(pool->tasks) > 0)
103 {
104 1 ptr_entry = gate_arraylist_get(pool->tasks, 0);
105 }
106 else
107 {
108 ptr_entry = NULL;
109 }
110
111
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (ptr_entry != NULL)
112 {
113 1 task = *(gate_runnable_t**)ptr_entry;
114
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (task != NULL)
115 {
116 1 gate_object_retain(task);
117 }
118 1 gate_arraylist_remove(pool->tasks, 0, 1);
119 }
120 } while (0);
121 2 gate_mutex_release(&pool->lock);
122 }
123
124
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (task != NULL)
125 {
126 1 gate_runnable_run(task);
127 1 gate_object_release(task);
128 }
129 }
130
131 1 return ret;
132 }
133
134 1 gate_result_t gate_threadpool_create(gate_threadpool_t* ptr_pool, gate_uint32_t thread_count, gate_uint32_t max_tasks)
135 {
136 1 gate_result_t ret = GATE_RESULT_FAILED;
137 1 gate_threadpool_t pool = NULL;
138 do
139 {
140
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (thread_count == 0)
141 {
142 1 thread_count = 1;
143 }
144
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (max_tasks == 0)
145 {
146 1 max_tasks = 1;
147 }
148
149 1 pool = gate_mem_alloc(sizeof(struct gate_threadpool_class) + sizeof(gate_thread_t) * thread_count);
150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pool == NULL) break;
151 1 gate_mem_clear(pool, sizeof(struct gate_threadpool_class) + sizeof(gate_thread_t) * thread_count);
152
153 1 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE);
154
155 1 ret = gate_mutex_create(&pool->lock);
156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
157 {
158 gate_mem_dealloc(pool);
159 pool = NULL;
160 break;
161 }
162
163 1 ret = gate_synccondition_create(&pool->cond);
164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
165
166 1 pool->tasks = gate_arraylist_create(sizeof(gate_runnable_t*), NULL, 16, &gate_object_ptr_copyctor, &gate_object_ptr_dtor);
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pool->tasks == NULL)
168 {
169 break;
170 }
171
172 1 pool->max_threads = thread_count;
173 1 pool->max_pending_tasks = max_tasks;
174
175 1 *ptr_pool = pool;
176 1 pool = NULL;
177 1 ret = GATE_RESULT_OK;
178 } while (0);
179
180
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pool != NULL)
181 {
182 if (pool->tasks != NULL)
183 {
184 gate_arraylist_release(pool->tasks);
185 }
186 gate_synccondition_destroy(&pool->cond);
187 gate_mutex_destroy(&pool->lock);
188 }
189
190 1 return ret;
191 }
192
193 1 gate_result_t gate_threadpool_destroy(gate_threadpool_t pool)
194 {
195 1 gate_result_t ret = GATE_RESULT_OK;
196 gate_result_t result;
197 1 gate_threadpool_stop(pool);
198
199 1 gate_arraylist_release(pool->tasks);
200
201 1 result = gate_synccondition_destroy(&pool->cond);
202
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result)) ret = result;
203
204 1 result = gate_mutex_destroy(&pool->lock);
205
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result)) ret = result;
206
207 1 gate_mem_dealloc(pool);
208
209 1 return ret;
210 }
211
212 1 gate_result_t gate_threadpool_start(gate_threadpool_t pool)
213 {
214 1 gate_result_t ret = GATE_RESULT_FAILED;
215 1 gate_size_t ndx = 0;
216
217
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (GATE_THREADPOOL_STATE_OFFLINE != gate_atomic_int_xchg_if(
218 &pool->state, GATE_THREADPOOL_STATE_OFFLINE, GATE_THREADPOOL_STATE_STARTING))
219 {
220 ret = GATE_RESULT_INVALIDSTATE;
221 }
222 else
223 {
224
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 for (ndx = 0; ndx != pool->max_threads; ++ndx)
225 {
226 1 ret = gate_thread_start_code(&gate_threadpool_code, pool, &pool->threads[ndx], NULL);
227
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
228 {
229 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_STOPPING);
230 gate_synccondition_signal_all(&pool->cond);
231 while (ndx-- != 0)
232 {
233 gate_thread_join(&pool->threads[ndx], NULL);
234 }
235 break;
236 }
237 }
238
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(ret))
239 {
240 1 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_ONLINE);
241 1 gate_synccondition_signal_all(&pool->cond);
242 }
243 else
244 {
245 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE);
246 }
247 }
248 1 return ret;
249 }
250
251 2 gate_result_t gate_threadpool_stop(gate_threadpool_t pool)
252 {
253 2 gate_result_t ret = GATE_RESULT_FAILED;
254 gate_size_t ndx;
255
256
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
2 if (GATE_THREADPOOL_STATE_ONLINE != gate_atomic_int_xchg_if(
257 &pool->state, GATE_THREADPOOL_STATE_ONLINE, GATE_THREADPOOL_STATE_STOPPING))
258 {
259 1 ret = GATE_RESULT_INVALIDSTATE;
260 }
261 else
262 {
263 1 gate_synccondition_signal_all(&pool->cond);
264
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 for (ndx = 0; ndx != pool->max_threads; ++ndx)
265 {
266 1 gate_thread_join(&pool->threads[ndx], NULL);
267 }
268 1 gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE);
269 1 ret = GATE_RESULT_OK;
270 }
271 2 return ret;
272 }
273 2 gate_result_t gate_threadpool_add_task(gate_threadpool_t pool, gate_runnable_t* task)
274 {
275 2 gate_result_t ret = GATE_RESULT_FAILED;
276 do
277 {
278 2 ret = gate_mutex_acquire(&pool->lock);
279
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
280
281
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (gate_arraylist_length(pool->tasks) >= pool->max_pending_tasks)
282 {
283 ret = GATE_RESULT_OUTOFBOUNDS;
284 }
285
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 else if (NULL != gate_arraylist_add(pool->tasks, &task))
286 {
287 2 ret = GATE_RESULT_OK;
288 }
289 else
290 {
291 ret = GATE_RESULT_FAILED;
292 }
293 2 gate_mutex_release(&pool->lock);
294 2 gate_synccondition_signal_all(&pool->cond);
295 } while (0);
296 2 return ret;
297 }
298
299 1 static gate_bool_t gate_threadpool_remove_task_condition(void const* item, void* param)
300 {
301 1 gate_runnable_t* const* src = (gate_runnable_t* const*)item;
302 1 gate_runnable_t* dst = (gate_runnable_t*)param;
303 1 return *src == dst;
304 }
305
306 1 gate_result_t gate_threadpool_remove_task(gate_threadpool_t pool, gate_runnable_t* task)
307 {
308 1 gate_result_t ret = GATE_RESULT_FAILED;
309 gate_size_t count1, count2;
310 do
311 {
312 1 ret = gate_mutex_acquire(&pool->lock);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
314 1 count1 = gate_arraylist_length(pool->tasks);
315 1 ret = gate_arraylist_remove_if(pool->tasks, gate_threadpool_remove_task_condition, task);
316
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(ret))
317 {
318 1 count2 = gate_arraylist_length(pool->tasks);
319
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (count2 == count1)
320 {
321 /* nothing was removed, signal "unchanged" OK-state */
322 ret = GATE_RESULT_OK_UNCHANGED;
323 }
324 }
325 1 gate_mutex_release(&pool->lock);
326 } while (0);
327 1 return ret;
328 }
329 4 gate_size_t gate_threadpool_pending_tasks(gate_threadpool_t pool)
330 {
331 4 gate_size_t ret = 0;
332 4 gate_result_t result = gate_mutex_acquire(&pool->lock);
333
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (GATE_SUCCEEDED(result))
334 {
335 4 ret = gate_arraylist_length(pool->tasks);
336 4 gate_mutex_release(&pool->lock);
337 }
338 4 return ret;
339 }
340