| ... | ... | @@ -15,15 +15,18 @@ | 
| 15 | 15 |  
 | 
| 16 | 16 |  import asyncio
 | 
| 17 | 17 |  from concurrent import futures
 | 
| 18 |  | -from datetime import timedelta
 | 
|  | 18 | +from datetime import datetime, timedelta
 | 
| 19 | 19 |  import logging
 | 
|  | 20 | +import logging.handlers
 | 
| 20 | 21 |  import os
 | 
| 21 | 22 |  import signal
 | 
|  | 23 | +import sys
 | 
| 22 | 24 |  import time
 | 
| 23 | 25 |  
 | 
| 24 | 26 |  import grpc
 | 
|  | 27 | +import janus
 | 
| 25 | 28 |  
 | 
| 26 |  | -from buildgrid._enums import MetricRecordDomain, MetricRecordType
 | 
|  | 29 | +from buildgrid._enums import LogRecordLevel, MetricRecordDomain, MetricRecordType
 | 
| 27 | 30 |  from buildgrid._protos.buildgrid.v2 import monitoring_pb2
 | 
| 28 | 31 |  from buildgrid.server.actioncache.service import ActionCacheService
 | 
| 29 | 32 |  from buildgrid.server.bots.service import BotsService
 | 
| ... | ... | @@ -32,7 +35,7 @@ from buildgrid.server.execution.service import ExecutionService | 
| 32 | 35 |  from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
 | 
| 33 | 36 |  from buildgrid.server.operations.service import OperationsService
 | 
| 34 | 37 |  from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 35 |  | -from buildgrid.settings import MONITORING_PERIOD
 | 
|  | 38 | +from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
 | 
| 36 | 39 |  
 | 
| 37 | 40 |  
 | 
| 38 | 41 |  class BuildGridServer:
 | 
| ... | ... | @@ -58,9 +61,16 @@ class BuildGridServer: | 
| 58 | 61 |          self.__grpc_server = grpc.server(self.__grpc_executor)
 | 
| 59 | 62 |  
 | 
| 60 | 63 |          self.__main_loop = asyncio.get_event_loop()
 | 
|  | 64 | +
 | 
| 61 | 65 |          self.__monitoring_bus = None
 | 
| 62 | 66 |  
 | 
|  | 67 | +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
 | 
|  | 68 | +        self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
 | 
|  | 69 | +        self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
 | 
|  | 70 | +        self.__print_log_records = True
 | 
|  | 71 | +
 | 
| 63 | 72 |          self.__state_monitoring_task = None
 | 
|  | 73 | +        self.__logging_task = None
 | 
| 64 | 74 |  
 | 
| 65 | 75 |          self._execution_service = None
 | 
| 66 | 76 |          self._bots_service = None
 | 
| ... | ... | @@ -80,6 +90,17 @@ class BuildGridServer: | 
| 80 | 90 |                  self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
 | 
| 81 | 91 |                  serialisation_format=MonitoringOutputFormat.JSON)
 | 
| 82 | 92 |  
 | 
|  | 93 | +        # Setup the main logging handler:
 | 
|  | 94 | +        root_logger = logging.getLogger()
 | 
|  | 95 | +
 | 
|  | 96 | +        for log_filter in root_logger.filters[:]:
 | 
|  | 97 | +            self.__logging_handler.addFilter(log_filter)
 | 
|  | 98 | +            root_logger.removeFilter(log_filter)
 | 
|  | 99 | +
 | 
|  | 100 | +        for log_handler in root_logger.handlers[:]:
 | 
|  | 101 | +            root_logger.removeHandler(log_handler)
 | 
|  | 102 | +        root_logger.addHandler(self.__logging_handler)
 | 
|  | 103 | +
 | 
| 83 | 104 |      # --- Public API ---
 | 
| 84 | 105 |  
 | 
| 85 | 106 |      def start(self):
 | 
| ... | ... | @@ -93,6 +114,9 @@ class BuildGridServer: | 
| 93 | 114 |                  self._state_monitoring_worker(period=MONITORING_PERIOD),
 | 
| 94 | 115 |                  loop=self.__main_loop)
 | 
| 95 | 116 |  
 | 
|  | 117 | +        self.__logging_task = asyncio.ensure_future(
 | 
|  | 118 | +            self._logging_worker(), loop=self.__main_loop)
 | 
|  | 119 | +
 | 
| 96 | 120 |          self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
 | 
| 97 | 121 |  
 | 
| 98 | 122 |          self.__main_loop.run_forever()
 | 
| ... | ... | @@ -105,6 +129,9 @@ class BuildGridServer: | 
| 105 | 129 |  
 | 
| 106 | 130 |              self.__monitoring_bus.stop()
 | 
| 107 | 131 |  
 | 
|  | 132 | +        if self.__logging_task is not None:
 | 
|  | 133 | +            self.__logging_task.cancel()
 | 
|  | 134 | +
 | 
| 108 | 135 |          self.__main_loop.stop()
 | 
| 109 | 136 |  
 | 
| 110 | 137 |          self.__grpc_server.stop(None)
 | 
| ... | ... | @@ -245,6 +272,53 @@ class BuildGridServer: | 
| 245 | 272 |  
 | 
| 246 | 273 |      # --- Private API ---
 | 
| 247 | 274 |  
 | 
|  | 275 | +    async def _logging_worker(self):
 | 
|  | 276 | +        """Publishes log records to the monitoring bus."""
 | 
|  | 277 | +        async def __logging_worker():
 | 
|  | 278 | +            log_record = await self.__logging_queue.async_q.get()
 | 
|  | 279 | +
 | 
|  | 280 | +            # Print log records to stdout, if required:
 | 
|  | 281 | +            if self.__print_log_records:
 | 
|  | 282 | +                record = self.__logging_formatter.format(log_record)
 | 
|  | 283 | +
 | 
|  | 284 | +                # TODO: Investigate if async write would be worth here.
 | 
|  | 285 | +                sys.stdout.write('{}\n'.format(record))
 | 
|  | 286 | +                sys.stdout.flush()
 | 
|  | 287 | +
 | 
|  | 288 | +            # Emit a log record if server is instrumented:
 | 
|  | 289 | +            if self._is_instrumented:
 | 
|  | 290 | +                log_record_level = LogRecordLevel(int(log_record.levelno / 10))
 | 
|  | 291 | +                log_record_creation_time = datetime.fromtimestamp(log_record.created)
 | 
|  | 292 | +                # logging.LogRecord.extra must be a str to str dict:
 | 
|  | 293 | +                if 'extra' in log_record.__dict__ and log_record.extra:
 | 
|  | 294 | +                    log_record_metadata = log_record.extra
 | 
|  | 295 | +                else:
 | 
|  | 296 | +                    log_record_metadata = None
 | 
|  | 297 | +                record = self._forge_log_record(
 | 
|  | 298 | +                    log_record.name, log_record_level, log_record.message,
 | 
|  | 299 | +                    log_record_creation_time, metadata=log_record_metadata)
 | 
|  | 300 | +
 | 
|  | 301 | +                await self.__monitoring_bus.send_record(record)
 | 
|  | 302 | +
 | 
|  | 303 | +        try:
 | 
|  | 304 | +            while True:
 | 
|  | 305 | +                await __logging_worker()
 | 
|  | 306 | +
 | 
|  | 307 | +        except asyncio.CancelledError:
 | 
|  | 308 | +            pass
 | 
|  | 309 | +
 | 
|  | 310 | +    def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
 | 
|  | 311 | +        log_record = monitoring_pb2.LogRecord()
 | 
|  | 312 | +
 | 
|  | 313 | +        log_record.creation_timestamp.FromDatetime(creation_time)
 | 
|  | 314 | +        log_record.domain = domain
 | 
|  | 315 | +        log_record.level = level.value
 | 
|  | 316 | +        log_record.message = message
 | 
|  | 317 | +        if metadata is not None:
 | 
|  | 318 | +            log_record.metadata.update(metadata)
 | 
|  | 319 | +
 | 
|  | 320 | +        return log_record
 | 
|  | 321 | +
 | 
| 248 | 322 |      async def _state_monitoring_worker(self, period=1.0):
 | 
| 249 | 323 |          """Periodically publishes state metrics to the monitoring bus."""
 | 
| 250 | 324 |          async def __state_monitoring_worker():
 |