openfl.interface.director

Director CLI.

openfl.interface.director.ClickPath

alias of Path

class openfl.interface.director.Director(*, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, sample_shape: list | None = None, target_shape: list | None = None, review_plan_callback: None | Callable = None, envoy_health_check_period: int = 60, install_requirements: bool = False)

Director class. The Director is the central node of the federation (Director-Based Workflow).

Class Attributes:
  • tls (bool) – A flag indicating if TLS should be used for connections.

  • root_certificate (Union[Path, str]) – The path to the root certificate for TLS.

  • private_key (Union[Path, str]) – The path to the private key for TLS.

  • certificate (Union[Path, str]) – The path to the certificate for TLS.

  • sample_shape (list) – The shape of the sample data.

  • target_shape (list) – The shape of the target data.

  • review_plan_callback (Union[None, Callable]) – A callback function for reviewing the plan.

  • envoy_health_check_period (int) – The period for health check of envoys in seconds.

  • install_requirements (bool) – A flag indicating if the requirements should be installed.

  • _shard_registry (dict) – A dictionary to store the shard registry.

  • experiments_registry (ExperimentsRegistry) – An object of ExperimentsRegistry to store the experiments.

  • col_exp_queues (defaultdict) – A defaultdict to store the experiment queues for collaborators.

  • col_exp (dict) – A dictionary to store the experiments for collaborators.

  • logger (Logger) – A logger for logging activities.

acknowledge_shard(shard_info: dict) bool

Save shard info to shard registry if it’s acceptable.

Parameters:

shard_info (dict) – The shard info dictionary should be able to store registries.

Returns:

is_accepted (bool) – Bool value to accept o deny the addition of the shard info.

get_dataset_info()

Get dataset info.

get_envoys() list

Get a status information about envoys.

Returns:

list – List with the status information about envoys.

get_experiment_data(experiment_name: str) Path

Get experiment data.

Parameters:

experiment_name (str) – String id for experiment.

Returns:

str – Path of archive.

get_experiment_description(caller: str, name: str) dict

Get a experiment information by name for specific user.

Parameters:
  • caller (str) – String id for experiment owner.

  • name (str) – String id for experiment name.

Returns:

dict – Dictionary with the info from the experiment.

async get_experiment_status(experiment_name: str, caller: str)

Get experiment status.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

Returns:

str – The status of the experiment can be one of the following: - PENDING = ‘pending’ - FINISHED = ‘finished’ - IN_PROGRESS = ‘in_progress’ - FAILED = ‘failed’ - REJECTED = ‘rejected’

get_experiments_list(caller: str) list

Get experiments list for specific user.

Parameters:

caller (str) – String id for experiment owner.

Returns:

list – List with the info of the experiment for specific user.

get_registered_shards() list

Get registered shard infos.

get_trained_model(experiment_name: str, caller: str, model_type: str)

Get trained model.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

  • model_type (str) – The type of the model.

Returns:
  • None – One of the following: [No experiment data in the stash] or [Aggregator have no aggregated model to return] or [Unknown model type required].

  • dict – Dictionary of tensors from the aggregator when the model type is ‘best’ or ‘last’.

remove_experiment_data(experiment_name: str, caller: str)

Remove experiment data from stash.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

set_experiment_failed(*, experiment_name: str, collaborator_name: str)

Envoys Set experiment failed RPC.

Parameters:
  • experiment_name (str) – String id for experiment.

  • collaborator_name (str) – String id for collaborator.

Returns:

None

async set_new_experiment(*, experiment_name: str, sender_name: str, tensor_dict: dict, collaborator_names: Iterable[str], experiment_archive_path: Path) bool

Set new experiment.

Parameters:
  • experiment_name (str) – String id for experiment.

  • sender_name (str) – The name of the sender.

  • tensor_dict (dict) – Dictionary of tensors.

  • collaborator_names (Iterable[str]) – Names of collaborators.

  • experiment_archive_path (Path) – Path of the experiment.

Returns:

bool – Boolean returned if the experiment register was successful.

async start_experiment_execution_loop()

Run task to monitor and run experiments.

async stream_metrics(experiment_name: str, caller: str)

Stream metrics from the aggregator.

This method takes next metric dictionary from the aggregator’s queue and returns it to the caller.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

Returns:
  • metric_dict – {‘metric_origin’,’task_name’,’metric_name’,’metric_value’,’round’} if the queue is not empty.

  • None – queue is empty but the experiment is still running.

Raises:

StopIteration – if the experiment is finished and there is no more metrics to report.

update_envoy_status(*, envoy_name: str, is_experiment_running: bool, cuda_devices_status: list | None = None) int

Accept health check from envoy.

Parameters:
  • envoy_name (str) – String id for envoy.

  • is_experiment_running (bool) – Boolean value for the status of the experiment.

  • cuda_devices_status (list, optional) – List of cuda devices and status. Defaults to None.

Raises:

ShardNotFoundError – When Unknown shard {envoy_name}.

Returns:

int – Value of the envoy_health_check_period.

async wait_experiment(envoy_name: str) str

Wait an experiment.

Parameters:

envoy_name (str) – The name of the envoy.

Returns:

str – The name of the experiment on the queue.

class openfl.interface.director.DirectorGRPCServer(*, director_cls, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, review_plan_callback: None | Callable = None, listen_host: str = '[::]', listen_port: int = 50051, envoy_health_check_period: int = 0, **kwargs)

Director transport class.

This class implements a gRPC server for the Director, allowing it to communicate with collaborators.

Class Attributes:
  • director (Director) – The director that this server is serving.

  • listen_uri (str) – The URI that the server is serving on.

  • tls (bool) – Whether to use TLS for the connection.

  • root_certificate (Path) – The path to the root certificate for the TLS connection.

  • private_key (Path) – The path to the server’s private key for the TLS connection.

  • certificate (Path) – The path to the server’s certificate for the TLS connection.

  • server (grpc.Server) – The gRPC server.

async GetDatasetInfo(request, context)

Request the info about target and sample shapes in the dataset.

Parameters:
Returns:

director_pb2.GetDatasetInfoResponse – The response to the request.

async GetEnvoys(request, context)

Get a status information about envoys.

Parameters:
Returns:

director_pb2.GetEnvoysResponse – The response to the request.

async GetExperimentData(request, context)

Receive experiment data.

Parameters:
Yields:

director_pb2.ExperimentData – The experiment data.

async GetExperimentDescription(request, context)

Get an experiment description.

Parameters:
Returns:

director_pb2.GetExperimentDescriptionResponse – The response to the request.

async GetExperimentStatus(request, context)

Get experiment status and update if experiment was approved.

Parameters:
Returns:

director_pb2.GetExperimentStatusResponse – The response to the request.

async GetExperimentsList(request, context)

Get list of experiments description.

Parameters:
Returns:

director_pb2.GetExperimentsListResponse – The response to the request.

async GetMetricStream(request, context)

Request to stream metrics from the aggregator to frontend.

Parameters:
Yields:

director_pb2.GetMetricStreamResponse – The metrics.

async GetTrainedModel(request, context)

RPC for retrieving trained models.

Parameters:
Returns:

director_pb2.TrainedModelResponse – The response to the request.

async RemoveExperimentData(request, context)

Remove experiment data RPC.

Parameters:
Returns:

response (director_pb2.RemoveExperimentResponse) – The response to the request.

async SetExperimentFailed(request, context)

Set the experiment failed.

Parameters:
Returns:

response (director_pb2.SetExperimentFailedResponse) – The response to the request.

async SetNewExperiment(stream, context)

Request to set new experiment.

Parameters:
  • stream (grpc.aio._MultiThreadedRendezvous) – The stream of experiment data.

  • context (grpc.ServicerContext) – The context of the request.

Returns:

director_pb2.SetNewExperimentResponse – The response to the request.

async UpdateEnvoyStatus(request, context)

Accept health check from envoy.

Parameters:
Returns:

resp (director_pb2.UpdateEnvoyStatusResponse) – The response to the request.

async UpdateShardInfo(request, context)

Receive acknowledge shard info.

Parameters:
Returns:

reply (director_pb2.UpdateShardInfoResponse) – The response to the request.

async WaitExperiment(request, context)

Request for wait an experiment.

Parameters:
Returns:

director_pb2.WaitExperimentResponse – The response to the request.

get_caller(context)

Get caller name from context.

if tls == True: get caller name from auth_context if tls == False: get caller name from context header ‘client_id’

Parameters:

context (grpc.ServicerContext) – The context of the request.

Returns:

str – The name of the caller.

start()

Launch the director GRPC server.

class openfl.interface.director.Path(*args, **kwargs)

PurePath subclass that can make system calls.

Path represents a filesystem path but unlike PurePath, also offers methods to do system calls on path objects. Depending on your system, instantiating a Path will return either a PosixPath or a WindowsPath object. You can also instantiate a PosixPath or WindowsPath directly, but cannot instantiate a WindowsPath on a POSIX system or vice versa.

absolute()

Return an absolute version of this path. This function works even if the path doesn’t point to anything.

No normalization is done, i.e. all ‘.’ and ‘..’ will be kept along. Use resolve() to get the canonical path to a file.

chmod(mode, *, follow_symlinks=True)

Change the permissions of the path, like os.chmod().

classmethod cwd()

Return a new path pointing to the current working directory (as returned by os.getcwd()).

exists()

Whether this path exists.

expanduser()

Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser)

glob(pattern)

Iterate over this subtree and yield all existing files (of any kind, including directories) matching the given relative pattern.

group()

Return the group name of the file gid.

Make this path a hard link pointing to the same file as target.

Note the order of arguments (self, target) is the reverse of os.link’s.

classmethod home()

Return a new path pointing to the user’s home directory (as returned by os.path.expanduser(‘~’)).

is_block_device()

Whether this path is a block device.

is_char_device()

Whether this path is a character device.

is_dir()

Whether this path is a directory.

is_fifo()

Whether this path is a FIFO.

is_file()

Whether this path is a regular file (also True for symlinks pointing to regular files).

is_mount()

Check if this path is a POSIX mount point

is_socket()

Whether this path is a socket.

Whether this path is a symbolic link.

iterdir()

Iterate over the files in this directory. Does not yield any result for the special paths ‘.’ and ‘..’.

lchmod(mode)

Like chmod(), except if the path points to a symlink, the symlink’s permissions are changed, rather than its target’s.

Make the target path a hard link pointing to this path.

Note this function does not make this path a hard link to target, despite the implication of the function and argument names. The order of arguments (target, link) is the reverse of Path.symlink_to, but matches that of os.link.

Deprecated since Python 3.10 and scheduled for removal in Python 3.12. Use hardlink_to() instead.

lstat()

Like stat(), except if the path points to a symlink, the symlink’s status information is returned, rather than its target’s.

mkdir(mode=511, parents=False, exist_ok=False)

Create a new directory at this given path.

open(mode='r', buffering=-1, encoding=None, errors=None, newline=None)

Open the file pointed by this path and return a file object, as the built-in open() function does.

owner()

Return the login name of the file owner.

read_bytes()

Open the file in bytes mode, read it, and close the file.

read_text(encoding=None, errors=None)

Open the file in text mode, read it, and close the file.

Return the path to which the symbolic link points.

rename(target)

Rename this path to the target path.

The target path may be absolute or relative. Relative paths are interpreted relative to the current working directory, not the directory of the Path object.

Returns the new Path instance pointing to the target path.

replace(target)

Rename this path to the target path, overwriting if that path exists.

The target path may be absolute or relative. Relative paths are interpreted relative to the current working directory, not the directory of the Path object.

Returns the new Path instance pointing to the target path.

resolve(strict=False)

Make the path absolute, resolving all symlinks on the way and also normalizing it (for example turning slashes into backslashes under Windows).

rglob(pattern)

Recursively yield all existing files (of any kind, including directories) matching the given relative pattern, anywhere in this subtree.

rmdir()

Remove this directory. The directory must be empty.

samefile(other_path)

Return whether other_path is the same or not as this file (as returned by os.path.samefile()).

stat(*, follow_symlinks=True)

Return the result of the stat() system call on this path, like os.stat() does.

Make this path a symlink pointing to the target path. Note the order of arguments (link, target) is the reverse of os.symlink.

touch(mode=438, exist_ok=True)

Create this file with the given access mode, if it doesn’t exist.

Remove this file or link. If the path is a directory, use rmdir() instead.

write_bytes(data)

Open the file in bytes mode, write to it, and close the file.

write_text(data, encoding=None, errors=None, newline=None)

Open the file in text mode, write to it, and close the file.

class openfl.interface.director.Validator(*names: str, must_exist: bool | None = None, required: bool | None = None, condition: ~typing.Callable[[~typing.Any], bool] | None = None, when: ~dynaconf.validator.Validator | None = None, env: str | ~typing.Sequence[str] | None = None, messages: dict[str, str] | None = None, cast: ~typing.Callable[[~typing.Any], ~typing.Any] | None = None, default: ~typing.Any | ~typing.Callable[[~typing.Any, ~dynaconf.validator.Validator], ~typing.Any] | None = <dynaconf.utils.functional.Empty object>, description: str | None = None, apply_default_on_none: bool | None = False, **operations: ~typing.Any)

Validators are conditions attached to settings variables names or patterns:

Validator('MESSAGE', must_exist=True, eq='Hello World')

The above ensure MESSAGE is available in default env and is equal to ‘Hello World’

names are a one (or more) names or patterns:

Validator('NAME')
Validator('NAME', 'OTHER_NAME', 'EVEN_OTHER')
Validator(r'^NAME', r'OTHER./*')

The operations are:

eq: value == other
ne: value != other
gt: value > other
lt: value < other
gte: value >= other
lte: value <= other
is_type_of: isinstance(value, type)
is_in:  value in sequence
is_not_in: value not in sequence
identity: value is other
cont: contain value in
len_eq: len(value) == other
len_ne: len(value) != other
len_min: len(value) > other
len_max: len(value) < other

env is which env to be checked, can be a list or default is used.

when holds a validator and its return decides if validator runs or not:

Validator('NAME', must_exist=True, when=Validator('OTHER', eq=2))
# NAME is required only if OTHER eq to 2
# When the very first thing to be performed when passed.
# if no env is passed to `when` it is inherited

must_exist is alias to required requirement. (executed after when):

settings.get(value, empty) returns non empty

condition is a callable to be executed and return boolean:

Validator('NAME', condition=lambda x: x == 1)
# it is executed before operations.
validate(settings: Settings, only: str | Sequence | None = None, exclude: str | Sequence | None = None, only_current_env: bool = False) None

Raise ValidationError if invalid

openfl.interface.director.group(name: str | Callable[[...], Any] | None = None, cls: Type[GrpType] | None = None, **attrs: Any) Group | Callable[[Callable[[...], Any]], Group | GrpType]

Creates a new Group with a function as callback. This works otherwise the same as command() just that the cls parameter is set to Group.

Changed in version 8.1: This decorator can be applied without parentheses.

openfl.interface.director.is_directory_traversal(directory: str | Path) bool

Check for directory traversal.

This function checks if the provided directory is a subdirectory of the current working directory. It returns True if the directory is not a subdirectory (i.e., it is a directory traversal), and False otherwise.

Parameters:

directory (Union[str, Path]) – The directory to check.

Returns:

boolTrue if the directory is a directory traversal, False otherwise.

openfl.interface.director.merge_configs(overwrite_dict: dict | None = None, value_transform: List[Tuple[str, Callable]] | None = None, **kwargs) LazySettings

Create Dynaconf settings, merge its with overwrite_dict and validate result.

This function creates a Dynaconf settings object, merges it with an optional dictionary, applies an optional value transformation, and validates the result.

Parameters:
  • overwrite_dict (Optional[dict], optional) – A dictionary to merge with the settings. Defaults to None.

  • value_transform (Optional[List[Tuple[str, Callable]]], optional) – A list of tuples, each containing a key and a function to apply to the value of that key. Defaults to None.

  • **kwargs – Additional keyword arguments to pass to the Dynaconf constructor.

Returns:

Dynaconf – The merged and validated settings.

openfl.interface.director.option(*param_decls: str, cls: Type[Option] | None = None, **attrs: Any) Callable[[FC], FC]

Attaches an option to the command. All positional arguments are passed as parameter declarations to Option; all keyword arguments are forwarded unchanged (except cls). This is equivalent to creating an Option instance manually and attaching it to the Command.params list.

For the default option class, refer to Option and Parameter for descriptions of parameters.

Parameters:
  • cls – the option class to instantiate. This defaults to Option.

  • param_decls – Passed as positional arguments to the constructor of cls.

  • attrs – Passed as keyword arguments to the constructor of cls.

openfl.interface.director.pass_context(f: t.Callable[te.Concatenate[Context, P], R]) t.Callable[P, R]

Marks a callback as wanting to receive the current context object as first argument.

openfl.interface.director.review_plan_callback(file_name, file_path)

Review plan callback for Director and Envoy.

Parameters:
  • file_name (str) – Name of the file to review.

  • file_path (str) – Path of the file to review.

Returns:

bool – True if the file is accepted, False otherwise.