openfl.component.aggregator.aggregator.Aggregator
- class openfl.component.aggregator.aggregator.Aggregator(aggregator_uuid, federation_uuid, authorized_cols, init_state_path, best_state_path, last_state_path, assigner, straggler_handling_policy=None, rounds_to_train=256, single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, write_logs=False, log_metric_callback=None, **kwargs)
Bases:
objectAn Aggregator is the central node in federated learning.
- Class Attributes:
round_number (int) – Current round number.
single_col_cert_common_name (str) – Common name for single collaborator certificate.
straggler_handling_policy – Policy for handling stragglers.
_end_of_round_check_done (list of bool) – Indicates if end of round check is done for each round.
stragglers (list) – List of stragglers.
rounds_to_train (int) – Number of rounds to train.
authorized_cols (list of str) – IDs of enrolled collaborators.
uuid (int) – Aggregator UUID.
federation_uuid (str) – Federation UUID.
assigner – Object assigning tasks to collaborators.
quit_job_sent_to (list) – Collaborators sent a quit job.
tensor_db (TensorDB) – Object for tensor database.
db_store_rounds* (int) – Rounds to store in TensorDB.
logger – Object for logging.
write_logs (bool) – Flag to enable log writing.
log_metric_callback – Callback for logging metrics.
best_model_score (optional) – Score of the best model. Defaults to None.
metric_queue (queue.Queue) – Queue for metrics.
compression_pipeline – Pipeline for compressing data.
tensor_codec (TensorCodec) – Codec for tensor compression.
init_state_path* (str) – Initial weight file location.
best_state_path* (str) – Where to store the best model weight.
last_state_path* (str) – Where to store the latest model weight.
best_tensor_dict (dict) – Dict of the best tensors.
last_tensor_dict (dict) – Dict of the last tensors.
collaborator_tensor_results (dict) – Dict of collaborator tensor results.
collaborator_tasks_results (dict) – Dict of collaborator tasks results.
collaborator_task_weight (dict) – Dict of col task weight.
lock – A threading Lock object used to ensure thread-safe operations.
Note
plan setting
Methods
Assert all quit jobs are sent to collaborators.
RPC called by collaborator.
RPC called by a collaborator to determine which tasks to perform.
RPC called by collaborator.
Stop aggregator execution.
Determine if the collaborator certificate and ID are valid for this federation.
- all_quit_jobs_sent()
Assert all quit jobs are sent to collaborators.
- Returns:
bool – True if all quit jobs are sent, False otherwise.
- get_aggregated_tensor(collaborator_name, tensor_name, round_number, report, tags, require_lossless)
RPC called by collaborator.
Performs local lookup to determine if there is an aggregated tensor available that matches the request.
- Parameters:
collaborator_name (str) – Requested tensor key collaborator name.
tensor_name (str) – Name of the tensor.
round_number (int) – Actual round number.
report (bool) – Whether to report.
tags (tuple[str, ...]) – Tags.
require_lossless (bool) – Whether to require lossless.
- Returns:
named_tensor (protobuf) – NamedTensor, the tensor requested by the collaborator.
- Raises:
ValueError – if Aggregator does not have an aggregated tensor for {tensor_key}.
- get_tasks(collaborator_name)
RPC called by a collaborator to determine which tasks to perform.
- Parameters:
collaborator_name (str) – Requested collaborator name.
- Returns:
tasks (list[str]) – List of tasks to be performed by the requesting collaborator for the current round.
round_number (int) – Actual round number.
sleep_time (int) – Sleep time.
time_to_quit (bool) – Whether it’s time to quit.
- send_local_task_results(collaborator_name, round_number, task_name, data_size, named_tensors)
RPC called by collaborator.
Transmits collaborator’s task results to the aggregator.
- Parameters:
collaborator_name (str) – Collaborator name.
round_number (int) – Round number.
task_name (str) – Task name.
data_size (int) – Data size.
named_tensors (protobuf NamedTensor) – Named tensors.
- Returns:
None
- stop(failed_collaborator: str | None = None) None
Stop aggregator execution.
- Parameters:
failed_collaborator (str, optional) – Failed collaborator. Defaults to None.
- Returns:
None
- valid_collaborator_cn_and_id(cert_common_name, collaborator_common_name)
Determine if the collaborator certificate and ID are valid for this federation.
- Parameters:
cert_common_name (str) – Common name for security certificate.
collaborator_common_name (str) – Common name for collaborator.
- Returns:
bool – True means the collaborator common name matches the name in the security certificate.