GCC Code Coverage Report


Directory: src/gate/
File: src/gate/net/sockettools.c
Date: 2025-09-14 13:10:38
Exec Total Coverage
Lines: 559 981 57.0%
Functions: 51 85 60.0%
Branches: 178 466 38.2%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger <sm@opengate.at> |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28 #include "gate/net/sockettools.h"
29 #include "gate/net/nameresolvers.h"
30 #include "gate/threading.h"
31 #include "gate/synchronization.h"
32 #include "gate/arrays.h"
33 #include "gate/debugging.h"
34 #include "gate/maps.h"
35 #include "gate/platforms.h"
36
37 #define GATE_SOCKETGROUP_LIMIT 512
38
39
40 #if !defined(GATE_SYS_POSIX)
41
42 gate_result_t gate_socketselector_create(gate_socketselector_t* selector)
43 {
44 gate_result_t ret;
45 gate_socket_t sock;
46 do
47 {
48 gate_mem_clear(selector, sizeof(gate_socketselector_t));
49 ret = gate_socket_create_ex(GATE_SOCKET_FAMILY_INET4, GATE_SOCKET_MSGTYPE_STREAM, GATE_SOCKET_PROTOCOL_TCP, &sock);
50 } while (0);
51 selector->handles[0] = (void*)sock;
52 return ret;
53 }
54 gate_result_t gate_socketselector_destroy(gate_socketselector_t* selector)
55 {
56 gate_result_t ret = GATE_RESULT_OK;
57 gate_socket_t sock = (gate_socket_t)(gate_intptr_t)selector->handles[0];
58 gate_socket_close(sock);
59 return ret;
60 }
61 gate_result_t gate_socketselector_interrupt(gate_socketselector_t* selector)
62 {
63 gate_result_t ret;
64 gate_socket_t oldsock = (gate_socket_t)(gate_intptr_t)selector->handles[0];
65 gate_socket_t newsock;
66 do
67 {
68 ret = gate_socket_create_ex(GATE_SOCKET_FAMILY_INET4, GATE_SOCKET_MSGTYPE_STREAM, GATE_SOCKET_PROTOCOL_TCP, &newsock);
69 GATE_BREAK_IF_FAILED(ret);
70
71 selector->handles[0] = (void*)newsock;
72 gate_socket_close(oldsock);
73 } while (0);
74 return ret;
75 }
76 gate_result_t gate_socketselector_select(gate_socketselector_t* selector, gate_socket_t const* socks, gate_size_t sockcount, gate_uint8_t* statusflags, gate_uint32_t timeout)
77 {
78 gate_result_t ret;
79 gate_socket_t local_sockets[GATE_SOCKETGROUP_LIMIT];
80 gate_uint8_t local_flags[GATE_SOCKETGROUP_LIMIT];
81 gate_size_t index;
82 gate_size_t local_sockcount = sockcount;
83 gate_socket_t interruptor = (gate_socket_t)(gate_intptr_t)selector->handles[0];
84
85 if (local_sockcount >= GATE_SOCKETGROUP_LIMIT)
86 {
87 local_sockcount = GATE_SOCKETGROUP_LIMIT - 1;
88 }
89
90 for (index = 0; index != local_sockcount; ++index)
91 {
92 local_sockets[index] = socks[index];
93 local_flags[index] = statusflags[index];
94 }
95 local_sockets[local_sockcount] = interruptor;
96 local_flags[local_sockcount] = GATE_SOCKET_SELECT_FLAG_ERROR;
97
98 ret = gate_socket_select(local_sockets, local_sockcount + 1, local_flags, timeout);
99
100 index = 0;
101 if (GATE_SUCCEEDED(ret))
102 {
103 for (; index != local_sockcount; ++index)
104 {
105 statusflags[index] = local_flags[index];
106 }
107 }
108
109 /* notify NO-ACTION on all non-processed sockets */
110 for (; index < sockcount; ++index)
111 {
112 statusflags[index] = 0;
113 }
114
115 return ret;
116 }
117
118 #else
119
120 #include <unistd.h>
121
122 1 gate_result_t gate_socketselector_create(gate_socketselector_t* selector)
123 {
124 int pipe_fd[2];
125 1 int result = pipe(pipe_fd);
126
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (result == -1)
127 {
128 return GATE_RESULT_FAILED;
129 }
130 else
131 {
132 1 selector->handles[0] = (void*)(gate_intptr_t)pipe_fd[0];
133 1 selector->handles[1] = (void*)(gate_intptr_t)pipe_fd[1];
134 1 return GATE_RESULT_OK;
135 }
136 }
137 1 gate_result_t gate_socketselector_destroy(gate_socketselector_t* selector)
138 {
139 1 int read_pipe = (int)(gate_intptr_t)selector->handles[0];
140 1 int write_pipe = (int)(gate_intptr_t)selector->handles[1];
141 1 gate_posix_close(read_pipe);
142 1 gate_posix_close(write_pipe);
143 1 return GATE_RESULT_OK;
144 }
145 10 gate_result_t gate_socketselector_interrupt(gate_socketselector_t* selector)
146 {
147 10 int write_pipe = (int)(gate_intptr_t)selector->handles[1];
148 static char const ch = 'I';
149 10 gate_posix_write(write_pipe, &ch, 1);
150 10 return GATE_RESULT_OK;
151 }
152 13 gate_result_t gate_socketselector_select(gate_socketselector_t* selector, gate_socket_t const* socks, gate_size_t sockcount, gate_uint8_t* statusflags, gate_uint32_t timeout)
153 {
154 gate_result_t ret;
155 gate_socket_t local_sockets[GATE_SOCKETGROUP_LIMIT];
156 gate_uint8_t local_flags[GATE_SOCKETGROUP_LIMIT];
157 gate_size_t index;
158 13 gate_size_t local_sockcount = sockcount;
159 13 gate_socket_t interruptor = (gate_socket_t)(gate_intptr_t)selector->handles[0];
160 char buf[1];
161
162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 if (local_sockcount >= GATE_SOCKETGROUP_LIMIT)
163 {
164 local_sockcount = GATE_SOCKETGROUP_LIMIT - 1;
165 }
166
167
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 13 times.
33 for (index = 0; index != local_sockcount; ++index)
168 {
169 20 local_sockets[index] = socks[index];
170 20 local_flags[index] = statusflags[index];
171 }
172 13 local_sockets[local_sockcount] = interruptor;
173 13 local_flags[local_sockcount] = GATE_SOCKET_SELECT_FLAG_RECEIVE | GATE_SOCKET_SELECT_FLAG_ERROR;
174
175 13 ret = gate_socket_select(local_sockets, local_sockcount + 1, local_flags, timeout);
176
177 13 index = 0;
178
1/2
✓ Branch 0 taken 13 times.
✗ Branch 1 not taken.
13 if (GATE_SUCCEEDED(ret))
179 {
180
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 3 times.
13 if (GATE_FLAG_ENABLED(local_flags[local_sockcount], GATE_SOCKET_SELECT_FLAG_RECEIVE))
181 {
182 10 gate_posix_read((int)interruptor, buf, 1);
183 }
184
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 13 times.
33 for (; index != local_sockcount; ++index)
185 {
186 20 statusflags[index] = local_flags[index];
187 }
188 }
189
190 /* notify NO-ACTION on all non-processed sockets */
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 for (; index < sockcount; ++index)
192 {
193 statusflags[index] = 0;
194 }
195
196 13 return ret;
197 }
198
199
200 #endif
201
202
203
204 typedef struct gate_socketgroup_task
205 {
206 gate_socket_t sock;
207 int command;
208 gate_size_t length;
209 gate_size_t offset;
210 void* param;
211 gate_intptr_t info;
212 gate_bool_t completed;
213 char data[1];
214
215 } gate_socketgroup_task_t;
216
217 10 static gate_socketgroup_task_t* gate_socketgroup_task_create(int command, gate_size_t datalength, gate_size_t allocatelength, void* param)
218 {
219 10 gate_socketgroup_task_t* ret = (gate_socketgroup_task_t*)gate_mem_alloc(sizeof(gate_socketgroup_task_t) + allocatelength);
220
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10 if (ret)
221 {
222 10 gate_mem_clear(ret, sizeof(gate_socketgroup_task_t) + allocatelength);
223 10 ret->command = command;
224 10 ret->offset = 0;
225 10 ret->length = datalength;
226 10 ret->param = param;
227 10 ret->completed = false;
228 }
229 10 return ret;
230 }
231
232 10 static void gate_socketgroup_task_destroy(gate_socketgroup_task_t* task)
233 {
234 10 gate_mem_dealloc(task);
235 10 }
236
237
238 1 gate_result_t gate_socketgroup_create(gate_socketgroup_t* group, void* user_tag)
239 {
240 1 gate_result_t ret = GATE_RESULT_OK;
241 do
242 {
243 1 gate_mem_clear(group, sizeof(gate_socketgroup_t));
244 1 gate_atomic_int_set(&group->running, 0);
245
246 1 ret = gate_socketselector_create(&group->selector);
247
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
248
249 1 group->tasks = gate_arraylist_create(sizeof(gate_socketgroup_task_t*), NULL, 0, NULL, NULL);
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!group->tasks)
251 {
252 gate_socketselector_destroy(&group->selector);
253 ret = GATE_RESULT_OUTOFMEMORY;
254 break;
255 }
256 1 ret = gate_mutex_create(&group->mutex);
257
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
258 {
259 gate_arraylist_release(group->tasks);
260 gate_socketselector_destroy(&group->selector);
261 break;
262 }
263 1 group->interval_ms = 500;
264 1 group->user_tag = user_tag;
265 } while (0);
266 1 return ret;
267 }
268
269 1 gate_result_t gate_socketgroup_destroy(gate_socketgroup_t* group)
270 {
271 1 gate_result_t ret = GATE_RESULT_OK;
272 gate_result_t result;
273 1 gate_socketgroup_quit(group);
274
275 1 result = gate_socketgroup_clear(group);
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result)) ret = result;
277
278 1 gate_arraylist_release(group->tasks);
279 1 gate_socketselector_destroy(&group->selector);
280
281 1 result = gate_mutex_destroy(&group->mutex);
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(result)) ret = result;
283
284 1 return ret;
285 }
286 2 gate_result_t gate_socketgroup_clear(gate_socketgroup_t* group)
287 {
288 2 gate_result_t ret = GATE_RESULT_FAILED;
289 gate_size_t index;
290 gate_size_t sockcount;
291 gate_socketgroup_task_t** ptr_task;
292 do
293 {
294 2 ret = gate_mutex_acquire(&group->mutex);
295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
296
297 do
298 {
299 2 sockcount = gate_arraylist_length(group->tasks);
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 for (index = 0; index != sockcount; ++index)
301 {
302 ptr_task = (gate_socketgroup_task_t**)gate_arraylist_get(group->tasks, index);
303 if (ptr_task && *ptr_task)
304 {
305 gate_socketgroup_task_destroy(*ptr_task);
306 }
307 }
308 2 gate_arraylist_clear(group->tasks);
309
310 } while (0);
311 2 gate_mutex_release(&group->mutex);
312
313 } while (0);
314 2 return ret;
315 }
316
317 5 static gate_bool_t gate_socketgroup_remove_condition(void const* item, void* param)
318 {
319 5 gate_socketgroup_task_t* const* task = (gate_socketgroup_task_t* const*)item;
320 5 gate_socket_t* sock = (gate_socket_t*)param;
321
5/6
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 2 times.
5 if (task && (*task) && ((*task)->sock == *sock))
322 {
323 /* delete task pointer and return true to remove it from arraylist */
324 1 gate_socketgroup_task_destroy(*task);
325 1 return true;
326 }
327 4 return false;
328 }
329
330 10 static gate_bool_t gate_socketgroup_integrate_task(gate_socketgroup_t* group, gate_socketgroup_task_t* task)
331 {
332 10 gate_size_t len = gate_arraylist_length(group->tasks);
333 gate_socketgroup_task_t** tasks;
334
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 1 times.
10 if (len != 0)
335 {
336 /* find empty slot to overwrite: */
337 9 tasks = (gate_socketgroup_task_t**)gate_arraylist_get(group->tasks, 0);
338
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 4 times.
15 for (; len != 0; --len, ++tasks)
339 {
340
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 6 times.
11 if (*tasks == NULL)
341 {
342 5 *tasks = task;
343 5 return true;
344 }
345 }
346 }
347 /* add new entry */
348 5 return NULL != gate_arraylist_add(group->tasks, &task);
349 }
350
351 3 gate_result_t gate_socketgroup_remove(gate_socketgroup_t* group, gate_socket_t sock)
352 {
353 3 gate_result_t ret = GATE_RESULT_FAILED;
354 do
355 {
356 3 ret = gate_mutex_acquire(&group->mutex);
357
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 GATE_BREAK_IF_FAILED(ret);
358
359 do
360 {
361 3 ret = gate_arraylist_remove_if(group->tasks, &gate_socketgroup_remove_condition, &sock);
362 } while (0);
363 3 gate_mutex_release(&group->mutex);
364
365 } while (0);
366 3 return ret;
367 }
368
369 gate_result_t gate_socketgroup_connect(gate_socketgroup_t* group, gate_socket_t sock, gate_socket_endpoint_t const* endpoint, void* param)
370 {
371 gate_result_t ret = GATE_RESULT_OK;
372 gate_socketgroup_task_t* task;
373 do
374 {
375 task = gate_socketgroup_task_create(GATE_SOCKETGROUP_OPERATION_CONNECT, 0, 0, param);
376 if (!task)
377 {
378 ret = GATE_RESULT_OUTOFMEMORY;
379 break;
380 }
381 ret = gate_socket_connect(sock, endpoint);
382
383 if (GATE_FAILED(ret))
384 {
385 gate_socketgroup_task_destroy(task);
386 }
387 else
388 {
389 task->sock = sock;
390 ret = gate_mutex_acquire(&group->mutex);
391 if (GATE_SUCCEEDED(ret))
392 {
393 if (!gate_socketgroup_integrate_task(group, task))
394 {
395 gate_socketgroup_task_destroy(task);
396 ret = GATE_RESULT_OUTOFMEMORY;
397 }
398 gate_mutex_release(&group->mutex);
399 }
400 gate_socketselector_interrupt(&group->selector);
401 }
402 } while (0);
403 return ret;
404 }
405 3 gate_result_t gate_socketgroup_accept(gate_socketgroup_t* group, gate_socket_t sock, void* param)
406 {
407 3 gate_result_t ret = GATE_RESULT_OK;
408 gate_socketgroup_task_t* task;
409 do
410 {
411 3 task = gate_socketgroup_task_create(GATE_SOCKETGROUP_OPERATION_ACCEPT, 0, 0, param);
412
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!task)
413 {
414 ret = GATE_RESULT_OUTOFMEMORY;
415 break;
416 }
417 3 task->sock = sock;
418 {
419 3 ret = gate_mutex_acquire(&group->mutex);
420
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (GATE_SUCCEEDED(ret))
421 {
422
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (!gate_socketgroup_integrate_task(group, task))
423 {
424 gate_socketgroup_task_destroy(task);
425 ret = GATE_RESULT_OUTOFMEMORY;
426 }
427 3 gate_mutex_release(&group->mutex);
428 }
429 3 gate_socketselector_interrupt(&group->selector);
430 }
431 } while (0);
432 3 return ret;
433 }
434 3 gate_result_t gate_socketgroup_read(gate_socketgroup_t* group, gate_socket_t sock, gate_size_t length, void* param)
435 {
436 3 gate_result_t ret = GATE_RESULT_OK;
437 gate_socketgroup_task_t* task;
438 do
439 {
440 3 task = gate_socketgroup_task_create(GATE_SOCKETGROUP_OPERATION_READ, 0, 0, param);
441
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!task)
442 {
443 ret = GATE_RESULT_OUTOFMEMORY;
444 break;
445 }
446 3 task->length = length;
447 3 task->sock = sock;
448 {
449 3 ret = gate_mutex_acquire(&group->mutex);
450
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (GATE_SUCCEEDED(ret))
451 {
452
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (!gate_socketgroup_integrate_task(group, task))
453 {
454 gate_socketgroup_task_destroy(task);
455 ret = GATE_RESULT_OUTOFMEMORY;
456 }
457 3 gate_mutex_release(&group->mutex);
458 }
459 3 gate_socketselector_interrupt(&group->selector);
460 }
461 } while (0);
462 3 return ret;
463 }
464 4 gate_result_t gate_socketgroup_write(gate_socketgroup_t* group, gate_socket_t sock, char const* data, gate_size_t length, void* param)
465 {
466 4 gate_result_t ret = GATE_RESULT_OK;
467 gate_socketgroup_task_t* task;
468 do
469 {
470 4 task = gate_socketgroup_task_create(GATE_SOCKETGROUP_OPERATION_WRITE, length, length, param);
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (!task)
472 {
473 ret = GATE_RESULT_OUTOFMEMORY;
474 break;
475 }
476
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (length != 0)
477 {
478 4 gate_mem_copy(task->data, data, length);
479 }
480 4 task->sock = sock;
481 {
482 4 ret = gate_mutex_acquire(&group->mutex);
483
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (GATE_SUCCEEDED(ret))
484 {
485
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (!gate_socketgroup_integrate_task(group, task))
486 {
487 gate_socketgroup_task_destroy(task);
488 ret = GATE_RESULT_OUTOFMEMORY;
489 }
490 4 gate_mutex_release(&group->mutex);
491 }
492 4 gate_socketselector_interrupt(&group->selector);
493 }
494 } while (0);
495 4 return ret;
496 }
497 gate_result_t gate_socketgroup_shutdown_write(gate_socketgroup_t* group, gate_socket_t sock, void* param)
498 {
499 gate_result_t ret = GATE_RESULT_OK;
500 gate_socketgroup_task_t* task;
501 do
502 {
503 task = gate_socketgroup_task_create(GATE_SOCKETGROUP_OPERATION_SHUTDOWN_WRITE, 0, 0, param);
504 if (!task)
505 {
506 ret = GATE_RESULT_OUTOFMEMORY;
507 break;
508 }
509 task->sock = sock;
510 {
511 ret = gate_mutex_acquire(&group->mutex);
512 if (GATE_SUCCEEDED(ret))
513 {
514 if (!gate_socketgroup_integrate_task(group, task))
515 {
516 gate_socketgroup_task_destroy(task);
517 ret = GATE_RESULT_OUTOFMEMORY;
518 }
519 gate_mutex_release(&group->mutex);
520 }
521 gate_socketselector_interrupt(&group->selector);
522 }
523 } while (0);
524 return ret;
525 }
526
527
528 22 static gate_size_t gate_socketgroup_find_socket(gate_socket_t* sockets, gate_size_t socketcount, gate_socket_t target)
529 {
530 22 gate_size_t sockindex = 0;
531
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 20 times.
29 while (socketcount-- != 0)
532 {
533
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 7 times.
9 if (*sockets == target)
534 {
535 2 return sockindex;
536 }
537 7 ++sockets;
538 7 ++sockindex;
539 }
540 20 return GATE_INVALID_SIZE;
541 }
542
543 13 static gate_size_t gate_socketgroup_load_sockets(gate_socketgroup_task_t* const* tasks, gate_size_t taskcount,
544 gate_socket_t* sockets, gate_uint8_t* flags, gate_size_t maxsockets)
545 {
546 13 gate_size_t sockets_used = 0;
547 gate_size_t sockindex;
548 gate_socketgroup_task_t* current_task;
549
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 if (taskcount > maxsockets)
550 {
551 taskcount = maxsockets;
552 }
553
554
1/2
✓ Branch 0 taken 22 times.
✗ Branch 1 not taken.
35 while ((taskcount-- != 0) && (sockets_used < maxsockets))
555 {
556 22 current_task = *tasks;
557 22 ++tasks;
558
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22 times.
22 if (current_task == NULL)
559 {
560 /* skip empty tasks */
561 continue;
562 }
563 22 sockindex = gate_socketgroup_find_socket(sockets, sockets_used, current_task->sock);
564
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 2 times.
22 if (sockindex == GATE_INVALID_SIZE)
565 {
566 20 sockindex = sockets_used++;
567 20 sockets[sockindex] = current_task->sock;
568 20 flags[sockindex] = GATE_SOCKET_SELECT_FLAG_ERROR;
569 }
570
571
2/3
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
22 switch (current_task->command)
572 {
573 16 case GATE_SOCKETGROUP_OPERATION_ACCEPT:
574 case GATE_SOCKETGROUP_OPERATION_READ:
575 {
576 16 flags[sockindex] |= GATE_SOCKET_SELECT_FLAG_RECEIVE;
577 16 break;
578 }
579 6 case GATE_SOCKETGROUP_OPERATION_WRITE:
580 case GATE_SOCKETGROUP_OPERATION_SHUTDOWN_WRITE:
581 case GATE_SOCKETGROUP_OPERATION_CONNECT:
582 {
583 6 flags[sockindex] |= GATE_SOCKET_SELECT_FLAG_SEND;
584 6 break;
585 }
586 }
587
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 13 times.
35 }
588 13 return sockets_used;
589 }
590
591 9 static gate_size_t gate_socketgroup_find_task(gate_socketgroup_task_t* const* tasks, gate_size_t taskcount,
592 gate_socket_t target, gate_uint8_t flag)
593 {
594 9 gate_size_t task_index = 0;
595
1/2
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
11 while (taskcount-- != 0)
596 {
597
3/4
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
✓ Branch 3 taken 2 times.
11 if ((*tasks) && (*tasks)->sock == target)
598 {
599
2/3
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
9 switch ((*tasks)->command)
600 {
601 4 case GATE_SOCKETGROUP_OPERATION_CONNECT:
602 case GATE_SOCKETGROUP_OPERATION_WRITE:
603 case GATE_SOCKETGROUP_OPERATION_SHUTDOWN_WRITE:
604 {
605
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (GATE_FLAG_ENABLED(flag, GATE_SOCKET_SELECT_FLAG_SEND))
606 {
607 4 return task_index;
608 }
609 break;
610 }
611 5 case GATE_SOCKETGROUP_OPERATION_ACCEPT:
612 case GATE_SOCKETGROUP_OPERATION_READ:
613 {
614
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if (GATE_FLAG_ENABLED(flag, GATE_SOCKET_SELECT_FLAG_RECEIVE))
615 {
616 5 return task_index;
617 }
618 break;
619 }
620 }
621
622 if (GATE_FLAG_ENABLED(flag, GATE_SOCKET_SELECT_FLAG_ERROR))
623 {
624 /* errors are always communicated */
625 return task_index;
626 }
627 }
628 2 ++task_index;
629 2 ++tasks;
630 }
631 return GATE_INVALID_SIZE;
632 }
633
634 13 static gate_size_t gate_socketgroup_load_tasks(gate_socket_t const* sockets, gate_uint8_t* flags, gate_size_t socketcount,
635 gate_socketgroup_task_t** tasks, gate_size_t taskcount,
636 gate_socketgroup_task_t** localtasks, gate_size_t max_localtasks)
637 {
638 13 gate_size_t localtasks_used = 0;
639 gate_size_t taskindex;
640 gate_size_t sockindex;
641
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 if (taskcount > max_localtasks)
642 {
643 taskcount = max_localtasks;
644 }
645
646
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 13 times.
33 for (sockindex = 0; sockindex != socketcount; ++sockindex)
647 {
648
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 9 times.
20 if (flags[sockindex] == 0)
649 {
650 /* skip socket with no activity */
651 11 continue;
652 }
653
654 9 taskindex = gate_socketgroup_find_task(tasks, taskcount, sockets[sockindex], flags[sockindex]);
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (taskindex == GATE_INVALID_SIZE)
656 {
657 /* skip sockets with no associated task */
658 continue;
659 }
660 /* put task into local-tasks buffer */
661
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (tasks[taskindex])
662 {
663 9 localtasks[localtasks_used] = tasks[taskindex];
664 9 localtasks[localtasks_used]->info = flags[sockindex];
665 9 tasks[taskindex] = NULL; /* remove task pointer from array */
666 9 ++localtasks_used;
667 }
668 }
669
670 13 return localtasks_used;
671 }
672 26 static gate_bool_t gate_socketgroup_remove_completed_task(void const* item, void* param)
673 {
674 26 gate_socketgroup_task_t* const* ptr_task = (gate_socketgroup_task_t* const*)item;
675 (void)param;
676
1/2
✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
26 if (ptr_task != NULL)
677 {
678
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 4 times.
26 if (*ptr_task != NULL)
679 {
680
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22 times.
22 if ((*ptr_task)->completed)
681 {
682 gate_socketgroup_task_destroy(*ptr_task);
683 return true;
684 }
685 else
686 {
687 22 return false;
688 }
689 }
690 4 return true;
691 }
692 GATE_DEBUG_BREAKPOINT;
693 return false;
694 }
695
696 1 gate_result_t gate_socketgroup_run(gate_socketgroup_t* group, gate_socketgroup_callback_t callback)
697 {
698 gate_result_t ret;
699 1 gate_int32_t prev_state = gate_atomic_int_xchg_if(&group->running, 0, 1);
700 gate_socket_t sockets[GATE_SOCKETGROUP_LIMIT];
701 gate_uint8_t flags[GATE_SOCKETGROUP_LIMIT];
702 gate_size_t sockets_used;
703 gate_socketgroup_task_t* localtasks[GATE_SOCKETGROUP_LIMIT];
704 gate_size_t localtasks_used;
705 gate_size_t localtask_index;
706
707 gate_socketgroup_task_t** tasks;
708 gate_size_t task_count;
709 gate_socketgroup_task_t* current_task;
710 gate_bool_t error_detected;
711 char buffer[1024 * 6 + 1];
712 gate_size_t buffer_len;
713 gate_size_t buffer_used;
714 gate_socket_t new_sock;
715
716 do
717 {
718
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (prev_state != 0)
719 {
720 ret = GATE_RESULT_INVALIDSTATE;
721 break;
722 }
723
724 1 ret = GATE_RESULT_OK;
725
726
2/2
✓ Branch 1 taken 13 times.
✓ Branch 2 taken 1 times.
14 while (gate_atomic_int_get(&group->running) == 1)
727 {
728 {
729 /* idle event */
730 13 callback(group, GATE_SOCKETGROUP_OPERATION_PREPARE, GATE_SOCKET_INVALID, GATE_RESULT_OK, NULL, NULL, 0);
731 }
732
733 13 sockets_used = 0;
734 { /* mutex section */
735 13 ret = gate_mutex_acquire(&group->mutex);
736
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 GATE_BREAK_IF_FAILED(ret);
737
738 13 task_count = gate_arraylist_length(group->tasks);
739
1/2
✓ Branch 0 taken 13 times.
✗ Branch 1 not taken.
13 if (task_count != 0)
740 {
741 13 tasks = (gate_socketgroup_task_t**)gate_arraylist_get(group->tasks, 0);
742
1/2
✓ Branch 0 taken 13 times.
✗ Branch 1 not taken.
13 if (tasks != NULL)
743 {
744 13 sockets_used = gate_socketgroup_load_sockets(tasks, task_count, sockets, flags, GATE_SOCKETGROUP_LIMIT);
745 }
746 }
747 13 gate_mutex_release(&group->mutex);
748 }
749
750 13 ret = gate_socketselector_select(&group->selector, sockets, sockets_used, flags, group->interval_ms);
751
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 GATE_BREAK_IF_FAILED(ret);
752
753 13 localtasks_used = 0;
754 { /* mutex section */
755 13 ret = gate_mutex_acquire(&group->mutex);
756
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 GATE_BREAK_IF_FAILED(ret);
757
758 13 task_count = gate_arraylist_length(group->tasks);
759
1/2
✓ Branch 0 taken 13 times.
✗ Branch 1 not taken.
13 if (task_count != 0)
760 {
761 13 tasks = (gate_socketgroup_task_t**)gate_arraylist_get(group->tasks, 0);
762
763 13 localtasks_used = gate_socketgroup_load_tasks(sockets, flags, sockets_used, tasks, task_count, localtasks, GATE_SOCKETGROUP_LIMIT);
764 }
765
766 13 gate_mutex_release(&group->mutex);
767 }
768
769
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 13 times.
22 for (localtask_index = 0; localtask_index != localtasks_used; ++localtask_index)
770 {
771 9 current_task = localtasks[localtask_index];
772 9 error_detected = GATE_FLAG_ENABLED(current_task->info, GATE_SOCKET_SELECT_FLAG_ERROR);
773
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (error_detected)
775 {
776 current_task->completed = true;
777 callback(group, current_task->command, current_task->sock, GATE_RESULT_CRITICALERROR, current_task->param, NULL, 0);
778 }
779 else
780 {
781
3/6
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 3 times.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
9 switch (current_task->command)
782 {
783 case GATE_SOCKETGROUP_OPERATION_CONNECT:
784 {
785 current_task->completed = true;
786 callback(group, GATE_SOCKETGROUP_OPERATION_CONNECT, current_task->sock, GATE_RESULT_OK, current_task->param,
787 NULL, 0);
788 break;
789 }
790 2 case GATE_SOCKETGROUP_OPERATION_ACCEPT:
791 {
792 2 current_task->completed = true;
793 2 ret = gate_socket_accept(current_task->sock, &new_sock);
794
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (GATE_FAILED(ret))
795 {
796 callback(group, GATE_SOCKETGROUP_OPERATION_ACCEPT, current_task->sock, ret, current_task->param,
797 NULL, 0);
798 }
799 else
800 {
801 2 callback(group, GATE_SOCKETGROUP_OPERATION_ACCEPT, new_sock, GATE_RESULT_OK, current_task->param,
802 2 (char const*)&current_task->sock, sizeof(current_task->sock));
803 }
804 2 break;
805 }
806 3 case GATE_SOCKETGROUP_OPERATION_READ:
807 {
808 3 current_task->completed = true;
809 3 buffer_len = sizeof(buffer) - 1;
810
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (buffer_len > current_task->length)
811 {
812 3 buffer_len = current_task->length;
813 }
814 3 ret = gate_socket_receive(current_task->sock, buffer, buffer_len, &buffer_used);
815
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (GATE_SUCCEEDED(ret))
816 {
817 3 buffer[buffer_used] = 0;
818 }
819
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
3 callback(group, GATE_SOCKETGROUP_OPERATION_READ, current_task->sock, ret, current_task->param,
820 GATE_FAILED(ret) ? NULL : buffer, GATE_FAILED(ret) ? 0 : buffer_used);
821 3 break;
822 }
823 4 case GATE_SOCKETGROUP_OPERATION_WRITE:
824 {
825
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (current_task->offset < current_task->length)
826 {
827 4 ret = gate_socket_send(current_task->sock, &current_task->data[current_task->offset],
828 4 current_task->length - current_task->offset, &buffer_used);
829
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (GATE_FAILED(ret))
830 {
831 current_task->completed = true;
832 callback(group, GATE_SOCKETGROUP_OPERATION_WRITE, current_task->sock, ret, current_task->param,
833 current_task->data, current_task->length);
834 break;
835 }
836 4 current_task->offset += buffer_used;
837 }
838
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (current_task->offset >= current_task->length)
839 {
840 4 current_task->completed = true;
841 8 callback(group, GATE_SOCKETGROUP_OPERATION_WRITE, current_task->sock, GATE_RESULT_OK, current_task->param,
842 4 current_task->data, current_task->length);
843 }
844
845 4 break;
846 }
847 case GATE_SOCKETGROUP_OPERATION_SHUTDOWN_WRITE:
848 {
849 ret = gate_socket_shutdown(current_task->sock, false, true);
850 current_task->completed = true;
851 callback(group, GATE_SOCKETGROUP_OPERATION_SHUTDOWN_WRITE, current_task->sock, ret, current_task->param,
852 NULL, 0);
853 break;
854 }
855 }
856 9 }
857
858 } /* for(all localtask */
859
860 { /* mutex section */
861 13 ret = gate_mutex_acquire(&group->mutex);
862
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 GATE_BREAK_IF_FAILED(ret);
863
864
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 13 times.
22 for (localtask_index = 0; localtask_index != localtasks_used; ++localtask_index)
865 {
866 9 current_task = localtasks[localtask_index];
867
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (current_task->completed)
868 {
869 9 gate_socketgroup_task_destroy(current_task);
870 }
871 else
872 {
873 if (!gate_socketgroup_integrate_task(group, current_task))
874 {
875 GATE_DEBUG_BREAKPOINT;
876 }
877 }
878 }
879
880 13 ret = gate_arraylist_remove_if(group->tasks, &gate_socketgroup_remove_completed_task, NULL);
881
882 13 gate_mutex_release(&group->mutex);
883
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 GATE_BREAK_IF_FAILED(ret);
884 }
885
886 } /* while running */
887
888 1 gate_atomic_int_set(&group->running, 0);
889
890 } while (0);
891
892 1 return ret;
893 }
894 2 gate_result_t gate_socketgroup_quit(gate_socketgroup_t* group)
895 {
896 2 gate_atomic_int_set(&group->running, 0);
897 2 return GATE_RESULT_OK;
898 }
899
900
901
902
903
904
905
906
907
908
909
910 /*
911 typedef struct gate_socket_request_class
912 {
913 gate_dataqueue_status_t callback;
914 gate_size_t length;
915 gate_size_t used;
916 char buffer[1];
917 } gate_socket_request_t;
918
919
920 static gate_socket_request_t* gate_socket_request_create(gate_dataqueue_status_t callback, gate_size_t length, char const* data)
921 {
922 gate_socket_request_t* ret = (gate_socket_request_t*)gate_mem_alloc(sizeof(gate_socket_request_t) + length);
923 if(ret != NULL)
924 {
925 ret->callback = callback;
926 ret->used = 0;
927 ret->length = length;
928 if(data != NULL)
929 {
930 gate_mem_copy(&ret->buffer[0], data, length);
931 ret->buffer[length] = 0;
932 }
933 else
934 {
935 ret->buffer[0] = 0;
936 }
937 }
938 return ret;
939 }
940 static void gate_socket_request_release(gate_socket_request_t* request)
941 {
942 gate_mem_dealloc(request);
943 }
944
945 typedef struct gate_socket_channel_class
946 {
947 gate_channel_id_t id;
948 gate_socket_t sock;
949 gate_arraylist_t read_requests;
950 gate_arraylist_t write_requests;
951
952 } gate_socket_channel_t;
953
954 static gate_socket_channel_t* gate_socket_channel_create()
955 {
956 gate_socket_channel_t* channel = gate_mem_alloc(sizeof(gate_socket_channel_t));
957 gate_mem_clear(channel, sizeof(gate_socket_channel_t));
958 channel->sock = GATE_SOCKET_INVALID;
959 return channel;
960 }
961 static void gate_socket_channel_release(gate_socket_channel_t* channel)
962 {
963 if(channel->sock != GATE_SOCKET_INVALID)
964 {
965 gate_socket_close(channel->sock);
966 }
967 gate_mem_dealloc(channel);
968 }
969 */
970
971 #define GATE_SOCKETQUEUE_STATE_OFFLINE 0
972 #define GATE_SOCKETQUEUE_STATE_STARTING 1
973 #define GATE_SOCKETQUEUE_STATE_ONLINE 2
974 #define GATE_SOCKETQUEUE_STATE_STOPPING 3
975
976 typedef struct gate_socketqueue_impl_class
977 {
978 GATE_INTERFACE_VTBL(gate_socketqueue) const* vtbl;
979
980 gate_atomic_int_t ref_counter;
981 gate_atomic_int_t state;
982 gate_atomic_int_t max_channel_id;
983 gate_thread_t thread;
984 gate_mutex_t lock;
985 gate_map_t channel_2_sock;
986 gate_map_t sock_2_channel;
987 gate_socketgroup_t sockets;
988 gate_dataqueue_status_t next_callback;
989 void* next_callback_data;
990 gate_dataqueue_status_t callback;
991 void* callback_data;
992
993 } gate_socketqueue_impl_t;
994
995 9 static gate_bool_t gate_socketqueue_impl_new_state(gate_socketqueue_impl_t* impl, gate_int32_t from_state, gate_int32_t to_state)
996 {
997 9 gate_int32_t old_state = gate_atomic_int_xchg_if(&impl->state, from_state, to_state);
998 9 return old_state == from_state;
999 }
1000 1 static gate_int32_t gate_socketqueue_impl_set_state(gate_socketqueue_impl_t* impl, gate_int32_t to_state)
1001 {
1002 1 gate_int32_t old_state = gate_atomic_int_set(&impl->state, to_state);
1003 1 return old_state;
1004 }
1005
1006 7 static gate_result_t gate_socketqueue_impl_resolve_socket(gate_socketqueue_impl_t* impl, gate_channel_id_t id, gate_socket_t* ptr_sock)
1007 {
1008 gate_result_t ret;
1009 gate_map_iterator_t iter;
1010 do
1011 {
1012 7 iter = gate_map_get(&impl->channel_2_sock, &id);
1013
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 if (gate_map_iterator_valid(iter))
1014 {
1015 7 *ptr_sock = GATE_MAP_ITER_VALUE(gate_socket_t, iter);
1016 7 ret = GATE_RESULT_OK;
1017 }
1018 else
1019 {
1020 ret = GATE_RESULT_NOMATCH;
1021 }
1022 } while (0);
1023 7 return ret;
1024 }
1025 9 static gate_result_t gate_socketqueue_impl_resolve_channel(gate_socketqueue_impl_t* impl, gate_socket_t sock, gate_channel_id_t* ptr_id)
1026 {
1027 gate_result_t ret;
1028 gate_map_iterator_t iter;
1029 do
1030 {
1031 9 iter = gate_map_get(&impl->sock_2_channel, &sock);
1032
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 if (gate_map_iterator_valid(iter))
1033 {
1034 9 *ptr_id = GATE_MAP_ITER_VALUE(gate_channel_id_t, iter);
1035 9 ret = GATE_RESULT_OK;
1036 }
1037 else
1038 {
1039 ret = GATE_RESULT_NOMATCH;
1040 }
1041 } while (0);
1042 9 return ret;
1043 }
1044 3 static gate_result_t gate_socketqueue_impl_register(gate_socketqueue_impl_t* impl, gate_socket_t sock, gate_channel_id_t channel_id)
1045 {
1046 gate_result_t ret;
1047 do
1048 {
1049
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (!gate_map_iterator_valid(gate_map_add(&impl->sock_2_channel, &sock, &channel_id)))
1050 {
1051 ret = GATE_RESULT_OUTOFMEMORY;
1052 break;
1053 }
1054
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (!gate_map_iterator_valid(gate_map_add(&impl->channel_2_sock, &channel_id, &sock)))
1055 {
1056 ret = GATE_RESULT_OUTOFMEMORY;
1057 gate_map_remove(&impl->sock_2_channel, &sock);
1058 break;
1059 }
1060 3 ret = GATE_RESULT_OK;
1061 } while (0);
1062 3 return ret;
1063 }
1064
1065 22 static void gate_socketqueue_group_callback(gate_socketgroup_t* group, gate_uint32_t operation, gate_socket_t sock,
1066 gate_result_t result, void* user_param, char const* data, gate_size_t data_len)
1067 {
1068 gate_result_t res;
1069 22 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)group->user_tag;
1070 22 gate_channel_id_t channel_id = GATE_QUEUE_INVALID_ID;
1071 22 gate_socket_t parent_socket = GATE_SOCKET_INVALID;
1072 gate_channel_id_t parent_channel_id;
1073
1074 {
1075
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 22 times.
22 if (GATE_FAILED(res = gate_mutex_acquire(&impl->lock)))
1076 {
1077 gate_socket_close(sock);
1078 return;
1079 }
1080
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 20 times.
22 if (operation == GATE_SOCKETGROUP_OPERATION_ACCEPT)
1081 {
1082 2 channel_id = gate_atomic_int_inc(&impl->max_channel_id);
1083 2 res = gate_socketqueue_impl_register(impl, sock, channel_id);
1084
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (GATE_FAILED(res))
1085 {
1086 gate_socket_close(sock);
1087 result = res;
1088 }
1089 else
1090 {
1091 2 parent_channel_id = 0;
1092 2 parent_socket = *(gate_socket_t const*)data;
1093 2 gate_socketqueue_impl_resolve_channel(impl, parent_socket, &parent_channel_id);
1094 }
1095 }
1096
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 13 times.
20 else if (operation != GATE_SOCKETGROUP_OPERATION_PREPARE)
1097 {
1098 7 res = gate_socketqueue_impl_resolve_channel(impl, sock, &channel_id);
1099 }
1100 22 gate_mutex_release(&impl->lock);
1101 }
1102
1103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22 times.
22 if (GATE_FAILED(res))
1104 {
1105 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_ERROR, 0, res, NULL, 0, user_param);
1106 }
1107 else
1108 {
1109
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22 times.
22 if (GATE_FAILED(result))
1110 {
1111 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_ERROR, channel_id, result, NULL, 0, user_param);
1112 }
1113 else
1114 {
1115
4/8
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 13 times.
✗ Branch 7 not taken.
22 switch (operation)
1116 {
1117 2 case GATE_SOCKETGROUP_OPERATION_ACCEPT:
1118 {
1119 2 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_OPENNEW, channel_id, result,
1120 (char const*)&parent_channel_id, sizeof(parent_channel_id), user_param);
1121 2 gate_socketgroup_accept(&impl->sockets, parent_socket, user_param);
1122 2 break;
1123 /* accepts are automatically repeated until server socket is closed */
1124 }
1125 case GATE_SOCKETGROUP_OPERATION_CONNECT:
1126 {
1127 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_OPEN, channel_id, result, NULL, 0, user_param);
1128 break;
1129 }
1130 3 case GATE_SOCKETGROUP_OPERATION_READ:
1131 {
1132
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 if (data_len == 0)
1133 {
1134 1 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_CLOSE, channel_id, result, NULL, 0, user_param);
1135 }
1136 else
1137 {
1138 2 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_READ, channel_id, result, data, data_len, user_param);
1139 }
1140 3 break;
1141 }
1142 4 case GATE_SOCKETGROUP_OPERATION_WRITE:
1143 {
1144 4 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_WRITE, channel_id, result, data, data_len, user_param);
1145 4 break;
1146 }
1147 case GATE_SOCKETGROUP_OPERATION_SHUTDOWN_WRITE:
1148 {
1149 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_WRITE, channel_id, result, NULL, 0, user_param);
1150 break;
1151 }
1152 case GATE_SOCKETGROUP_OPERATION_ERROR:
1153 {
1154 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_ERROR, channel_id, result, data, data_len, user_param);
1155 break;
1156 }
1157 13 case GATE_SOCKETGROUP_OPERATION_PREPARE:
1158 {
1159 13 impl->callback(impl->callback_data, GATE_DATAQUEUE_RESULT_HEARTBEAT, GATE_QUEUE_INVALID_ID, result, NULL, 0, user_param);
1160 13 break;
1161 }
1162 }
1163 22 }
1164 }
1165 }
1166
1167 1 static gate_result_t gate_socketqueue_impl_worker(void* data)
1168 {
1169 1 gate_result_t ret = GATE_RESULT_OK;
1170 1 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)data;
1171
1172
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (!gate_socketqueue_impl_new_state(impl, GATE_SOCKETQUEUE_STATE_STARTING, GATE_SOCKETQUEUE_STATE_ONLINE))
1173 {
1174 return GATE_RESULT_INVALIDSTATE;
1175 }
1176
1177 do
1178 {
1179 1 ret = gate_socketgroup_run(&impl->sockets, &gate_socketqueue_group_callback);
1180 if (GATE_FAILED(ret))
1181 {
1182 /* TODO */
1183 }
1184
1185
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 } while (gate_socketqueue_impl_new_state(impl, GATE_SOCKETQUEUE_STATE_ONLINE, GATE_SOCKETQUEUE_STATE_ONLINE));
1186 1 return ret;
1187 }
1188
1189
1190 2 static int gate_socketqueue_impl_retain(void* obj)
1191 {
1192 2 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1193 2 return gate_atomic_int_inc(&impl->ref_counter);
1194 }
1195 static char const* gate_socketqueue_impl_get_interface_name(void* obj)
1196 {
1197 (void)obj;
1198 return GATE_INTERFACE_NAME_SOCKETQUEUE;
1199 }
1200
1201
1202 1 static gate_result_t gate_socketqueue_impl_start(void* obj)
1203 {
1204 1 gate_result_t ret = GATE_RESULT_FAILED;
1205 1 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1206
1207 do
1208 {
1209
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (!gate_socketqueue_impl_new_state(impl, GATE_SOCKETQUEUE_STATE_OFFLINE, GATE_SOCKETQUEUE_STATE_STARTING))
1210 {
1211 ret = GATE_RESULT_INVALIDSTATE;
1212 break;
1213 }
1214
1215 1 impl->callback = impl->next_callback;
1216 1 impl->callback_data = impl->next_callback_data;
1217
1218 1 ret = gate_thread_start_code(&gate_socketqueue_impl_worker, obj, &impl->thread, NULL);
1219
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
1220 {
1221 gate_socketqueue_impl_new_state(impl, GATE_SOCKETQUEUE_STATE_STARTING, GATE_SOCKETQUEUE_STATE_OFFLINE);
1222 }
1223 } while (0);
1224
1225 1 return ret;
1226 }
1227 3 static gate_result_t gate_socketqueue_impl_stop(void* obj)
1228 {
1229 3 gate_result_t ret = GATE_RESULT_FAILED;
1230 3 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1231 gate_result_t thread_result;
1232 3 gate_bool_t is_current_thread = false;
1233 3 gate_bool_t join_thread = true;
1234
1235 do
1236 {
1237
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if (!gate_socketqueue_impl_new_state(impl, GATE_SOCKETQUEUE_STATE_STARTING, GATE_SOCKETQUEUE_STATE_STOPPING))
1238 {
1239
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
3 if (!gate_socketqueue_impl_new_state(impl, GATE_SOCKETQUEUE_STATE_ONLINE, GATE_SOCKETQUEUE_STATE_STOPPING))
1240 {
1241 2 ret = GATE_RESULT_INVALIDSTATE;
1242 2 break;
1243 }
1244 }
1245
1246 1 ret = gate_socketgroup_quit(&impl->sockets);
1247 1 GATE_DEBUG_ASSERT(GATE_SUCCEEDED(ret));
1248
1249 1 ret = gate_thread_is_current(&impl->thread, &is_current_thread);
1250 1 GATE_DEBUG_ASSERT(GATE_SUCCEEDED(ret));
1251
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (GATE_SUCCEEDED(ret))
1252 {
1253 1 join_thread = !is_current_thread;
1254 }
1255
1256
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (join_thread)
1257 {
1258 1 ret = gate_thread_join(&impl->thread, &thread_result);
1259 1 GATE_DEBUG_ASSERT(GATE_SUCCEEDED(ret));
1260 }
1261 else
1262 {
1263 ret = gate_thread_detach(&impl->thread);
1264 GATE_DEBUG_ASSERT(GATE_SUCCEEDED(ret));
1265 }
1266
1267 1 gate_socketqueue_impl_set_state(impl, GATE_SOCKETQUEUE_STATE_OFFLINE);
1268 } while (0);
1269
1270 3 return ret;
1271 }
1272
1273
1274 static gate_enumint_t gate_socketqueue_impl_get_status(void* obj)
1275 {
1276 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1277 return (gate_enumint_t)gate_atomic_int_get(&impl->state);
1278 }
1279
1280 1 gate_result_t gate_socketqueue_impl_set_callback(void* obj, gate_dataqueue_status_t callback_func, void* callback_data)
1281 {
1282 1 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1283 1 impl->next_callback = callback_func;
1284 1 impl->next_callback_data = callback_data;
1285 1 return GATE_RESULT_OK;
1286 }
1287
1288
1289 1 static gate_result_t gate_socketqueue_impl_open(void* obj, gate_string_t const* address, gate_enumint_t flags, void* user_param, gate_channel_id_t* channel_id)
1290 {
1291 1 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1292 1 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1293 gate_socket_endpoint_t endpoint;
1294 1 gate_socket_t new_sock = GATE_SOCKET_INVALID;
1295 1 gate_channel_id_t new_channel = 0;
1296 1 gate_bool_t server_socket = GATE_FLAG_ENABLED(flags, GATE_SOCKETQUEUE_OPEN_SERVER);
1297
1298 do
1299 {
1300 1 ret = gate_socket_parse_endpoint(address, &endpoint);
1301
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
1302
1303
1/3
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
1 switch (endpoint.family)
1304 {
1305 1 case GATE_SOCKET_FAMILY_INET4:
1306 {
1307 1 ret = gate_socket_create_ex(endpoint.family, GATE_SOCKET_MSGTYPE_STREAM, GATE_SOCKET_PROTOCOL_IP, &new_sock);
1308 1 break;
1309 }
1310 case GATE_SOCKET_FAMILY_INET6:
1311 {
1312 ret = gate_socket_create_ex(endpoint.family, GATE_SOCKET_MSGTYPE_STREAM, GATE_SOCKET_PROTOCOL_IP6, &new_sock);
1313 }
1314 default:
1315 {
1316 ret = GATE_RESULT_NOTSUPPORTED;
1317 break;
1318 }
1319 }
1320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
1321
1322
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (server_socket)
1323 {
1324 1 ret = gate_socket_bind(new_sock, &endpoint);
1325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED_TRACE(ret, "gate_socket_bind() failed");
1326 1 ret = gate_socket_listen(new_sock, 0);
1327
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED_TRACE(ret, "gate_socket_listen() failed");
1328 1 ret = gate_socketgroup_accept(&impl->sockets, new_sock, user_param);
1329
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED_TRACE(ret, "gate_socketgroup_accept() failed");
1330 }
1331 else
1332 {
1333 ret = gate_socket_set(new_sock, GATE_SOCKET_OPTION_BLOCKING, 0);
1334 GATE_BREAK_IF_FAILED_TRACE(ret, "gate_socket_set() failed");
1335
1336 ret = gate_socketgroup_connect(&impl->sockets, new_sock, &endpoint, user_param);
1337 GATE_BREAK_IF_FAILED_TRACE(ret, "gate_socketgroup_connect() failed");
1338
1339 ret = gate_socket_set(new_sock, GATE_SOCKET_OPTION_BLOCKING, 1);
1340 GATE_BREAK_IF_FAILED_TRACE(ret, "gate_socket_set() failed");
1341 }
1342
1343
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 if (GATE_SUCCEEDED(ret = gate_mutex_acquire(&impl->lock)))
1344 {
1345 1 new_channel = gate_atomic_int_inc(&impl->max_channel_id);
1346 1 ret = gate_socketqueue_impl_register(impl, new_sock, new_channel);
1347 1 gate_mutex_release(&impl->lock);
1348 1 *channel_id = new_channel;
1349 }
1350
1351 } while (0);
1352
1353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
1354 {
1355 gate_socket_close(new_sock);
1356 }
1357
1358 1 return ret;
1359 }
1360 3 static gate_result_t gate_socketqueue_impl_close(void* obj, gate_channel_id_t channel_id)
1361 {
1362 3 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1363 3 gate_result_t ret = GATE_RESULT_FAILED;
1364 gate_map_iterator_t iter;
1365 3 gate_socket_t sock = GATE_SOCKET_INVALID;
1366
1367
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if (GATE_SUCCEEDED(ret = gate_mutex_acquire(&impl->lock)))
1368 {
1369 do
1370 {
1371
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (!gate_map_iterator_valid(iter = gate_map_get(&impl->channel_2_sock, &channel_id)))
1372 {
1373 ret = GATE_RESULT_NOMATCH;
1374 break;
1375 }
1376 3 sock = GATE_MAP_ITER_VALUE(gate_socket_t, iter);
1377 3 gate_map_remove(&impl->sock_2_channel, &sock);
1378 3 gate_map_remove(&impl->channel_2_sock, &channel_id);
1379
1380 3 gate_socketgroup_remove(&impl->sockets, sock);
1381 3 gate_socket_close(sock);
1382
1383 3 ret = GATE_RESULT_OK;
1384 } while (0);
1385 3 gate_mutex_release(&impl->lock);
1386 }
1387 3 return ret;
1388 }
1389 3 static gate_result_t gate_socketqueue_impl_begin_read(void* obj, gate_channel_id_t channel_id, gate_size_t size, void* user_param)
1390 {
1391 3 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1392 gate_result_t ret;
1393 3 gate_socket_t sock = GATE_SOCKET_INVALID;
1394
1395
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if (GATE_SUCCEEDED(ret = gate_mutex_acquire(&impl->lock)))
1396 {
1397 3 ret = gate_socketqueue_impl_resolve_socket(impl, channel_id, &sock);
1398 3 gate_mutex_release(&impl->lock);
1399 }
1400
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (GATE_SUCCEEDED(ret))
1401 {
1402 3 ret = gate_socketgroup_read(&impl->sockets, sock, size, user_param);
1403 }
1404
1405 3 return ret;
1406 }
1407 4 static gate_result_t gate_socketqueue_impl_begin_write(void* obj, gate_channel_id_t channel_id, char const* buffer, gate_size_t buffer_size, void* user_param)
1408 {
1409 4 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1410 gate_result_t ret;
1411 4 gate_socket_t sock = GATE_SOCKET_INVALID;
1412
1413
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 if (GATE_SUCCEEDED(ret = gate_mutex_acquire(&impl->lock)))
1414 {
1415 4 ret = gate_socketqueue_impl_resolve_socket(impl, channel_id, &sock);
1416 4 gate_mutex_release(&impl->lock);
1417 }
1418
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (GATE_SUCCEEDED(ret))
1419 {
1420
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 if ((buffer_size == 0) || (buffer == NULL))
1421 {
1422 /* sending empty buffers is interpreted as socket shutdown */
1423 ret = gate_socketgroup_shutdown_write(&impl->sockets, sock, user_param);
1424 }
1425 else
1426 {
1427 4 ret = gate_socketgroup_write(&impl->sockets, sock, buffer, buffer_size, user_param);
1428 }
1429 }
1430 4 return ret;
1431 }
1432
1433 1 static gate_result_t gate_socketqueue_impl_close_all(void* obj)
1434 {
1435 gate_result_t ret;
1436 1 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1437
1438
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 if (GATE_SUCCEEDED(ret = gate_mutex_acquire(&impl->lock)))
1439 {
1440 1 ret = gate_socketgroup_clear(&impl->sockets);
1441 1 gate_map_clear(&impl->channel_2_sock);
1442 1 gate_map_clear(&impl->sock_2_channel);
1443 1 gate_mutex_release(&impl->lock);
1444 }
1445 1 return ret;
1446 }
1447
1448 3 static void gate_socketqueue_impl_release(void* obj)
1449 {
1450 3 gate_socketqueue_impl_t* impl = (gate_socketqueue_impl_t*)obj;
1451
1452
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
3 if (gate_atomic_int_dec(&impl->ref_counter) == 0)
1453 {
1454 1 gate_socketqueue_impl_stop(impl);
1455
1456 1 gate_socketgroup_destroy(&impl->sockets);
1457 1 gate_map_destroy(&impl->sock_2_channel);
1458 1 gate_map_destroy(&impl->channel_2_sock);
1459 1 gate_mutex_destroy(&impl->lock);
1460 1 gate_mem_dealloc(impl);
1461 }
1462 3 }
1463
1464 static GATE_INTERFACE_VTBL(gate_socketqueue) gate_socketqueue_vtbl;
1465 1 static void gate_init_socketqueue_vtbl()
1466 {
1467
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!gate_socketqueue_vtbl.get_interface_name)
1468 {
1469 GATE_INTERFACE_VTBL(gate_socketqueue) const local_vtbl =
1470 {
1471 &gate_socketqueue_impl_get_interface_name,
1472 &gate_socketqueue_impl_release,
1473 &gate_socketqueue_impl_retain,
1474
1475 &gate_socketqueue_impl_start,
1476 &gate_socketqueue_impl_stop,
1477 &gate_socketqueue_impl_get_status,
1478
1479 &gate_socketqueue_impl_set_callback,
1480 &gate_socketqueue_impl_open,
1481 &gate_socketqueue_impl_close,
1482 &gate_socketqueue_impl_begin_read,
1483 &gate_socketqueue_impl_begin_write,
1484
1485 &gate_socketqueue_impl_close_all
1486 };
1487 1 gate_socketqueue_vtbl = local_vtbl;
1488 }
1489 1 }
1490
1491 1 gate_result_t gate_socketqueue_create(gate_socketqueue_t** ptr_queue, gate_uint32_t idle_interval_ms)
1492 {
1493 1 gate_result_t ret = GATE_RESULT_FAILED;
1494 gate_socketqueue_impl_t* impl;
1495
1496 do
1497 {
1498
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (idle_interval_ms < 1)
1499 {
1500 idle_interval_ms = 1;
1501 }
1502
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 else if (idle_interval_ms > 60000)
1503 {
1504 idle_interval_ms = 60000;
1505 }
1506
1507 1 impl = gate_mem_alloc(sizeof(gate_socketqueue_impl_t));
1508
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (impl == NULL)
1509 {
1510 ret = GATE_RESULT_OUTOFMEMORY;
1511 break;
1512 }
1513 1 gate_mem_clear(impl, sizeof(gate_socketqueue_impl_t));
1514 1 gate_init_socketqueue_vtbl();
1515 1 impl->vtbl = &gate_socketqueue_vtbl;
1516 1 gate_atomic_int_init(&impl->ref_counter, 1);
1517 1 gate_atomic_int_init(&impl->state, GATE_SOCKETQUEUE_STATE_OFFLINE);
1518 1 gate_atomic_int_init(&impl->max_channel_id, 0);
1519
1520 1 ret = gate_mutex_create(&impl->lock);
1521
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
1522
1523
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_map_create(&impl->sock_2_channel, &gate_compare_socket, sizeof(gate_socket_t), NULL, NULL, sizeof(gate_channel_id_t), NULL, NULL))
1524 {
1525 ret = GATE_RESULT_OUTOFMEMORY;
1526 break;
1527 }
1528
1529
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (NULL == gate_map_create(&impl->channel_2_sock, &gate_compare_channel_id, sizeof(gate_channel_id_t), NULL, NULL, sizeof(gate_socket_t), NULL, NULL))
1530 {
1531 ret = GATE_RESULT_OUTOFMEMORY;
1532 break;
1533 }
1534
1535 1 ret = gate_socketgroup_create(&impl->sockets, impl);
1536
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 GATE_BREAK_IF_FAILED(ret);
1537 1 impl->sockets.interval_ms = idle_interval_ms;
1538
1539 1 ret = GATE_RESULT_OK;
1540 1 *ptr_queue = (gate_socketqueue_t*)impl;
1541 1 impl = NULL;
1542 } while (0);
1543
1544
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (GATE_FAILED(ret))
1545 {
1546 if (impl)
1547 {
1548 gate_map_destroy(&impl->sock_2_channel);
1549 gate_map_destroy(&impl->channel_2_sock);
1550 gate_mutex_destroy(&impl->lock);
1551 gate_mem_dealloc(impl);
1552 }
1553 }
1554
1555 1 return ret;
1556 }
1557
1558
1559
1560 /**********************************
1561 * SOCKET STREAM implementation *
1562 **********************************/
1563
1564 typedef struct gate_socketstream_class
1565 {
1566 GATE_INTERFACE_VTBL(gate_controlstream) const* vtbl;
1567
1568 gate_atomic_int_t ref_counter;
1569 gate_socket_t sock;
1570 gate_bool_t connected;
1571 gate_socket_endpoint_t ep;
1572 gate_uint64_t bytes_received;
1573 gate_uint64_t bytes_sent;
1574 } gate_socketstream_t;
1575
1576 2 static gate_result_t socketstream_create_socket(gate_socket_endpoint_t const* ptr_ep, gate_socket_t* ptr_sock)
1577 {
1578 gate_int16_t socktype;
1579
1/3
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
2 switch (ptr_ep->family)
1580 {
1581 2 case GATE_SOCKET_FAMILY_INET4: socktype = GATE_SOCKET_TYPE_TCP4; break;
1582 case GATE_SOCKET_FAMILY_INET6: socktype = GATE_SOCKET_TYPE_TCP6; break;
1583 default: socktype = 0;
1584 }
1585
1586
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (socktype == 0)
1587 {
1588 return GATE_RESULT_INVALIDARG;
1589 }
1590
1591 2 return gate_socket_create(socktype, ptr_sock);
1592 }
1593
1594 2 static gate_result_t socketstream_connect_socket(gate_socketstream_t* ptr_sockstream)
1595 {
1596 2 gate_result_t ret = gate_socket_connect(ptr_sockstream->sock, &ptr_sockstream->ep);
1597
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (GATE_SUCCEEDED(ret))
1598 {
1599 2 ptr_sockstream->connected = true;
1600 }
1601 2 return ret;
1602 }
1603
1604
1605 static char const* socketstream_get_interface_name(void* obj)
1606 {
1607 GATE_UNUSED_ARG(obj);
1608 return GATE_INTERFACE_NAME_CONTROLSTREAM;
1609 }
1610 6 static void socketstream_release(void* obj)
1611 {
1612 6 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1613
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 4 times.
6 if (0 == gate_atomic_int_dec(&self->ref_counter))
1614 {
1615 2 gate_socket_close(self->sock);
1616 2 gate_mem_dealloc(self);
1617 }
1618 6 }
1619 4 static int socketstream_retain(void* obj)
1620 {
1621 4 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1622 4 return gate_atomic_int_inc(&self->ref_counter);
1623 }
1624
1625 78 static gate_result_t socketstream_read(void* obj, char* buffer, gate_size_t bufferlength, gate_size_t* returned)
1626 {
1627 gate_result_t ret;
1628 78 gate_size_t received = 0;
1629 78 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1630
1631
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if (!self->connected)
1632 {
1633 ret = socketstream_connect_socket(self);
1634 GATE_RETURN_IF_FAILED(ret);
1635 }
1636
1637 78 ret = gate_socket_receive(self->sock, buffer, bufferlength, &received);
1638
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 if (GATE_SUCCEEDED(ret))
1639 {
1640
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 if (returned)
1641 {
1642 78 *returned = received;
1643 }
1644 78 self->bytes_received += received;
1645 }
1646 78 return ret;
1647 }
1648 static gate_result_t socketstream_peek(void* obj, char* buffer, gate_size_t bufferlength, gate_size_t* returned)
1649 {
1650 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1651 gate_result_t ret;
1652 gate_uint8_t sel_states = GATE_SOCKET_SELECT_FLAG_RECEIVE;
1653
1654 do
1655 {
1656 if (!self->connected)
1657 {
1658 ret = socketstream_connect_socket(self);
1659 GATE_BREAK_IF_FAILED(ret);
1660 }
1661
1662 ret = gate_socket_select(&self->sock, 1, &sel_states, 0);
1663 GATE_BREAK_IF_FAILED(ret);
1664
1665 if (GATE_FLAG_ENABLED(sel_states, GATE_SOCKET_SELECT_FLAG_RECEIVE))
1666 {
1667 /* there are bytes in receive buffer: */
1668 ret = gate_socket_receive_from(self->sock, NULL, buffer, bufferlength, returned, GATE_SOCKET_FLAG_PEEK);
1669 }
1670 else
1671 {
1672 /* no bytes in receive buffer */
1673 if (returned)
1674 {
1675 *returned = 0;
1676 }
1677 ret = GATE_RESULT_OK;
1678 }
1679 } while (0);
1680
1681 return ret;
1682 }
1683 3 static gate_result_t socketstream_write(void* obj, char const* buffer, gate_size_t bufferlength, gate_size_t* written)
1684 {
1685 gate_result_t ret;
1686 3 gate_size_t sent = 0;
1687 3 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1688
1689
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (!self->connected)
1690 {
1691 ret = socketstream_connect_socket(self);
1692 GATE_RETURN_IF_FAILED(ret);
1693 }
1694
1695 3 ret = gate_socket_send(self->sock, buffer, bufferlength, &sent);
1696
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (GATE_SUCCEEDED(ret))
1697 {
1698
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (written)
1699 {
1700 3 *written = sent;
1701 }
1702 3 self->bytes_sent += sent;
1703 }
1704 3 return ret;
1705 }
1706 static gate_result_t socketstream_flush(void* obj)
1707 {
1708 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1709 if (!self->connected)
1710 {
1711 return socketstream_connect_socket(self);
1712 }
1713 return GATE_RESULT_OK;
1714 }
1715
1716 static gate_result_t socketstream_get_resource(void* obj, gate_enumint_t resource_type, gate_uintptr_t* resource)
1717 {
1718 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1719 switch (resource_type)
1720 {
1721 case GATE_STREAM_RESOURCE_INPUT:
1722 case GATE_STREAM_RESOURCE_OUTPUT:
1723 if (resource)
1724 {
1725 *resource = (gate_uintptr_t)self->sock;
1726 }
1727 return GATE_RESULT_OK;
1728 default:
1729 return GATE_RESULT_NOTSUPPORTED;
1730 }
1731 }
1732
1733 static gate_result_t socketstream_can_read(void* obj, gate_bool_t* return_value)
1734 {
1735 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1736 gate_uint8_t status = GATE_SOCKET_SELECT_FLAG_RECEIVE;
1737 gate_result_t ret;
1738
1739 if (!self->connected)
1740 {
1741 ret = socketstream_connect_socket(self);
1742 GATE_RETURN_IF_FAILED(ret);
1743 }
1744
1745 ret = gate_socket_select(&self->sock, 1, &status, 10);
1746 if (return_value)
1747 {
1748 *return_value = GATE_FLAG_ENABLED(status, GATE_SOCKET_SELECT_FLAG_RECEIVE);
1749 }
1750 return ret;
1751 }
1752
1753 static gate_result_t socketstream_can_write(void* obj, gate_bool_t* return_value)
1754 {
1755 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1756 gate_uint8_t status = GATE_SOCKET_SELECT_FLAG_SEND;
1757 gate_result_t ret;
1758
1759 if (!self->connected)
1760 {
1761 ret = socketstream_connect_socket(self);
1762 GATE_RETURN_IF_FAILED(ret);
1763 }
1764
1765 ret = gate_socket_select(&self->sock, 1, &status, 10);
1766 if (return_value)
1767 {
1768 *return_value = GATE_FLAG_ENABLED(status, GATE_SOCKET_SELECT_FLAG_SEND);
1769 }
1770 return ret;
1771 }
1772
1773 static gate_result_t socketstream_get_size(void* obj, gate_int64_t* return_value)
1774 {
1775 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1776 if (return_value)
1777 {
1778 *return_value = (gate_int64_t)self->bytes_received;
1779 }
1780 return GATE_RESULT_NOTAVAILABLE;
1781 }
1782
1783 static gate_result_t socketstream_get_available(void* obj, gate_int64_t* return_value)
1784 {
1785 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1786 gate_uint8_t status = GATE_SOCKET_SELECT_FLAG_RECEIVE;
1787 gate_result_t ret = gate_socket_select(&self->sock, 1, &status, 0);
1788 if (return_value)
1789 {
1790 *return_value = GATE_FLAG_ENABLED(status, GATE_SOCKET_SELECT_FLAG_RECEIVE) ? 1 : 0;
1791 }
1792 return ret;
1793 }
1794
1795 static gate_result_t socketstream_seek(void* obj, gate_int64_t position, gate_enumint_t origin, gate_int64_t* final_position)
1796 {
1797 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1798 if (((origin == GATE_STREAM_SEEK_CURRENT) || (origin == GATE_STREAM_SEEK_END)) && (position == 0))
1799 {
1800 /* just return current 'write' position */
1801 if (final_position)
1802 {
1803 *final_position = (gate_int64_t)self->bytes_sent;
1804 }
1805 return GATE_RESULT_OK;
1806 }
1807 return GATE_RESULT_NOTSUPPORTED;
1808 }
1809 static gate_result_t socketstream_reset(void* obj)
1810 {
1811 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1812
1813 if (self->connected)
1814 {
1815 gate_socket_close(self->sock);
1816 self->connected = false;
1817 }
1818
1819 return socketstream_connect_socket(self);
1820 }
1821 2 static gate_result_t socketstream_close(void* obj, gate_enumint_t close_type)
1822 {
1823 2 gate_socketstream_t* self = (gate_socketstream_t*)obj;
1824
1825
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!self->connected)
1826 {
1827 return GATE_RESULT_OK;
1828 }
1829
1830
1/3
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 switch (close_type)
1831 {
1832 case GATE_STREAM_CLOSE_INPUT:
1833 return gate_socket_shutdown(self->sock, true, false);
1834 2 case GATE_STREAM_CLOSE_OUTPUT:
1835 2 return gate_socket_shutdown(self->sock, false, true);
1836 }
1837 return GATE_RESULT_NOTSUPPORTED;
1838 }
1839
1840 static GATE_INTERFACE_VTBL(gate_controlstream) sockstream_vtbl;
1841
1842 2 static void gate_init_socketstream_vtbl()
1843 {
1844
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (!sockstream_vtbl.get_interface_name)
1845 {
1846 1 GATE_INTERFACE_VTBL(gate_controlstream) local_vtbl =
1847 {
1848 &socketstream_get_interface_name,
1849 &socketstream_release,
1850 &socketstream_retain,
1851
1852 &socketstream_read,
1853 &socketstream_peek,
1854 &socketstream_write,
1855 &socketstream_flush,
1856
1857 &socketstream_get_resource,
1858
1859 &socketstream_can_read,
1860 &socketstream_can_write,
1861 &socketstream_get_size,
1862 &socketstream_get_available,
1863 &socketstream_seek,
1864 &socketstream_reset,
1865 &socketstream_close
1866 };
1867 1 sockstream_vtbl = local_vtbl;
1868 }
1869 2 }
1870
1871
1872 2 static gate_result_t gate_socketstream_create_internal(
1873 gate_socket_t sock, gate_socket_endpoint_t const* ep, gate_bool_t delay_connect, gate_controlstream_t** ptr_stream)
1874 {
1875 2 gate_result_t ret = GATE_RESULT_FAILED;
1876 2 gate_socketstream_t* ptr_sockstream = NULL;
1877 do
1878 {
1879 2 ptr_sockstream = (gate_socketstream_t*)gate_mem_alloc(sizeof(gate_socketstream_t));
1880
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (NULL == ptr_sockstream)
1881 {
1882 ret = GATE_RESULT_OUTOFMEMORY;
1883 break;
1884 }
1885 2 gate_atomic_int_init(&ptr_sockstream->ref_counter, 1);
1886 2 gate_init_socketstream_vtbl();
1887 2 ptr_sockstream->vtbl = &sockstream_vtbl;
1888 2 ptr_sockstream->bytes_sent = 0;
1889 2 ptr_sockstream->bytes_received = 0;
1890 2 ptr_sockstream->sock = sock;
1891
1892
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (sock == GATE_SOCKET_INVALID)
1893 {
1894 2 ptr_sockstream->sock = GATE_SOCKET_INVALID;
1895 2 ptr_sockstream->connected = false;
1896 2 gate_mem_copy(&ptr_sockstream->ep, ep, sizeof(ptr_sockstream->ep));
1897
1898 2 ret = socketstream_create_socket(&ptr_sockstream->ep, &ptr_sockstream->sock);
1899
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
1900
1901
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (!delay_connect)
1902 {
1903 2 ret = socketstream_connect_socket(ptr_sockstream);
1904
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 GATE_BREAK_IF_FAILED(ret);
1905 }
1906 }
1907 else
1908 {
1909 ptr_sockstream->connected = true;
1910 gate_mem_clear(&ptr_sockstream->ep, sizeof(ptr_sockstream->ep));
1911 }
1912
1913
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (ptr_stream)
1914 {
1915 2 *ptr_stream = (gate_controlstream_t*)ptr_sockstream;
1916 2 ptr_sockstream = NULL;
1917 }
1918 2 ret = GATE_RESULT_OK;
1919 } while (0);
1920
1921
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (ptr_sockstream)
1922 {
1923 gate_object_release(ptr_sockstream);
1924 }
1925
1926 2 return ret;
1927 }
1928
1929 gate_result_t gate_socketstream_create(gate_socket_t sock, gate_controlstream_t** ptr_stream)
1930 {
1931 return gate_socketstream_create_internal(sock, NULL, false, ptr_stream);
1932 }
1933
1934 2 gate_result_t gate_socketstream_create_endpoint(gate_socket_endpoint_t const* ep, gate_controlstream_t** ptr_stream, gate_bool_t delay_connect)
1935 {
1936 2 return gate_socketstream_create_internal(GATE_SOCKET_INVALID, ep, delay_connect, ptr_stream);
1937 }
1938
1939 gate_result_t gate_socketstream_create_address(gate_string_t const* addr, gate_controlstream_t** ptr_stream, gate_bool_t delay_connect)
1940 {
1941 gate_result_t ret;
1942 gate_socket_endpoint_t ep;
1943 do
1944 {
1945 ret = gate_socket_parse_endpoint(addr, &ep);
1946 if (GATE_FAILED(ret))
1947 {
1948 gate_string_t host = GATE_STRING_INIT_EMPTY;
1949 gate_uint16_t port = 0;
1950 gate_int16_t family = 0;
1951 gate_size_t ep_count = 1;
1952 do
1953 {
1954 ret = gate_socket_parse_address(addr, &host, &port, &family);
1955 GATE_BREAK_IF_FAILED(ret);
1956
1957 ret = gate_net_resolve_host(&host, family, &ep, &ep_count);
1958 switch (ep.family)
1959 {
1960 case GATE_SOCKET_FAMILY_INET4: ep.ip4.port = port; break;
1961 case GATE_SOCKET_FAMILY_INET6: ep.ip6.port = port; break;
1962 }
1963 } while (0);
1964 gate_string_release(&host);
1965 }
1966 GATE_BREAK_IF_FAILED(ret);
1967
1968 ret = gate_socketstream_create_internal(GATE_SOCKET_INVALID, &ep, delay_connect, ptr_stream);
1969 } while (0);
1970 return ret;
1971 }
1972
1973
1974
1975 typedef struct gate_socketserverstream_class
1976 {
1977 GATE_INTERFACE_VTBL(gate_controlstream) const* vtbl;
1978
1979 gate_atomic_int_t ref_counter;
1980 gate_socket_t server_sock;
1981 gate_socket_endpoint_t ep;
1982 gate_socket_t connection_sock;
1983 gate_bool_t connected;
1984 gate_uint64_t bytes_received;
1985 gate_uint64_t bytes_sent;
1986 } gate_socketserverstream_t;
1987
1988
1989 static char const* socketserverstream_get_interface_name(void* obj)
1990 {
1991 GATE_UNUSED_ARG(obj);
1992 return GATE_INTERFACE_NAME_CONTROLSTREAM;
1993 }
1994 static void socketserverstream_release(void* obj)
1995 {
1996 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
1997 if (0 == gate_atomic_int_dec(&self->ref_counter))
1998 {
1999 if (self->connection_sock != GATE_SOCKET_INVALID)
2000 {
2001 gate_socket_close(self->connection_sock);
2002 }
2003 if (self->server_sock != GATE_SOCKET_INVALID)
2004 {
2005 gate_socket_close(self->server_sock);
2006 }
2007 gate_mem_dealloc(self);
2008 }
2009 }
2010 static int socketserverstream_retain(void* obj)
2011 {
2012 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2013 return gate_atomic_int_inc(&self->ref_counter);
2014 }
2015
2016 static gate_result_t socketserverstream_accept(gate_socketserverstream_t* self)
2017 {
2018 gate_result_t ret = GATE_RESULT_OK;
2019 if (!self->connected)
2020 {
2021 ret = gate_socket_accept(self->server_sock, &self->connection_sock);
2022 if (GATE_SUCCEEDED(ret))
2023 {
2024 self->connected = true;
2025 }
2026 }
2027 return ret;
2028 }
2029
2030 static gate_result_t socketserverstream_read(void* obj, char* buffer, gate_size_t bufferlength, gate_size_t* returned)
2031 {
2032 gate_result_t ret;
2033 gate_size_t received = 0;
2034 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2035
2036 ret = socketserverstream_accept(self);
2037 GATE_RETURN_IF_FAILED(ret);
2038
2039 ret = gate_socket_receive(self->connection_sock, buffer, bufferlength, &received);
2040 if (GATE_SUCCEEDED(ret))
2041 {
2042 if (returned)
2043 {
2044 *returned = received;
2045 }
2046 self->bytes_received += received;
2047 }
2048 return ret;
2049 }
2050
2051 static gate_result_t socketserverstream_peek(void* obj, char* buffer, gate_size_t bufferlength, gate_size_t* returned)
2052 {
2053 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2054 gate_result_t ret;
2055
2056 ret = socketserverstream_accept(self);
2057 GATE_RETURN_IF_FAILED(ret);
2058
2059 return gate_socket_receive_from(self->connection_sock, NULL, buffer, bufferlength, returned, GATE_SOCKET_FLAG_PEEK);
2060 }
2061
2062 static gate_result_t socketserverstream_write(void* obj, char const* buffer, gate_size_t bufferlength, gate_size_t* written)
2063 {
2064 gate_result_t ret;
2065 gate_size_t sent = 0;
2066 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2067
2068 ret = socketserverstream_accept(self);
2069 GATE_RETURN_IF_FAILED(ret);
2070
2071 ret = gate_socket_send(self->connection_sock, buffer, bufferlength, &sent);
2072 if (GATE_SUCCEEDED(ret))
2073 {
2074 if (written)
2075 {
2076 *written = sent;
2077 }
2078 self->bytes_sent += sent;
2079 }
2080 return ret;
2081 }
2082 static gate_result_t socketserverstream_flush(void* obj)
2083 {
2084 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2085
2086 return socketserverstream_accept(self);
2087 }
2088
2089 static gate_result_t socketserverstream_get_resource(void* obj, gate_enumint_t resource_type, gate_uintptr_t* resource)
2090 {
2091 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2092 switch (resource_type)
2093 {
2094 case GATE_STREAM_RESOURCE_DEFAULT:
2095 if (resource)
2096 {
2097 *resource = (gate_uintptr_t)self->server_sock;
2098 }
2099 return GATE_RESULT_OK;
2100
2101 case GATE_STREAM_RESOURCE_INPUT:
2102 case GATE_STREAM_RESOURCE_OUTPUT:
2103 if (resource)
2104 {
2105 *resource = (gate_uintptr_t)self->connection_sock;
2106 }
2107 return GATE_RESULT_OK;
2108 default:
2109 return GATE_RESULT_NOTSUPPORTED;
2110 }
2111 }
2112
2113 static gate_result_t socketserverstream_can_read(void* obj, gate_bool_t* return_value)
2114 {
2115 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2116 gate_uint8_t status = GATE_SOCKET_SELECT_FLAG_RECEIVE;
2117 gate_result_t ret;
2118 gate_bool_t can_read = false;
2119
2120 if (self->connected)
2121 {
2122 ret = gate_socket_select(&self->connection_sock, 1, &status, 10);
2123 can_read = GATE_FLAG_ENABLED(status, GATE_SOCKET_SELECT_FLAG_RECEIVE);
2124 }
2125
2126 if (return_value)
2127 {
2128 *return_value = can_read;
2129 }
2130
2131 return ret;
2132 }
2133
2134 static gate_result_t socketserverstream_can_write(void* obj, gate_bool_t* return_value)
2135 {
2136 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2137 gate_uint8_t status = GATE_SOCKET_SELECT_FLAG_SEND;
2138 gate_result_t ret;
2139 gate_bool_t can_read = false;
2140
2141 if (self->connected)
2142 {
2143 ret = gate_socket_select(&self->connection_sock, 1, &status, 10);
2144 can_read = GATE_FLAG_ENABLED(status, GATE_SOCKET_SELECT_FLAG_SEND);
2145 }
2146
2147 if (return_value)
2148 {
2149 *return_value = can_read;
2150 }
2151
2152 return ret;
2153 }
2154
2155 static gate_result_t socketserverstream_get_size(void* obj, gate_int64_t* return_value)
2156 {
2157 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2158 if (return_value)
2159 {
2160 *return_value = (gate_int64_t)self->bytes_received;
2161 }
2162 return GATE_RESULT_NOTAVAILABLE;
2163 }
2164
2165 static gate_result_t socketserverstream_get_available(void* obj, gate_int64_t* return_value)
2166 {
2167 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2168 gate_uint8_t status = GATE_SOCKET_SELECT_FLAG_RECEIVE;
2169 gate_result_t ret = GATE_RESULT_OK;
2170 unsigned available = 0;
2171
2172 if (self->connected)
2173 {
2174 ret = gate_socket_select(&self->connection_sock, 1, &status, 0);
2175 available = GATE_FLAG_ENABLED(status, GATE_SOCKET_SELECT_FLAG_RECEIVE) ? 1 : 0;
2176 }
2177
2178 if (return_value)
2179 {
2180 *return_value = (gate_int64_t)available;
2181 }
2182 return ret;
2183 }
2184
2185 static gate_result_t socketserverstream_seek(void* obj, gate_int64_t position, gate_enumint_t origin, gate_int64_t* final_position)
2186 {
2187 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2188 if (((origin == GATE_STREAM_SEEK_CURRENT) || (origin == GATE_STREAM_SEEK_END)) && (position == 0))
2189 {
2190 /* just return current 'write' position */
2191 if (final_position)
2192 {
2193 *final_position = (gate_int64_t)self->bytes_sent;
2194 }
2195 return GATE_RESULT_OK;
2196 }
2197 return GATE_RESULT_NOTSUPPORTED;
2198 }
2199 static gate_result_t socketserverstream_reset(void* obj)
2200 {
2201 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2202
2203 if (self->connected)
2204 {
2205 gate_socket_close(self->connection_sock);
2206 self->connected = false;
2207 }
2208
2209 return socketserverstream_accept(self);
2210 }
2211
2212 static gate_result_t socketserverstream_close(void* obj, gate_enumint_t close_type)
2213 {
2214 gate_socketserverstream_t* self = (gate_socketserverstream_t*)obj;
2215
2216 if (!self->connected)
2217 {
2218 return GATE_RESULT_OK;
2219 }
2220
2221 switch (close_type)
2222 {
2223 case GATE_STREAM_CLOSE_INPUT:
2224 return gate_socket_shutdown(self->connected, true, false);
2225 case GATE_STREAM_CLOSE_OUTPUT:
2226 return gate_socket_shutdown(self->connection_sock, false, true);
2227 }
2228 return GATE_RESULT_NOTSUPPORTED;
2229 }
2230
2231 static GATE_INTERFACE_VTBL(gate_controlstream) socketserverstream_vtbl;
2232
2233 static void gate_init_socketserverstream_vtbl()
2234 {
2235 if (!sockstream_vtbl.get_interface_name)
2236 {
2237 GATE_INTERFACE_VTBL(gate_controlstream) local_vtbl =
2238 {
2239 &socketserverstream_get_interface_name,
2240 &socketserverstream_release,
2241 &socketserverstream_retain,
2242
2243 &socketserverstream_read,
2244 &socketserverstream_peek,
2245 &socketserverstream_write,
2246 &socketserverstream_flush,
2247
2248 &socketserverstream_get_resource,
2249
2250 &socketserverstream_can_read,
2251 &socketserverstream_can_write,
2252 &socketserverstream_get_size,
2253 &socketserverstream_get_available,
2254 &socketserverstream_seek,
2255 &socketserverstream_reset,
2256 &socketserverstream_close
2257 };
2258 socketserverstream_vtbl = local_vtbl;
2259 }
2260 }
2261
2262
2263 gate_result_t gate_socketstream_create_server(gate_socket_endpoint_t const* bind_ep, gate_controlstream_t** ptr_stream)
2264 {
2265 gate_result_t ret = GATE_RESULT_FAILED;
2266 gate_socketserverstream_t* ptr_sockstream = NULL;
2267
2268 do
2269 {
2270 if (bind_ep == NULL)
2271 {
2272 ret = GATE_RESULT_INVALIDARG;
2273 break;
2274 }
2275 ptr_sockstream = (gate_socketserverstream_t*)gate_mem_alloc(sizeof(gate_socketserverstream_t));
2276 if (NULL == ptr_sockstream)
2277 {
2278 ret = GATE_RESULT_OUTOFMEMORY;
2279 break;
2280 }
2281 gate_atomic_int_init(&ptr_sockstream->ref_counter, 1);
2282 gate_init_socketstream_vtbl();
2283 ptr_sockstream->vtbl = &socketserverstream_vtbl;
2284 ptr_sockstream->bytes_sent = 0;
2285 ptr_sockstream->bytes_received = 0;
2286 ptr_sockstream->server_sock = GATE_SOCKET_INVALID;
2287 ptr_sockstream->connection_sock = GATE_SOCKET_INVALID;
2288 ptr_sockstream->connected = false;
2289
2290 gate_mem_copy(&ptr_sockstream->ep, bind_ep, sizeof(ptr_sockstream->ep));
2291
2292 ret = socketstream_create_socket(&ptr_sockstream->ep, &ptr_sockstream->server_sock);
2293 GATE_BREAK_IF_FAILED(ret);
2294
2295 ret = gate_socket_bind(ptr_sockstream->server_sock, &ptr_sockstream->ep);
2296 GATE_BREAK_IF_FAILED(ret);
2297
2298 ret = gate_socket_listen(ptr_sockstream->server_sock, 1);
2299 GATE_BREAK_IF_FAILED(ret);
2300
2301 if (ptr_stream)
2302 {
2303 *ptr_stream = (gate_controlstream_t*)ptr_sockstream;
2304 ptr_sockstream = NULL;
2305 }
2306 ret = GATE_RESULT_OK;
2307 } while (0);
2308
2309 if (ptr_sockstream)
2310 {
2311 gate_object_release(ptr_sockstream);
2312 }
2313
2314 return ret;
2315 }
2316
2317