openfl.component.collaborator.collaborator.Collaborator#

class openfl.component.collaborator.collaborator.Collaborator(collaborator_name, aggregator_uuid, federation_uuid, client, task_runner, task_config, opt_treatment='RESET', device_assignment_policy='CPU_ONLY', delta_updates=False, compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, write_logs=False, callbacks=None)[source]#

Bases: object

The Collaborator object class.

Parameters:

callbacks (List | None)

collaborator_name#

The common name for the collaborator.

Type:

str

aggregator_uuid#

The unique id for the client.

Type:

str

federation_uuid#

The unique id for the federation.

Type:

str

client#

The client object.

Type:

object

task_runner#

The task runner object.

Type:

object

task_config#

The task configuration.

Type:

dict

opt_treatment#

The optimizer state treatment.

Type:

str

device_assignment_policy#

The device assignment policy.

Type:

str

delta_updates#

If True, only model delta gets sent. If False, whole model gets sent to collaborator.

Type:

bool

compression_pipeline#

The compression pipeline.

Type:

object

db_store_rounds#

The number of rounds to store in the database.

Type:

int

single_col_cert_common_name#

The common name for the single column certificate.

Type:

str

Note

* - Plan setting.

__init__(collaborator_name, aggregator_uuid, federation_uuid, client, task_runner, task_config, opt_treatment='RESET', device_assignment_policy='CPU_ONLY', delta_updates=False, compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, write_logs=False, callbacks=None)[source]#

Initialize the Collaborator object.

Parameters:
  • collaborator_name (str) – The common name for the collaborator.

  • aggregator_uuid (str) – The unique id for the client.

  • federation_uuid (str) – The unique id for the federation.

  • client (object) – The client object.

  • task_runner (object) – The task runner object.

  • task_config (dict) – The task configuration.

  • opt_treatment (str, optional) – The optimizer state treatment. Defaults to ‘RESET’.

  • device_assignment_policy (str, optional) – The device assignment policy. Defaults to ‘CPU_ONLY’.

  • delta_updates (bool, optional) – If True, only model delta gets sent. If False, whole model gets sent to collaborator. Defaults to False.

  • compression_pipeline (object, optional) – The compression pipeline. Defaults to None.

  • db_store_rounds (int, optional) – The number of rounds to store in the database. Defaults to 1.

  • callbacks (list, optional) – List of callbacks. Defaults to None.

Methods

__init__(collaborator_name, aggregator_uuid, ...)

Initialize the Collaborator object.

do_task(task, round_number)

Perform the specified task.

get_aggregated_tensor_from_aggregator(tensor_key)

Return the decompressed tensor associated with the requested tensor key.

get_data_for_tensorkey(tensor_key)

Resolve the tensor corresponding to the requested tensorkey.

get_numpy_dict_for_tensorkeys(tensor_keys)

Get tensor dictionary for specified tensorkey set.

get_tasks()

Get tasks from the aggregator.

named_tensor_to_nparray(named_tensor)

Convert named tensor to a numpy array.

nparray_to_named_tensor(tensor_key, nparray)

Construct the NamedTensor Protobuf.

run()

Run the collaborator.

run_simulation()

Specific function for the simulation.

send_task_results(tensor_dict, round_number, ...)

Send task results to the aggregator.

set_available_devices([cuda])

Set available CUDA devices.

do_task(task, round_number)[source]#

Perform the specified task.

Parameters:
  • task (list_of_str) – List of tasks.

  • round_number (int) – Actual round number.

Returns:

A dictionary of reportable metrics of the current collaborator for the task.

Return type:

dict

get_aggregated_tensor_from_aggregator(tensor_key, require_lossless=False)[source]#

Return the decompressed tensor associated with the requested tensor key.

If the key requests a compressed tensor (in the tag), the tensor will be decompressed before returning. If the key specifies an uncompressed tensor (or just omits a compressed tag), the decompression operation will be skipped.

Parameters:
  • tensor_key (namedtuple) – The requested tensor.

  • require_lossless (bool) – Should compression of the tensor be allowed in flight? For the initial model, it may affect convergence to apply lossy compression. And metrics shouldn’t be compressed either.

Returns:

The decompressed tensor associated with the requested

tensor key.

Return type:

nparray

get_data_for_tensorkey(tensor_key)[source]#

Resolve the tensor corresponding to the requested tensorkey.

Parameters:
  • tensor_key (namedtuple) – Tensorkey that will be resolved locally or

  • tensors. (remotely. May be the product of other)

Returns:

The decompressed tensor associated with the requested

tensor key.

Return type:

nparray

get_numpy_dict_for_tensorkeys(tensor_keys)[source]#

Get tensor dictionary for specified tensorkey set.

Parameters:

tensor_keys (namedtuple) – Tensorkeys that will be resolved locally or remotely. May be the product of other tensors.

get_tasks()[source]#

Get tasks from the aggregator.

Returns:

List of tasks. round_number (int): Actual round number. sleep_time (int): Sleep time. time_to_quit (bool): bool value for quit.

Return type:

tasks (list_of_str)

named_tensor_to_nparray(named_tensor)[source]#

Convert named tensor to a numpy array.

Parameters:

named_tensor (protobuf) – The tensor to convert to nparray.

Returns:

The nparray converted.

Return type:

decompressed_nparray (nparray)

nparray_to_named_tensor(tensor_key, nparray)[source]#

Construct the NamedTensor Protobuf.

Includes logic to create delta, compress tensors with the TensorCodec, etc.

Parameters:
  • tensor_key (namedtuple) – Tensorkey that will be resolved locally or remotely. May be the product of other tensors.

  • nparray – The decompressed tensor associated with the requested tensor key.

Returns:

The tensor constructed from the nparray.

Return type:

named_tensor (protobuf)

run()[source]#

Run the collaborator.

run_simulation()[source]#

Specific function for the simulation.

After the tasks have been performed for a roundquit, and then the collaborator object will be reinitialized after the next round.

send_task_results(tensor_dict, round_number, task_name)[source]#

Send task results to the aggregator.

Parameters:
  • tensor_dict (dict) – Tensor dictionary.

  • round_number (int) – Actual round number.

  • task_name (string) – Task name.

Returns:

A dictionary of reportable metrics of the current collaborator for the task.

Return type:

dict

set_available_devices(cuda=())[source]#

Set available CUDA devices.

Parameters:

cuda (Tuple[str]) – Tuple containing string indices of available CUDA devices, (‘1’, ‘3’).