openfl.component.director
- class openfl.component.director.Director(*, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, sample_shape: list | None = None, target_shape: list | None = None, review_plan_callback: None | Callable = None, envoy_health_check_period: int = 60, install_requirements: bool = False)
Director class. The Director is the central node of the federation (Director-Based Workflow).
- Class Attributes:
tls (bool) – A flag indicating if TLS should be used for connections.
root_certificate (Union[Path, str]) – The path to the root certificate for TLS.
private_key (Union[Path, str]) – The path to the private key for TLS.
certificate (Union[Path, str]) – The path to the certificate for TLS.
sample_shape (list) – The shape of the sample data.
target_shape (list) – The shape of the target data.
review_plan_callback (Union[None, Callable]) – A callback function for reviewing the plan.
envoy_health_check_period (int) – The period for health check of envoys in seconds.
install_requirements (bool) – A flag indicating if the requirements should be installed.
_shard_registry (dict) – A dictionary to store the shard registry.
experiments_registry (ExperimentsRegistry) – An object of ExperimentsRegistry to store the experiments.
col_exp_queues (defaultdict) – A defaultdict to store the experiment queues for collaborators.
col_exp (dict) – A dictionary to store the experiments for collaborators.
logger (Logger) – A logger for logging activities.
- acknowledge_shard(shard_info: dict) bool
Save shard info to shard registry if it’s acceptable.
- Parameters:
shard_info (dict) – The shard info dictionary should be able to store registries.
- Returns:
is_accepted (bool) – Bool value to accept o deny the addition of the shard info.
- get_dataset_info()
Get dataset info.
- get_envoys() list
Get a status information about envoys.
- Returns:
list – List with the status information about envoys.
- get_experiment_data(experiment_name: str) Path
Get experiment data.
- Parameters:
experiment_name (str) – String id for experiment.
- Returns:
str – Path of archive.
- get_experiment_description(caller: str, name: str) dict
Get a experiment information by name for specific user.
- Parameters:
caller (str) – String id for experiment owner.
name (str) – String id for experiment name.
- Returns:
dict – Dictionary with the info from the experiment.
- async get_experiment_status(experiment_name: str, caller: str)
Get experiment status.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- Returns:
str – The status of the experiment can be one of the following: - PENDING = ‘pending’ - FINISHED = ‘finished’ - IN_PROGRESS = ‘in_progress’ - FAILED = ‘failed’ - REJECTED = ‘rejected’
- get_experiments_list(caller: str) list
Get experiments list for specific user.
- Parameters:
caller (str) – String id for experiment owner.
- Returns:
list – List with the info of the experiment for specific user.
- get_registered_shards() list
Get registered shard infos.
- get_trained_model(experiment_name: str, caller: str, model_type: str)
Get trained model.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
model_type (str) – The type of the model.
- Returns:
None – One of the following: [No experiment data in the stash] or [Aggregator have no aggregated model to return] or [Unknown model type required].
dict – Dictionary of tensors from the aggregator when the model type is ‘best’ or ‘last’.
- remove_experiment_data(experiment_name: str, caller: str)
Remove experiment data from stash.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- set_experiment_failed(*, experiment_name: str, collaborator_name: str)
Envoys Set experiment failed RPC.
- Parameters:
experiment_name (str) – String id for experiment.
collaborator_name (str) – String id for collaborator.
- Returns:
None
- async set_new_experiment(*, experiment_name: str, sender_name: str, tensor_dict: dict, collaborator_names: Iterable[str], experiment_archive_path: Path) bool
Set new experiment.
- Parameters:
experiment_name (str) – String id for experiment.
sender_name (str) – The name of the sender.
tensor_dict (dict) – Dictionary of tensors.
collaborator_names (Iterable[str]) – Names of collaborators.
experiment_archive_path (Path) – Path of the experiment.
- Returns:
bool – Boolean returned if the experiment register was successful.
- async start_experiment_execution_loop()
Run task to monitor and run experiments.
- async stream_metrics(experiment_name: str, caller: str)
Stream metrics from the aggregator.
This method takes next metric dictionary from the aggregator’s queue and returns it to the caller.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- Returns:
metric_dict – {‘metric_origin’,’task_name’,’metric_name’,’metric_value’,’round’} if the queue is not empty.
None – queue is empty but the experiment is still running.
- Raises:
StopIteration – if the experiment is finished and there is no more metrics to report.
- update_envoy_status(*, envoy_name: str, is_experiment_running: bool, cuda_devices_status: list | None = None) int
Accept health check from envoy.
- Parameters:
envoy_name (str) – String id for envoy.
is_experiment_running (bool) – Boolean value for the status of the experiment.
cuda_devices_status (list, optional) – List of cuda devices and status. Defaults to None.
- Raises:
ShardNotFoundError – When Unknown shard {envoy_name}.
- Returns:
int – Value of the envoy_health_check_period.
- async wait_experiment(envoy_name: str) str
Wait an experiment.
- Parameters:
envoy_name (str) – The name of the envoy.
- Returns:
str – The name of the experiment on the queue.
|
Director module. |
|
Experiment module. |