Class - Collaborator#

class openfl.component.collaborator.collaborator.Collaborator(collaborator_name, aggregator_uuid, federation_uuid, client, task_runner, task_config, opt_treatment='RESET', device_assignment_policy='CPU_ONLY', use_delta_updates=False, compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, write_logs=False, callbacks=[], secure_aggregation=False, interop_mode=False)[source]#

Bases: object

The Collaborator object class.

Parameters:
  • client (AggregatorClientInterface)

  • callbacks (List | None)

collaborator_name#

The common name for the collaborator.

Type:

str

aggregator_uuid#

The unique id for the client.

Type:

str

federation_uuid#

The unique id for the federation.

Type:

str

client#

The client object.

Type:

object

task_runner#

The task runner object.

Type:

object

task_config#

The task configuration.

Type:

dict

opt_treatment#

The optimizer state treatment.

Type:

str

device_assignment_policy#

[Deprecated] The device assignment policy.

Type:

str

use_delta_updates#

If True, only model delta gets sent. If False, whole model gets sent to collaborator.

Type:

bool

compression_pipeline#

The compression pipeline.

Type:

object

db_store_rounds#

The number of rounds to store in the database.

Type:

int

single_col_cert_common_name#

The common name for the single column certificate.

Type:

str

Note

* - Plan setting.

__init__(collaborator_name, aggregator_uuid, federation_uuid, client, task_runner, task_config, opt_treatment='RESET', device_assignment_policy='CPU_ONLY', use_delta_updates=False, compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, write_logs=False, callbacks=[], secure_aggregation=False, interop_mode=False)[source]#

Initialize the Collaborator object.

Parameters:
  • collaborator_name (str) – The common name for the collaborator.

  • aggregator_uuid (str) – The unique id for the client.

  • federation_uuid (str) – The unique id for the federation.

  • client (object) – The client object.

  • task_runner (object) – The task runner object.

  • task_config (dict) – The task configuration.

  • opt_treatment (str, optional) – The optimizer state treatment. Defaults to ‘RESET’.

  • device_assignment_policy (str, optional) – The device assignment policy. Defaults to ‘CPU_ONLY’.

  • use_delta_updates (bool, optional) – If True, only model delta gets sent. If False, whole model gets sent to collaborator. Defaults to False.

  • compression_pipeline (object, optional) – The compression pipeline. Defaults to None.

  • db_store_rounds (int, optional) – The number of rounds to store in the database. Defaults to 1.

  • callbacks (list, optional) – List of callbacks. Defaults to None.

Methods

__init__(collaborator_name, aggregator_uuid, ...)

Initialize the Collaborator object.

do_task(task, round_number)

Perform the specified task.

get_aggregated_tensor_from_aggregator(tensor_key)

Return the decompressed tensor associated with the requested tensor key.

get_data_for_tensorkey(tensor_key)

Resolve the tensor corresponding to the requested tensorkey.

named_tensor_to_nparray(named_tensor)

Convert named tensor to a numpy array.

nparray_to_named_tensor(tensor_key, nparray)

Construct the NamedTensor Protobuf.

ping()

Ping the Aggregator.

prepare_interop_server()

Prepare the interoperability server.

run()

Run the collaborator.

send_task_results(tensor_dict, round_number, ...)

Send task results to the aggregator.

do_task(task, round_number)[source]#

Perform the specified task.

Parameters:
  • task (list_of_str) – List of tasks.

  • round_number (int) – Actual round number.

Returns:

A dictionary of reportable metrics of the current collaborator for the task.

Return type:

dict

get_aggregated_tensor_from_aggregator(tensor_key, require_lossless=False)[source]#

Return the decompressed tensor associated with the requested tensor key.

If the key requests a compressed tensor (in the tag), the tensor will be decompressed before returning. If the key specifies an uncompressed tensor (or just omits a compressed tag), the decompression operation will be skipped.

Parameters:
  • tensor_key (namedtuple) – The requested tensor.

  • require_lossless (bool) – Should compression of the tensor be allowed in flight? For the initial model, it may affect convergence to apply lossy compression. And metrics shouldn’t be compressed either.

Returns:

The decompressed tensor associated with the requested

tensor key.

Return type:

nparray

get_data_for_tensorkey(tensor_key)[source]#

Resolve the tensor corresponding to the requested tensorkey.

Parameters:
  • tensor_key (namedtuple) – Tensorkey that will be resolved locally or

  • tensors. (remotely. May be the product of other)

Returns:

The decompressed tensor associated with the requested

tensor key.

Return type:

nparray

named_tensor_to_nparray(named_tensor)[source]#

Convert named tensor to a numpy array.

Parameters:

named_tensor (protobuf) – The tensor to convert to nparray.

Returns:

The nparray converted.

Return type:

decompressed_nparray (nparray)

nparray_to_named_tensor(tensor_key, nparray)[source]#

Construct the NamedTensor Protobuf.

Includes logic to create delta, compress tensors with the TensorCodec, etc.

Parameters:
  • tensor_key (namedtuple) – Tensorkey that will be resolved locally or remotely. May be the product of other tensors.

  • nparray – The decompressed tensor associated with the requested tensor key.

Returns:

The tensor constructed from the nparray.

Return type:

named_tensor (protobuf)

ping()[source]#

Ping the Aggregator.

prepare_interop_server()[source]#

Prepare the interoperability server.

This function initializes the interoperability server and sets up the callback for receiving messages from the interop server. It also sets the interop server in the task configuration to be used by the Task Runner.

run()[source]#

Run the collaborator.

send_task_results(tensor_dict, round_number, task_name)[source]#

Send task results to the aggregator.

Parameters:
  • tensor_dict (dict) – Tensor dictionary.

  • round_number (int) – Actual round number.

  • task_name (string) – Task name.

Returns:

A dictionary of reportable metrics of the current collaborator for the task.

Return type:

dict