Martin Blanchard pushed to branch mablanch/23-new-logging at BuildGrid / buildgrid
Commits:
- 
329787fb
by Martin Blanchard at 2018-11-26T16:22:26Z
- 
eba2aedb
by Martin Blanchard at 2018-11-26T16:22:26Z
- 
6cd1d1ac
by Martin Blanchard at 2018-11-26T16:22:26Z
- 
d3888608
by Martin Blanchard at 2018-11-26T16:22:26Z
5 changed files:
- .gitlab-ci.yml
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/server/instance.py
Changes:
| ... | ... | @@ -2,7 +2,7 @@ | 
| 2 | 2 |  image: python:3.5-stretch
 | 
| 3 | 3 |  | 
| 4 | 4 |  variables:
 | 
| 5 | -  BGD: bgd --verbose
 | |
| 5 | +  BGD: bgd
 | |
| 6 | 6 |  | 
| 7 | 7 |  stages:
 | 
| 8 | 8 |    - test
 | 
| ... | ... | @@ -23,10 +23,12 @@ will be attempted to be imported. | 
| 23 | 23 |  | 
| 24 | 24 |  import logging
 | 
| 25 | 25 |  import os
 | 
| 26 | +import sys
 | |
| 26 | 27 |  | 
| 27 | 28 |  import click
 | 
| 28 | 29 |  import grpc
 | 
| 29 | 30 |  | 
| 31 | +from buildgrid.settings import LOG_RECORD_FORMAT
 | |
| 30 | 32 |  from buildgrid.utils import read_file
 | 
| 31 | 33 |  | 
| 32 | 34 |  CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
 | 
| ... | ... | @@ -138,28 +140,70 @@ class BuildGridCLI(click.MultiCommand): | 
| 138 | 140 |          return mod.cli
 | 
| 139 | 141 |  | 
| 140 | 142 |  | 
| 143 | +class DebugFilter(logging.Filter):
 | |
| 144 | + | |
| 145 | +    def __init__(self, debug_domains, name=''):
 | |
| 146 | +        super().__init__(name=name)
 | |
| 147 | +        self.__domains_tree = {}
 | |
| 148 | + | |
| 149 | +        for domain in debug_domains.split(','):
 | |
| 150 | +            domains_tree = self.__domains_tree
 | |
| 151 | +            for label in domain.split('.'):
 | |
| 152 | +                if label not in domains_tree:
 | |
| 153 | +                    domains_tree[label] = {}
 | |
| 154 | +                domains_tree = domains_tree[label]
 | |
| 155 | + | |
| 156 | +    def filter(self, record):
 | |
| 157 | +        domains_tree = self.__domains_tree
 | |
| 158 | +        for label in record.name.split('.'):
 | |
| 159 | +            if '*' in domains_tree:
 | |
| 160 | +                return True
 | |
| 161 | +            if label not in domains_tree:
 | |
| 162 | +                return False
 | |
| 163 | +            domains_tree = domains_tree[label]
 | |
| 164 | +        return True
 | |
| 165 | + | |
| 166 | + | |
| 167 | +def setup_logging(verbosity=0, debug_mode=False):
 | |
| 168 | +    """Deals with loggers verbosity"""
 | |
| 169 | +    asyncio_logger = logging.getLogger('asyncio')
 | |
| 170 | +    root_logger = logging.getLogger()
 | |
| 171 | + | |
| 172 | +    log_handler = logging.StreamHandler(stream=sys.stdout)
 | |
| 173 | +    for log_filter in root_logger.filters:
 | |
| 174 | +        log_handler.addFilter(log_filter)
 | |
| 175 | + | |
| 176 | +    logging.basicConfig(format=LOG_RECORD_FORMAT, handlers=[log_handler])
 | |
| 177 | + | |
| 178 | +    if verbosity == 1:
 | |
| 179 | +        root_logger.setLevel(logging.WARNING)
 | |
| 180 | +    elif verbosity == 2:
 | |
| 181 | +        root_logger.setLevel(logging.INFO)
 | |
| 182 | +    elif verbosity >= 3:
 | |
| 183 | +        root_logger.setLevel(logging.DEBUG)
 | |
| 184 | +    else:
 | |
| 185 | +        root_logger.setLevel(logging.ERROR)
 | |
| 186 | + | |
| 187 | +    if not debug_mode:
 | |
| 188 | +        asyncio_logger.setLevel(logging.CRITICAL)
 | |
| 189 | +    else:
 | |
| 190 | +        asyncio_logger.setLevel(logging.DEBUG)
 | |
| 191 | +        root_logger.setLevel(logging.DEBUG)
 | |
| 192 | + | |
| 193 | + | |
| 141 | 194 |  @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
 | 
| 142 | -@click.option('-v', '--verbose', count=True,
 | |
| 143 | -              help='Increase log verbosity level.')
 | |
| 144 | 195 |  @pass_context
 | 
| 145 | -def cli(context, verbose):
 | |
| 196 | +def cli(context):
 | |
| 146 | 197 |      """BuildGrid App"""
 | 
| 147 | -    logger = logging.getLogger()
 | |
| 198 | +    root_logger = logging.getLogger()
 | |
| 148 | 199 |  | 
| 149 | 200 |      # Clean-up root logger for any pre-configuration:
 | 
| 150 | -    for log_handler in logger.handlers[:]:
 | |
| 151 | -        logger.removeHandler(log_handler)
 | |
| 152 | -    for log_filter in logger.filters[:]:
 | |
| 153 | -        logger.removeFilter(log_filter)
 | |
| 154 | - | |
| 155 | -    logging.basicConfig(
 | |
| 156 | -        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
 | |
| 157 | - | |
| 158 | -    if verbose == 1:
 | |
| 159 | -        logger.setLevel(logging.WARNING)
 | |
| 160 | -    elif verbose == 2:
 | |
| 161 | -        logger.setLevel(logging.INFO)
 | |
| 162 | -    elif verbose >= 3:
 | |
| 163 | -        logger.setLevel(logging.DEBUG)
 | |
| 164 | -    else:
 | |
| 165 | -        logger.setLevel(logging.ERROR) | |
| 201 | +    for log_handler in root_logger.handlers[:]:
 | |
| 202 | +        root_logger.removeHandler(log_handler)
 | |
| 203 | +    for log_filter in root_logger.filters[:]:
 | |
| 204 | +        root_logger.removeFilter(log_filter)
 | |
| 205 | + | |
| 206 | +    # Filter debug messages using BGD_MESSAGE_DEBUG value:
 | |
| 207 | +    debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
 | |
| 208 | +    if debug_domains:
 | |
| 209 | +        root_logger.addFilter(DebugFilter(debug_domains)) | 
| ... | ... | @@ -34,7 +34,7 @@ from buildgrid.bot.hardware.worker import Worker | 
| 34 | 34 |  | 
| 35 | 35 |  | 
| 36 | 36 |  from ..bots import buildbox, dummy, host
 | 
| 37 | -from ..cli import pass_context
 | |
| 37 | +from ..cli import pass_context, setup_logging
 | |
| 38 | 38 |  | 
| 39 | 39 |  | 
| 40 | 40 |  @click.group(name='bot', short_help="Create and register bot clients.")
 | 
| ... | ... | @@ -58,9 +58,12 @@ from ..cli import pass_context | 
| 58 | 58 |                help="Time period for bot updates to the server in seconds.")
 | 
| 59 | 59 |  @click.option('--parent', type=click.STRING, default='main', show_default=True,
 | 
| 60 | 60 |                help="Targeted farm resource.")
 | 
| 61 | +@click.option('-v', '--verbose', count=True,
 | |
| 62 | +              help='Increase log verbosity level.')
 | |
| 61 | 63 |  @pass_context
 | 
| 62 | 64 |  def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
 | 
| 63 | -        remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
 | |
| 65 | +        remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
 | |
| 66 | +    setup_logging(verbosity=verbose)
 | |
| 64 | 67 |      # Setup the remote execution server channel:
 | 
| 65 | 68 |      url = urlparse(remote)
 | 
| 66 | 69 |  | 
| ... | ... | @@ -122,9 +125,8 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ | 
| 122 | 125 |          context.cas_client_cert = context.client_cert
 | 
| 123 | 126 |          context.cas_server_cert = context.server_cert
 | 
| 124 | 127 |  | 
| 125 | -    click.echo("Starting for remote=[{}]".format(context.remote))
 | |
| 126 | - | |
| 127 | 128 |      bot_interface = interface.BotInterface(context.channel)
 | 
| 129 | + | |
| 128 | 130 |      worker = Worker()
 | 
| 129 | 131 |      worker.add_device(Device())
 | 
| 130 | 132 |      hardware_interface = HardwareInterface(worker)
 | 
| ... | ... | @@ -26,7 +26,7 @@ import click | 
| 26 | 26 |  | 
| 27 | 27 |  from buildgrid.server.instance import BuildGridServer
 | 
| 28 | 28 |  | 
| 29 | -from ..cli import pass_context
 | |
| 29 | +from ..cli import pass_context, setup_logging
 | |
| 30 | 30 |  from ..settings import parser
 | 
| 31 | 31 |  | 
| 32 | 32 |  | 
| ... | ... | @@ -37,9 +37,14 @@ def cli(context): | 
| 37 | 37 |  | 
| 38 | 38 |  | 
| 39 | 39 |  @cli.command('start', short_help="Setup a new server instance.")
 | 
| 40 | -@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
 | |
| 40 | +@click.argument('CONFIG',
 | |
| 41 | +                type=click.Path(file_okay=True, dir_okay=False, writable=False))
 | |
| 42 | +@click.option('-v', '--verbose', count=True,
 | |
| 43 | +              help='Increase log verbosity level.')
 | |
| 41 | 44 |  @pass_context
 | 
| 42 | -def start(context, config):
 | |
| 45 | +def start(context, config, verbose):
 | |
| 46 | +    setup_logging(verbosity=verbose)
 | |
| 47 | + | |
| 43 | 48 |      with open(config) as f:
 | 
| 44 | 49 |          settings = parser.get_parser().safe_load(f)
 | 
| 45 | 50 |  | 
| ... | ... | @@ -15,15 +15,18 @@ | 
| 15 | 15 |  | 
| 16 | 16 |  import asyncio
 | 
| 17 | 17 |  from concurrent import futures
 | 
| 18 | -from datetime import timedelta
 | |
| 18 | +from datetime import datetime, timedelta
 | |
| 19 | 19 |  import logging
 | 
| 20 | +import logging.handlers
 | |
| 20 | 21 |  import os
 | 
| 21 | 22 |  import signal
 | 
| 23 | +import sys
 | |
| 22 | 24 |  import time
 | 
| 23 | 25 |  | 
| 24 | 26 |  import grpc
 | 
| 27 | +import janus
 | |
| 25 | 28 |  | 
| 26 | -from buildgrid._enums import BotStatus, MetricRecordDomain, MetricRecordType
 | |
| 29 | +from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
 | |
| 27 | 30 |  from buildgrid._protos.buildgrid.v2 import monitoring_pb2
 | 
| 28 | 31 |  from buildgrid.server.actioncache.service import ActionCacheService
 | 
| 29 | 32 |  from buildgrid.server.bots.service import BotsService
 | 
| ... | ... | @@ -32,7 +35,7 @@ from buildgrid.server.execution.service import ExecutionService | 
| 32 | 35 |  from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
 | 
| 33 | 36 |  from buildgrid.server.operations.service import OperationsService
 | 
| 34 | 37 |  from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 35 | -from buildgrid.settings import MONITORING_PERIOD
 | |
| 38 | +from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
 | |
| 36 | 39 |  | 
| 37 | 40 |  | 
| 38 | 41 |  class BuildGridServer:
 | 
| ... | ... | @@ -58,9 +61,16 @@ class BuildGridServer: | 
| 58 | 61 |          self.__grpc_server = grpc.server(self.__grpc_executor)
 | 
| 59 | 62 |  | 
| 60 | 63 |          self.__main_loop = asyncio.get_event_loop()
 | 
| 64 | + | |
| 61 | 65 |          self.__monitoring_bus = None
 | 
| 62 | 66 |  | 
| 67 | +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
 | |
| 68 | +        self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
 | |
| 69 | +        self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
 | |
| 70 | +        self.__print_log_records = True
 | |
| 71 | + | |
| 63 | 72 |          self.__state_monitoring_task = None
 | 
| 73 | +        self.__logging_task = None
 | |
| 64 | 74 |  | 
| 65 | 75 |          self._execution_service = None
 | 
| 66 | 76 |          self._bots_service = None
 | 
| ... | ... | @@ -80,6 +90,17 @@ class BuildGridServer: | 
| 80 | 90 |                  self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
 | 
| 81 | 91 |                  serialisation_format=MonitoringOutputFormat.JSON)
 | 
| 82 | 92 |  | 
| 93 | +        # Setup the main logging handler:
 | |
| 94 | +        root_logger = logging.getLogger()
 | |
| 95 | + | |
| 96 | +        for log_filter in root_logger.filters[:]:
 | |
| 97 | +            self.__logging_handler.addFilter(log_filter)
 | |
| 98 | +            root_logger.removeFilter(log_filter)
 | |
| 99 | + | |
| 100 | +        for log_handler in root_logger.handlers[:]:
 | |
| 101 | +            root_logger.removeHandler(log_handler)
 | |
| 102 | +        root_logger.addHandler(self.__logging_handler)
 | |
| 103 | + | |
| 83 | 104 |      # --- Public API ---
 | 
| 84 | 105 |  | 
| 85 | 106 |      def start(self):
 | 
| ... | ... | @@ -93,6 +114,9 @@ class BuildGridServer: | 
| 93 | 114 |                  self._state_monitoring_worker(period=MONITORING_PERIOD),
 | 
| 94 | 115 |                  loop=self.__main_loop)
 | 
| 95 | 116 |  | 
| 117 | +        self.__logging_task = asyncio.ensure_future(
 | |
| 118 | +            self._logging_worker(), loop=self.__main_loop)
 | |
| 119 | + | |
| 96 | 120 |          self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
 | 
| 97 | 121 |  | 
| 98 | 122 |          self.__main_loop.run_forever()
 | 
| ... | ... | @@ -105,6 +129,9 @@ class BuildGridServer: | 
| 105 | 129 |  | 
| 106 | 130 |              self.__monitoring_bus.stop()
 | 
| 107 | 131 |  | 
| 132 | +        if self.__logging_task is not None:
 | |
| 133 | +            self.__logging_task.cancel()
 | |
| 134 | + | |
| 108 | 135 |          self.__main_loop.stop()
 | 
| 109 | 136 |  | 
| 110 | 137 |          self.__grpc_server.stop(None)
 | 
| ... | ... | @@ -245,6 +272,53 @@ class BuildGridServer: | 
| 245 | 272 |  | 
| 246 | 273 |      # --- Private API ---
 | 
| 247 | 274 |  | 
| 275 | +    async def _logging_worker(self):
 | |
| 276 | +        """Publishes log records to the monitoring bus."""
 | |
| 277 | +        async def __logging_worker():
 | |
| 278 | +            log_record = await self.__logging_queue.async_q.get()
 | |
| 279 | + | |
| 280 | +            # Print log records to stdout, if required:
 | |
| 281 | +            if self.__print_log_records:
 | |
| 282 | +                record = self.__logging_formatter.format(log_record)
 | |
| 283 | + | |
| 284 | +                # TODO: Investigate if async write would be worth here.
 | |
| 285 | +                sys.stdout.write('{}\n'.format(record))
 | |
| 286 | +                sys.stdout.flush()
 | |
| 287 | + | |
| 288 | +            # Emit a log record if server is instrumented:
 | |
| 289 | +            if self._is_instrumented:
 | |
| 290 | +                log_record_level = LogRecordLevel(int(log_record.levelno / 10))
 | |
| 291 | +                log_record_creation_time = datetime.fromtimestamp(log_record.created)
 | |
| 292 | +                # logging.LogRecord.extra must be a str to str dict:
 | |
| 293 | +                if 'extra' in log_record.__dict__ and log_record.extra:
 | |
| 294 | +                    log_record_metadata = log_record.extra
 | |
| 295 | +                else:
 | |
| 296 | +                    log_record_metadata = None
 | |
| 297 | +                record = self._forge_log_record(
 | |
| 298 | +                    log_record.name, log_record_level, log_record.message,
 | |
| 299 | +                    log_record_creation_time, metadata=log_record_metadata)
 | |
| 300 | + | |
| 301 | +                await self.__monitoring_bus.send_record(record)
 | |
| 302 | + | |
| 303 | +        try:
 | |
| 304 | +            while True:
 | |
| 305 | +                await __logging_worker()
 | |
| 306 | + | |
| 307 | +        except asyncio.CancelledError:
 | |
| 308 | +            pass
 | |
| 309 | + | |
| 310 | +    def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
 | |
| 311 | +        log_record = monitoring_pb2.LogRecord()
 | |
| 312 | + | |
| 313 | +        log_record.creation_timestamp.FromDatetime(creation_time)
 | |
| 314 | +        log_record.domain = domain
 | |
| 315 | +        log_record.level = level.value
 | |
| 316 | +        log_record.message = message
 | |
| 317 | +        if metadata is not None:
 | |
| 318 | +            log_record.metadata.update(metadata)
 | |
| 319 | + | |
| 320 | +        return log_record
 | |
| 321 | + | |
| 248 | 322 |      async def _state_monitoring_worker(self, period=1.0):
 | 
| 249 | 323 |          """Periodically publishes state metrics to the monitoring bus."""
 | 
| 250 | 324 |          async def __state_monitoring_worker():
 | 
