openfl.component.director.director.Director
- class openfl.component.director.director.Director(*, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, sample_shape: list | None = None, target_shape: list | None = None, review_plan_callback: None | Callable = None, envoy_health_check_period: int = 60, install_requirements: bool = False)
Bases:
objectDirector class. The Director is the central node of the federation (Director-Based Workflow).
- Class Attributes:
tls (bool) – A flag indicating if TLS should be used for connections.
root_certificate (Union[Path, str]) – The path to the root certificate for TLS.
private_key (Union[Path, str]) – The path to the private key for TLS.
certificate (Union[Path, str]) – The path to the certificate for TLS.
sample_shape (list) – The shape of the sample data.
target_shape (list) – The shape of the target data.
review_plan_callback (Union[None, Callable]) – A callback function for reviewing the plan.
envoy_health_check_period (int) – The period for health check of envoys in seconds.
install_requirements (bool) – A flag indicating if the requirements should be installed.
_shard_registry (dict) – A dictionary to store the shard registry.
experiments_registry (ExperimentsRegistry) – An object of ExperimentsRegistry to store the experiments.
col_exp_queues (defaultdict) – A defaultdict to store the experiment queues for collaborators.
col_exp (dict) – A dictionary to store the experiments for collaborators.
logger (Logger) – A logger for logging activities.
Methods
Save shard info to shard registry if it's acceptable.
Get dataset info.
Get a status information about envoys.
Get experiment data.
Get a experiment information by name for specific user.
Get experiment status.
Get experiments list for specific user.
Get registered shard infos.
Get trained model.
Remove experiment data from stash.
Envoys Set experiment failed RPC.
Set new experiment.
Run task to monitor and run experiments.
Stream metrics from the aggregator.
Accept health check from envoy.
Wait an experiment.
- acknowledge_shard(shard_info: dict) bool
Save shard info to shard registry if it’s acceptable.
- Parameters:
shard_info (dict) – The shard info dictionary should be able to store registries.
- Returns:
is_accepted (bool) – Bool value to accept o deny the addition of the shard info.
- get_dataset_info()
Get dataset info.
- get_envoys() list
Get a status information about envoys.
- Returns:
list – List with the status information about envoys.
- get_experiment_data(experiment_name: str) Path
Get experiment data.
- Parameters:
experiment_name (str) – String id for experiment.
- Returns:
str – Path of archive.
- get_experiment_description(caller: str, name: str) dict
Get a experiment information by name for specific user.
- Parameters:
caller (str) – String id for experiment owner.
name (str) – String id for experiment name.
- Returns:
dict – Dictionary with the info from the experiment.
- async get_experiment_status(experiment_name: str, caller: str)
Get experiment status.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- Returns:
str – The status of the experiment can be one of the following: - PENDING = ‘pending’ - FINISHED = ‘finished’ - IN_PROGRESS = ‘in_progress’ - FAILED = ‘failed’ - REJECTED = ‘rejected’
- get_experiments_list(caller: str) list
Get experiments list for specific user.
- Parameters:
caller (str) – String id for experiment owner.
- Returns:
list – List with the info of the experiment for specific user.
- get_registered_shards() list
Get registered shard infos.
- get_trained_model(experiment_name: str, caller: str, model_type: str)
Get trained model.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
model_type (str) – The type of the model.
- Returns:
None – One of the following: [No experiment data in the stash] or [Aggregator have no aggregated model to return] or [Unknown model type required].
dict – Dictionary of tensors from the aggregator when the model type is ‘best’ or ‘last’.
- remove_experiment_data(experiment_name: str, caller: str)
Remove experiment data from stash.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- set_experiment_failed(*, experiment_name: str, collaborator_name: str)
Envoys Set experiment failed RPC.
- Parameters:
experiment_name (str) – String id for experiment.
collaborator_name (str) – String id for collaborator.
- Returns:
None
- async set_new_experiment(*, experiment_name: str, sender_name: str, tensor_dict: dict, collaborator_names: Iterable[str], experiment_archive_path: Path) bool
Set new experiment.
- Parameters:
experiment_name (str) – String id for experiment.
sender_name (str) – The name of the sender.
tensor_dict (dict) – Dictionary of tensors.
collaborator_names (Iterable[str]) – Names of collaborators.
experiment_archive_path (Path) – Path of the experiment.
- Returns:
bool – Boolean returned if the experiment register was successful.
- async start_experiment_execution_loop()
Run task to monitor and run experiments.
- async stream_metrics(experiment_name: str, caller: str)
Stream metrics from the aggregator.
This method takes next metric dictionary from the aggregator’s queue and returns it to the caller.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- Returns:
metric_dict – {‘metric_origin’,’task_name’,’metric_name’,’metric_value’,’round’} if the queue is not empty.
None – queue is empty but the experiment is still running.
- Raises:
StopIteration – if the experiment is finished and there is no more metrics to report.
- update_envoy_status(*, envoy_name: str, is_experiment_running: bool, cuda_devices_status: list | None = None) int
Accept health check from envoy.
- Parameters:
envoy_name (str) – String id for envoy.
is_experiment_running (bool) – Boolean value for the status of the experiment.
cuda_devices_status (list, optional) – List of cuda devices and status. Defaults to None.
- Raises:
ShardNotFoundError – When Unknown shard {envoy_name}.
- Returns:
int – Value of the envoy_health_check_period.
- async wait_experiment(envoy_name: str) str
Wait an experiment.
- Parameters:
envoy_name (str) – The name of the envoy.
- Returns:
str – The name of the experiment on the queue.