... |
... |
@@ -71,7 +71,10 @@ class BuildGridServer: |
71
|
71
|
self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
|
72
|
72
|
self.__print_log_records = True
|
73
|
73
|
|
|
74
|
+ self.__build_metadata_queues = None
|
|
75
|
+
|
74
|
76
|
self.__state_monitoring_task = None
|
|
77
|
+ self.__build_monitoring_tasks = None
|
75
|
78
|
self.__logging_task = None
|
76
|
79
|
|
77
|
80
|
# We always want a capabilities service
|
... |
... |
@@ -95,6 +98,8 @@ class BuildGridServer: |
95
|
98
|
self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
|
96
|
99
|
serialisation_format=MonitoringOutputFormat.JSON)
|
97
|
100
|
|
|
101
|
+ self.__build_monitoring_tasks = []
|
|
102
|
+
|
98
|
103
|
# Setup the main logging handler:
|
99
|
104
|
root_logger = logging.getLogger()
|
100
|
105
|
|
... |
... |
@@ -119,6 +124,18 @@ class BuildGridServer: |
119
|
124
|
self._state_monitoring_worker(period=MONITORING_PERIOD),
|
120
|
125
|
loop=self.__main_loop)
|
121
|
126
|
|
|
127
|
+ self.__build_monitoring_tasks.clear()
|
|
128
|
+ for instance_name, scheduler in self._schedulers.items():
|
|
129
|
+ if not scheduler.is_instrumented:
|
|
130
|
+ continue
|
|
131
|
+
|
|
132
|
+ message_queue = janus.Queue(loop=self.__main_loop)
|
|
133
|
+ scheduler.register_build_metadata_watcher(message_queue.sync_q)
|
|
134
|
+
|
|
135
|
+ self.__build_monitoring_tasks.append(asyncio.ensure_future(
|
|
136
|
+ self._build_monitoring_worker(instance_name, message_queue),
|
|
137
|
+ loop=self.__main_loop))
|
|
138
|
+
|
122
|
139
|
self.__logging_task = asyncio.ensure_future(
|
123
|
140
|
self._logging_worker(), loop=self.__main_loop)
|
124
|
141
|
|
... |
... |
@@ -132,6 +149,10 @@ class BuildGridServer: |
132
|
149
|
if self.__state_monitoring_task is not None:
|
133
|
150
|
self.__state_monitoring_task.cancel()
|
134
|
151
|
|
|
152
|
+ for build_monitoring_task in self.__build_monitoring_tasks:
|
|
153
|
+ build_monitoring_task.cancel()
|
|
154
|
+ self.__build_monitoring_tasks.clear()
|
|
155
|
+
|
135
|
156
|
self.__monitoring_bus.stop()
|
136
|
157
|
|
137
|
158
|
if self.__logging_task is not None:
|
... |
... |
@@ -352,6 +373,60 @@ class BuildGridServer: |
352
|
373
|
|
353
|
374
|
return log_record
|
354
|
375
|
|
|
376
|
+ async def _build_monitoring_worker(self, instance_name, message_queue):
|
|
377
|
+ """Publishes builds metadata to the monitoring bus."""
|
|
378
|
+ async def __build_monitoring_worker():
|
|
379
|
+ metadata, context = await message_queue.async_q.get()
|
|
380
|
+
|
|
381
|
+ context.update({'instance-name': instance_name or ''})
|
|
382
|
+
|
|
383
|
+ # Emit build inputs fetching time record:
|
|
384
|
+ fetch_start = metadata.input_fetch_start_timestamp.ToDatetime()
|
|
385
|
+ fetch_completed = metadata.input_fetch_completed_timestamp.ToDatetime()
|
|
386
|
+ input_fetch_time = fetch_completed - fetch_start
|
|
387
|
+ timer_record = self._forge_timer_metric_record(
|
|
388
|
+ MetricRecordDomain.BUILD, 'inputs-fetching-time', input_fetch_time,
|
|
389
|
+ metadata=context)
|
|
390
|
+
|
|
391
|
+ await self.__monitoring_bus.send_record(timer_record)
|
|
392
|
+
|
|
393
|
+ # Emit build execution time record:
|
|
394
|
+ execution_start = metadata.execution_start_timestamp.ToDatetime()
|
|
395
|
+ execution_completed = metadata.execution_completed_timestamp.ToDatetime()
|
|
396
|
+ execution_time = execution_completed - execution_start
|
|
397
|
+ timer_record = self._forge_timer_metric_record(
|
|
398
|
+ MetricRecordDomain.BUILD, 'execution-time', execution_time,
|
|
399
|
+ metadata=context)
|
|
400
|
+
|
|
401
|
+ await self.__monitoring_bus.send_record(timer_record)
|
|
402
|
+
|
|
403
|
+ # Emit build outputs uploading time record:
|
|
404
|
+ upload_start = metadata.output_upload_start_timestamp.ToDatetime()
|
|
405
|
+ upload_completed = metadata.output_upload_completed_timestamp.ToDatetime()
|
|
406
|
+ output_upload_time = upload_completed - upload_start
|
|
407
|
+ timer_record = self._forge_timer_metric_record(
|
|
408
|
+ MetricRecordDomain.BUILD, 'outputs-uploading-time', output_upload_time,
|
|
409
|
+ metadata=context)
|
|
410
|
+
|
|
411
|
+ await self.__monitoring_bus.send_record(timer_record)
|
|
412
|
+
|
|
413
|
+ # Emit total build handling time record:
|
|
414
|
+ queued = metadata.queued_timestamp.ToDatetime()
|
|
415
|
+ worker_completed = metadata.worker_completed_timestamp.ToDatetime()
|
|
416
|
+ total_handling_time = worker_completed - queued
|
|
417
|
+ timer_record = self._forge_timer_metric_record(
|
|
418
|
+ MetricRecordDomain.BUILD, 'total-handling-time', total_handling_time,
|
|
419
|
+ metadata=context)
|
|
420
|
+
|
|
421
|
+ await self.__monitoring_bus.send_record(timer_record)
|
|
422
|
+
|
|
423
|
+ try:
|
|
424
|
+ while True:
|
|
425
|
+ await __build_monitoring_worker()
|
|
426
|
+
|
|
427
|
+ except asyncio.CancelledError:
|
|
428
|
+ pass
|
|
429
|
+
|
355
|
430
|
async def _state_monitoring_worker(self, period=1.0):
|
356
|
431
|
"""Periodically publishes state metrics to the monitoring bus."""
|
357
|
432
|
async def __state_monitoring_worker():
|
... |
... |
@@ -463,7 +538,7 @@ class BuildGridServer: |
463
|
538
|
n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
|
464
|
539
|
gauge_record = self._forge_gauge_metric_record(
|
465
|
540
|
MetricRecordDomain.STATE, 'clients-count', n_clients,
|
466
|
|
- metadata={'instance-name': instance_name or 'void'})
|
|
541
|
+ metadata={'instance-name': instance_name or ''})
|
467
|
542
|
|
468
|
543
|
return n_clients, gauge_record
|
469
|
544
|
|
... |
... |
@@ -480,7 +555,7 @@ class BuildGridServer: |
480
|
555
|
n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
|
481
|
556
|
gauge_record = self._forge_gauge_metric_record(
|
482
|
557
|
MetricRecordDomain.STATE, 'bots-count', n_bots,
|
483
|
|
- metadata={'instance-name': instance_name or 'void'})
|
|
558
|
+ metadata={'instance-name': instance_name or ''})
|
484
|
559
|
|
485
|
560
|
return n_bots, gauge_record
|
486
|
561
|
|
... |
... |
@@ -498,6 +573,6 @@ class BuildGridServer: |
498
|
573
|
am_queue_time = self._schedulers[instance_name].query_am_queue_time()
|
499
|
574
|
timer_record = self._forge_timer_metric_record(
|
500
|
575
|
MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
|
501
|
|
- metadata={'instance-name': instance_name or 'void'})
|
|
576
|
+ metadata={'instance-name': instance_name or ''})
|
502
|
577
|
|
503
|
578
|
return am_queue_time, timer_record
|