Class - Collaborator#
- class openfl.component.collaborator.collaborator.Collaborator(collaborator_name, aggregator_uuid, federation_uuid, client, task_runner, task_config, opt_treatment='RESET', device_assignment_policy='CPU_ONLY', use_delta_updates=False, compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, write_logs=False, callbacks=[], secure_aggregation=False)[source]#
Bases:
objectThe Collaborator object class.
- Parameters:
client (AggregatorGRPCClient)
callbacks (List | None)
- collaborator_name#
The common name for the collaborator.
- Type:
str
- aggregator_uuid#
The unique id for the client.
- Type:
str
- federation_uuid#
The unique id for the federation.
- Type:
str
- client#
The client object.
- Type:
object
- task_runner#
The task runner object.
- Type:
object
- task_config#
The task configuration.
- Type:
dict
- opt_treatment#
The optimizer state treatment.
- Type:
str
- device_assignment_policy#
[Deprecated] The device assignment policy.
- Type:
str
- use_delta_updates#
If True, only model delta gets sent. If False, whole model gets sent to collaborator.
- Type:
bool
- compression_pipeline#
The compression pipeline.
- Type:
object
- db_store_rounds#
The number of rounds to store in the database.
- Type:
int
- single_col_cert_common_name#
The common name for the single column certificate.
- Type:
str
Note
* - Plan setting.
- __init__(collaborator_name, aggregator_uuid, federation_uuid, client, task_runner, task_config, opt_treatment='RESET', device_assignment_policy='CPU_ONLY', use_delta_updates=False, compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, write_logs=False, callbacks=[], secure_aggregation=False)[source]#
Initialize the Collaborator object.
- Parameters:
collaborator_name (str) – The common name for the collaborator.
aggregator_uuid (str) – The unique id for the client.
federation_uuid (str) – The unique id for the federation.
client (object) – The client object.
task_runner (object) – The task runner object.
task_config (dict) – The task configuration.
opt_treatment (str, optional) – The optimizer state treatment. Defaults to ‘RESET’.
device_assignment_policy (str, optional) – The device assignment policy. Defaults to ‘CPU_ONLY’.
use_delta_updates (bool, optional) – If True, only model delta gets sent. If False, whole model gets sent to collaborator. Defaults to False.
compression_pipeline (object, optional) – The compression pipeline. Defaults to None.
db_store_rounds (int, optional) – The number of rounds to store in the database. Defaults to 1.
callbacks (list, optional) – List of callbacks. Defaults to None.
Methods
__init__(collaborator_name, aggregator_uuid, ...)Initialize the Collaborator object.
do_task(task, round_number)Perform the specified task.
get_aggregated_tensor_from_aggregator(tensor_key)Return the decompressed tensor associated with the requested tensor key.
get_data_for_tensorkey(tensor_key)Resolve the tensor corresponding to the requested tensorkey.
named_tensor_to_nparray(named_tensor)Convert named tensor to a numpy array.
nparray_to_named_tensor(tensor_key, nparray)Construct the NamedTensor Protobuf.
run()Run the collaborator.
send_task_results(tensor_dict, round_number, ...)Send task results to the aggregator.
- do_task(task, round_number)[source]#
Perform the specified task.
- Parameters:
task (list_of_str) – List of tasks.
round_number (int) – Actual round number.
- Returns:
A dictionary of reportable metrics of the current collaborator for the task.
- Return type:
dict
- get_aggregated_tensor_from_aggregator(tensor_key, require_lossless=False)[source]#
Return the decompressed tensor associated with the requested tensor key.
If the key requests a compressed tensor (in the tag), the tensor will be decompressed before returning. If the key specifies an uncompressed tensor (or just omits a compressed tag), the decompression operation will be skipped.
- Parameters:
tensor_key (namedtuple) – The requested tensor.
require_lossless (bool) – Should compression of the tensor be allowed in flight? For the initial model, it may affect convergence to apply lossy compression. And metrics shouldn’t be compressed either.
- Returns:
- The decompressed tensor associated with the requested
tensor key.
- Return type:
nparray
- get_data_for_tensorkey(tensor_key)[source]#
Resolve the tensor corresponding to the requested tensorkey.
- Parameters:
tensor_key (namedtuple) – Tensorkey that will be resolved locally or
tensors. (remotely. May be the product of other)
- Returns:
- The decompressed tensor associated with the requested
tensor key.
- Return type:
nparray
- named_tensor_to_nparray(named_tensor)[source]#
Convert named tensor to a numpy array.
- Parameters:
named_tensor (protobuf) – The tensor to convert to nparray.
- Returns:
The nparray converted.
- Return type:
decompressed_nparray (nparray)
- nparray_to_named_tensor(tensor_key, nparray)[source]#
Construct the NamedTensor Protobuf.
Includes logic to create delta, compress tensors with the TensorCodec, etc.
- Parameters:
tensor_key (namedtuple) – Tensorkey that will be resolved locally or remotely. May be the product of other tensors.
nparray – The decompressed tensor associated with the requested tensor key.
- Returns:
The tensor constructed from the nparray.
- Return type:
named_tensor (protobuf)
- send_task_results(tensor_dict, round_number, task_name)[source]#
Send task results to the aggregator.
- Parameters:
tensor_dict (dict) – Tensor dictionary.
round_number (int) – Actual round number.
task_name (string) – Task name.
- Returns:
A dictionary of reportable metrics of the current collaborator for the task.
- Return type:
dict