Line | Branch | Exec | Source |
---|---|---|---|
1 | /* GATE PROJECT LICENSE: | ||
2 | +----------------------------------------------------------------------------+ | ||
3 | | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> | | ||
4 | | All rights reserved. | | ||
5 | | | | ||
6 | | Redistribution and use in source and binary forms, with or without | | ||
7 | | modification, are permitted provided that the following conditions are met:| | ||
8 | | | | ||
9 | | 1. Redistributions of source code must retain the above copyright notice, | | ||
10 | | this list of conditions and the following disclaimer. | | ||
11 | | 2. Redistributions in binary form must reproduce the above copyright | | ||
12 | | notice, this list of conditions and the following disclaimer in the | | ||
13 | | documentation and/or other materials provided with the distribution. | | ||
14 | | | | ||
15 | | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"| | ||
16 | | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | | ||
17 | | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | | ||
18 | | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | | ||
19 | | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | | ||
20 | | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | | ||
21 | | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | | ||
22 | | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | | ||
23 | | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | | ||
24 | | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF | | ||
25 | | THE POSSIBILITY OF SUCH DAMAGE. | | ||
26 | +----------------------------------------------------------------------------+ | ||
27 | */ | ||
28 | #include "gate/threadpools.h" | ||
29 | #include "gate/synchronization.h" | ||
30 | #include "gate/arrays.h" | ||
31 | #include "gate/results.h" | ||
32 | |||
33 | struct gate_threadpool_class | ||
34 | { | ||
35 | gate_mutex_t lock; | ||
36 | gate_synccondition_t cond; | ||
37 | gate_atomic_int_t state; | ||
38 | gate_uint32_t max_pending_tasks; | ||
39 | gate_uint32_t max_threads; | ||
40 | gate_arraylist_t tasks; | ||
41 | gate_thread_t threads[1]; | ||
42 | }; | ||
43 | |||
44 | |||
45 | #define GATE_THREADPOOL_STATE_OFFLINE 0 | ||
46 | #define GATE_THREADPOOL_STATE_STARTING 1 | ||
47 | #define GATE_THREADPOOL_STATE_ONLINE 2 | ||
48 | #define GATE_THREADPOOL_STATE_STOPPING 3 | ||
49 | |||
50 | 1 | static gate_bool_t is_starting(gate_threadpool_t pool) | |
51 | { | ||
52 | 1 | return gate_atomic_int_get(&pool->state) == GATE_THREADPOOL_STATE_STARTING; | |
53 | } | ||
54 | |||
55 | 7 | static gate_bool_t is_online(gate_threadpool_t pool) | |
56 | { | ||
57 | 7 | return gate_atomic_int_get(&pool->state) == GATE_THREADPOOL_STATE_ONLINE; | |
58 | } | ||
59 | |||
60 | 1 | static gate_result_t gate_threadpool_code(void* param) | |
61 | { | ||
62 | 1 | gate_result_t ret = GATE_RESULT_OK; | |
63 | 1 | gate_threadpool_t pool = (gate_threadpool_t)param; | |
64 | void* ptr_entry; | ||
65 | gate_runnable_t* task; | ||
66 | |||
67 | { | ||
68 | 1 | ret = gate_mutex_acquire(&pool->lock); | |
69 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (GATE_FAILED(ret)) |
70 | { | ||
71 | ✗ | return ret; | |
72 | } | ||
73 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
|
1 | while (is_starting(pool)) |
74 | { | ||
75 | ✗ | gate_synccondition_timed_wait(&pool->cond, &pool->lock, 200); | |
76 | } | ||
77 | 1 | gate_mutex_release(&pool->lock); | |
78 | } | ||
79 | |||
80 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
|
3 | while (is_online(pool)) |
81 | { | ||
82 | 2 | task = NULL; | |
83 | |||
84 | { | ||
85 | 2 | ret = gate_mutex_acquire(&pool->lock); | |
86 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (GATE_FAILED(ret)) |
87 | { | ||
88 | ✗ | break; | |
89 | } | ||
90 | |||
91 | do | ||
92 | { | ||
93 |
4/4✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 1 times.
|
3 | while ((gate_arraylist_length(pool->tasks) == 0) && is_online(pool)) |
94 | { | ||
95 | 1 | gate_synccondition_timed_wait(&pool->cond, &pool->lock, 1000); | |
96 | } | ||
97 | |||
98 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
|
2 | if (!is_online(pool)) |
99 | { | ||
100 | 1 | break; | |
101 | } | ||
102 | |||
103 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | if (gate_arraylist_length(pool->tasks) > 0) |
104 | { | ||
105 | 1 | ptr_entry = gate_arraylist_get(pool->tasks, 0); | |
106 | } | ||
107 | else | ||
108 | { | ||
109 | ✗ | ptr_entry = NULL; | |
110 | } | ||
111 | |||
112 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (ptr_entry != NULL) |
113 | { | ||
114 | 1 | task = *(gate_runnable_t**)ptr_entry; | |
115 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (task != NULL) |
116 | { | ||
117 | 1 | gate_object_retain(task); | |
118 | } | ||
119 | 1 | gate_arraylist_remove(pool->tasks, 0, 1); | |
120 | } | ||
121 | } while (0); | ||
122 | 2 | gate_mutex_release(&pool->lock); | |
123 | } | ||
124 | |||
125 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (task != NULL) |
126 | { | ||
127 | 1 | gate_runnable_run(task); | |
128 | 1 | gate_object_release(task); | |
129 | } | ||
130 | } | ||
131 | |||
132 | 1 | return ret; | |
133 | } | ||
134 | |||
135 | 1 | gate_result_t gate_threadpool_create(gate_threadpool_t* ptr_pool, gate_uint32_t thread_count, gate_uint32_t max_tasks) | |
136 | { | ||
137 | 1 | gate_result_t ret = GATE_RESULT_FAILED; | |
138 | 1 | gate_threadpool_t pool = NULL; | |
139 | do | ||
140 | { | ||
141 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (thread_count == 0) |
142 | { | ||
143 | 1 | thread_count = 1; | |
144 | } | ||
145 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (max_tasks == 0) |
146 | { | ||
147 | 1 | max_tasks = 1; | |
148 | } | ||
149 | |||
150 | 1 | pool = gate_mem_alloc(sizeof(struct gate_threadpool_class) + sizeof(gate_thread_t) * thread_count); | |
151 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (pool == NULL) break; |
152 | 1 | gate_mem_clear(pool, sizeof(struct gate_threadpool_class) + sizeof(gate_thread_t) * thread_count); | |
153 | |||
154 | 1 | gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE); | |
155 | |||
156 | 1 | ret = gate_mutex_create(&pool->lock); | |
157 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (GATE_FAILED(ret)) |
158 | { | ||
159 | ✗ | gate_mem_dealloc(pool); | |
160 | ✗ | pool = NULL; | |
161 | ✗ | break; | |
162 | } | ||
163 | |||
164 | 1 | ret = gate_synccondition_create(&pool->cond); | |
165 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | GATE_BREAK_IF_FAILED(ret); |
166 | |||
167 | 1 | pool->tasks = gate_arraylist_create(sizeof(gate_runnable_t*), NULL, 16, &gate_object_ptr_copyctor, &gate_object_ptr_dtor); | |
168 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (pool->tasks == NULL) |
169 | { | ||
170 | ✗ | break; | |
171 | } | ||
172 | |||
173 | 1 | pool->max_threads = thread_count; | |
174 | 1 | pool->max_pending_tasks = max_tasks; | |
175 | |||
176 | 1 | *ptr_pool = pool; | |
177 | 1 | pool = NULL; | |
178 | 1 | ret = GATE_RESULT_OK; | |
179 | } while (0); | ||
180 | |||
181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (pool != NULL) |
182 | { | ||
183 | ✗ | if (pool->tasks != NULL) | |
184 | { | ||
185 | ✗ | gate_arraylist_release(pool->tasks); | |
186 | } | ||
187 | ✗ | gate_synccondition_destroy(&pool->cond); | |
188 | ✗ | gate_mutex_destroy(&pool->lock); | |
189 | } | ||
190 | |||
191 | 1 | return ret; | |
192 | } | ||
193 | |||
194 | 1 | gate_result_t gate_threadpool_destroy(gate_threadpool_t pool) | |
195 | { | ||
196 | 1 | gate_result_t ret = GATE_RESULT_OK; | |
197 | gate_result_t result; | ||
198 | 1 | gate_threadpool_stop(pool); | |
199 | |||
200 | 1 | gate_arraylist_release(pool->tasks); | |
201 | |||
202 | 1 | result = gate_synccondition_destroy(&pool->cond); | |
203 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (GATE_FAILED(result)) ret = result; |
204 | |||
205 | 1 | result = gate_mutex_destroy(&pool->lock); | |
206 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (GATE_FAILED(result)) ret = result; |
207 | |||
208 | 1 | gate_mem_dealloc(pool); | |
209 | |||
210 | 1 | return ret; | |
211 | } | ||
212 | |||
213 | 1 | gate_result_t gate_threadpool_start(gate_threadpool_t pool) | |
214 | { | ||
215 | 1 | gate_result_t ret = GATE_RESULT_FAILED; | |
216 | 1 | gate_size_t ndx = 0; | |
217 | |||
218 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
|
1 | if (GATE_THREADPOOL_STATE_OFFLINE != gate_atomic_int_xchg_if( |
219 | &pool->state, GATE_THREADPOOL_STATE_OFFLINE, GATE_THREADPOOL_STATE_STARTING)) | ||
220 | { | ||
221 | ✗ | ret = GATE_RESULT_INVALIDSTATE; | |
222 | } | ||
223 | else | ||
224 | { | ||
225 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | for (ndx = 0; ndx != pool->max_threads; ++ndx) |
226 | { | ||
227 | 1 | ret = gate_thread_start_code(&gate_threadpool_code, pool, &pool->threads[ndx], NULL); | |
228 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (GATE_FAILED(ret)) |
229 | { | ||
230 | ✗ | gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_STOPPING); | |
231 | ✗ | gate_synccondition_signal_all(&pool->cond); | |
232 | ✗ | while (ndx-- != 0) | |
233 | { | ||
234 | ✗ | gate_thread_join(&pool->threads[ndx], NULL); | |
235 | } | ||
236 | ✗ | break; | |
237 | } | ||
238 | } | ||
239 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (GATE_SUCCEEDED(ret)) |
240 | { | ||
241 | 1 | gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_ONLINE); | |
242 | 1 | gate_synccondition_signal_all(&pool->cond); | |
243 | } | ||
244 | else | ||
245 | { | ||
246 | ✗ | gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE); | |
247 | } | ||
248 | } | ||
249 | 1 | return ret; | |
250 | } | ||
251 | |||
252 | 2 | gate_result_t gate_threadpool_stop(gate_threadpool_t pool) | |
253 | { | ||
254 | 2 | gate_result_t ret = GATE_RESULT_FAILED; | |
255 | gate_size_t ndx; | ||
256 | |||
257 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
|
2 | if (GATE_THREADPOOL_STATE_ONLINE != gate_atomic_int_xchg_if( |
258 | &pool->state, GATE_THREADPOOL_STATE_ONLINE, GATE_THREADPOOL_STATE_STOPPING)) | ||
259 | { | ||
260 | 1 | ret = GATE_RESULT_INVALIDSTATE; | |
261 | } | ||
262 | else | ||
263 | { | ||
264 | 1 | gate_synccondition_signal_all(&pool->cond); | |
265 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | for (ndx = 0; ndx != pool->max_threads; ++ndx) |
266 | { | ||
267 | 1 | gate_thread_join(&pool->threads[ndx], NULL); | |
268 | } | ||
269 | 1 | gate_atomic_int_set(&pool->state, GATE_THREADPOOL_STATE_OFFLINE); | |
270 | 1 | ret = GATE_RESULT_OK; | |
271 | } | ||
272 | 2 | return ret; | |
273 | } | ||
274 | 2 | gate_result_t gate_threadpool_add_task(gate_threadpool_t pool, gate_runnable_t* task) | |
275 | { | ||
276 | 2 | gate_result_t ret = GATE_RESULT_FAILED; | |
277 | do | ||
278 | { | ||
279 | 2 | ret = gate_mutex_acquire(&pool->lock); | |
280 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | GATE_BREAK_IF_FAILED(ret); |
281 | |||
282 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (gate_arraylist_length(pool->tasks) >= pool->max_pending_tasks) |
283 | { | ||
284 | ✗ | ret = GATE_RESULT_OUTOFBOUNDS; | |
285 | } | ||
286 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | else if (NULL != gate_arraylist_add(pool->tasks, &task)) |
287 | { | ||
288 | 2 | ret = GATE_RESULT_OK; | |
289 | } | ||
290 | else | ||
291 | { | ||
292 | ✗ | ret = GATE_RESULT_FAILED; | |
293 | } | ||
294 | 2 | gate_mutex_release(&pool->lock); | |
295 | 2 | gate_synccondition_signal_all(&pool->cond); | |
296 | } while (0); | ||
297 | 2 | return ret; | |
298 | } | ||
299 | |||
300 | 1 | static gate_bool_t gate_threadpool_remove_task_condition(void const* item, void* param) | |
301 | { | ||
302 | 1 | gate_runnable_t* const* src = (gate_runnable_t* const*)item; | |
303 | 1 | gate_runnable_t* dst = (gate_runnable_t*)param; | |
304 | 1 | return *src == dst; | |
305 | } | ||
306 | |||
307 | 1 | gate_result_t gate_threadpool_remove_task(gate_threadpool_t pool, gate_runnable_t* task) | |
308 | { | ||
309 | 1 | gate_result_t ret = GATE_RESULT_FAILED; | |
310 | gate_size_t count1, count2; | ||
311 | do | ||
312 | { | ||
313 | 1 | ret = gate_mutex_acquire(&pool->lock); | |
314 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | GATE_BREAK_IF_FAILED(ret); |
315 | 1 | count1 = gate_arraylist_length(pool->tasks); | |
316 | 1 | ret = gate_arraylist_remove_if(pool->tasks, gate_threadpool_remove_task_condition, task); | |
317 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (GATE_SUCCEEDED(ret)) |
318 | { | ||
319 | 1 | count2 = gate_arraylist_length(pool->tasks); | |
320 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (count2 == count1) |
321 | { | ||
322 | /* nothing was removed, signal "unchanged" OK-state */ | ||
323 | ✗ | ret = GATE_RESULT_OK_UNCHANGED; | |
324 | } | ||
325 | } | ||
326 | 1 | gate_mutex_release(&pool->lock); | |
327 | } while (0); | ||
328 | 1 | return ret; | |
329 | } | ||
330 | 4 | gate_size_t gate_threadpool_pending_tasks(gate_threadpool_t pool) | |
331 | { | ||
332 | 4 | gate_size_t ret = 0; | |
333 | 4 | gate_result_t result = gate_mutex_acquire(&pool->lock); | |
334 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (GATE_SUCCEEDED(result)) |
335 | { | ||
336 | 4 | ret = gate_arraylist_length(pool->tasks); | |
337 | 4 | gate_mutex_release(&pool->lock); | |
338 | } | ||
339 | 4 | return ret; | |
340 | } | ||
341 |