openfl.component.director.director.Director

class openfl.component.director.director.Director(*, tls: bool = True, root_certificate: Path | str | None = None, private_key: Path | str | None = None, certificate: Path | str | None = None, sample_shape: list | None = None, target_shape: list | None = None, review_plan_callback: None | Callable = None, envoy_health_check_period: int = 60, install_requirements: bool = False)

Bases: object

Director class. The Director is the central node of the federation (Director-Based Workflow).

Class Attributes:
  • tls (bool) – A flag indicating if TLS should be used for connections.

  • root_certificate (Union[Path, str]) – The path to the root certificate for TLS.

  • private_key (Union[Path, str]) – The path to the private key for TLS.

  • certificate (Union[Path, str]) – The path to the certificate for TLS.

  • sample_shape (list) – The shape of the sample data.

  • target_shape (list) – The shape of the target data.

  • review_plan_callback (Union[None, Callable]) – A callback function for reviewing the plan.

  • envoy_health_check_period (int) – The period for health check of envoys in seconds.

  • install_requirements (bool) – A flag indicating if the requirements should be installed.

  • _shard_registry (dict) – A dictionary to store the shard registry.

  • experiments_registry (ExperimentsRegistry) – An object of ExperimentsRegistry to store the experiments.

  • col_exp_queues (defaultdict) – A defaultdict to store the experiment queues for collaborators.

  • col_exp (dict) – A dictionary to store the experiments for collaborators.

  • logger (Logger) – A logger for logging activities.

Methods

acknowledge_shard

Save shard info to shard registry if it's acceptable.

get_dataset_info

Get dataset info.

get_envoys

Get a status information about envoys.

get_experiment_data

Get experiment data.

get_experiment_description

Get a experiment information by name for specific user.

get_experiment_status

Get experiment status.

get_experiments_list

Get experiments list for specific user.

get_registered_shards

Get registered shard infos.

get_trained_model

Get trained model.

remove_experiment_data

Remove experiment data from stash.

set_experiment_failed

Envoys Set experiment failed RPC.

set_new_experiment

Set new experiment.

start_experiment_execution_loop

Run task to monitor and run experiments.

stream_metrics

Stream metrics from the aggregator.

update_envoy_status

Accept health check from envoy.

wait_experiment

Wait an experiment.

acknowledge_shard(shard_info: dict) bool

Save shard info to shard registry if it’s acceptable.

Parameters:

shard_info (dict) – The shard info dictionary should be able to store registries.

Returns:

is_accepted (bool) – Bool value to accept o deny the addition of the shard info.

get_dataset_info()

Get dataset info.

get_envoys() list

Get a status information about envoys.

Returns:

list – List with the status information about envoys.

get_experiment_data(experiment_name: str) Path

Get experiment data.

Parameters:

experiment_name (str) – String id for experiment.

Returns:

str – Path of archive.

get_experiment_description(caller: str, name: str) dict

Get a experiment information by name for specific user.

Parameters:
  • caller (str) – String id for experiment owner.

  • name (str) – String id for experiment name.

Returns:

dict – Dictionary with the info from the experiment.

async get_experiment_status(experiment_name: str, caller: str)

Get experiment status.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

Returns:

str – The status of the experiment can be one of the following: - PENDING = ‘pending’ - FINISHED = ‘finished’ - IN_PROGRESS = ‘in_progress’ - FAILED = ‘failed’ - REJECTED = ‘rejected’

get_experiments_list(caller: str) list

Get experiments list for specific user.

Parameters:

caller (str) – String id for experiment owner.

Returns:

list – List with the info of the experiment for specific user.

get_registered_shards() list

Get registered shard infos.

get_trained_model(experiment_name: str, caller: str, model_type: str)

Get trained model.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

  • model_type (str) – The type of the model.

Returns:
  • None – One of the following: [No experiment data in the stash] or [Aggregator have no aggregated model to return] or [Unknown model type required].

  • dict – Dictionary of tensors from the aggregator when the model type is ‘best’ or ‘last’.

remove_experiment_data(experiment_name: str, caller: str)

Remove experiment data from stash.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

set_experiment_failed(*, experiment_name: str, collaborator_name: str)

Envoys Set experiment failed RPC.

Parameters:
  • experiment_name (str) – String id for experiment.

  • collaborator_name (str) – String id for collaborator.

Returns:

None

async set_new_experiment(*, experiment_name: str, sender_name: str, tensor_dict: dict, collaborator_names: Iterable[str], experiment_archive_path: Path) bool

Set new experiment.

Parameters:
  • experiment_name (str) – String id for experiment.

  • sender_name (str) – The name of the sender.

  • tensor_dict (dict) – Dictionary of tensors.

  • collaborator_names (Iterable[str]) – Names of collaborators.

  • experiment_archive_path (Path) – Path of the experiment.

Returns:

bool – Boolean returned if the experiment register was successful.

async start_experiment_execution_loop()

Run task to monitor and run experiments.

async stream_metrics(experiment_name: str, caller: str)

Stream metrics from the aggregator.

This method takes next metric dictionary from the aggregator’s queue and returns it to the caller.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

Returns:
  • metric_dict – {‘metric_origin’,’task_name’,’metric_name’,’metric_value’,’round’} if the queue is not empty.

  • None – queue is empty but the experiment is still running.

Raises:

StopIteration – if the experiment is finished and there is no more metrics to report.

update_envoy_status(*, envoy_name: str, is_experiment_running: bool, cuda_devices_status: list | None = None) int

Accept health check from envoy.

Parameters:
  • envoy_name (str) – String id for envoy.

  • is_experiment_running (bool) – Boolean value for the status of the experiment.

  • cuda_devices_status (list, optional) – List of cuda devices and status. Defaults to None.

Raises:

ShardNotFoundError – When Unknown shard {envoy_name}.

Returns:

int – Value of the envoy_health_check_period.

async wait_experiment(envoy_name: str) str

Wait an experiment.

Parameters:

envoy_name (str) – The name of the envoy.

Returns:

str – The name of the experiment on the queue.