Class - Aggregator#
- class openfl.component.aggregator.aggregator.Aggregator(aggregator_uuid, federation_uuid, authorized_cols, init_state_path, best_state_path, last_state_path, assigner, connector=None, use_delta_updates=True, straggler_handling_policy=<class 'openfl.component.aggregator.straggler_handling.WaitForAllPolicy'>, rounds_to_train=256, single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, initial_tensor_dict=None, log_memory_usage=False, write_logs=False, save_native_model=False, callbacks=[], persist_checkpoint=True, persistent_db_path=None, secure_aggregation=False)[source]#
Bases:
objectAn Aggregator is the central node in federated learning.
- Parameters:
straggler_handling_policy (StragglerPolicy)
callbacks (List | None)
- round_number#
Current round number.
- Type:
int
- single_col_cert_common_name#
Common name for single collaborator certificate.
- Type:
str
- straggler_handling_policy#
Policy for handling stragglers.
- _end_of_round_check_done#
Indicates if end of round check is done for each round.
- Type:
list of bool
- stragglers#
List of stragglers.
- Type:
list
- rounds_to_train#
Number of rounds to train.
- Type:
int
- authorized_cols#
IDs of enrolled collaborators.
- Type:
list of str
- uuid#
Aggregator UUID.
- Type:
int
- federation_uuid#
Federation UUID.
- Type:
str
- assigner#
Object assigning tasks to collaborators.
- connector#
Object responsible for managing interopability with other frameworks. Defaults to None
- Type:
optional
- quit_job_sent_to#
Collaborators sent a quit job.
- Type:
list
- db_store_rounds*
Rounds to store in TensorDB.
- Type:
int
- logger#
Object for logging.
- write_logs#
Flag to enable metric writer callback.
- Type:
bool
- save_native_model#
Flag to save model in native format.
- Type:
bool
- best_model_score#
Score of the best model. Defaults to None.
- Type:
optional
- metric_queue#
Queue for metrics.
- Type:
queue.Queue
- compression_pipeline#
Pipeline for compressing data.
- tensor_codec#
Codec for tensor compression.
- Type:
- init_state_path*
Initial weight file location.
- Type:
str
- best_state_path*
Where to store the best model weight.
- Type:
str
- last_state_path*
Where to store the latest model weight.
- Type:
str
- best_tensor_dict#
Dict of the best tensors.
- Type:
dict
- last_tensor_dict#
Dict of the last tensors.
- Type:
dict
- collaborator_tensor_results#
Dict of collaborator tensor results.
- Type:
dict
- collaborator_tasks_results#
Dict of collaborator tasks results.
- Type:
dict
- collaborator_task_weight#
Dict of col task weight.
- Type:
dict
- lock#
A threading Lock object used to ensure thread-safe operations.
Note
plan setting
- __init__(aggregator_uuid, federation_uuid, authorized_cols, init_state_path, best_state_path, last_state_path, assigner, connector=None, use_delta_updates=True, straggler_handling_policy=<class 'openfl.component.aggregator.straggler_handling.WaitForAllPolicy'>, rounds_to_train=256, single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, initial_tensor_dict=None, log_memory_usage=False, write_logs=False, save_native_model=False, callbacks=[], persist_checkpoint=True, persistent_db_path=None, secure_aggregation=False)[source]#
Initializes the Aggregator.
- Parameters:
aggregator_uuid (int) – Aggregation ID.
federation_uuid (str) – Federation ID.
authorized_cols (list of str) – The list of IDs of enrolled collaborators.
init_state_path (str) – The location of the initial weight file.
best_state_path (str) – The file location to store the weight of the best model.
last_state_path (str) – The file location to store the latest weight.
assigner – Assigner object.
straggler_handling_policy (optional) – Straggler handling policy.
rounds_to_train (int, optional) – Number of rounds to train. Defaults to 256.
single_col_cert_common_name (str, optional) – Common name for single collaborator certificate. Defaults to None.
compression_pipeline (optional) – Compression pipeline. Defaults to NoCompressionPipeline.
db_store_rounds (int, optional) – Rounds to store in TensorDB. Defaults to 1.
initial_tensor_dict (dict, optional) – Initial tensor dictionary.
callbacks (List | None) – List of callbacks to be used during the experiment.
Methods
__init__(aggregator_uuid, federation_uuid, ...)Initializes the Aggregator.
Assert all quit jobs are sent to collaborators.
get_aggregated_tensor(tensor_name, ...)RPC called by collaborator.
get_tasks(collaborator_name)RPC called by a collaborator to determine which tasks to perform.
process_task_results(collaborator_name, ...)Save analytics results to a JSON file.
save_persistent_db(collaborator_name, ...)send_local_task_results(collaborator_name, ...)RPC called by collaborator.
stop([failed_collaborator])Stop aggregator execution.
Determine if the collaborator certificate and ID are valid for this federation.
- all_quit_jobs_sent()[source]#
Assert all quit jobs are sent to collaborators.
- Returns:
True if all quit jobs are sent, False otherwise.
- Return type:
bool
- get_aggregated_tensor(tensor_name, round_number, report, tags, require_lossless, requested_by)[source]#
RPC called by collaborator.
Performs local lookup to determine if there is an aggregated tensor available that matches the request.
- Parameters:
tensor_name (str) – Name of the tensor.
round_number (int) – Actual round number.
report (bool) – Whether to report.
tags (tuple[str, ...]) – Tags.
require_lossless (bool) – Whether to require lossless.
requested_by (str) – Request originator name.
- Returns:
NamedTensor, the tensor requested by the collaborator.
- Return type:
named_tensor (protobuf)
- Raises:
ValueError – if Aggregator does not have an aggregated tensor for {tensor_key}.
- get_tasks(collaborator_name)[source]#
RPC called by a collaborator to determine which tasks to perform.
- Parameters:
collaborator_name (str) – Requested collaborator name.
- Returns:
- List of tasks to be performed by the requesting
collaborator for the current round.
round_number (int): Actual round number. sleep_time (int): Sleep time. time_to_quit (bool): Whether it’s time to quit.
- Return type:
tasks (list[str])
- save_analytics_result()[source]#
Save analytics results to a JSON file. This method retrieves tensors tagged with “analytics” for the current round from the tensor database and saves them as a JSON file at the path specified by self.last_state_path. The tensor values are converted to lists if they are NumPy arrays. The saved JSON file contains a dictionary where the keys are tensor names and the values are the corresponding tensor data. Logs the saved analytics result for reference. :returns: None
- send_local_task_results(collaborator_name, round_number, task_name, data_size, named_tensors)[source]#
RPC called by collaborator.
Transmits collaborator’s task results to the aggregator.
- Parameters:
collaborator_name (str) – Collaborator name.
round_number (int) – Round number.
task_name (str) – Task name.
data_size (int) – Data size.
named_tensors (protobuf NamedTensor) – Named tensors.
- Returns:
None
- stop(failed_collaborator=None)[source]#
Stop aggregator execution.
- Parameters:
failed_collaborator (str, optional) – Failed collaborator. Defaults to None.
- Returns:
None
- Return type:
None
- valid_collaborator_cn_and_id(cert_common_name, collaborator_common_name)[source]#
Determine if the collaborator certificate and ID are valid for this federation.
- Parameters:
cert_common_name (str) – Common name for security certificate.
collaborator_common_name (str) – Common name for collaborator.
- Returns:
- True means the collaborator common name matches the name in
the security certificate.
- Return type:
bool