GCC Code Coverage Report


Directory: src/gate/
File: src/gate/tech/gateservices.c
Date: 2025-12-12 23:40:09
Exec Total Coverage
Lines: 0 788 0.0%
Functions: 0 59 0.0%
Branches: 0 367 0.0%

Line Branch Exec Source
1 /* GATE PROJECT LICENSE:
2 +----------------------------------------------------------------------------+
3 | Copyright(c) 2018-2025, Stefan Meislinger |
4 | All rights reserved. |
5 | |
6 | Redistribution and use in source and binary forms, with or without |
7 | modification, are permitted provided that the following conditions are met:|
8 | |
9 | 1. Redistributions of source code must retain the above copyright notice, |
10 | this list of conditions and the following disclaimer. |
11 | 2. Redistributions in binary form must reproduce the above copyright |
12 | notice, this list of conditions and the following disclaimer in the |
13 | documentation and/or other materials provided with the distribution. |
14 | |
15 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"|
16 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
17 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
18 | ARE DISCLAIMED.IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
19 | LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
20 | CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
21 | SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
22 | INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
23 | CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
24 | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
25 | THE POSSIBILITY OF SUCH DAMAGE. |
26 +----------------------------------------------------------------------------+
27 */
28
29 #include "gate/tech/gateservices.h"
30 #include "gate/results.h"
31 #include "gate/utilities.h"
32 #include "gate/synchronization.h"
33 #include "gate/structs.h"
34 #include "gate/tech/microservices/webserver_service.h"
35 #include "gate/tech/microservices/timer_service.h"
36 #include "gate/tech/microservices/procrunner_service.h"
37 #include "gate/io/logging.h"
38 #include "gate/encode/yaml.h"
39 #include "gate/encode/json.h"
40
41
42
43 #define GATE_STRUCT_SERVICEHOST_ROOT_NAME "servicehost"
44
45 #define GATE_SERVICEHOST_CONFIG_SERVICE_NAME GATE_STRUCT_ROOT_NAME GATE_STRUCT_SEPARATOR GATE_STRUCT_SERVICEHOST_ROOT_NAME GATE_STRUCT_SEPARATOR "config_service"
46
47 static gate_struct_item_t const config_service_members[] =
48 {
49 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_STRING, name, "Service class name", GATE_STRUCT_FLAG_DEFAULT),
50 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_STRING, instance, "Service instance identifier", GATE_STRUCT_FLAG_DEFAULT),
51 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_ARRAYLIST_STRING, msg_subscriptions, "Message subscriptions", GATE_STRUCT_FLAG_DEFAULT),
52 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_ARRAYLIST_STRING, obj_subscriptions, "Object subscriptions", GATE_STRUCT_FLAG_DEFAULT),
53 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_STRING, service_group, "Service execution group name", GATE_STRUCT_FLAG_DEFAULT),
54 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_BOOL, autostart, "Automatic start of service", GATE_STRUCT_FLAG_DEFAULT),
55 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_UI32, startorder, "Start order of service", GATE_STRUCT_FLAG_DEFAULT),
56 GATE_STRUCT_ITEM_EX(gate_servicehost_config_service_t, GATE_TYPE_PROPERTY, parameters, "Service internal parameters", GATE_STRUCT_FLAG_DEFAULT)
57 };
58
59 static gate_struct_descriptor_t const config_service_descriptor =
60 {
61 GATE_SERVICEHOST_CONFIG_SERVICE_NAME,
62 config_service_members,
63 sizeof(config_service_members) / sizeof(config_service_members[0]),
64 sizeof(gate_servicehost_config_service_t)
65 };
66
67 gate_result_t gate_servicehost_config_service_copy_constructor(void* dest, void const* src)
68 {
69 gate_result_t result;
70 gate_servicehost_config_service_t* ptr = (gate_servicehost_config_service_t*)dest;
71 gate_struct_t const* ptr_src = (gate_struct_t const*)src;
72
73 gate_mem_clear(&ptr->struct_base, sizeof(gate_servicehost_config_service_t));
74 if (ptr_src)
75 {
76 result = gate_struct_copy(&ptr->struct_base, ptr_src);
77 }
78 else
79 {
80 result = gate_struct_init(&ptr->struct_base, &config_service_descriptor);
81 }
82 return result;
83 }
84
85 gate_result_t gate_servicehost_config_service_init(gate_struct_t* ptr)
86 {
87 gate_result_t result = gate_servicehost_config_service_copy_constructor(ptr, NULL);
88 return result;
89 }
90
91
92
93 #define GATE_STRUCT_GSVC_CONFIG_NAME GATE_STRUCT_ROOT_NAME GATE_STRUCT_SEPARATOR GATE_STRUCT_SERVICEHOST_ROOT_NAME GATE_STRUCT_SEPARATOR "config"
94
95 static gate_struct_item_t const config_members[] =
96 {
97 GATE_STRUCT_ITEM(gate_servicehost_config_t, GATE_TYPE_STRING, id),
98 GATE_STRUCT_ITEM(gate_servicehost_config_t, GATE_TYPE_STRING, version),
99 GATE_STRUCT_ITEM(gate_servicehost_config_t, GATE_TYPE_ARRAYLIST_STRUCT, services)
100 };
101
102 static gate_struct_descriptor_t const config_descriptor =
103 {
104 GATE_STRUCT_GSVC_CONFIG_NAME,
105 config_members,
106 sizeof(config_members) / sizeof(config_members[0]),
107 sizeof(gate_servicehost_config_t)
108 };
109
110
111 void* gate_servicehost_config_copy_constructor(void* dest, void const* src)
112 {
113 void* ret = NULL;
114 gate_result_t result;
115 gate_servicehost_config_t* ptr = (gate_servicehost_config_t*)dest;
116 gate_mem_clear(ptr, sizeof(gate_servicehost_config_t));
117
118 if (src)
119 {
120 result = gate_struct_copy(&ptr->struct_base, (gate_struct_t const*)src);
121 }
122 else
123 {
124 result = gate_struct_init(&ptr->struct_base, &config_descriptor);
125 ptr->services = gate_arraylist_create(sizeof(gate_servicehost_config_service_t), NULL, 0,
126 &gate_servicehost_config_service_copy_constructor, &gate_struct_destructor);
127 }
128 if (GATE_SUCCEEDED(result))
129 {
130 ret = dest;
131 }
132 return ret;
133 }
134
135 gate_result_t gate_servicehost_config_init(gate_struct_t* ptr)
136 {
137 if (NULL == gate_servicehost_config_copy_constructor(ptr, NULL))
138 {
139 return GATE_RESULT_FAILED;
140 }
141 else
142 {
143 return GATE_RESULT_OK;
144 }
145 }
146
147
148
149
150 typedef struct gate_essentials_impl_class
151 {
152 GATE_INTERFACE_VTBL(gate_microfactory) const* vtbl;
153
154 gate_atomic_int_t ref_counter;
155 } gate_essentials_impl_t;
156
157
158 static void essentials_destroy(gate_essentials_impl_t* impl)
159 {
160 (void)impl;
161 /* no deallocation, static object */
162 }
163
164 static char const* essentials_get_interface_name(void* thisptr)
165 {
166 gate_essentials_impl_t* self = (gate_essentials_impl_t*)thisptr;
167 (void)self;
168 return GATE_INTERFACE_NAME_MICROFACTORY;
169 }
170 static void essentials_release(void* thisptr)
171 {
172 gate_essentials_impl_t* self = (gate_essentials_impl_t*)thisptr;
173 if (0 == gate_atomic_int_dec(&self->ref_counter))
174 {
175 essentials_destroy(self);
176 }
177 }
178 static int essentials_retain(void* thisptr)
179 {
180 gate_essentials_impl_t* self = (gate_essentials_impl_t*)thisptr;
181 return gate_atomic_int_inc(&self->ref_counter);
182 }
183
184 static gate_string_t name_webserver = GATE_STRING_INIT_STATIC(GATE_MICROSERVICE_NAME_WEBSERVER);
185 static gate_string_t name_timer = GATE_STRING_INIT_STATIC(GATE_MICROSERVICE_NAME_TIMER);
186 static gate_string_t name_procrunner = GATE_STRING_INIT_STATIC(GATE_MICROSERVICE_NAME_PROCRUNNER);
187
188 typedef struct factory_mapping_class
189 {
190 gate_string_t const* name;
191 gate_microservice_creator_t creator;
192 } factory_mapping_t;
193
194 static factory_mapping_t essential_mappings[] =
195 {
196 { &name_webserver, &gate_microservice_webserver_create },
197 { &name_timer, &gate_microservice_timer_create },
198 { &name_procrunner, &gate_microservice_procrunner_create }
199 };
200
201
202 static gate_result_t essentials_supported_service_names(void* thisptr, gate_array_t* ptr_output_names)
203 {
204 gate_result_t ret = GATE_RESULT_FAILED;
205 gate_essentials_impl_t* self = (gate_essentials_impl_t*)thisptr;
206 gate_size_t index;
207 gate_size_t count;
208 gate_arraylist_t names = NULL;
209
210 (void)self;
211 do
212 {
213 count = sizeof(essential_mappings) / sizeof(essential_mappings[0]);
214
215 names = gate_util_stringarray_create_duplicate(count);
216 if (names == NULL)
217 {
218 ret = GATE_RESULT_OUTOFMEMORY;
219 break;
220 }
221
222 for (index = 0; index != count; ++index)
223 {
224 gate_arraylist_add(names, essential_mappings[index].name);
225 }
226
227 if (NULL == gate_array_create(ptr_output_names, names))
228 {
229 ret = GATE_RESULT_OUTOFMEMORY;
230 }
231 else
232 {
233 ret = GATE_RESULT_OK;
234 }
235 } while (0);
236
237 if (names)
238 {
239 gate_arraylist_release(names);
240 }
241
242 return ret;
243 }
244 static gate_result_t essentials_create_service(void* thisptr, gate_string_t const* service_name, gate_microservice_t** ptr_service)
245 {
246 gate_result_t ret = GATE_RESULT_NOMATCH;
247 gate_size_t index, count;
248 gate_essentials_impl_t* self = (gate_essentials_impl_t*)thisptr;
249 (void)self;
250
251 do
252 {
253 count = sizeof(essential_mappings) / sizeof(essential_mappings[0]);
254
255 for (index = 0; index != count; ++index)
256 {
257 if (gate_string_equals(service_name, essential_mappings[index].name))
258 {
259 ret = GATE_RESULT_OK;
260 if (ptr_service != NULL)
261 {
262 *ptr_service = essential_mappings[index].creator();
263 if (*ptr_service == NULL)
264 {
265 ret = GATE_RESULT_FAILED;
266 }
267 }
268 break;
269 }
270 }
271 } while (0);
272
273 return ret;
274 }
275
276
277 static GATE_INTERFACE_VTBL(gate_microfactory) gate_essentials_microfactory_vtbl;
278 static void gate_init_essentials_microfactory_vtbl()
279 {
280 if (!gate_essentials_microfactory_vtbl.get_interface_name)
281 {
282 GATE_INTERFACE_VTBL(gate_microfactory) const local_vtbl =
283 {
284 &essentials_get_interface_name,
285 &essentials_release,
286 &essentials_retain,
287
288 &essentials_supported_service_names,
289 &essentials_create_service
290 };
291 gate_essentials_microfactory_vtbl = local_vtbl;
292 }
293 }
294
295 static gate_essentials_impl_t global_essentials_factory =
296 {
297 &gate_essentials_microfactory_vtbl,
298 0
299 };
300
301
302
303 GATE_TECH_API gate_microfactory_t* gate_servicehost_essentials_factory()
304 {
305 gate_microfactory_t* factory = (gate_microfactory_t*)&global_essentials_factory;
306
307 gate_init_essentials_microfactory_vtbl();
308
309 gate_atomic_int_inc(&global_essentials_factory.ref_counter);
310 return factory;
311 }
312
313
314
315
316
317
318
319
320 typedef struct subscription_class
321 {
322 gate_string_t source_address;
323 gate_string_t destination_address;
324 gate_string_t msg_id;
325 void* user_param;
326 gate_microservice_t* service;
327 } subscription_t;
328
329 static void subscription_destroy(subscription_t* subscr)
330 {
331 gate_string_release(&subscr->source_address);
332 gate_string_release(&subscr->destination_address);
333 gate_string_release(&subscr->msg_id);
334 subscr->user_param = NULL;
335 if (subscr->service != NULL)
336 {
337 gate_object_release(subscr->service);
338 }
339 }
340
341 static subscription_t* subscription_create(
342 subscription_t* subscr, gate_string_t const* source, gate_string_t const* dest, gate_string_t const* msg_id,
343 void* user_param, gate_microservice_t* service)
344 {
345 subscription_t* ret = NULL;
346
347 do
348 {
349 gate_mem_clear(subscr, sizeof(subscription_t));
350
351 if (NULL == gate_string_clone(&subscr->source_address, source))
352 {
353 break;
354 }
355 if (NULL == gate_string_clone(&subscr->destination_address, dest))
356 {
357 break;
358 }
359 if (NULL == gate_string_clone(&subscr->msg_id, msg_id))
360 {
361 break;
362 }
363 subscr->user_param = user_param;
364 subscr->service = service;
365 if (subscr->service != NULL)
366 {
367 gate_object_retain(subscr->service);
368 }
369
370 ret = subscr;
371 } while (0);
372
373 if (!ret)
374 {
375 subscription_destroy(subscr);
376 }
377
378 return ret;
379 }
380
381 static gate_result_t subscription_copy_ctor(void* dest, void const* source)
382 {
383 subscription_t const* src_ptr = (subscription_t const*)source;
384 subscription_t* dst_ptr = subscription_create((subscription_t*)dest,
385 &src_ptr->source_address, &src_ptr->destination_address,
386 &src_ptr->msg_id, src_ptr->user_param, src_ptr->service);
387 if (NULL == dst_ptr)
388 {
389 return GATE_RESULT_OUTOFMEMORY;
390 }
391 else
392 {
393 return GATE_RESULT_OK;
394 }
395 }
396
397 static void subscription_dtor(void* dest)
398 {
399 subscription_destroy((subscription_t*)dest);
400 }
401
402
403 typedef struct gate_servicehost_impl_class
404 {
405 GATE_INTERFACE_VTBL(gate_servicehost) const* vtbl;
406
407 gate_atomic_int_t ref_counter;
408
409 gate_mutex_t lock;
410 gate_map_t factories; /* map: (string)name -> (object)gate_microfactory_t */
411 gate_map_t services; /* map: (string)name -> (object)gate_microservice_t */
412 gate_map_t services_configs; /* map: (string)name -> [map: (ui32)id -> (value)value] */
413 gate_arraylist_t msg_subscriptions; /* subscription_t[] */
414 gate_arraylist_t obj_subscriptions; /* subscription_t[] */
415 gate_logger_t const* logger;
416
417
418 } gate_servicehost_impl_t;
419
420 static gate_result_t gsvchost_init_members(gate_servicehost_impl_t* impl)
421 {
422 gate_result_t ret = GATE_RESULT_FAILED;
423
424 do
425 {
426 gate_atomic_int_init(&impl->ref_counter, 1);
427
428 ret = gate_mutex_create(&impl->lock);
429 GATE_BREAK_IF_FAILED(ret);
430
431 if (NULL == gate_map_create(&impl->factories, &gate_compare_string,
432 sizeof(gate_string_t), &gate_string_copy_constructor, &gate_string_destructor,
433 sizeof(gate_object_t*), &gate_object_ptr_copyctor, &gate_object_ptr_dtor))
434 {
435 ret = GATE_RESULT_OUTOFMEMORY;
436 break;
437 }
438
439 if (NULL == gate_map_create(&impl->services, &gate_compare_string,
440 sizeof(gate_string_t), &gate_string_copy_constructor, &gate_string_destructor,
441 sizeof(gate_object_t*), &gate_object_ptr_copyctor, &gate_object_ptr_dtor))
442 {
443 ret = GATE_RESULT_OUTOFMEMORY;
444 break;
445 }
446
447 if (NULL == gate_map_create(&impl->services_configs, &gate_compare_string,
448 sizeof(gate_string_t), &gate_string_copy_constructor, &gate_string_destructor,
449 sizeof(gate_map_t), &gate_map_copy_constructor, &gate_map_destructor))
450 {
451 ret = GATE_RESULT_OUTOFMEMORY;
452 break;
453 }
454
455 impl->msg_subscriptions = gate_arraylist_create(sizeof(subscription_t), NULL, 0, &subscription_copy_ctor, &subscription_dtor);
456 if (NULL == impl->msg_subscriptions)
457 {
458 ret = GATE_RESULT_OUTOFMEMORY;
459 break;
460 }
461
462 impl->obj_subscriptions = gate_arraylist_create(sizeof(subscription_t), NULL, 0, &subscription_copy_ctor, &subscription_dtor);
463 if (NULL == impl->obj_subscriptions)
464 {
465 ret = GATE_RESULT_OUTOFMEMORY;
466 break;
467 }
468
469 impl->logger = gate_logger_get();
470
471 } while (0);
472
473 return ret;
474 }
475
476
477 static void gsvchost_destroy(gate_servicehost_impl_t* impl)
478 {
479 gate_arraylist_release(impl->obj_subscriptions);
480 gate_arraylist_release(impl->msg_subscriptions);
481 gate_map_destroy(&impl->services_configs);
482 gate_map_destroy(&impl->services);
483 gate_map_destroy(&impl->factories);
484 gate_mutex_destroy(&impl->lock);
485
486 gate_mem_dealloc(impl);
487 }
488
489 static char const* gsvchost_get_interface_name(void* this_ptr)
490 {
491 (void)this_ptr;
492 return GATE_INTERFACE_NAME_GATESERVICEHOST;
493 }
494 static void gsvchost_release(void* this_ptr)
495 {
496 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
497 if (0 == gate_atomic_int_dec(&impl->ref_counter))
498 {
499 gsvchost_destroy(impl);
500 }
501 }
502 static int gsvchost_retain(void* this_ptr)
503 {
504 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
505 return gate_atomic_int_inc(&impl->ref_counter);
506 }
507
508 static gate_size_t extract_subscriptions(gate_arraylist_t source, subscription_t* subscrs, gate_size_t max_subscrs,
509 gate_string_t const* source_address, gate_string_t const* destination_address,
510 gate_string_t const* msg_id)
511 {
512 gate_size_t ret = 0;
513 gate_size_t index;
514 gate_size_t count = gate_arraylist_length(source);
515 subscription_t const* ptr;
516 gate_result_t result;
517
518 for (index = 0; (max_subscrs != 0) && (index != count); ++index)
519 {
520 ptr = (subscription_t const*)gate_arraylist_get(source, index);
521 if (ptr != NULL)
522 {
523 if (!gate_string_is_empty(source_address) && !gate_string_is_empty(&ptr->source_address))
524 {
525 if (!gate_string_like(source_address, &ptr->source_address))
526 {
527 continue;
528 }
529 }
530
531 if (!gate_string_is_empty(destination_address) && !gate_string_is_empty(&ptr->destination_address))
532 {
533 if (!gate_string_like(destination_address, &ptr->destination_address))
534 {
535 continue;
536 }
537 }
538
539 if (!gate_string_is_empty(msg_id) && !gate_string_is_empty(&ptr->msg_id))
540 {
541 if (!gate_string_equals(msg_id, &ptr->msg_id))
542 {
543 continue;
544 }
545 }
546
547 /* copy this subscription */
548 result = subscription_copy_ctor(subscrs, ptr);
549 if (GATE_SUCCEEDED(result))
550 {
551 ++ret;
552 ++subscrs;
553 --max_subscrs;
554 }
555 }
556 }
557 return ret;
558 }
559 static void release_subscriptions(subscription_t* subscrs, gate_size_t count)
560 {
561 while (count-- != 0)
562 {
563 subscription_destroy(subscrs);
564 ++subscrs;
565 }
566 }
567
568 static gate_result_t gsvchost_publish_message(void* this_ptr,
569 gate_string_t const* source_address, gate_string_t const* destination_address,
570 gate_string_t const* msg_id, gate_string_t const* ptr_message)
571 {
572 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
573 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
574 subscription_t subscriptions[128];
575 gate_size_t subscriptions_extracted = 0;
576 gate_size_t index = 0;
577 gate_result_t msg_result;
578 static gate_string_t log_msg_origin = GATE_STRING_INIT_STATIC("microservice::publish_message");
579 gate_strbuilder_t log_builder = GATE_INIT_EMPTY;
580 char log_buffer[4096];
581 gate_string_t log_text = GATE_STRING_INIT_EMPTY;
582 gate_uint32_t svc_status = 0;
583
584 do
585 {
586 {
587 gate_strbuilder_create_static(&log_builder, log_buffer, sizeof(log_buffer), 0);
588 gate_strbuilder_append(&log_builder,
589 GATE_PRINT_CSTR, "Publish message ID '",
590 GATE_PRINT_STRING, msg_id,
591 GATE_PRINT_CSTR, "' from '",
592 GATE_PRINT_STRING, source_address,
593 GATE_PRINT_CSTR, "' to '",
594 GATE_PRINT_STRING, destination_address,
595 GATE_PRINT_CSTR, "'",
596 GATE_PRINT_END);
597 gate_strbuilder_to_string(&log_builder, &log_text);
598 gate_microhost_log(impl, GATE_LOG_TYPE_INFO, 0, 0, &log_msg_origin, &log_text);
599 gate_string_release(&log_text);
600 gate_strbuilder_release(&log_builder);
601 }
602
603 gate_mem_clear(subscriptions, sizeof(subscriptions));
604
605 ret = gate_mutex_acquire(&impl->lock);
606 GATE_BREAK_IF_FAILED(ret);
607 do
608 {
609 subscriptions_extracted = extract_subscriptions(impl->msg_subscriptions,
610 subscriptions, sizeof(subscriptions) / sizeof(subscriptions[0]),
611 source_address, destination_address, msg_id);
612 } while (0);
613 gate_mutex_release(&impl->lock);
614
615 ret = GATE_RESULT_OK;
616 for (index = 0; index != subscriptions_extracted; ++index)
617 {
618 if (subscriptions[index].service)
619 {
620 svc_status = gate_microservice_get_status(subscriptions[index].service);
621
622 if (svc_status == GATE_MICROSERVICE_STATUS_ONLINE)
623 {
624 {
625 gate_strbuilder_create_static(&log_builder, log_buffer, sizeof(log_buffer), 0);
626 gate_strbuilder_append(&log_builder,
627 GATE_PRINT_CSTR, "Delivering published message ID '",
628 GATE_PRINT_STRING, msg_id,
629 GATE_PRINT_CSTR, "' from '",
630 GATE_PRINT_STRING, source_address,
631 GATE_PRINT_CSTR, "' to '",
632 GATE_PRINT_STRING, &subscriptions[index].destination_address,
633 GATE_PRINT_CSTR, "'",
634 GATE_PRINT_END);
635 gate_strbuilder_to_string(&log_builder, &log_text);
636 gate_microhost_log(impl, GATE_LOG_TYPE_INFO, 0, 0, &log_msg_origin, &log_text);
637 gate_string_release(&log_text);
638 gate_strbuilder_release(&log_builder);
639 }
640
641 msg_result = gate_microservice_process_message(subscriptions[index].service, source_address, destination_address,
642 msg_id, ptr_message, subscriptions[index].user_param);
643 if (GATE_FAILED(msg_result))
644 {
645 gate_strbuilder_create_static(&log_builder, log_buffer, sizeof(log_buffer), 0);
646 gate_strbuilder_append(&log_builder,
647 GATE_PRINT_CSTR, "Delivery of published message ID '",
648 GATE_PRINT_STRING, msg_id,
649 GATE_PRINT_CSTR, "' from '",
650 GATE_PRINT_STRING, source_address,
651 GATE_PRINT_CSTR, "' to '",
652 GATE_PRINT_STRING, &subscriptions[index].destination_address,
653 GATE_PRINT_CSTR, "' has failed",
654 GATE_PRINT_END);
655 gate_strbuilder_to_string(&log_builder, &log_text);
656 gate_microhost_log(impl, GATE_LOG_TYPE_WARN, msg_result, 0, &log_msg_origin, &log_text);
657 gate_string_release(&log_text);
658 gate_strbuilder_release(&log_builder);
659 }
660 else
661 {
662 gate_strbuilder_create_static(&log_builder, log_buffer, sizeof(log_buffer), 0);
663 gate_strbuilder_append(&log_builder,
664 GATE_PRINT_CSTR, "Delivery of published message ID '",
665 GATE_PRINT_STRING, msg_id,
666 GATE_PRINT_CSTR, "' from '",
667 GATE_PRINT_STRING, source_address,
668 GATE_PRINT_CSTR, "' to '",
669 GATE_PRINT_STRING, &subscriptions[index].destination_address,
670 GATE_PRINT_CSTR, "' succeeded",
671 GATE_PRINT_END);
672 gate_strbuilder_to_string(&log_builder, &log_text);
673 gate_microhost_log(impl, GATE_LOG_TYPE_INFO, 0, 0, &log_msg_origin, &log_text);
674 gate_string_release(&log_text);
675 gate_strbuilder_release(&log_builder);
676 }
677 }
678 else
679 {
680 {
681 gate_strbuilder_create_static(&log_builder, log_buffer, sizeof(log_buffer), 0);
682 gate_strbuilder_append(&log_builder,
683 GATE_PRINT_CSTR, "Subscription target service '",
684 GATE_PRINT_STRING, &subscriptions[index].destination_address,
685 GATE_PRINT_CSTR, "' has invalid state #",
686 GATE_PRINT_UI32, svc_status,
687 GATE_PRINT_END);
688 gate_strbuilder_to_string(&log_builder, &log_text);
689 gate_microhost_log(impl, GATE_LOG_TYPE_WARN, 0, 0, &log_msg_origin, &log_text);
690 gate_string_release(&log_text);
691 gate_strbuilder_release(&log_builder);
692 }
693
694 }
695
696
697 }
698 }
699 } while (0);
700
701 release_subscriptions(subscriptions, subscriptions_extracted);
702
703 return ret;
704 }
705
706 static gate_result_t gsvchost_publish_object(void* this_ptr,
707 gate_string_t const* source_address, gate_string_t const* destination_address,
708 gate_string_t const* obj_id, gate_object_t* ptr_object)
709 {
710 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
711 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
712 subscription_t subscriptions[128];
713 gate_size_t subscriptions_extracted = 0;
714 gate_size_t index = 0;
715 gate_result_t msg_result;
716
717 do
718 {
719 gate_mem_clear(subscriptions, sizeof(subscriptions));
720
721 ret = gate_mutex_acquire(&impl->lock);
722 GATE_BREAK_IF_FAILED(ret);
723 do
724 {
725 subscriptions_extracted = extract_subscriptions(impl->obj_subscriptions,
726 subscriptions, sizeof(subscriptions) / sizeof(subscriptions[0]),
727 source_address, destination_address, NULL);
728 } while (0);
729 gate_mutex_release(&impl->lock);
730
731 ret = GATE_RESULT_OK;
732 for (index = 0; index != subscriptions_extracted; ++index)
733 {
734 if (subscriptions[index].service)
735 {
736 msg_result = gate_microservice_process_object(subscriptions[index].service, source_address, destination_address,
737 obj_id, ptr_object, subscriptions[index].user_param);
738 if (GATE_FAILED(msg_result))
739 {
740 /* TODO error logging */
741 }
742 }
743 }
744 } while (0);
745
746 release_subscriptions(subscriptions, subscriptions_extracted);
747
748 return ret;
749 }
750
751 static gate_result_t gsvchost_add_subscription(gate_servicehost_impl_t* impl, gate_arraylist_t subscription_list,
752 gate_string_t const* source_address, gate_string_t const* destination_address,
753 gate_string_t const* msg_id, gate_object_t* subscriber, void* user_data)
754 {
755 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
756 subscription_t subscr;
757 gate_microservice_t* microservice = NULL;
758
759 do
760 {
761 gate_mem_clear(&subscr, sizeof(subscr));
762
763 if (subscriber != NULL)
764 {
765 if (!gate_object_implements_interface(subscriber, GATE_INTERFACE_NAME_MICROSERVICE))
766 {
767 ret = GATE_RESULT_INVALIDARG;
768 break;
769 }
770 microservice = (gate_microservice_t*)subscriber;
771 }
772
773 if (NULL == subscription_create(&subscr, source_address, destination_address, msg_id, user_data, microservice))
774 {
775 ret = GATE_RESULT_OUTOFMEMORY;
776 break;
777 }
778
779 ret = gate_mutex_acquire(&impl->lock);
780 GATE_BREAK_IF_FAILED(ret);
781 do
782 {
783 if (NULL == gate_arraylist_add(subscription_list, &subscr))
784 {
785 ret = GATE_RESULT_OUTOFMEMORY;
786 }
787 else
788 {
789 ret = GATE_RESULT_OK;
790 }
791 } while (0);
792 gate_mutex_release(&impl->lock);
793
794 } while (0);
795
796 subscription_destroy(&subscr);
797 return ret;
798 }
799
800 static gate_bool_t gsvchost_remove_subscription_filter(void const* item, void* param)
801 {
802 subscription_t const* ptr = (subscription_t const*)item;
803 subscription_t const* pattern = (subscription_t const*)param;
804 return (ptr->service == pattern->service)
805 || (((pattern->service == NULL) || (ptr->service == NULL))
806 && gate_string_equals(&ptr->source_address, &pattern->source_address)
807 && gate_string_equals(&ptr->destination_address, &pattern->destination_address)
808 && gate_string_equals(&ptr->msg_id, &pattern->msg_id)
809 );
810 }
811
812 static gate_result_t gsvchost_remove_subscription(gate_servicehost_impl_t* impl, gate_arraylist_t subscription_list,
813 gate_string_t const* source_address, gate_string_t const* destination_address,
814 gate_string_t const* msg_id, gate_object_t* subscriber, void* user_data)
815 {
816 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
817 subscription_t subscr;
818 gate_microservice_t* microservice = NULL;
819
820 do
821 {
822 gate_mem_clear(&subscr, sizeof(subscr));
823
824 if (subscriber != NULL)
825 {
826 if (!gate_object_implements_interface(subscriber, GATE_INTERFACE_NAME_MICROSERVICE))
827 {
828 ret = GATE_RESULT_INVALIDARG;
829 break;
830 }
831 microservice = (gate_microservice_t*)subscriber;
832 }
833
834 if (NULL == subscription_create(&subscr, source_address, destination_address, msg_id, user_data, microservice))
835 {
836 ret = GATE_RESULT_OUTOFMEMORY;
837 break;
838 }
839
840 ret = gate_mutex_acquire(&impl->lock);
841 GATE_BREAK_IF_FAILED(ret);
842 do
843 {
844 ret = gate_arraylist_remove_if(subscription_list, &gsvchost_remove_subscription_filter, &subscr);
845 } while (0);
846 gate_mutex_release(&impl->lock);
847
848 } while (0);
849
850 subscription_destroy(&subscr);
851 return ret;
852 }
853
854
855 static gate_result_t gsvchost_subscribe_messages(
856 void* this_ptr, gate_string_t const* source_address, gate_string_t const* destination_address,
857 gate_string_t const* msg_id, gate_object_t* subscriber, void* user_data)
858 {
859 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
860 return gsvchost_add_subscription(impl, impl->msg_subscriptions, source_address, destination_address, msg_id, subscriber, user_data);
861 }
862 static gate_result_t gsvchost_unsubscribe_messages(
863 void* this_ptr, gate_string_t const* source_address, gate_string_t const* destination_address,
864 gate_string_t const* msg_id, gate_object_t* subscriber, void* user_data)
865 {
866 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
867 return gsvchost_remove_subscription(impl, impl->msg_subscriptions, source_address, destination_address, msg_id, subscriber, user_data);
868 }
869
870 static gate_result_t gsvchost_subscribe_objects(
871 void* this_ptr, gate_string_t const* source_address, gate_string_t const* destination_address,
872 gate_string_t const* obj_id, gate_object_t* subscriber, void* user_data)
873 {
874 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
875 return gsvchost_add_subscription(impl, impl->obj_subscriptions, source_address, destination_address, obj_id, subscriber, user_data);
876 }
877 static gate_result_t gsvchost_unsubscribe_objects(
878 void* this_ptr, gate_string_t const* source_address, gate_string_t const* destination_address,
879 gate_string_t const* obj_id, gate_object_t* subscriber, void* user_data)
880 {
881 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
882 return gsvchost_remove_subscription(impl, impl->obj_subscriptions, source_address, destination_address, obj_id, subscriber, user_data);
883 }
884
885 static gate_result_t gsvchost_remote_invoke(void* this_ptr, gate_string_t const* destination_address, gate_string_t const* method,
886 gate_struct_t const* input_data, gate_struct_t* output_data)
887 {
888 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
889 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
890 gate_microservice_t* service = NULL;
891
892 do
893 {
894 ret = gate_servicehost_get_service(impl, destination_address, &service);
895 GATE_BREAK_IF_FAILED(ret);
896
897 ret = gate_microservice_invoke(service, method, input_data, output_data);
898 } while (0);
899
900 if (service != NULL)
901 {
902 gate_object_release(service);
903 }
904 return ret;
905 }
906
907 static gate_result_t gsvchost_log(void* this_ptr, gate_uint32_t svc_log_type, gate_result_t result_code, gate_int32_t native_code,
908 gate_string_t const* origin, gate_string_t const* message)
909 {
910 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
911 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
912 gate_uint32_t logger_log_type = GATE_LOG_TYPE_INFO;
913 do
914 {
915 if (impl->logger != NULL)
916 {
917 switch (svc_log_type)
918 {
919
920 case GATE_MICROSERVICE_LOGTYPE_DEBUG: logger_log_type = GATE_LOG_TYPE_DEBUG; break;
921 case GATE_MICROSERVICE_LOGTYPE_INFO: logger_log_type = GATE_LOG_TYPE_INFO; break;
922 case GATE_MICROSERVICE_LOGTYPE_STATUS: logger_log_type = GATE_LOG_TYPE_STATUS; break;
923 case GATE_MICROSERVICE_LOGTYPE_WARNING: logger_log_type = GATE_LOG_TYPE_WARN; break;
924 case GATE_MICROSERVICE_LOGTYPE_ERROR: logger_log_type = GATE_LOG_TYPE_ERROR; break;
925 case GATE_MICROSERVICE_LOGTYPE_ALERT: logger_log_type = GATE_LOG_TYPE_FATAL; break;
926 }
927
928 ret = gate_log(impl->logger, logger_log_type, result_code, native_code, origin, message);
929 }
930 else
931 {
932 ret = GATE_RESULT_NOTREADY;
933 }
934
935 } while (0);
936
937 return ret;
938 }
939
940 static gate_result_t gsvchost_add_factory(void* this_ptr, gate_microfactory_t* factory, gate_string_t const* factory_name)
941 {
942 gate_result_t ret = GATE_RESULT_FAILED;
943 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
944 do
945 {
946 ret = gate_mutex_acquire(&impl->lock);
947 GATE_BREAK_IF_FAILED(ret);
948
949 if (NULL == gate_map_add(&impl->factories, factory_name, &factory))
950 {
951 ret = GATE_RESULT_OUTOFMEMORY;
952 }
953 gate_mutex_release(&impl->lock);
954 } while (0);
955
956 return ret;
957 }
958 static gate_result_t gsvchost_remove_factory(void* this_ptr, gate_string_t const* factory_name)
959 {
960 gate_result_t ret = GATE_RESULT_FAILED;
961 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
962 do
963 {
964 ret = gate_mutex_acquire(&impl->lock);
965 GATE_BREAK_IF_FAILED(ret);
966 gate_map_remove(&impl->factories, factory_name);
967 gate_mutex_release(&impl->lock);
968 } while (0);
969
970 return ret;
971 }
972
973 static gate_result_t gsvchost_get_factory_service_names(void* this_ptr, gate_array_t* names)
974 {
975 gate_result_t ret = GATE_RESULT_FAILED;
976 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
977 gate_arraylist_t arr = NULL;
978
979 do
980 {
981 arr = gate_util_map_export_keys(&impl->factories);
982 if (!arr)
983 {
984 ret = GATE_RESULT_OUTOFMEMORY;
985 break;
986 }
987 if (names)
988 {
989 if (NULL == gate_array_create(names, arr))
990 {
991 ret = GATE_RESULT_OUTOFMEMORY;
992 break;
993 }
994 arr = NULL;
995 }
996 ret = GATE_RESULT_OK;
997 } while (0);
998
999 gate_arraylist_release(arr);
1000 return ret;
1001 }
1002
1003 static gate_result_t gsvchost_create_service(void* this_ptr,
1004 gate_string_t const* service_name, gate_string_t const* service_instance, gate_string_t* service_address)
1005 {
1006 gate_result_t ret = GATE_RESULT_NOMATCH;
1007 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
1008 gate_microfactory_t* const* ptr_factory = NULL;
1009 gate_microfactory_t* factory = NULL;
1010 gate_microservice_t* service = NULL;
1011 gate_string_t address = GATE_STRING_INIT_EMPTY;
1012 gate_map_iterator_t iter;
1013
1014 do
1015 {
1016 ret = gate_microservice_encode_address(service_name, service_instance, &address);
1017 GATE_BREAK_IF_FAILED(ret);
1018
1019 if (NULL != gate_map_get(&impl->services, &address))
1020 {
1021 ret = GATE_RESULT_ALREADYEXISTS;
1022 break;
1023 }
1024
1025 ret = GATE_RESULT_NOMATCH;
1026 iter = gate_map_first(&impl->factories);
1027 while (gate_map_iterator_valid(iter))
1028 {
1029 ptr_factory = (gate_microfactory_t* const*)gate_map_iterator_value(iter);
1030 if (ptr_factory)
1031 {
1032 factory = *ptr_factory;
1033 if (factory)
1034 {
1035 ret = gate_microfactory_create_service(factory, service_name, &service);
1036 if (GATE_SUCCEEDED(ret))
1037 {
1038 break;
1039 }
1040 }
1041 }
1042 iter = gate_map_iterator_next(iter);
1043 }
1044 GATE_BREAK_IF_FAILED(ret);
1045
1046 if (!gate_map_add(&impl->services, &address, &service))
1047 {
1048 ret = GATE_RESULT_OUTOFMEMORY;
1049 break;
1050 }
1051
1052 ret = GATE_RESULT_OK;
1053 if (service_address)
1054 {
1055 gate_string_clone(service_address, &address);
1056 }
1057 } while (0);
1058
1059 gate_string_release(&address);
1060 if (service)
1061 {
1062 gate_object_release(service);
1063 }
1064 return ret;
1065 }
1066
1067 static gate_result_t gsvchost_get_service_addresses(void* this_ptr, gate_array_t* addresses)
1068 {
1069 gate_result_t ret = GATE_RESULT_FAILED;
1070 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
1071 gate_arraylist_t arr = NULL;
1072
1073 do
1074 {
1075 arr = gate_util_map_export_keys(&impl->services);
1076 if (!arr)
1077 {
1078 ret = GATE_RESULT_OUTOFMEMORY;
1079 break;
1080 }
1081 if (addresses)
1082 {
1083 if (NULL == gate_array_create(addresses, arr))
1084 {
1085 ret = GATE_RESULT_OUTOFMEMORY;
1086 break;
1087 }
1088 }
1089 ret = GATE_RESULT_OK;
1090 } while (0);
1091
1092 gate_arraylist_release(arr);
1093 return ret;
1094
1095 }
1096
1097 static gate_result_t gsvchost_get_service(void* this_ptr, gate_string_t const* service_address, gate_microservice_t** ptr_service)
1098 {
1099 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1100 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
1101 gate_microservice_t* microsvc = NULL;
1102 gate_microservice_t** ptr_microsvc = NULL;
1103 do
1104 {
1105
1106 ret = gate_mutex_acquire(&impl->lock);
1107 GATE_BREAK_IF_FAILED(ret);
1108 do
1109 {
1110 ptr_microsvc = gate_map_get_value(&impl->services, service_address);
1111 if (NULL == ptr_microsvc)
1112 {
1113 ret = GATE_RESULT_NOMATCH;
1114 }
1115 else
1116 {
1117 microsvc = *ptr_microsvc;
1118 if (microsvc != NULL)
1119 {
1120 gate_object_retain(microsvc);
1121 ret = GATE_RESULT_OK;
1122 }
1123 else
1124 {
1125 ret = GATE_RESULT_NULLPOINTER;
1126 }
1127 }
1128 } while (0);
1129 gate_mutex_release(&impl->lock);
1130
1131 GATE_BREAK_IF_FAILED(ret);
1132
1133 if (ptr_service != NULL)
1134 {
1135 *ptr_service = microsvc;
1136 microsvc = NULL;
1137 }
1138 } while (0);
1139
1140 if (microsvc != NULL)
1141 {
1142 gate_object_release(microsvc);
1143 }
1144
1145 return ret;
1146 }
1147
1148 static gate_result_t gsvchost_get_service_config(void* this_ptr, gate_string_t const* service_address, gate_uint32_t config_type, gate_value_t* value)
1149 {
1150 gate_result_t ret = GATE_RESULT_FAILED;
1151 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
1152
1153 do
1154 {
1155 gate_map_t const* ptr_map;
1156 gate_value_t const* ptr_value;
1157 gate_map_iterator_t iter = gate_map_get(&impl->services_configs, service_address);
1158 if (!gate_map_iterator_valid(iter))
1159 {
1160 break;
1161 }
1162 ptr_map = (gate_map_t const*)gate_map_iterator_value(iter);
1163 if (!ptr_map)
1164 {
1165 break;
1166 }
1167 iter = gate_map_get(ptr_map, &config_type);
1168 if (!gate_map_iterator_valid(iter))
1169 {
1170 break;
1171 }
1172 ptr_value = (gate_value_t const*)gate_map_iterator_value(iter);
1173 if (!ptr_value)
1174 {
1175 break;
1176 }
1177 if (NULL == gate_value_clone(ptr_value, value))
1178 {
1179 ret = GATE_RESULT_OUTOFMEMORY;
1180 break;
1181 }
1182 ret = GATE_RESULT_OK;
1183 } while (0);
1184
1185 return ret;
1186 }
1187
1188 static gate_result_t gsvchost_set_service_config(void* this_ptr, gate_string_t const* service_address, gate_uint32_t config_type, gate_value_t const* value)
1189 {
1190 gate_result_t ret = GATE_RESULT_FAILED;
1191 gate_servicehost_impl_t* impl = (gate_servicehost_impl_t*)this_ptr;
1192
1193
1194
1195 do
1196 {
1197 gate_map_t tmp = GATE_INIT_EMPTY;
1198 gate_map_t* ptr_map = NULL;
1199 gate_map_iterator_t iter = gate_map_get(&impl->services_configs, service_address);
1200 if (!gate_map_iterator_valid(iter))
1201 {
1202 gate_map_create(&tmp, gate_compare_uint32, sizeof(gate_uint32_t), NULL, NULL,
1203 sizeof(gate_value_t), &gate_value_copy_constructor, &gate_value_destructor);
1204 iter = gate_map_add(&tmp, &config_type, value);
1205 if (!gate_map_iterator_valid(iter))
1206 {
1207 gate_map_destroy(&tmp);
1208 break;
1209 }
1210 iter = gate_map_add(&impl->services_configs, service_address, &tmp);
1211 gate_map_destroy(&tmp);
1212 if (gate_map_iterator_valid(iter))
1213 {
1214 ret = GATE_RESULT_OK;
1215 }
1216 }
1217 else
1218 {
1219 ptr_map = (gate_map_t*)gate_map_iterator_value(iter);
1220 if (!ptr_map)
1221 {
1222 break;
1223 }
1224 iter = gate_map_add(ptr_map, &config_type, value);
1225 if (gate_map_iterator_valid(iter))
1226 {
1227 ret = GATE_RESULT_OK;
1228 }
1229 }
1230
1231 } while (0);
1232
1233 return ret;
1234 }
1235
1236
1237 static gate_result_t gsvchost_start_service(void* this_ptr, gate_string_t const* service_address)
1238 {
1239 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1240 gate_servicehost_impl_t* const impl = (gate_servicehost_impl_t*)this_ptr;
1241 gate_microservice_t* service = NULL;
1242
1243 do
1244 {
1245
1246 ret = gsvchost_get_service(this_ptr, service_address, &service);
1247 GATE_BREAK_IF_FAILED(ret);
1248 ret = gate_microservice_setup(service, service_address, ((gate_microhost_t*)impl));
1249 GATE_BREAK_IF_FAILED(ret);
1250 ret = gate_microservice_start(service);
1251 } while (0);
1252
1253 if (service != NULL)
1254 {
1255 gate_object_release(service);
1256 }
1257
1258 return ret;
1259 }
1260 static gate_result_t gsvchost_stop_service(void* this_ptr, gate_string_t const* service_address)
1261 {
1262 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1263 gate_microservice_t* service = NULL;
1264
1265 do
1266 {
1267 ret = gsvchost_get_service(this_ptr, service_address, &service);
1268 GATE_BREAK_IF_FAILED(ret);
1269 ret = gate_microservice_stop(service);
1270 } while (0);
1271
1272 if (service != NULL)
1273 {
1274 gate_object_release(service);
1275 }
1276
1277 return ret;
1278 }
1279
1280
1281 static gate_bool_t gsvchost_remove_subscription_by_pointer(void const* item, void* param)
1282 {
1283 subscription_t const* subscr = (subscription_t const*)item;
1284 gate_microservice_t* service = (gate_microservice_t*)param;
1285
1286 return subscr->service == service;
1287 }
1288
1289 static gate_bool_t gsvchost_remove_subscription_by_name(void const* item, void* param)
1290 {
1291 subscription_t const* subscr = (subscription_t const*)item;
1292 gate_string_t const* name = (gate_string_t const*)param;
1293
1294 return gate_string_equals(&subscr->destination_address, name);
1295 }
1296
1297
1298 static gate_result_t gsvchost_remove_service(void* this_ptr, gate_string_t const* service_address)
1299 {
1300 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1301 gate_servicehost_impl_t* const impl = (gate_servicehost_impl_t*)this_ptr;
1302 gate_microservice_t* microsvc = NULL;
1303 do
1304 {
1305
1306 ret = gate_mutex_acquire(&impl->lock);
1307 GATE_BREAK_IF_FAILED(ret);
1308 do
1309 {
1310 gate_microservice_t** ptr_microsvc = (gate_microservice_t**)gate_map_get_value(&impl->services, service_address);
1311 if (NULL == ptr_microsvc)
1312 {
1313 ret = GATE_RESULT_NOMATCH;
1314 }
1315 else
1316 {
1317 microsvc = *ptr_microsvc;
1318 if (microsvc != NULL)
1319 {
1320 gate_object_retain(microsvc);
1321 ret = GATE_RESULT_OK;
1322
1323 gate_arraylist_remove_if(impl->msg_subscriptions, &gsvchost_remove_subscription_by_pointer, microsvc);
1324 gate_arraylist_remove_if(impl->msg_subscriptions, &gsvchost_remove_subscription_by_name, (void*)service_address);
1325 gate_arraylist_remove_if(impl->obj_subscriptions, &gsvchost_remove_subscription_by_pointer, microsvc);
1326 gate_arraylist_remove_if(impl->obj_subscriptions, &gsvchost_remove_subscription_by_name, (void*)service_address);
1327 }
1328 else
1329 {
1330 ret = GATE_RESULT_NULLPOINTER;
1331 }
1332 gate_map_remove(&impl->services, service_address);
1333 }
1334 } while (0);
1335 gate_mutex_release(&impl->lock);
1336
1337 GATE_BREAK_IF_FAILED(ret);
1338
1339 } while (0);
1340
1341 if (microsvc != NULL)
1342 {
1343 gate_microservice_stop(microsvc);
1344 gate_object_release(microsvc);
1345 }
1346
1347 return ret;
1348 }
1349
1350 static gate_size_t gsvchost_start_services(gate_servicehost_impl_t* host,
1351 gate_string_t const* const* addresses, gate_size_t addresses_count)
1352 {
1353 gate_size_t started_counter = 0;
1354
1355 while (addresses_count-- != 0)
1356 {
1357 gate_microservice_t* ptr_service;
1358 gate_result_t result = gate_servicehost_get_service(host, *addresses, &ptr_service);
1359 if (GATE_SUCCEEDED(result))
1360 {
1361 result = gate_microservice_setup(ptr_service, *addresses, (gate_microhost_t*)host);
1362 if (GATE_SUCCEEDED(result))
1363 {
1364 result = gate_microservice_start(ptr_service);
1365 if (GATE_SUCCEEDED(result))
1366 {
1367 ++started_counter;
1368 }
1369 }
1370 gate_object_release(ptr_service);
1371 }
1372 ++addresses;
1373 }
1374 return started_counter;
1375 }
1376
1377 static gate_size_t gsvchost_stop_services(gate_servicehost_impl_t* host,
1378 gate_string_t const* const* addresses, gate_size_t addresses_count)
1379 {
1380 gate_size_t started_counter = 0;
1381
1382 while (addresses_count-- != 0)
1383 {
1384 gate_microservice_t* ptr_service;
1385 gate_result_t result = gate_servicehost_get_service(host, *addresses, &ptr_service);
1386 if (GATE_SUCCEEDED(result))
1387 {
1388 result = gate_microservice_stop(ptr_service);
1389 if (GATE_SUCCEEDED(result))
1390 {
1391 ++started_counter;
1392 }
1393 gate_object_release(ptr_service);
1394 }
1395 ++addresses;
1396 }
1397 return started_counter;
1398 }
1399
1400 static void gsvchost_release_services(gate_servicehost_impl_t* host,
1401 gate_string_t const* const* addresses, gate_size_t addresses_count)
1402 {
1403 while (addresses_count-- != 0)
1404 {
1405 gate_servicehost_release_service(host, *addresses);
1406 ++addresses;
1407 }
1408 }
1409
1410
1411 static gate_result_t gsvchost_start_up(void* this_ptr)
1412 {
1413 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1414 gate_servicehost_impl_t* const impl = (gate_servicehost_impl_t*)this_ptr;
1415 gate_array_t addresses = GATE_INIT_EMPTY;
1416
1417 do
1418 {
1419 gate_size_t index;
1420 gate_size_t length;
1421 gate_string_t const* addresses_to_start[1024];
1422 gate_size_t const addresses_max = sizeof(addresses_to_start) / sizeof(addresses_to_start[0]);
1423 gate_size_t addresses_to_start_count = 0;
1424 gate_value_t value_autostart_enabled = GATE_INIT_EMPTY;
1425
1426 ret = gate_servicehost_get_service_addresses(impl, &addresses);
1427 GATE_BREAK_IF_FAILED(ret);
1428
1429 length = gate_array_length(&addresses);
1430 for (index = 0; index != length; ++index)
1431 {
1432 gate_string_t const* ptr_address = (gate_string_t const*)gate_array_get(&addresses, index);
1433 if (ptr_address)
1434 {
1435 gate_bool_t autostart_enabled = false;
1436 gate_result_t result = gate_servicehost_get_service_config(impl, ptr_address,
1437 GATE_SERVICEHOST_CONFIG_AUTOSTART,
1438 &value_autostart_enabled);
1439 if (GATE_SUCCEEDED(result))
1440 {
1441 gate_value_get(&value_autostart_enabled, &autostart_enabled, sizeof(autostart_enabled));
1442 }
1443
1444 if (autostart_enabled)
1445 {
1446 addresses_to_start[addresses_to_start_count++] = ptr_address;
1447 if (addresses_to_start_count >= addresses_max)
1448 {
1449 break;
1450 }
1451 }
1452 }
1453 }
1454 gsvchost_start_services(impl, addresses_to_start, addresses_to_start_count);
1455 } while (0);
1456
1457 gate_array_release(&addresses);
1458
1459 return ret;
1460 }
1461 static gate_result_t gsvchost_shut_down(void* this_ptr, gate_bool_t release_services)
1462 {
1463 gate_result_t ret = GATE_RESULT_NOTIMPLEMENTED;
1464 gate_servicehost_impl_t* const impl = (gate_servicehost_impl_t*)this_ptr;
1465 gate_array_t addresses = GATE_INIT_EMPTY;
1466
1467 do
1468 {
1469 gate_size_t index;
1470 gate_size_t length;
1471 gate_string_t const* addresses_to_stop[512];
1472 gate_size_t const addresses_max = sizeof(addresses_to_stop) / sizeof(addresses_to_stop[0]);
1473 gate_size_t addresses_to_stop_count = 0;
1474
1475 ret = gate_servicehost_get_service_addresses(impl, &addresses);
1476 GATE_BREAK_IF_FAILED(ret);
1477
1478 length = gate_array_length(&addresses);
1479 for (index = 0; index != length; ++index)
1480 {
1481 gate_string_t const* ptr_address = (gate_string_t const*)gate_array_get(&addresses, index);
1482 if (ptr_address)
1483 {
1484 addresses_to_stop[addresses_to_stop_count++] = ptr_address;
1485 if (addresses_to_stop_count >= addresses_max)
1486 {
1487 break;
1488 }
1489 }
1490 }
1491 gsvchost_stop_services(impl, addresses_to_stop, addresses_to_stop_count);
1492 if (release_services)
1493 {
1494 gsvchost_release_services(impl, addresses_to_stop, addresses_to_stop_count);
1495 }
1496 } while (0);
1497
1498 gate_array_release(&addresses);
1499
1500 return ret;
1501 }
1502
1503
1504 static GATE_INTERFACE_VTBL(gate_servicehost) gate_servicehost_vtbl;
1505 static void gate_init_servicehost_vtbl()
1506 {
1507 if (!gate_servicehost_vtbl.get_interface_name)
1508 {
1509 GATE_INTERFACE_VTBL(gate_servicehost) const local_vtbl =
1510 {
1511 &gsvchost_get_interface_name,
1512 &gsvchost_release,
1513 &gsvchost_retain,
1514
1515 &gsvchost_publish_message,
1516 &gsvchost_publish_object,
1517
1518 &gsvchost_subscribe_messages,
1519 &gsvchost_unsubscribe_messages,
1520
1521 &gsvchost_subscribe_objects,
1522 &gsvchost_unsubscribe_objects,
1523
1524 &gsvchost_remote_invoke,
1525
1526 &gsvchost_log,
1527
1528 &gsvchost_add_factory,
1529 &gsvchost_remove_factory,
1530 &gsvchost_get_factory_service_names,
1531
1532 &gsvchost_create_service,
1533 &gsvchost_get_service_addresses,
1534 &gsvchost_get_service,
1535 &gsvchost_get_service_config,
1536 &gsvchost_set_service_config,
1537 &gsvchost_start_service,
1538 &gsvchost_stop_service,
1539 &gsvchost_remove_service,
1540
1541 &gsvchost_start_up,
1542 &gsvchost_shut_down
1543 };
1544 gate_servicehost_vtbl = local_vtbl;
1545 }
1546 }
1547
1548
1549 gate_servicehost_t* gate_servicehost_create()
1550 {
1551 gate_servicehost_t* ret = NULL;
1552 gate_servicehost_impl_t* impl = NULL;
1553 do
1554 {
1555 impl = (gate_servicehost_impl_t*)gate_mem_alloc(sizeof(gate_servicehost_impl_t));
1556 if (impl == NULL)
1557 {
1558 break;
1559 }
1560
1561 if (GATE_SUCCEEDED(gsvchost_init_members(impl)))
1562 {
1563 gate_init_servicehost_vtbl();
1564 impl->vtbl = &gate_servicehost_vtbl;
1565 ret = (gate_servicehost_t*)impl;
1566 impl = NULL;
1567 }
1568
1569 } while (0);
1570
1571 if (impl != NULL)
1572 {
1573 gsvchost_destroy(impl);
1574 }
1575
1576 return ret;
1577 }
1578
1579
1580 gate_result_t gate_servicehost_load_config(gate_stream_t* srcstream, gate_uint32_t format, gate_servicehost_config_t* ptr_config)
1581 {
1582 gate_result_t ret = GATE_RESULT_FAILED;
1583 gate_property_t prop = GATE_INIT_EMPTY;
1584
1585 do
1586 {
1587 switch(format)
1588 {
1589 case GATE_SERVICEHOST_CONFIG_FORMAT_YAML:
1590 {
1591 gate_yaml_result_t yaml_result = GATE_INIT_EMPTY;
1592 ret = gate_yaml_parse(srcstream, &prop, &yaml_result);
1593 GATE_BREAK_IF_FAILED(ret);
1594 if (!yaml_result.succeeded)
1595 {
1596 ret = GATE_RESULT_INVALIDCONTENT;
1597 }
1598 break;
1599 }
1600 case GATE_SERVICEHOST_CONFIG_FORMAT_JSON:
1601 {
1602 gate_json_result_t json_result = GATE_INIT_EMPTY;
1603 ret = gate_json_parse(srcstream, &prop, &json_result);
1604 GATE_BREAK_IF_FAILED(ret);
1605 if (!json_result.succeeded)
1606 {
1607 ret = GATE_RESULT_INVALIDCONTENT;
1608 }
1609 break;
1610 }
1611 default:
1612 {
1613 ret = GATE_RESULT_INVALIDARG;
1614 break;
1615 }
1616 }
1617 GATE_BREAK_IF_FAILED(ret);
1618
1619 if (gate_property_get_type(&prop) != GATE_PROPERTY_TYPE_OBJECT)
1620 {
1621 ret = GATE_RESULT_INVALIDDATA;
1622 break;
1623 }
1624
1625 ret = gate_servicehost_config_init(&ptr_config->struct_base);
1626 GATE_BREAK_IF_FAILED(ret);
1627
1628 ret = gate_property_export(&prop, GATE_TYPE_STRUCT, &ptr_config->struct_base);
1629 } while (0);
1630
1631 gate_property_destroy(&prop);
1632
1633 return ret;
1634 }
1635 gate_result_t gate_servicehost_save_config(gate_servicehost_config_t const* config, gate_uint32_t format, gate_stream_t* deststream)
1636 {
1637 gate_result_t ret = GATE_RESULT_FAILED;
1638 gate_property_t prop = GATE_INIT_EMPTY;
1639
1640 do
1641 {
1642 ret = gate_property_import(&prop, GATE_TYPE_STRUCT, &config->struct_base);
1643 GATE_BREAK_IF_FAILED(ret);
1644
1645 switch(format)
1646 {
1647 case GATE_SERVICEHOST_CONFIG_FORMAT_YAML:
1648 ret = gate_yaml_build(&prop, deststream);
1649 break;
1650 case GATE_SERVICEHOST_CONFIG_FORMAT_JSON:
1651 ret = gate_json_build(&prop, deststream, 2);
1652 break;
1653 default:
1654 ret = GATE_RESULT_INVALIDARG;
1655 break;
1656 }
1657 GATE_BREAK_IF_FAILED(ret);
1658
1659 } while (0);
1660
1661 gate_property_destroy(&prop);
1662
1663 return ret;
1664 }
1665
1666 gate_result_t gate_servicehost_import_config(gate_servicehost_t* host, gate_servicehost_config_t const* config)
1667 {
1668 gate_result_t ret = GATE_RESULT_FAILED;
1669 gate_size_t index;
1670 gate_size_t count = gate_arraylist_length(config->services);
1671 gate_string_t address = GATE_STRING_INIT_EMPTY;
1672
1673 for (index = 0; index != count; ++index)
1674 {
1675 gate_microservice_t* ptr_microsvc = NULL;
1676 gate_servicehost_config_service_t const* ptr_svc =
1677 (gate_servicehost_config_service_t const*)gate_arraylist_get(config->services, index);
1678 if(!ptr_svc)
1679 {
1680 continue;
1681 }
1682 ret = gate_servicehost_create_service(host, &ptr_svc->name, &ptr_svc->instance, &address);
1683 GATE_BREAK_IF_FAILED(ret);
1684 ret = gate_servicehost_get_service(host, &address, &ptr_microsvc);
1685 if (GATE_SUCCEEDED(ret))
1686 {
1687 gate_size_t subscr_count;
1688 gate_size_t subscr_index;
1689 gate_value_t cfg_value;
1690
1691 if (gate_property_get_type(&ptr_svc->parameters) == GATE_PROPERTY_TYPE_OBJECT)
1692 {
1693 gate_size_t name_ndx, name_count;
1694 gate_array_t param_names = GATE_INIT_EMPTY;
1695
1696 if (NULL == gate_property_member_names(&ptr_svc->parameters, &param_names))
1697 {
1698 ret = GATE_RESULT_OUTOFMEMORY;
1699 break;
1700 }
1701
1702 name_count = gate_array_length(&param_names);
1703 for (name_ndx = 0; name_ndx != name_count; ++name_ndx)
1704 {
1705 gate_string_t const* const ptr_name = (gate_string_t const*)gate_array_get(&param_names, name_ndx);
1706 if (ptr_name)
1707 {
1708 gate_property_t const* ptr_param_prop = gate_property_member_get(&ptr_svc->parameters, ptr_name);
1709 if (ptr_param_prop)
1710 {
1711 ret = gate_microservice_set_parameter(ptr_microsvc, ptr_name, ptr_param_prop);
1712 }
1713 }
1714 }
1715 gate_array_release(&param_names);
1716 }
1717 subscr_count = gate_arraylist_length(ptr_svc->msg_subscriptions);
1718 for (subscr_index = 0; subscr_index != subscr_count; ++subscr_index)
1719 {
1720 gate_string_t const* const subscr_str = (gate_string_t const*)gate_arraylist_get(ptr_svc->msg_subscriptions, subscr_index);
1721 if (subscr_str)
1722 {
1723 gate_size_t subscr_pos = gate_string_char_pos(subscr_str, ':', 0);
1724 if (subscr_pos == GATE_STR_NPOS)
1725 {
1726 gate_servicehost_subscribe_messages(host, subscr_str, &address, NULL, ptr_microsvc, NULL);
1727 }
1728 else
1729 {
1730 gate_string_t subscr_id = GATE_STRING_INIT_EMPTY;
1731 gate_string_t subscr_addr = GATE_STRING_INIT_EMPTY;
1732 gate_string_substr(&subscr_addr, subscr_str, 0, subscr_pos);
1733 gate_string_substr(&subscr_id, subscr_str, subscr_pos + 1, GATE_STR_NPOS);
1734 gate_servicehost_subscribe_messages(host, &subscr_addr, &address, &subscr_id, ptr_microsvc, NULL);
1735 gate_string_release(&subscr_id);
1736 gate_string_release(&subscr_addr);
1737 }
1738 }
1739 }
1740
1741 /* inject advanced configuration values */
1742 if (gate_value_create(GATE_TYPE_BOOL, &ptr_svc->autostart, &cfg_value))
1743 {
1744 gate_servicehost_set_service_config(host, &address, GATE_SERVICEHOST_CONFIG_AUTOSTART, &cfg_value);
1745 gate_value_release(&cfg_value);
1746 }
1747 if (gate_value_create(GATE_TYPE_UI32, &ptr_svc->startorder, &cfg_value))
1748 {
1749 gate_servicehost_set_service_config(host, &address, GATE_SERVICEHOST_CONFIG_STARTORDER, &cfg_value);
1750 gate_value_release(&cfg_value);
1751 }
1752 if (gate_value_create(GATE_TYPE_STRING, &ptr_svc->service_group, &cfg_value))
1753 {
1754 gate_servicehost_set_service_config(host, &address, GATE_SERVICEHOST_CONFIG_SERVICEGROUP, &cfg_value);
1755 gate_value_release(&cfg_value);
1756 }
1757 }
1758 if (NULL != ptr_microsvc)
1759 {
1760 gate_object_release(ptr_microsvc);
1761 }
1762
1763 gate_string_release(&address);
1764 ret = GATE_RESULT_OK;
1765 }
1766
1767 gate_string_release(&address);
1768 return ret;
1769 }
1770
1771 static const gate_string_t default_servicehost_config_id = GATE_STRING_INIT_STATIC("gate/microservices/config");
1772 static const gate_string_t default_servicehost_config_version = GATE_STRING_INIT_STATIC("1.0");
1773
1774 gate_result_t gate_servicehost_export_config(gate_servicehost_t* host, gate_servicehost_config_t* config)
1775 {
1776 gate_result_t ret = GATE_RESULT_FAILED;
1777 gate_array_t addresses = GATE_INIT_EMPTY;
1778 gate_microservice_t* ptr_svc = NULL;
1779 do
1780 {
1781 gate_size_t ndx, length;
1782 gate_property_t prop;
1783 gate_size_t name_index, name_count;
1784
1785 ret = gate_servicehost_get_service_addresses(host, &addresses);
1786 GATE_BREAK_IF_FAILED(ret);
1787
1788 ret = gate_servicehost_config_init(&config->struct_base);
1789 GATE_BREAK_IF_FAILED(ret);
1790
1791 gate_string_duplicate(&config->id, &default_servicehost_config_id);
1792 gate_string_duplicate(&config->version, &default_servicehost_config_version);
1793
1794 length = gate_array_length(&addresses);
1795 for (ndx = 0; ndx != length; ++ndx)
1796 {
1797 gate_servicehost_config_service_t cfg_svc;
1798 gate_string_t const* ptr_str;
1799 if (ptr_svc != NULL)
1800 {
1801 gate_object_release(ptr_svc);
1802 ptr_svc = NULL;
1803 }
1804
1805 ptr_str = (gate_string_t const*)gate_array_get(&addresses, ndx);
1806 if (!ptr_str)
1807 {
1808 continue;
1809 }
1810 ret = gate_servicehost_get_service(host, ptr_str, &ptr_svc);
1811 if (GATE_FAILED(ret))
1812 {
1813 continue;
1814 }
1815
1816 ret = gate_servicehost_config_service_init(&cfg_svc.struct_base);
1817
1818 if (GATE_SUCCEEDED(ret))
1819 {
1820 gate_array_t param_names = GATE_INIT_EMPTY;
1821 gate_string_t const str_name = GATE_STRING_INIT_EMPTY;
1822 gate_string_t const str_instance = GATE_STRING_INIT_EMPTY;
1823 gate_string_duplicate(&cfg_svc.name, &str_name);
1824 gate_string_duplicate(&cfg_svc.instance, &str_instance);
1825
1826 ret = gate_microservice_get_parameter_names(ptr_svc, &param_names);
1827 if (GATE_SUCCEEDED(ret))
1828 {
1829 gate_property_t service_params;
1830 if (NULL != gate_property_create_object(&service_params))
1831 {
1832 name_count = gate_array_length(&param_names);
1833 for (name_index = 0; name_index != name_count; ++name_index)
1834 {
1835 gate_string_t const* const ptr_param_name = (gate_string_t const*)gate_array_get(&param_names, name_index);
1836 if (ptr_param_name)
1837 {
1838 ret = gate_microservice_get_parameter(ptr_svc, ptr_param_name, &prop);
1839 if (GATE_FAILED(ret))
1840 {
1841 continue;
1842 }
1843 gate_property_member_add(&service_params, ptr_param_name, &prop);
1844 gate_property_destroy(&prop);
1845 }
1846 }
1847 gate_mem_copy(&cfg_svc.parameters, &service_params, sizeof(gate_property_t));
1848 }
1849 gate_array_release(&param_names);
1850 }
1851 gate_arraylist_add(config->services, &cfg_svc.struct_base);
1852 gate_struct_release(&cfg_svc.struct_base);
1853 }
1854 }
1855
1856 } while (0);
1857
1858 if (ptr_svc != NULL)
1859 {
1860 gate_object_release(ptr_svc);
1861 ptr_svc = NULL;
1862 }
1863
1864 gate_array_release(&addresses);
1865 return ret;
1866 }
1867
1868 gate_result_t gate_servicehost_release_services(gate_servicehost_t* host)
1869 {
1870 gate_result_t result;
1871 gate_array_t addrs = GATE_INIT_EMPTY;
1872
1873 do
1874 {
1875 gate_enumerator_t addrs_enum;
1876 gate_string_t const* ptr_addr;
1877 result = gate_servicehost_get_service_addresses(host, &addrs);
1878 GATE_BREAK_IF_FAILED(result);
1879
1880 if (NULL == gate_array_enumerate(&addrs, &addrs_enum))
1881 {
1882 result = GATE_RESULT_OUTOFMEMORY;
1883 break;
1884 }
1885
1886 GATE_FOR_ENUMERATOR(gate_string_t, ptr_addr, &addrs_enum)
1887 {
1888 gate_servicehost_release_service(host, ptr_addr);
1889 }
1890
1891 } while (0);
1892
1893 gate_array_release(&addrs);
1894
1895 return result;
1896 }
1897