[Notes] [Git][BuildGrid/buildgrid][finn/async] Added messaging queues for operation updates



Title: GitLab

finnball pushed to branch finn/async at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -34,12 +34,12 @@ class ExecutionInstance():
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
             self._scheduler = scheduler
    
    36 36
     
    
    37
    -    def execute(self, action_digest, skip_cache_lookup):
    
    37
    +    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38 38
             """ Sends a job for execution.
    
    39 39
             Queues an action and creates an Operation instance to be associated with
    
    40 40
             this action.
    
    41 41
             """
    
    42
    -        job = Job(action_digest)
    
    42
    +        job = Job(action_digest, message_queue)
    
    43 43
             self.logger.info("Operation name: {}".format(job.name))
    
    44 44
     
    
    45 45
             if not skip_cache_lookup:
    
    ... ... @@ -70,3 +70,15 @@ class ExecutionInstance():
    70 70
         def cancel_operation(self, name):
    
    71 71
             # TODO: Cancel leases
    
    72 72
             raise NotImplementedError("Cancelled operations not supported")
    
    73
    +
    
    74
    +    def register_message_client(self, name, queue):
    
    75
    +        try:
    
    76
    +            self._scheduler.register_client(name, queue)
    
    77
    +        except KeyError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    79
    +
    
    80
    +    def unregister_message_client(self, name, queue):
    
    81
    +        try:
    
    82
    +            self._scheduler.unregister_client(name, queue)
    
    83
    +        except KeyError:
    
    84
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -22,10 +22,9 @@ ExecutionService
    22 22
     Serves remote execution requests.
    
    23 23
     """
    
    24 24
     
    
    25
    -import copy
    
    26 25
     import grpc
    
    27 26
     import logging
    
    28
    -import time
    
    27
    +import queue
    
    29 28
     
    
    30 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31 30
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    ... ... @@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError
    35 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    36 35
     
    
    37 36
         def __init__(self, instance):
    
    38
    -        self._instance = instance
    
    39 37
             self.logger = logging.getLogger(__name__)
    
    38
    +        self._instance = instance
    
    40 39
     
    
    41 40
         def Execute(self, request, context):
    
    42 41
             # Ignore request.instance_name for now
    
    43 42
             # Have only one instance
    
    44 43
             try:
    
    44
    +            message_queue = queue.Queue()
    
    45 45
                 operation = self._instance.execute(request.action_digest,
    
    46
    -                                               request.skip_cache_lookup)
    
    46
    +                                               request.skip_cache_lookup,
    
    47
    +                                               message_queue)
    
    47 48
     
    
    48
    -            yield from self._stream_operation_updates(operation.name)
    
    49
    +            remove_client = lambda : self._remove_client(operation.name, message_queue)
    
    50
    +            context.add_callback(remove_client)
    
    51
    +
    
    52
    +            yield from self._stream_operation_updates(message_queue,
    
    53
    +                                                      operation.name)
    
    49 54
     
    
    50 55
             except InvalidArgumentError as e:
    
    51 56
                 self.logger.error(e)
    
    ... ... @@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    59 64
     
    
    60 65
         def WaitExecution(self, request, context):
    
    61 66
             try:
    
    62
    -            yield from self._stream_operation_updates(request.name)
    
    67
    +            message_queue = queue.Queue()
    
    68
    +            operation_name = request.name
    
    69
    +
    
    70
    +            self._instance.register_message_client(operation_name, message_queue)
    
    71
    +
    
    72
    +            remove_client = lambda : self._remove_client(operation_name, message_queue)
    
    73
    +            context.add_callback(remove_client)
    
    74
    +
    
    75
    +            yield from self._stream_operation_updates(message_queue,
    
    76
    +                                                      operation_name)
    
    63 77
     
    
    64 78
             except InvalidArgumentError as e:
    
    65 79
                 self.logger.error(e)
    
    66 80
                 context.set_details(str(e))
    
    67 81
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68 82
     
    
    69
    -    def _stream_operation_updates(self, name):
    
    70
    -        stream_previous = None
    
    71
    -        while True:
    
    72
    -            stream = self._instance.get_operation(name)
    
    73
    -            if stream != stream_previous:
    
    74
    -                yield stream
    
    75
    -                if stream.done == True: break
    
    76
    -                stream_previous = copy.deepcopy(stream)
    
    77
    -            time.sleep(1)
    83
    +    def _remove_client(self, operation_name, message_queue):
    
    84
    +        self._instance.unregister_message_client(operation_name, message_queue)
    
    85
    +
    
    86
    +    def _stream_operation_updates(self, message_queue, operation_name):
    
    87
    +        operation = message_queue.get()
    
    88
    +        while not operation.done:
    
    89
    +            yield operation
    
    90
    +            operation = message_queue.get()
    
    91
    +        yield operation

  • buildgrid/server/job.py
    ... ... @@ -51,9 +51,8 @@ class LeaseState(Enum):
    51 51
     
    
    52 52
     class Job():
    
    53 53
     
    
    54
    -    def __init__(self, action):
    
    55
    -        self.action = action
    
    56
    -        self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
    
    54
    +    def __init__(self, action_digest, message_queue=None):
    
    55
    +        self.action_digest = action_digest
    
    57 56
             self.execute_stage = ExecuteStage.UNKNOWN
    
    58 57
             self.lease = None
    
    59 58
             self.logger = logging.getLogger(__name__)
    
    ... ... @@ -62,10 +61,24 @@ class Job():
    62 61
     
    
    63 62
             self._n_tries = 0
    
    64 63
             self._operation = operations_pb2.Operation(name = self.name)
    
    64
    +        self._operation_update_queues = []
    
    65
    +
    
    66
    +        if message_queue is not None:
    
    67
    +            self.register_client(message_queue)
    
    68
    +
    
    69
    +    def check_job_finished(self):
    
    70
    +        if not self._operation_update_queues:
    
    71
    +            return self._operation.done
    
    72
    +        return False
    
    73
    +
    
    74
    +    def register_client(self, queue):
    
    75
    +        self._operation_update_queues.append(queue)
    
    76
    +
    
    77
    +    def unregister_client(self, queue):
    
    78
    +        self._operation_update_queues.remove(queue)
    
    65 79
     
    
    66 80
         def get_operation(self):
    
    67 81
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    68
    -
    
    69 82
             if self.result is not None:
    
    70 83
                 self._operation.done = True
    
    71 84
                 response = ExecuteResponse()
    
    ... ... @@ -81,10 +94,10 @@ class Job():
    81 94
             return meta
    
    82 95
     
    
    83 96
         def create_lease(self):
    
    84
    -        action = self._pack_any(self.action)
    
    97
    +        action_digest = self._pack_any(self.action_digest)
    
    85 98
     
    
    86 99
             lease = bots_pb2.Lease(id = self.name,
    
    87
    -                               payload = action,
    
    100
    +                               payload = action_digest,
    
    88 101
                                    state = LeaseState.PENDING.value)
    
    89 102
             self.lease = lease
    
    90 103
             return lease
    
    ... ... @@ -92,6 +105,11 @@ class Job():
    92 105
         def get_operations(self):
    
    93 106
             return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
    
    94 107
     
    
    108
    +    def update_execute_stage(self, stage):
    
    109
    +        self.execute_stage = stage
    
    110
    +        for queue in self._operation_update_queues:
    
    111
    +            queue.put(self.get_operation())
    
    112
    +
    
    95 113
         def _pack_any(self, pack):
    
    96 114
             any = any_pb2.Any()
    
    97 115
             any.Pack(pack)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -35,8 +35,17 @@ class Scheduler():
    35 35
             self.jobs = {}
    
    36 36
             self.queue = deque()
    
    37 37
     
    
    38
    +    def register_client(self, name, queue):
    
    39
    +        self.jobs[name].register_client(queue)
    
    40
    +
    
    41
    +    def unregister_client(self, name, queue):
    
    42
    +        job = self.jobs[name]
    
    43
    +        job.unregister_client(queue)
    
    44
    +        if job.check_job_finished():
    
    45
    +            del self.jobs[name]
    
    46
    +
    
    38 47
         def append_job(self, job):
    
    39
    -        job.execute_stage = ExecuteStage.QUEUED
    
    48
    +        job.update_execute_stage(ExecuteStage.QUEUED)
    
    40 49
             self.jobs[job.name] = job
    
    41 50
             self.queue.append(job)
    
    42 51
     
    
    ... ... @@ -45,9 +54,9 @@ class Scheduler():
    45 54
     
    
    46 55
             if job.n_tries >= self.MAX_N_TRIES:
    
    47 56
                 # TODO: Decide what to do with these jobs
    
    48
    -            job.execute_stage = ExecuteStage.COMPLETED
    
    57
    +            job.update_execute_stage(ExecuteStage.COMPLETED)
    
    49 58
             else:
    
    50
    -            job.execute_stage = ExecuteStage.QUEUED
    
    59
    +            job.update_execute_stage(ExecuteStage.QUEUED)
    
    51 60
                 job.n_tries += 1
    
    52 61
                 self.queue.appendleft(job)
    
    53 62
     
    
    ... ... @@ -56,15 +65,14 @@ class Scheduler():
    56 65
         def create_job(self):
    
    57 66
             if len(self.queue) > 0:
    
    58 67
                 job = self.queue.popleft()
    
    59
    -            job.execute_stage = ExecuteStage.EXECUTING
    
    68
    +            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    60 69
                 self.jobs[job.name] = job
    
    61 70
                 return job
    
    62
    -        return None
    
    63 71
     
    
    64 72
         def job_complete(self, name, result):
    
    65 73
             job = self.jobs[name]
    
    66
    -        job.execute_stage = ExecuteStage.COMPLETED
    
    67 74
             job.result = result
    
    75
    +        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    68 76
             self.jobs[name] = job
    
    69 77
     
    
    70 78
         def get_operations(self):
    
    ... ... @@ -122,3 +130,7 @@ class Scheduler():
    122 130
             if state == LeaseState.PENDING.value or \
    
    123 131
                state == LeaseState.ACTIVE.value:
    
    124 132
                 self.retry_job(name)
    
    133
    +
    
    134
    +    def _update_execute_stage(self, job, stage):
    
    135
    +        job.update_execute_stage(stage)
    
    136
    +        return job

  • tests/integration/execution_service.py
    ... ... @@ -69,17 +69,22 @@ def test_execute(skip_cache_lookup, instance, context):
    69 69
             assert result.done is False
    
    70 70
     
    
    71 71
     def test_wait_execution(instance, context):
    
    72
    +    # TODO: Figure out why next(response) hangs on the .get()
    
    73
    +    # method when running in pytest.
    
    72 74
         action_digest = remote_execution_pb2.Digest()
    
    73 75
         action_digest.hash = 'zhora'
    
    74 76
     
    
    75
    -    execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
    
    76
    -                                                            action_digest = action_digest,
    
    77
    -                                                            skip_cache_lookup = True)
    
    78
    -    execution_response = next(instance.Execute(execution_request, context))
    
    77
    +    j = job.Job(action_digest, None)
    
    78
    +    j._operation.done = True
    
    79 79
     
    
    80
    +    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    80 81
     
    
    81
    -    request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
    
    82
    +    instance._instance._scheduler.jobs[j.name] = j
    
    82 83
     
    
    83
    -    response = next(instance.WaitExecution(request, context))
    
    84
    +    action_result_any = any_pb2.Any()
    
    85
    +    action_result = remote_execution_pb2.ActionResult()
    
    86
    +    action_result_any.Pack(action_result)
    
    84 87
     
    
    85
    -    assert response == execution_response
    88
    +    instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
    
    89
    +
    
    90
    +    response = instance.WaitExecution(request, context)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]