openfl.component.aggregator.aggregator.Aggregator

class openfl.component.aggregator.aggregator.Aggregator(aggregator_uuid, federation_uuid, authorized_cols, init_state_path, best_state_path, last_state_path, assigner, straggler_handling_policy=None, rounds_to_train=256, single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, write_logs=False, log_metric_callback=None, **kwargs)

Bases: object

An Aggregator is the central node in federated learning.

Class Attributes:
  • round_number (int) – Current round number.

  • single_col_cert_common_name (str) – Common name for single collaborator certificate.

  • straggler_handling_policy – Policy for handling stragglers.

  • _end_of_round_check_done (list of bool) – Indicates if end of round check is done for each round.

  • stragglers (list) – List of stragglers.

  • rounds_to_train (int) – Number of rounds to train.

  • authorized_cols (list of str) – IDs of enrolled collaborators.

  • uuid (int) – Aggregator UUID.

  • federation_uuid (str) – Federation UUID.

  • assigner – Object assigning tasks to collaborators.

  • quit_job_sent_to (list) – Collaborators sent a quit job.

  • tensor_db (TensorDB) – Object for tensor database.

  • db_store_rounds* (int) – Rounds to store in TensorDB.

  • logger – Object for logging.

  • write_logs (bool) – Flag to enable log writing.

  • log_metric_callback – Callback for logging metrics.

  • best_model_score (optional) – Score of the best model. Defaults to None.

  • metric_queue (queue.Queue) – Queue for metrics.

  • compression_pipeline – Pipeline for compressing data.

  • tensor_codec (TensorCodec) – Codec for tensor compression.

  • init_state_path* (str) – Initial weight file location.

  • best_state_path* (str) – Where to store the best model weight.

  • last_state_path* (str) – Where to store the latest model weight.

  • best_tensor_dict (dict) – Dict of the best tensors.

  • last_tensor_dict (dict) – Dict of the last tensors.

  • collaborator_tensor_results (dict) – Dict of collaborator tensor results.

  • collaborator_tasks_results (dict) – Dict of collaborator tasks results.

  • collaborator_task_weight (dict) – Dict of col task weight.

  • lock – A threading Lock object used to ensure thread-safe operations.

Note

  • plan setting

Methods

all_quit_jobs_sent

Assert all quit jobs are sent to collaborators.

get_aggregated_tensor

RPC called by collaborator.

get_tasks

RPC called by a collaborator to determine which tasks to perform.

send_local_task_results

RPC called by collaborator.

stop

Stop aggregator execution.

valid_collaborator_cn_and_id

Determine if the collaborator certificate and ID are valid for this federation.

all_quit_jobs_sent()

Assert all quit jobs are sent to collaborators.

Returns:

bool – True if all quit jobs are sent, False otherwise.

get_aggregated_tensor(collaborator_name, tensor_name, round_number, report, tags, require_lossless)

RPC called by collaborator.

Performs local lookup to determine if there is an aggregated tensor available that matches the request.

Parameters:
  • collaborator_name (str) – Requested tensor key collaborator name.

  • tensor_name (str) – Name of the tensor.

  • round_number (int) – Actual round number.

  • report (bool) – Whether to report.

  • tags (tuple[str, ...]) – Tags.

  • require_lossless (bool) – Whether to require lossless.

Returns:

named_tensor (protobuf) – NamedTensor, the tensor requested by the collaborator.

Raises:

ValueError – if Aggregator does not have an aggregated tensor for {tensor_key}.

get_tasks(collaborator_name)

RPC called by a collaborator to determine which tasks to perform.

Parameters:

collaborator_name (str) – Requested collaborator name.

Returns:
  • tasks (list[str]) – List of tasks to be performed by the requesting collaborator for the current round.

  • round_number (int) – Actual round number.

  • sleep_time (int) – Sleep time.

  • time_to_quit (bool) – Whether it’s time to quit.

send_local_task_results(collaborator_name, round_number, task_name, data_size, named_tensors)

RPC called by collaborator.

Transmits collaborator’s task results to the aggregator.

Parameters:
  • collaborator_name (str) – Collaborator name.

  • round_number (int) – Round number.

  • task_name (str) – Task name.

  • data_size (int) – Data size.

  • named_tensors (protobuf NamedTensor) – Named tensors.

Returns:

None

stop(failed_collaborator: str | None = None) None

Stop aggregator execution.

Parameters:

failed_collaborator (str, optional) – Failed collaborator. Defaults to None.

Returns:

None

valid_collaborator_cn_and_id(cert_common_name, collaborator_common_name)

Determine if the collaborator certificate and ID are valid for this federation.

Parameters:
  • cert_common_name (str) – Common name for security certificate.

  • collaborator_common_name (str) – Common name for collaborator.

Returns:

bool – True means the collaborator common name matches the name in the security certificate.