openfl.component.director.director.Director#

class openfl.component.director.director.Director(*, tls=True, root_certificate=None, private_key=None, certificate=None, sample_shape=None, target_shape=None, review_plan_callback=None, envoy_health_check_period=60, install_requirements=False)[source]#

Bases: object

Director class. The Director is the central node of the federation (Director-Based Workflow).

Parameters:
  • tls (bool)

  • root_certificate (Path | str)

  • private_key (Path | str)

  • certificate (Path | str)

  • sample_shape (list)

  • target_shape (list)

  • review_plan_callback (None | Callable)

  • envoy_health_check_period (int)

  • install_requirements (bool)

tls#

A flag indicating if TLS should be used for connections.

Type:

bool

root_certificate#

The path to the root certificate for TLS.

Type:

Union[Path, str]

private_key#

The path to the private key for TLS.

Type:

Union[Path, str]

certificate#

The path to the certificate for TLS.

Type:

Union[Path, str]

sample_shape#

The shape of the sample data.

Type:

list

target_shape#

The shape of the target data.

Type:

list

review_plan_callback#

A callback function for reviewing the plan.

Type:

Union[None, Callable]

envoy_health_check_period#

The period for health check of envoys in seconds.

Type:

int

install_requirements#

A flag indicating if the requirements should be installed.

Type:

bool

_shard_registry#

A dictionary to store the shard registry.

Type:

dict

experiments_registry#

An object of ExperimentsRegistry to store the experiments.

Type:

ExperimentsRegistry

col_exp_queues#

A defaultdict to store the experiment queues for collaborators.

Type:

defaultdict

col_exp#

A dictionary to store the experiments for collaborators.

Type:

dict

logger#

A logger for logging activities.

Type:

Logger

__init__(*, tls=True, root_certificate=None, private_key=None, certificate=None, sample_shape=None, target_shape=None, review_plan_callback=None, envoy_health_check_period=60, install_requirements=False)[source]#

Initialize the Director object.

Parameters:
  • tls (bool, optional) – A flag indicating if TLS should be used for connections. Defaults to True.

  • root_certificate (Union[Path, str], optional) – The path to the root certificate for TLS. Defaults to None.

  • private_key (Union[Path, str], optional) – The path to the private key for TLS. Defaults to None.

  • certificate (Union[Path, str], optional) – The path to the certificate for TLS. Defaults to None.

  • sample_shape (list, optional) – The shape of the sample data. Defaults to None.

  • target_shape (list, optional) – The shape of the target data. Defaults to None.

  • review_plan_callback (Union[None, Callable], optional) – A callback function for reviewing the plan. Defaults to None.

  • envoy_health_check_period (int, optional) – The period for health check of envoys in seconds. Defaults to 60.

  • install_requirements (bool, optional) – A flag indicating if the requirements should be installed. Defaults to False.

Return type:

None

Methods

__init__(*[, tls, root_certificate, ...])

Initialize the Director object.

acknowledge_shard(shard_info)

Save shard info to shard registry if it's acceptable.

get_dataset_info()

Get dataset info.

get_envoys()

Get a status information about envoys.

get_experiment_data(experiment_name)

Get experiment data.

get_experiment_description(caller, name)

Get a experiment information by name for specific user.

get_experiment_status(experiment_name, caller)

Get experiment status.

get_experiments_list(caller)

Get experiments list for specific user.

get_registered_shards()

Get registered shard infos.

get_trained_model(experiment_name, caller, ...)

Get trained model.

remove_experiment_data(experiment_name, caller)

Remove experiment data from stash.

set_experiment_failed(*, experiment_name, ...)

Envoys Set experiment failed RPC.

set_new_experiment(*, experiment_name, ...)

Set new experiment.

start_experiment_execution_loop()

Run task to monitor and run experiments.

stream_metrics(experiment_name, caller)

Stream metrics from the aggregator.

update_envoy_status(*, envoy_name, ...[, ...])

Accept health check from envoy.

wait_experiment(envoy_name)

Wait an experiment.

acknowledge_shard(shard_info)[source]#

Save shard info to shard registry if it’s acceptable.

Parameters:

shard_info (dict) – The shard info dictionary should be able to store registries.

Returns:

Bool value to accept o deny the addition of

the shard info.

Return type:

is_accepted (bool)

get_dataset_info()[source]#

Get dataset info.

get_envoys()[source]#

Get a status information about envoys.

Returns:

List with the status information about envoys.

Return type:

list

get_experiment_data(experiment_name)[source]#

Get experiment data.

Parameters:

experiment_name (str) – String id for experiment.

Returns:

Path of archive.

Return type:

str

get_experiment_description(caller, name)[source]#

Get a experiment information by name for specific user.

Parameters:
  • caller (str) – String id for experiment owner.

  • name (str) – String id for experiment name.

Returns:

Dictionary with the info from the experiment.

Return type:

dict

async get_experiment_status(experiment_name, caller)[source]#

Get experiment status.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

Returns:

The status of the experiment can be one of the following:
  • PENDING = ‘pending’

  • FINISHED = ‘finished’

  • IN_PROGRESS = ‘in_progress’

  • FAILED = ‘failed’

  • REJECTED = ‘rejected’

Return type:

str

get_experiments_list(caller)[source]#

Get experiments list for specific user.

Parameters:

caller (str) – String id for experiment owner.

Returns:

List with the info of the experiment for specific user.

Return type:

list

get_registered_shards()[source]#

Get registered shard infos.

Return type:

list

get_trained_model(experiment_name, caller, model_type)[source]#

Get trained model.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

  • model_type (str) – The type of the model.

Returns:

One of the following: [No experiment data in the stash] or

[Aggregator have no aggregated model to return] or [Unknown model type required].

dict: Dictionary of tensors from the aggregator when the model

type is ‘best’ or ‘last’.

Return type:

None

remove_experiment_data(experiment_name, caller)[source]#

Remove experiment data from stash.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

set_experiment_failed(*, experiment_name, collaborator_name)[source]#

Envoys Set experiment failed RPC.

Parameters:
  • experiment_name (str) – String id for experiment.

  • collaborator_name (str) – String id for collaborator.

Returns:

None

async set_new_experiment(*, experiment_name, sender_name, tensor_dict, collaborator_names, experiment_archive_path)[source]#

Set new experiment.

Parameters:
  • experiment_name (str) – String id for experiment.

  • sender_name (str) – The name of the sender.

  • tensor_dict (dict) – Dictionary of tensors.

  • collaborator_names (Iterable[str]) – Names of collaborators.

  • experiment_archive_path (Path) – Path of the experiment.

Returns:

Boolean returned if the experiment register was successful.

Return type:

bool

async start_experiment_execution_loop()[source]#

Run task to monitor and run experiments.

async stream_metrics(experiment_name, caller)[source]#

Stream metrics from the aggregator.

This method takes next metric dictionary from the aggregator’s queue and returns it to the caller.

Parameters:
  • experiment_name (str) – String id for experiment.

  • caller (str) – String id for experiment owner.

Returns:

{‘metric_origin’,’task_name’,’metric_name’,’metric_value’,’round’}

if the queue is not empty.

None: queue is empty but the experiment is still running.

Return type:

metric_dict

Raises:

StopIteration – if the experiment is finished and there is no more metrics to report.

update_envoy_status(*, envoy_name, is_experiment_running, cuda_devices_status=None)[source]#

Accept health check from envoy.

Parameters:
  • envoy_name (str) – String id for envoy.

  • is_experiment_running (bool) – Boolean value for the status of the experiment.

  • cuda_devices_status (list, optional) – List of cuda devices and status. Defaults to None.

Raises:

ShardNotFoundError – When Unknown shard {envoy_name}.

Returns:

Value of the envoy_health_check_period.

Return type:

int

async wait_experiment(envoy_name)[source]#

Wait an experiment.

Parameters:

envoy_name (str) – The name of the envoy.

Returns:

The name of the experiment on the queue.

Return type:

str