openfl.component.director.director.Director#
- class openfl.component.director.director.Director(*, tls=True, root_certificate=None, private_key=None, certificate=None, sample_shape=None, target_shape=None, review_plan_callback=None, envoy_health_check_period=60, install_requirements=False)[source]#
Bases:
objectDirector class. The Director is the central node of the federation (Director-Based Workflow).
- Parameters:
tls (bool)
root_certificate (Path | str)
private_key (Path | str)
certificate (Path | str)
sample_shape (list)
target_shape (list)
review_plan_callback (None | Callable)
envoy_health_check_period (int)
install_requirements (bool)
- tls#
A flag indicating if TLS should be used for connections.
- Type:
bool
- root_certificate#
The path to the root certificate for TLS.
- Type:
Union[Path, str]
- private_key#
The path to the private key for TLS.
- Type:
Union[Path, str]
- certificate#
The path to the certificate for TLS.
- Type:
Union[Path, str]
- sample_shape#
The shape of the sample data.
- Type:
list
- target_shape#
The shape of the target data.
- Type:
list
- review_plan_callback#
A callback function for reviewing the plan.
- Type:
Union[None, Callable]
- envoy_health_check_period#
The period for health check of envoys in seconds.
- Type:
int
- install_requirements#
A flag indicating if the requirements should be installed.
- Type:
bool
- _shard_registry#
A dictionary to store the shard registry.
- Type:
dict
- experiments_registry#
An object of ExperimentsRegistry to store the experiments.
- Type:
- col_exp_queues#
A defaultdict to store the experiment queues for collaborators.
- Type:
defaultdict
- col_exp#
A dictionary to store the experiments for collaborators.
- Type:
dict
- logger#
A logger for logging activities.
- Type:
Logger
- __init__(*, tls=True, root_certificate=None, private_key=None, certificate=None, sample_shape=None, target_shape=None, review_plan_callback=None, envoy_health_check_period=60, install_requirements=False)[source]#
Initialize the Director object.
- Parameters:
tls (bool, optional) – A flag indicating if TLS should be used for connections. Defaults to True.
root_certificate (Union[Path, str], optional) – The path to the root certificate for TLS. Defaults to None.
private_key (Union[Path, str], optional) – The path to the private key for TLS. Defaults to None.
certificate (Union[Path, str], optional) – The path to the certificate for TLS. Defaults to None.
sample_shape (list, optional) – The shape of the sample data. Defaults to None.
target_shape (list, optional) – The shape of the target data. Defaults to None.
review_plan_callback (Union[None, Callable], optional) – A callback function for reviewing the plan. Defaults to None.
envoy_health_check_period (int, optional) – The period for health check of envoys in seconds. Defaults to 60.
install_requirements (bool, optional) – A flag indicating if the requirements should be installed. Defaults to False.
- Return type:
None
Methods
__init__(*[, tls, root_certificate, ...])Initialize the Director object.
acknowledge_shard(shard_info)Save shard info to shard registry if it's acceptable.
Get dataset info.
Get a status information about envoys.
get_experiment_data(experiment_name)Get experiment data.
get_experiment_description(caller, name)Get a experiment information by name for specific user.
get_experiment_status(experiment_name, caller)Get experiment status.
get_experiments_list(caller)Get experiments list for specific user.
Get registered shard infos.
get_trained_model(experiment_name, caller, ...)Get trained model.
remove_experiment_data(experiment_name, caller)Remove experiment data from stash.
set_experiment_failed(*, experiment_name, ...)Envoys Set experiment failed RPC.
set_new_experiment(*, experiment_name, ...)Set new experiment.
Run task to monitor and run experiments.
stream_metrics(experiment_name, caller)Stream metrics from the aggregator.
update_envoy_status(*, envoy_name, ...[, ...])Accept health check from envoy.
wait_experiment(envoy_name)Wait an experiment.
- acknowledge_shard(shard_info)[source]#
Save shard info to shard registry if it’s acceptable.
- Parameters:
shard_info (dict) – The shard info dictionary should be able to store registries.
- Returns:
- Bool value to accept o deny the addition of
the shard info.
- Return type:
is_accepted (bool)
- get_envoys()[source]#
Get a status information about envoys.
- Returns:
List with the status information about envoys.
- Return type:
list
- get_experiment_data(experiment_name)[source]#
Get experiment data.
- Parameters:
experiment_name (str) – String id for experiment.
- Returns:
Path of archive.
- Return type:
str
- get_experiment_description(caller, name)[source]#
Get a experiment information by name for specific user.
- Parameters:
caller (str) – String id for experiment owner.
name (str) – String id for experiment name.
- Returns:
Dictionary with the info from the experiment.
- Return type:
dict
- async get_experiment_status(experiment_name, caller)[source]#
Get experiment status.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- Returns:
- The status of the experiment can be one of the following:
PENDING = ‘pending’
FINISHED = ‘finished’
IN_PROGRESS = ‘in_progress’
FAILED = ‘failed’
REJECTED = ‘rejected’
- Return type:
str
- get_experiments_list(caller)[source]#
Get experiments list for specific user.
- Parameters:
caller (str) – String id for experiment owner.
- Returns:
List with the info of the experiment for specific user.
- Return type:
list
- get_trained_model(experiment_name, caller, model_type)[source]#
Get trained model.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
model_type (str) – The type of the model.
- Returns:
- One of the following: [No experiment data in the stash] or
[Aggregator have no aggregated model to return] or [Unknown model type required].
- dict: Dictionary of tensors from the aggregator when the model
type is ‘best’ or ‘last’.
- Return type:
None
- remove_experiment_data(experiment_name, caller)[source]#
Remove experiment data from stash.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- set_experiment_failed(*, experiment_name, collaborator_name)[source]#
Envoys Set experiment failed RPC.
- Parameters:
experiment_name (str) – String id for experiment.
collaborator_name (str) – String id for collaborator.
- Returns:
None
- async set_new_experiment(*, experiment_name, sender_name, tensor_dict, collaborator_names, experiment_archive_path)[source]#
Set new experiment.
- Parameters:
experiment_name (str) – String id for experiment.
sender_name (str) – The name of the sender.
tensor_dict (dict) – Dictionary of tensors.
collaborator_names (Iterable[str]) – Names of collaborators.
experiment_archive_path (Path) – Path of the experiment.
- Returns:
Boolean returned if the experiment register was successful.
- Return type:
bool
- async stream_metrics(experiment_name, caller)[source]#
Stream metrics from the aggregator.
This method takes next metric dictionary from the aggregator’s queue and returns it to the caller.
- Parameters:
experiment_name (str) – String id for experiment.
caller (str) – String id for experiment owner.
- Returns:
- {‘metric_origin’,’task_name’,’metric_name’,’metric_value’,’round’}
if the queue is not empty.
None: queue is empty but the experiment is still running.
- Return type:
metric_dict
- Raises:
StopIteration – if the experiment is finished and there is no more metrics to report.
- update_envoy_status(*, envoy_name, is_experiment_running, cuda_devices_status=None)[source]#
Accept health check from envoy.
- Parameters:
envoy_name (str) – String id for envoy.
is_experiment_running (bool) – Boolean value for the status of the experiment.
cuda_devices_status (list, optional) – List of cuda devices and status. Defaults to None.
- Raises:
ShardNotFoundError – When Unknown shard {envoy_name}.
- Returns:
Value of the envoy_health_check_period.
- Return type:
int