openfl.transport

class openfl.transport.AggregatorGRPCClient(agg_addr, agg_port, disable_client_auth, root_certificate, certificate, private_key, tls=True, aggregator_uuid=None, federation_uuid=None, single_col_cert_common_name=None, **kwargs)

Client to the aggregator over gRPC-TLS.

This class implements a gRPC client for communicating with an aggregator over a secure (TLS) connection.

Class Attributes:
  • uri (str) – The URI of the aggregator.

  • tls (bool) – Whether to use TLS for the connection.

  • disable_client_auth (bool) – Whether to disable client-side authentication.

  • root_certificate (str) – The path to the root certificate for the TLS connection.

  • certificate (str) – The path to the client’s certificate for the TLS connection.

  • private_key (str) – The path to the client’s private key for the TLS connection.

  • aggregator_uuid (str) – The UUID of the aggregator.

  • federation_uuid (str) – The UUID of the federation.

  • single_col_cert_common_name (str) – The common name on the collaborator’s certificate.

create_insecure_channel(uri)

Set an insecure gRPC channel (i.e. no TLS) if desired.

Warns user that this is not recommended.

Parameters:

uri (str) – The uniform resource identifier for the insecure channel

Returns:

grpc.Channel – An insecure gRPC channel object

create_tls_channel(uri, root_certificate, disable_client_auth, certificate, private_key)

Set an secure gRPC channel (i.e. TLS).

Parameters:
  • uri (str) – The uniform resource identifier for the secure channel.

  • root_certificate (str) – The Certificate Authority filename.

  • disable_client_auth (bool) – True disables client-side authentication (not recommended, throws warning to user).

  • certificate (str) – The client certificate filename from the collaborator (signed by the certificate authority).

  • private_key (str) – The private key filename for the client certificate.

Returns:

grpc.Channel – A secure gRPC channel object

disconnect()

Close the gRPC channel.

reconnect()

Create a new channel with the gRPC server.

validate_response(reply, collaborator_name)

Validate the aggregator response.

Parameters:
  • reply (aggregator_pb2.MessageReply) – The reply from the aggregator.

  • collaborator_name (str) – The name of the collaborator.

class openfl.transport.AggregatorGRPCServer(aggregator, agg_port, tls=True, disable_client_auth=False, root_certificate=None, certificate=None, private_key=None, **kwargs)

GRPC server class for the Aggregator.

This class implements a gRPC server for the Aggregator, allowing it to communicate with collaborators.

Class Attributes:
  • aggregator (Aggregator) – The aggregator that this server is serving.

  • uri (str) – The URI that the server is serving on.

  • tls (bool) – Whether to use TLS for the connection.

  • disable_client_auth (bool) – Whether to disable client-side authentication.

  • root_certificate (str) – The path to the root certificate for the TLS connection.

  • certificate (str) – The path to the server’s certificate for the TLS connection.

  • private_key (str) – The path to the server’s private key for the TLS connection.

  • server (grpc.Server) – The gRPC server.

  • server_credentials (grpc.ServerCredentials) – The server’s credentials.

GetAggregatedTensor(request, context)

Request a job from aggregator.

This method handles a request from a collaborator for an aggregated tensor.

Parameters:
Returns:

aggregator_pb2.GetAggregatedTensorResponse – The response to the request.

GetTasks(request, context)

Request a job from aggregator.

This method handles a request from a collaborator for a job.

Parameters:
Returns:

aggregator_pb2.GetTasksResponse – The response to the request.

SendLocalTaskResults(request, context)

Request a model download from aggregator.

This method handles a request from a collaborator to send the results of a local task.

Parameters:
  • request (aggregator_pb2.SendLocalTaskResultsRequest) – The request from the collaborator.

  • context (grpc.ServicerContext) – The context of the request.

Returns:

aggregator_pb2.SendLocalTaskResultsResponse – The response to the request.

check_request(request)

Validate request header matches expected values.

This method checks that the request is valid and was sent by an

authorized collaborator.

Parameters:

request (aggregator_pb2.MessageHeader) – Request sent from a collaborator that requires validation.

Raises:

ValueError – If the request is not valid.

get_header(collaborator_name)

Compose and return MessageHeader.

This method creates a MessageHeader for a message to the specified collaborator.

Parameters:

collaborator_name (str) – The name of the collaborator to send the message to.

Returns:

aggregator_pb2.MessageHeader – The header for the message.

get_server()

Return gRPC server.

This method creates a gRPC server if it does not already exist and returns it.

Returns:

grpc.Server – The gRPC server.

serve()

Start an aggregator gRPC service.

This method starts the gRPC server and handles requests until all quit jobs havebeen sent.

validate_collaborator(request, context)

Validate the collaborator.

This method checks that the collaborator who sent the request is authorized to do so.

Parameters:
  • request (aggregator_pb2.MessageHeader) – The request from the collaborator.

  • context (grpc.ServicerContext) – The context of the request.

Raises:

grpc.RpcError – If the collaborator or collaborator certificate is not authorized.

class openfl.transport.DirectorGRPCServer(*, director_cls, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, review_plan_callback: None | Callable = None, listen_host: str = '[::]', listen_port: int = 50051, envoy_health_check_period: int = 0, **kwargs)

Director transport class.

This class implements a gRPC server for the Director, allowing it to communicate with collaborators.

Class Attributes:
  • director (Director) – The director that this server is serving.

  • listen_uri (str) – The URI that the server is serving on.

  • tls (bool) – Whether to use TLS for the connection.

  • root_certificate (Path) – The path to the root certificate for the TLS connection.

  • private_key (Path) – The path to the server’s private key for the TLS connection.

  • certificate (Path) – The path to the server’s certificate for the TLS connection.

  • server (grpc.Server) – The gRPC server.

async GetDatasetInfo(request, context)

Request the info about target and sample shapes in the dataset.

Parameters:
Returns:

director_pb2.GetDatasetInfoResponse – The response to the request.

async GetEnvoys(request, context)

Get a status information about envoys.

Parameters:
Returns:

director_pb2.GetEnvoysResponse – The response to the request.

async GetExperimentData(request, context)

Receive experiment data.

Parameters:
Yields:

director_pb2.ExperimentData – The experiment data.

async GetExperimentDescription(request, context)

Get an experiment description.

Parameters:
Returns:

director_pb2.GetExperimentDescriptionResponse – The response to the request.

async GetExperimentStatus(request, context)

Get experiment status and update if experiment was approved.

Parameters:
Returns:

director_pb2.GetExperimentStatusResponse – The response to the request.

async GetExperimentsList(request, context)

Get list of experiments description.

Parameters:
Returns:

director_pb2.GetExperimentsListResponse – The response to the request.

async GetMetricStream(request, context)

Request to stream metrics from the aggregator to frontend.

Parameters:
Yields:

director_pb2.GetMetricStreamResponse – The metrics.

async GetTrainedModel(request, context)

RPC for retrieving trained models.

Parameters:
Returns:

director_pb2.TrainedModelResponse – The response to the request.

async RemoveExperimentData(request, context)

Remove experiment data RPC.

Parameters:
Returns:

response (director_pb2.RemoveExperimentResponse) – The response to the request.

async SetExperimentFailed(request, context)

Set the experiment failed.

Parameters:
Returns:

response (director_pb2.SetExperimentFailedResponse) – The response to the request.

async SetNewExperiment(stream, context)

Request to set new experiment.

Parameters:
  • stream (grpc.aio._MultiThreadedRendezvous) – The stream of experiment data.

  • context (grpc.ServicerContext) – The context of the request.

Returns:

director_pb2.SetNewExperimentResponse – The response to the request.

async UpdateEnvoyStatus(request, context)

Accept health check from envoy.

Parameters:
Returns:

resp (director_pb2.UpdateEnvoyStatusResponse) – The response to the request.

async UpdateShardInfo(request, context)

Receive acknowledge shard info.

Parameters:
Returns:

reply (director_pb2.UpdateShardInfoResponse) – The response to the request.

async WaitExperiment(request, context)

Request for wait an experiment.

Parameters:
Returns:

director_pb2.WaitExperimentResponse – The response to the request.

get_caller(context)

Get caller name from context.

if tls == True: get caller name from auth_context if tls == False: get caller name from context header ‘client_id’

Parameters:

context (grpc.ServicerContext) – The context of the request.

Returns:

str – The name of the caller.

start()

Launch the director GRPC server.

grpc