Class - Aggregator#

class openfl.component.aggregator.aggregator.Aggregator(aggregator_uuid, federation_uuid, authorized_cols, init_state_path, best_state_path, last_state_path, assigner, connector=None, use_delta_updates=True, straggler_handling_policy=<class 'openfl.component.aggregator.straggler_handling.WaitForAllPolicy'>, rounds_to_train=256, single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, initial_tensor_dict=None, log_memory_usage=False, write_logs=False, save_native_model=False, callbacks=[], persist_checkpoint=True, persistent_db_path=None, secure_aggregation=False)[source]#

Bases: object

An Aggregator is the central node in federated learning.

Parameters:
round_number#

Current round number.

Type:

int

single_col_cert_common_name#

Common name for single collaborator certificate.

Type:

str

straggler_handling_policy#

Policy for handling stragglers.

_end_of_round_check_done#

Indicates if end of round check is done for each round.

Type:

list of bool

stragglers#

List of stragglers.

Type:

list

rounds_to_train#

Number of rounds to train.

Type:

int

authorized_cols#

IDs of enrolled collaborators.

Type:

list of str

uuid#

Aggregator UUID.

Type:

int

federation_uuid#

Federation UUID.

Type:

str

assigner#

Object assigning tasks to collaborators.

connector#

Object responsible for managing interopability with other frameworks. Defaults to None

Type:

optional

quit_job_sent_to#

Collaborators sent a quit job.

Type:

list

tensor_db#

Object for tensor database.

Type:

TensorDB

db_store_rounds*

Rounds to store in TensorDB.

Type:

int

logger#

Object for logging.

write_logs#

Flag to enable metric writer callback.

Type:

bool

save_native_model#

Flag to save model in native format.

Type:

bool

best_model_score#

Score of the best model. Defaults to None.

Type:

optional

metric_queue#

Queue for metrics.

Type:

queue.Queue

compression_pipeline#

Pipeline for compressing data.

tensor_codec#

Codec for tensor compression.

Type:

TensorCodec

init_state_path*

Initial weight file location.

Type:

str

best_state_path*

Where to store the best model weight.

Type:

str

last_state_path*

Where to store the latest model weight.

Type:

str

best_tensor_dict#

Dict of the best tensors.

Type:

dict

last_tensor_dict#

Dict of the last tensors.

Type:

dict

collaborator_tensor_results#

Dict of collaborator tensor results.

Type:

dict

collaborator_tasks_results#

Dict of collaborator tasks results.

Type:

dict

collaborator_task_weight#

Dict of col task weight.

Type:

dict

lock#

A threading Lock object used to ensure thread-safe operations.

Note

  • plan setting

__init__(aggregator_uuid, federation_uuid, authorized_cols, init_state_path, best_state_path, last_state_path, assigner, connector=None, use_delta_updates=True, straggler_handling_policy=<class 'openfl.component.aggregator.straggler_handling.WaitForAllPolicy'>, rounds_to_train=256, single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, initial_tensor_dict=None, log_memory_usage=False, write_logs=False, save_native_model=False, callbacks=[], persist_checkpoint=True, persistent_db_path=None, secure_aggregation=False)[source]#

Initializes the Aggregator.

Parameters:
  • aggregator_uuid (int) – Aggregation ID.

  • federation_uuid (str) – Federation ID.

  • authorized_cols (list of str) – The list of IDs of enrolled collaborators.

  • init_state_path (str) – The location of the initial weight file.

  • best_state_path (str) – The file location to store the weight of the best model.

  • last_state_path (str) – The file location to store the latest weight.

  • assigner – Assigner object.

  • straggler_handling_policy (optional) – Straggler handling policy.

  • rounds_to_train (int, optional) – Number of rounds to train. Defaults to 256.

  • single_col_cert_common_name (str, optional) – Common name for single collaborator certificate. Defaults to None.

  • compression_pipeline (optional) – Compression pipeline. Defaults to NoCompressionPipeline.

  • db_store_rounds (int, optional) – Rounds to store in TensorDB. Defaults to 1.

  • initial_tensor_dict (dict, optional) – Initial tensor dictionary.

  • callbacks (List | None) – List of callbacks to be used during the experiment.

Methods

__init__(aggregator_uuid, federation_uuid, ...)

Initializes the Aggregator.

all_quit_jobs_sent()

Assert all quit jobs are sent to collaborators.

get_aggregated_tensor(tensor_name, ...)

RPC called by collaborator.

get_tasks(collaborator_name)

RPC called by a collaborator to determine which tasks to perform.

process_task_results(collaborator_name, ...)

save_analytics_result()

Save analytics results to a JSON file.

save_persistent_db(collaborator_name, ...)

send_local_task_results(collaborator_name, ...)

RPC called by collaborator.

stop([failed_collaborator])

Stop aggregator execution.

valid_collaborator_cn_and_id(...)

Determine if the collaborator certificate and ID are valid for this federation.

all_quit_jobs_sent()[source]#

Assert all quit jobs are sent to collaborators.

Returns:

True if all quit jobs are sent, False otherwise.

Return type:

bool

get_aggregated_tensor(tensor_name, round_number, report, tags, require_lossless, requested_by)[source]#

RPC called by collaborator.

Performs local lookup to determine if there is an aggregated tensor available that matches the request.

Parameters:
  • tensor_name (str) – Name of the tensor.

  • round_number (int) – Actual round number.

  • report (bool) – Whether to report.

  • tags (tuple[str, ...]) – Tags.

  • require_lossless (bool) – Whether to require lossless.

  • requested_by (str) – Request originator name.

Returns:

NamedTensor, the tensor requested by the collaborator.

Return type:

named_tensor (protobuf)

Raises:

ValueError – if Aggregator does not have an aggregated tensor for {tensor_key}.

get_tasks(collaborator_name)[source]#

RPC called by a collaborator to determine which tasks to perform.

Parameters:

collaborator_name (str) – Requested collaborator name.

Returns:

List of tasks to be performed by the requesting

collaborator for the current round.

round_number (int): Actual round number. sleep_time (int): Sleep time. time_to_quit (bool): Whether it’s time to quit.

Return type:

tasks (list[str])

save_analytics_result()[source]#

Save analytics results to a JSON file. This method retrieves tensors tagged with “analytics” for the current round from the tensor database and saves them as a JSON file at the path specified by self.last_state_path. The tensor values are converted to lists if they are NumPy arrays. The saved JSON file contains a dictionary where the keys are tensor names and the values are the corresponding tensor data. Logs the saved analytics result for reference. :returns: None

send_local_task_results(collaborator_name, round_number, task_name, data_size, named_tensors)[source]#

RPC called by collaborator.

Transmits collaborator’s task results to the aggregator.

Parameters:
  • collaborator_name (str) – Collaborator name.

  • round_number (int) – Round number.

  • task_name (str) – Task name.

  • data_size (int) – Data size.

  • named_tensors (protobuf NamedTensor) – Named tensors.

Returns:

None

stop(failed_collaborator=None)[source]#

Stop aggregator execution.

Parameters:

failed_collaborator (str, optional) – Failed collaborator. Defaults to None.

Returns:

None

Return type:

None

valid_collaborator_cn_and_id(cert_common_name, collaborator_common_name)[source]#

Determine if the collaborator certificate and ID are valid for this federation.

Parameters:
  • cert_common_name (str) – Common name for security certificate.

  • collaborator_common_name (str) – Common name for collaborator.

Returns:

True means the collaborator common name matches the name in

the security certificate.

Return type:

bool