[Notes] [Git][BuildStream/buildstream][725-job-cancellation-on-remote-builds] _sandboxremote.py: Add sigterm handler that sends CancelOperation



Title: GitLab

Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildStream / buildstream

Commits:

1 changed file:

Changes:

  • buildstream/sandbox/_sandboxremote.py
    ... ... @@ -19,15 +19,19 @@
    19 19
     #        Jim MacArthur <jim macarthur codethink co uk>
    
    20 20
     
    
    21 21
     import os
    
    22
    +import signal
    
    22 23
     from urllib.parse import urlparse
    
    24
    +from functools import partial
    
    23 25
     
    
    24 26
     import grpc
    
    25 27
     
    
    26 28
     from . import Sandbox
    
    27 29
     from ..storage._filebaseddirectory import FileBasedDirectory
    
    28 30
     from ..storage._casbaseddirectory import CasBasedDirectory
    
    31
    +from .. import _signals
    
    29 32
     from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    30 33
     from .._protos.google.rpc import code_pb2
    
    34
    +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
    
    31 35
     
    
    32 36
     
    
    33 37
     class SandboxError(Exception):
    
    ... ... @@ -54,6 +58,7 @@ class SandboxRemote(Sandbox):
    54 58
                                    "Only plain HTTP is currenlty supported (no HTTPS).")
    
    55 59
     
    
    56 60
             self.server_url = '{}:{}'.format(url.hostname, url.port)
    
    61
    +        self.operation_name = None
    
    57 62
     
    
    58 63
         def run_remote_command(self, command, input_root_digest, working_directory, environment):
    
    59 64
             # Sends an execution request to the remote execution server.
    
    ... ... @@ -104,11 +109,21 @@ class SandboxRemote(Sandbox):
    104 109
                         request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
    
    105 110
                         operation_iterator = stub.WaitExecution(request)
    
    106 111
     
    
    107
    -                for operation in operation_iterator:
    
    108
    -                    if operation.done:
    
    109
    -                        return operation
    
    110
    -                    else:
    
    111
    -                        last_operation = operation
    
    112
    +                # Set up SIGTERM handler, but block for first operation to get name
    
    113
    +                with _signals.terminator(partial(self.cancel_operation, channel)):
    
    114
    +                    with _signals.blocked([signal.SIGTERM], ignore=False):
    
    115
    +                        operation = next(operation_iterator)
    
    116
    +                        self.operation_name = operation.name
    
    117
    +                        if operation.done:
    
    118
    +                            return operation
    
    119
    +                        else:
    
    120
    +                            last_operation = operation
    
    121
    +                    for operation in operation_iterator:
    
    122
    +                        if operation.done:
    
    123
    +                            return operation
    
    124
    +                        else:
    
    125
    +                            last_operation = operation
    
    126
    +
    
    112 127
                 except grpc.RpcError as e:
    
    113 128
                     status_code = e.code()
    
    114 129
                     if status_code == grpc.StatusCode.UNAVAILABLE:
    
    ... ... @@ -135,12 +150,22 @@ class SandboxRemote(Sandbox):
    135 150
                     return None
    
    136 151
                 elif operation.done:
    
    137 152
                     return operation
    
    138
    -
    
    139 153
                 while operation is not None and not operation.done:
    
    140 154
                     operation = __run_remote_command(stub, running_operation=operation)
    
    141 155
     
    
    142 156
             return operation
    
    143 157
     
    
    158
    +    def cancel_operation(self, channel):
    
    159
    +        stub = operations_pb2_grpc.OperationsStub(channel)
    
    160
    +        request = operations_pb2.CancelOperationRequest(
    
    161
    +            name=str(self.operation_name))
    
    162
    +
    
    163
    +        try:
    
    164
    +            stub.CancelOperation(request)
    
    165
    +        except grpc.RpcError as e:
    
    166
    +            if e.code() != grpc.StatusCode.UNIMPLEMENTED:
    
    167
    +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
    
    168
    +
    
    144 169
         def process_job_output(self, output_directories, output_files):
    
    145 170
             # Reads the remote execution server response to an execution request.
    
    146 171
             #
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]