[Notes] [Git][BuildGrid/buildgrid][mablanch/139-emit-build-metrics] 3 commits: scheduler.py: Allow registering for build metadata updates



Title: GitLab

Martin Blanchard pushed to branch mablanch/139-emit-build-metrics at BuildGrid / buildgrid

Commits:

2 changed files:

Changes:

  • buildgrid/server/instance.py
    ... ... @@ -71,7 +71,10 @@ class BuildGridServer:
    71 71
             self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
    
    72 72
             self.__print_log_records = True
    
    73 73
     
    
    74
    +        self.__build_metadata_queues = None
    
    75
    +
    
    74 76
             self.__state_monitoring_task = None
    
    77
    +        self.__build_monitoring_tasks = None
    
    75 78
             self.__logging_task = None
    
    76 79
     
    
    77 80
             # We always want a capabilities service
    
    ... ... @@ -95,6 +98,8 @@ class BuildGridServer:
    95 98
                     self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    96 99
                     serialisation_format=MonitoringOutputFormat.JSON)
    
    97 100
     
    
    101
    +            self.__build_monitoring_tasks = []
    
    102
    +
    
    98 103
             # Setup the main logging handler:
    
    99 104
             root_logger = logging.getLogger()
    
    100 105
     
    
    ... ... @@ -119,6 +124,18 @@ class BuildGridServer:
    119 124
                     self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    120 125
                     loop=self.__main_loop)
    
    121 126
     
    
    127
    +            self.__build_monitoring_tasks.clear()
    
    128
    +            for instance_name, scheduler in self._schedulers.items():
    
    129
    +                if not scheduler.is_instrumented:
    
    130
    +                    continue
    
    131
    +
    
    132
    +                message_queue = janus.Queue(loop=self.__main_loop)
    
    133
    +                scheduler.register_build_metadata_watcher(message_queue.sync_q)
    
    134
    +
    
    135
    +                self.__build_monitoring_tasks.append(asyncio.ensure_future(
    
    136
    +                    self._build_monitoring_worker(instance_name, message_queue),
    
    137
    +                    loop=self.__main_loop))
    
    138
    +
    
    122 139
             self.__logging_task = asyncio.ensure_future(
    
    123 140
                 self._logging_worker(), loop=self.__main_loop)
    
    124 141
     
    
    ... ... @@ -132,6 +149,10 @@ class BuildGridServer:
    132 149
                 if self.__state_monitoring_task is not None:
    
    133 150
                     self.__state_monitoring_task.cancel()
    
    134 151
     
    
    152
    +            for build_monitoring_task in self.__build_monitoring_tasks:
    
    153
    +                build_monitoring_task.cancel()
    
    154
    +            self.__build_monitoring_tasks.clear()
    
    155
    +
    
    135 156
                 self.__monitoring_bus.stop()
    
    136 157
     
    
    137 158
             if self.__logging_task is not None:
    
    ... ... @@ -352,6 +373,60 @@ class BuildGridServer:
    352 373
     
    
    353 374
             return log_record
    
    354 375
     
    
    376
    +    async def _build_monitoring_worker(self, instance_name, message_queue):
    
    377
    +        """Publishes builds metadata to the monitoring bus."""
    
    378
    +        async def __build_monitoring_worker():
    
    379
    +            metadata, context = await message_queue.async_q.get()
    
    380
    +
    
    381
    +            context.update({'instance-name': instance_name or ''})
    
    382
    +
    
    383
    +            # Emit build inputs fetching time record:
    
    384
    +            fetch_start = metadata.input_fetch_start_timestamp.ToDatetime()
    
    385
    +            fetch_completed = metadata.input_fetch_completed_timestamp.ToDatetime()
    
    386
    +            input_fetch_time = fetch_completed - fetch_start
    
    387
    +            timer_record = self._forge_timer_metric_record(
    
    388
    +                MetricRecordDomain.BUILD, 'inputs-fetching-time', input_fetch_time,
    
    389
    +                metadata=context)
    
    390
    +
    
    391
    +            await self.__monitoring_bus.send_record(timer_record)
    
    392
    +
    
    393
    +            # Emit build execution time record:
    
    394
    +            execution_start = metadata.execution_start_timestamp.ToDatetime()
    
    395
    +            execution_completed = metadata.execution_completed_timestamp.ToDatetime()
    
    396
    +            execution_time = execution_completed - execution_start
    
    397
    +            timer_record = self._forge_timer_metric_record(
    
    398
    +                MetricRecordDomain.BUILD, 'execution-time', execution_time,
    
    399
    +                metadata=context)
    
    400
    +
    
    401
    +            await self.__monitoring_bus.send_record(timer_record)
    
    402
    +
    
    403
    +            # Emit build outputs uploading time record:
    
    404
    +            upload_start = metadata.output_upload_start_timestamp.ToDatetime()
    
    405
    +            upload_completed = metadata.output_upload_completed_timestamp.ToDatetime()
    
    406
    +            output_upload_time = upload_completed - upload_start
    
    407
    +            timer_record = self._forge_timer_metric_record(
    
    408
    +                MetricRecordDomain.BUILD, 'outputs-uploading-time', output_upload_time,
    
    409
    +                metadata=context)
    
    410
    +
    
    411
    +            await self.__monitoring_bus.send_record(timer_record)
    
    412
    +
    
    413
    +            # Emit total build handling time record:
    
    414
    +            queued = metadata.queued_timestamp.ToDatetime()
    
    415
    +            worker_completed = metadata.worker_completed_timestamp.ToDatetime()
    
    416
    +            total_handling_time = worker_completed - queued
    
    417
    +            timer_record = self._forge_timer_metric_record(
    
    418
    +                MetricRecordDomain.BUILD, 'total-handling-time', total_handling_time,
    
    419
    +                metadata=context)
    
    420
    +
    
    421
    +            await self.__monitoring_bus.send_record(timer_record)
    
    422
    +
    
    423
    +        try:
    
    424
    +            while True:
    
    425
    +                await __build_monitoring_worker()
    
    426
    +
    
    427
    +        except asyncio.CancelledError:
    
    428
    +            pass
    
    429
    +
    
    355 430
         async def _state_monitoring_worker(self, period=1.0):
    
    356 431
             """Periodically publishes state metrics to the monitoring bus."""
    
    357 432
             async def __state_monitoring_worker():
    
    ... ... @@ -463,7 +538,7 @@ class BuildGridServer:
    463 538
             n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    464 539
             gauge_record = self._forge_gauge_metric_record(
    
    465 540
                 MetricRecordDomain.STATE, 'clients-count', n_clients,
    
    466
    -            metadata={'instance-name': instance_name or 'void'})
    
    541
    +            metadata={'instance-name': instance_name or ''})
    
    467 542
     
    
    468 543
             return n_clients, gauge_record
    
    469 544
     
    
    ... ... @@ -480,7 +555,7 @@ class BuildGridServer:
    480 555
             n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    481 556
             gauge_record = self._forge_gauge_metric_record(
    
    482 557
                 MetricRecordDomain.STATE, 'bots-count', n_bots,
    
    483
    -            metadata={'instance-name': instance_name or 'void'})
    
    558
    +            metadata={'instance-name': instance_name or ''})
    
    484 559
     
    
    485 560
             return n_bots, gauge_record
    
    486 561
     
    
    ... ... @@ -498,6 +573,6 @@ class BuildGridServer:
    498 573
             am_queue_time = self._schedulers[instance_name].query_am_queue_time()
    
    499 574
             timer_record = self._forge_timer_metric_record(
    
    500 575
                 MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
    
    501
    -            metadata={'instance-name': instance_name or 'void'})
    
    576
    +            metadata={'instance-name': instance_name or ''})
    
    502 577
     
    
    503 578
             return am_queue_time, timer_record

  • buildgrid/server/scheduler.py
    ... ... @@ -34,6 +34,8 @@ class Scheduler:
    34 34
         def __init__(self, action_cache=None, monitor=False):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    +        self.__build_metadata_queues = None
    
    38
    +
    
    37 39
             self.__operations_by_stage = None
    
    38 40
             self.__leases_by_state = None
    
    39 41
             self.__queue_time_average = None
    
    ... ... @@ -46,6 +48,8 @@ class Scheduler:
    46 48
             self._is_instrumented = monitor
    
    47 49
     
    
    48 50
             if self._is_instrumented:
    
    51
    +            self.__build_metadata_queues = []
    
    52
    +
    
    49 53
                 self.__operations_by_stage = {}
    
    50 54
                 self.__leases_by_state = {}
    
    51 55
                 self.__queue_time_average = 0, timedelta()
    
    ... ... @@ -228,6 +232,10 @@ class Scheduler:
    228 232
         def is_instrumented(self):
    
    229 233
             return self._is_instrumented
    
    230 234
     
    
    235
    +    def register_build_metadata_watcher(self, message_queue):
    
    236
    +        if self.__build_metadata_queues is not None:
    
    237
    +            self.__build_metadata_queues.append(message_queue)
    
    238
    +
    
    231 239
         def query_n_jobs(self):
    
    232 240
             return len(self.jobs)
    
    233 241
     
    
    ... ... @@ -319,3 +327,12 @@ class Scheduler:
    319 327
                         average_time = average_time + ((queue_time - average_time) / average_order)
    
    320 328
     
    
    321 329
                     self.__queue_time_average = average_order, average_time
    
    330
    +
    
    331
    +                if not job.holds_cached_action_result:
    
    332
    +                    execution_metadata = job.action_result.execution_metadata
    
    333
    +                    context_metadata = {'job-is': job.name}
    
    334
    +
    
    335
    +                    message = (execution_metadata, context_metadata,)
    
    336
    +
    
    337
    +                    for message_queue in self.__build_metadata_queues:
    
    338
    +                        message_queue.put(message)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]