openfl.transport
- class openfl.transport.AggregatorGRPCClient(agg_addr, agg_port, disable_client_auth, root_certificate, certificate, private_key, tls=True, aggregator_uuid=None, federation_uuid=None, single_col_cert_common_name=None, **kwargs)
Client to the aggregator over gRPC-TLS.
This class implements a gRPC client for communicating with an aggregator over a secure (TLS) connection.
- Class Attributes:
uri (str) – The URI of the aggregator.
tls (bool) – Whether to use TLS for the connection.
disable_client_auth (bool) – Whether to disable client-side authentication.
root_certificate (str) – The path to the root certificate for the TLS connection.
certificate (str) – The path to the client’s certificate for the TLS connection.
private_key (str) – The path to the client’s private key for the TLS connection.
aggregator_uuid (str) – The UUID of the aggregator.
federation_uuid (str) – The UUID of the federation.
single_col_cert_common_name (str) – The common name on the collaborator’s certificate.
- create_insecure_channel(uri)
Set an insecure gRPC channel (i.e. no TLS) if desired.
Warns user that this is not recommended.
- Parameters:
uri (str) – The uniform resource identifier for the insecure channel
- Returns:
grpc.Channel – An insecure gRPC channel object
- create_tls_channel(uri, root_certificate, disable_client_auth, certificate, private_key)
Set an secure gRPC channel (i.e. TLS).
- Parameters:
uri (str) – The uniform resource identifier for the secure channel.
root_certificate (str) – The Certificate Authority filename.
disable_client_auth (bool) – True disables client-side authentication (not recommended, throws warning to user).
certificate (str) – The client certificate filename from the collaborator (signed by the certificate authority).
private_key (str) – The private key filename for the client certificate.
- Returns:
grpc.Channel – A secure gRPC channel object
- disconnect()
Close the gRPC channel.
- reconnect()
Create a new channel with the gRPC server.
- validate_response(reply, collaborator_name)
Validate the aggregator response.
- Parameters:
reply (aggregator_pb2.MessageReply) – The reply from the aggregator.
collaborator_name (str) – The name of the collaborator.
- class openfl.transport.AggregatorGRPCServer(aggregator, agg_port, tls=True, disable_client_auth=False, root_certificate=None, certificate=None, private_key=None, **kwargs)
GRPC server class for the Aggregator.
This class implements a gRPC server for the Aggregator, allowing it to communicate with collaborators.
- Class Attributes:
aggregator (Aggregator) – The aggregator that this server is serving.
uri (str) – The URI that the server is serving on.
tls (bool) – Whether to use TLS for the connection.
disable_client_auth (bool) – Whether to disable client-side authentication.
root_certificate (str) – The path to the root certificate for the TLS connection.
certificate (str) – The path to the server’s certificate for the TLS connection.
private_key (str) – The path to the server’s private key for the TLS connection.
server (grpc.Server) – The gRPC server.
server_credentials (grpc.ServerCredentials) – The server’s credentials.
- GetAggregatedTensor(request, context)
Request a job from aggregator.
This method handles a request from a collaborator for an aggregated tensor.
- Parameters:
request (aggregator_pb2.GetAggregatedTensorRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
aggregator_pb2.GetAggregatedTensorResponse – The response to the request.
- GetTasks(request, context)
Request a job from aggregator.
This method handles a request from a collaborator for a job.
- Parameters:
request (aggregator_pb2.GetTasksRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
aggregator_pb2.GetTasksResponse – The response to the request.
- SendLocalTaskResults(request, context)
Request a model download from aggregator.
This method handles a request from a collaborator to send the results of a local task.
- Parameters:
request (aggregator_pb2.SendLocalTaskResultsRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
aggregator_pb2.SendLocalTaskResultsResponse – The response to the request.
- check_request(request)
Validate request header matches expected values.
- This method checks that the request is valid and was sent by an
authorized collaborator.
- Parameters:
request (aggregator_pb2.MessageHeader) – Request sent from a collaborator that requires validation.
- Raises:
ValueError – If the request is not valid.
- get_header(collaborator_name)
Compose and return MessageHeader.
This method creates a MessageHeader for a message to the specified collaborator.
- Parameters:
collaborator_name (str) – The name of the collaborator to send the message to.
- Returns:
aggregator_pb2.MessageHeader – The header for the message.
- get_server()
Return gRPC server.
This method creates a gRPC server if it does not already exist and returns it.
- Returns:
grpc.Server – The gRPC server.
- serve()
Start an aggregator gRPC service.
This method starts the gRPC server and handles requests until all quit jobs havebeen sent.
- validate_collaborator(request, context)
Validate the collaborator.
This method checks that the collaborator who sent the request is authorized to do so.
- Parameters:
request (aggregator_pb2.MessageHeader) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Raises:
grpc.RpcError – If the collaborator or collaborator certificate is not authorized.
- class openfl.transport.DirectorGRPCServer(*, director_cls, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, review_plan_callback: None | Callable = None, listen_host: str = '[::]', listen_port: int = 50051, envoy_health_check_period: int = 0, **kwargs)
Director transport class.
This class implements a gRPC server for the Director, allowing it to communicate with collaborators.
- Class Attributes:
director (Director) – The director that this server is serving.
listen_uri (str) – The URI that the server is serving on.
tls (bool) – Whether to use TLS for the connection.
root_certificate (Path) – The path to the root certificate for the TLS connection.
private_key (Path) – The path to the server’s private key for the TLS connection.
certificate (Path) – The path to the server’s certificate for the TLS connection.
server (grpc.Server) – The gRPC server.
- async GetDatasetInfo(request, context)
Request the info about target and sample shapes in the dataset.
- Parameters:
request (director_pb2.GetDatasetInfoRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.GetDatasetInfoResponse – The response to the request.
- async GetEnvoys(request, context)
Get a status information about envoys.
- Parameters:
request (director_pb2.GetEnvoysRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.GetEnvoysResponse – The response to the request.
- async GetExperimentData(request, context)
Receive experiment data.
- Parameters:
request (director_pb2.GetExperimentDataRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Yields:
director_pb2.ExperimentData – The experiment data.
- async GetExperimentDescription(request, context)
Get an experiment description.
- Parameters:
request (director_pb2.GetExperimentDescriptionRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.GetExperimentDescriptionResponse – The response to the request.
- async GetExperimentStatus(request, context)
Get experiment status and update if experiment was approved.
- Parameters:
request (director_pb2.GetExperimentStatusRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.GetExperimentStatusResponse – The response to the request.
- async GetExperimentsList(request, context)
Get list of experiments description.
- Parameters:
request (director_pb2.GetExperimentsListRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.GetExperimentsListResponse – The response to the request.
- async GetMetricStream(request, context)
Request to stream metrics from the aggregator to frontend.
- Parameters:
request (director_pb2.GetMetricStreamRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Yields:
director_pb2.GetMetricStreamResponse – The metrics.
- async GetTrainedModel(request, context)
RPC for retrieving trained models.
- Parameters:
request (director_pb2.GetTrainedModelRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.TrainedModelResponse – The response to the request.
- async RemoveExperimentData(request, context)
Remove experiment data RPC.
- Parameters:
request (director_pb2.RemoveExperimentRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
response (director_pb2.RemoveExperimentResponse) – The response to the request.
- async SetExperimentFailed(request, context)
Set the experiment failed.
- Parameters:
request (director_pb2.SetExperimentFailedRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
response (director_pb2.SetExperimentFailedResponse) – The response to the request.
- async SetNewExperiment(stream, context)
Request to set new experiment.
- Parameters:
stream (grpc.aio._MultiThreadedRendezvous) – The stream of experiment data.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.SetNewExperimentResponse – The response to the request.
- async UpdateEnvoyStatus(request, context)
Accept health check from envoy.
- Parameters:
request (director_pb2.UpdateEnvoyStatusRequest) – The request from the envoy.
context (grpc.ServicerContext) – The context of the request.
- Returns:
resp (director_pb2.UpdateEnvoyStatusResponse) – The response to the request.
- async UpdateShardInfo(request, context)
Receive acknowledge shard info.
- Parameters:
request (director_pb2.UpdateShardInfoRequest) – The request from the shard.
context (grpc.ServicerContext) – The context of the request.
- Returns:
reply (director_pb2.UpdateShardInfoResponse) – The response to the request.
- async WaitExperiment(request, context)
Request for wait an experiment.
- Parameters:
request (director_pb2.WaitExperimentRequest) – The request from the collaborator.
context (grpc.ServicerContext) – The context of the request.
- Returns:
director_pb2.WaitExperimentResponse – The response to the request.
- get_caller(context)
Get caller name from context.
if tls == True: get caller name from auth_context if tls == False: get caller name from context header ‘client_id’
- Parameters:
context (grpc.ServicerContext) – The context of the request.
- Returns:
str – The name of the caller.
- start()
Launch the director GRPC server.
|