Welcome to the Open Federated Learning (OpenFL) Documentation!

Open Federated Learning (OpenFL) is a Python* 3 library for federated learning that enables organizations to collaboratively train a model without sharing sensitive information.

OpenFL is Deep Learning framework-agnostic. Training of statistical models may be done with any deep learning framework, such as TensorFlow* or PyTorch*, via a plugin mechanism.

OpenFL is a community supported project, originally developed by Intel Labs and the Intel Internet of Things Group. The team would like to encourage any contributions, notes, or requests to improve the documentation.

Manual

What is Open Federated Learning (OpenFL):

Establish a federation with OpenFL:

Customize the federation:

Get familiar with the APIs:

Explore new and experimental features:

Overview

Note

This project is continually being developed and improved. Expect changes to this manual, the project code, and the project design.

Open Federated Learning (OpenFL) is a Python* 3 project developed by Intel Internet of Things Group (IOTG) and Intel Labs.

_images/ct_vs_fl.png

Federated Learning

What is Federated Learning?

Federated learning is a distributed machine learning approach that enables collaboration on machine learning projects without sharing sensitive data, such as patient records, financial data, or classified secrets (McMahan, 2016; Sheller, Reina, Edwards, Martin, & Bakas, 2019; Yang, Liu, Chen, & Tong, 2019; Sheller et al., 2020). In federated learning, the model moves to meet the data rather than the data moving to meet the model. The movement of data across the federation are the model parameters and their updates.

_images/diagram_fl_new.png

Federated Learning

Definitions and Conventions

Federated learning brings in a few more components to the traditional data science training pipeline:

Collaborator

A collaborator is a client in the federation that has access to the local training, validation, and test datasets. By design, the collaborator is the only component of the federation with access to the local data. The local dataset should never leave the collaborator.

Aggregator

A parameter server sends a global model to the collaborators. Parameter servers are often combined with aggregators on the same compute node. An aggregator receives locally tuned models from collaborators and combines the locally tuned models into a new global model. Typically, federated averaging, (a weighted average) is the algorithm used to combine the locally tuned models.

Round

A federation round is defined as the interval (typically defined in terms of training steps) where an aggregation is performed. Collaborators may perform local training on the model for multiple epochs (or even partial epochs) within a single training round.

Installation

Depending on how you want to set up OpenFL, choose one of the following installation procedure.

Install the Package

Follow this procedure to prepare the environment and install the OpenFL package. Perform this procedure on every node in the federation.

  1. Install a Python 3.8 (>=3.6, <3.9) virtual environment using venv.

See the Venv installation guide for details.

  1. Create a new Virtualenv environment for the project.

    python3 -m venv venv
    
  2. Activate the virtual environment.

    source venv/bin/activate
    
  3. Install the OpenFL package.

    1. Installation from PyPI:

      python -m pip install openfl
      
    2. Installation from source:

      1. Clone the OpenFL repository:

        git clone https://github.com/intel/openfl.git
        
      2. Install build tools, before installing OpenFL:

        python -m pip install -U pip setuptools wheel
        cd openfl/
        python -m pip install .
        
  4. Run the fx command in the virtual environment to confirm OpenFL is installed.

    _images/fx_help.png

Output of the fx Command

OpenFL with Docker*

Follow this procedure to download or build a Docker* image of OpenFL, which you can use to run your federation in an isolated environment.

Note

The Docker* version of OpenFL is to provide an isolated environment complete with the prerequisites to run a federation. When the execution is over, the container can be destroyed and the results of the computation will be available on a directory on the local host.

  1. Install Docker on all nodes in the federation.

See the Docker installation guide for details.

  1. Check that Docker is running properly with the Hello World command:

    $ docker run hello-world
    Hello from Docker!
    This message shows that your installation appears to be working correctly.
    ...
    ...
    ...
    
  2. Build an image from the latest official OpenFL release:

    docker pull intel/openfl
    

    If you prefer to build an image from a specific commit or branch, perform the following commands:

    git clone https://github.com/intel/openfl.git
    cd openfl
    docker build -f openfl-docker/Dockerfile.base .
    

Run the Federation

OpenFL currently offers two ways to set up and run experiments with a federation: the Director-based workflow and Aggregator-based workflow. The Director-based workflow introduces a new and more convenient way to set up a federation and brings “long-lived” components in a federation (“Director” and “Envoy”), while the Aggregator-based workflow is advised for scenarios where the workload needs to be verified prior to execution.

Director-Based Workflow

Setup long-lived components to run many experiments in series. Recommended for FL research when many changes to model, dataloader, or hyperparameters are expected

Aggregator-Based Workflow

Define an experiment and distribute it manually. All participants can verify model code and [FL plan](https://openfl.readthedocs.io/en/latest/running_the_federation.html#federated-learning-plan-fl-plan-settings) prior to execution. The federation is terminated when the experiment is finished

Director-Based Workflow

A director-based workflow uses long-lived components in a federation. These components continue to be available to distribute more experiments in the federation.

  • The Director is the central node of the federation. This component starts an Aggregator for each experiment, sends data to connected collaborator nodes, and provides updates on the status.

  • The Envoy runs on collaborator nodes connected to the Director. When the Director starts an experiment, the Envoy starts the Collaborator to train the global model.

The director-based workflow comprises the following roles and their tasks:

Follow the procedure in the director-based workflow to become familiar with the setup required and APIs provided for each role in the federation: Experiment manager (Data scientist), Director manager, and Collaborator manager.

  • Experiment manager (or Data scientist) is a person or group of people using OpenFL.

  • Director Manager is ML model creator’s representative controlling Director.

  • Collaborator manager is Data owner’s representative controlling Envoy.

Note

The Open Federated Learning (OpenFL) interactive Python API enables the Experiment manager (data scientists) to define and start a federated learning experiment from a single entry point: a Jupyter* notebook or a Python* script.

See Interactive Python API (Beta) for details.

An overview of this workflow is shown below.

_images/director_workflow.svg

Overview of the Director-Based Workflow

Director Manager: Set Up the Director

The Director manager sets up the Director, which is the central node of the federation.

OPTIONAL STEP: Plan Agreement

In order to carry out a secure federation, the Director must approve the FL Plan before starting the experiment. This check could be enforced with the use of the setting review_experiment: True in director config. Refer to director_config_review_exp.yaml file under PyTorch_Histology interactive API example. After the Director approves the experiment, it starts the aggregator and sends the experiment archive to all the participanting Envoys for review. On the other hand, if the Director rejects the experiment, the experiment is aborted right away, no aggregator is started and the Envoys don’t receive the experiment archive at all.

OPTIONAL STEP: Create PKI Certificates Using Step-CA

The use of mutual Transport Layer Security (mTLS) is recommended for deployments in untrusted environments to establish participant identity and to encrypt communication. You may either import certificates provided by your organization or generate certificates with the semi-automatic PKI provided by OpenFL.

STEP 1: Install Open Federated Learning (OpenFL)

Install OpenFL in a virtual Python* environment. See Install the Package for details.

STEP 2: Start the Director

Start the Director on a node with at least two open ports. See Long-Lived Components to learn more about the Director entity.

  1. Create a Director workspace with a default config file.

    fx director create-workspace -p path/to/director_workspace_dir
    

This workspace will contain received experiments and supplementary files (Director config file and certificates).

  1. Modify the Director config file according to your federation setup.

The default config file contains the Director node FQDN, an open port, path of certificates, and sample_shape and target_shape fields with string representation of the unified data interface in the federation.

  1. Start the Director.

If mTLS protection is not set up, run this command.

fx director start --disable-tls -c director_config.yaml

If you have a federation with PKI certificates, run this command.

fx director start -c director_config.yaml \
     -rc cert/root_ca.crt \
     -pk cert/priv.key \
     -oc cert/open.crt
Collaborator Manager: Set Up the Envoy

The Collaborator manager sets up the Envoys, which are long-lived components on collaborator nodes. When started, Envoys will try to connect to the Director. Envoys receive an experiment archive and provide access to local data.

OPTIONAL STEP: Plan Agreement

In order to carry out a secure federation, each of the Envoys must approve the experiment before it is started, after the Director’s approval. This check could be enforced with the use of the parameter review_experiment: True in envoy config. Refer to envoy_config_review_exp.yaml file under PyTorch_Histology interactive API example. If any of the Envoys rejects the experiment, a set_experiment_failed request is sent to the Director to stop the aggregator.

OPTIONAL STEP: Sign PKI Certificates (Optional)

The use of mTLS is recommended for deployments in untrusted environments to establish participant identity and to encrypt communication. You may either import certificates provided by your organization or use the semi-automatic PKI certificate provided by OpenFL.

STEP 1: Install OpenFL

Install OpenFL in a Python*virtual environment. See Install the Package for details.

STEP 2: Start the Envoy
  1. Create an Envoy workspace with a default config file and shard descriptor Python* script.

    fx envoy create-workspace -p path/to/envoy_workspace_dir
    
  2. Modify the Envoy config file and local shard descriptor template.

    • Provide the settings field with the arbitrary settings required to initialize the shard descriptor.

    • Complete the shard descriptor template field with the address of the local shard descriptor class.

    Note

    The shard descriptor is an object to provide a unified data interface for FL experiments. The shard descriptor implements get_dataset() method as well as several additional methods to access sample shape, target shape, and shard description that may be used to identify participants during experiment definition and execution.

    get_dataset() method accepts the dataset_type (for instance train, validation, query, gallery) and returns an iterable object with samples and targets.

    User’s implementation of ShardDescriptor should be inherented from openfl.interface.interactive_api.shard_descriptor.ShardDescriptor. It should implement get_dataset, sample_shape and target_shape methods to describe the way data samples and labels will be loaded from disk during training.

  3. Start the Envoy.

If mTLS protection is not set up, run this command.

ENVOY_NAME=envoy_example_name

fx envoy start \
    -n "$ENVOY_NAME" \
    --disable-tls \
    --envoy-config-path envoy_config.yaml \
    -dh director_fqdn \
    -dp port

If you have a federation with PKI certificates, run this command.

ENVOY_NAME=envoy_example_name

fx envoy start \
    -n "$ENVOY_NAME" \
    --envoy-config-path envoy_config.yaml \
    -dh director_fqdn \
    -dp port \
    -rc cert/root_ca.crt \
    -pk cert/"$ENVOY_NAME".key \
    -oc cert/"$ENVOY_NAME".crt
Experiment Manager: Describe an Experiment

The process of defining an experiment is decoupled from the process of establishing a federation. The Experiment manager (or data scientist) is able to prepare an experiment in a Python environment. Then the Experiment manager registers experiments into the federation using Interactive Python API (Beta) that is allow to communicate with the Director using a gRPC client.

Interactive Python API (Beta)

The Open Federated Learning (OpenFL) interactive Python API enables the Experiment manager (data scientists) to define and start a federated learning experiment from a single entry point: a Jupyter* notebook or a Python script.

Prerequisites

The Experiment manager requires the following:

Python Intepreter

Create a virtual Python environment with packages required for conducting the experiment. The Python environment is replicated on collaborator nodes.

A Local Experiment Workspace

Initialize a workspace by creating an empty directory and placing inside the workspace a Jupyter* notebook or a Python script.

Items in the workspace may include:

  • source code of objects imported into the notebook from local modules

  • local test data stored in a data directory

  • certificates stored in a cert directory

Note

This workspace will be archived and transferred to collaborator nodes. Ensure only relevant source code or resources are stored in the workspace.

data and cert directories will not be included in the archive.

Define a Federated Learning Experiment

The definition process of a federated learning experiment uses the interactive Python API to set up several interface entities and experiment parameters.

The following are the interactive Python API to define an experiment:

Note

Each federation is bound to some Machine Learning problem in a sense that all collaborators dataset shards should allow to solve the same data science problem. For example object detection and semantic segmentation problems should be solved in different federations.

Federation API

The Federation entity is designed to be a bridge between a notebook and Director.

  1. Import the Federation class from openfl package

    from openfl.interface.interactive_api.federation import Federation
    
  2. Initialize the Federation object with the Director node network address and encryption settings.

    federation = Federation(
        client_id: str, director_node_fqdn: str, director_port: str
        tls: bool, cert_chain: str, api_cert: str, api_private_key: str)
    

    Note

    You may disable mTLS in trusted environments or enable mTLS by providing paths to the certificate chain of the API authority, aggregator certificate, and a private key.

Note

Methods available in the Federation API:

  • get_dummy_shard_descriptor: creates a dummy shard descriptor for debugging the experiment pipeline

  • get_shard_registry: returns information about the Envoys connected to the Director and their shard descriptors

Experiment API

The Experiment entity registers training-related objects, federated learning (FL) tasks, and settings.

  1. Import the FLExperiment class from openfl package

    from openfl.interface.interactive_api.experiment import FLExperiment
    
  2. Initialize the experiment with the following parameters: a federation object and a unique experiment name.

    fl_experiment = FLExperiment(federation: Federation, experiment_name: str)
    
  3. Import these supplementary interface classes: TaskInterface, DataInterface, and ModelInterface.

    from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface
    

Register the Model and Optimizer ( ModelInterface )

Instantiate and initialize a model and optimizer in your preferred deep learning framework.

from openfl.interface.interactive_api.experiment import ModelInterface
MI = ModelInterface(model, optimizer, framework_plugin: str)

The initialized model and optimizer objects should be passed to the ModelInterface along with the path to correct Framework Adapter plugin inside the OpenFL package or from local workspace.

Note

The OpenFL interactive API supports TensorFlow and PyTorch frameworks via existing plugins. User can add support for other deep learning frameworks via the plugin interface and point to your implementation of a framework_plugin in ModelInterface.

Register FL Tasks ( TaskInterface )

An FL task accepts the following objects:

  • model - will be rebuilt with relevant weights for every task by TaskRunner

  • data_loader - data loader that will provide local data

  • device - a device to be used for execution on collaborator machines

  • optimizer (optional) - model optimizer; only for training tasks

Register an FL task and accompanying information.

TI = TaskInterface()

task_settings = {
    'batch_size': 32,
    'some_arg': 228,
}
@TI.add_kwargs(**task_settings)
@TI.register_fl_task(model='my_model', data_loader='train_loader',
        device='device', optimizer='my_Adam_opt')
def foo(my_model, train_loader, my_Adam_opt, device, batch_size, some_arg=356):
    # training or validation logic
...

FL tasks return a dictionary object with metrics: {metric name: metric value for this task}.

Note

The OpenFL interactive API currently allows registering only standalone functions defined in the main module or imported from other modules inside the workspace.

The TaskInterface class must be instantiated before you can use its methods to register FL tasks.

  • @TI.register_fl_task() needs tasks argument names for model, data_loader, device , and optimizer (optional) that constitute a task contract. This method adds the callable and the task contract to the task registry.

  • @TI.add_kwargs() should be used to set up arguments that are not included in the contract.

Register Federated Data Loader ( DataInterface )

A shard descriptor defines how to read and format the local data. Therefore, the data loader contains the batching and augmenting data logic, which are common for all collaborators.

Subclass DataInterface and implement the following methods.

class CustomDataLoader(DataInterface):
    def __init__(self, **kwargs):
        # Initialize superclass with kwargs: this array will be passed
        # to get_data_loader methods
        super().__init__(**kwargs)
        # Set up augmentation, save required parameters,
        # use it as you regular dataset class
        validation_fraction = kwargs.get('validation_fraction', 0.5)
        ...

    @property
    def shard_descriptor(self):
        return self._shard_descriptor

    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        self._shard_descriptor = shard_descriptor
        # You can implement data splitting logic here
        # Or update your data set according to local Shard Descriptor atributes if required

    def get_train_loader(self, **kwargs):
        # these are the same kwargs you provided to __init__,
        # But passed on a collaborator machine
        bs = kwargs.get('train_batch_size', 32)
        return foo_loader()

    # so on, see the full list of methods below

The following are shard descriptor setter and getter methods:

  • shard_descriptor(self, shard_descriptor) is called during the Collaborator initialization procedure with the local shard descriptor. Include in this method any logic that is triggered with the shard descriptor replacement.

  • get_train_loader(self, **kwargs) is called before the execution of training tasks. This method returns the outcome of the training task according to the data_loader contract argument. The kwargs dict returns the same information that was provided during the DataInterface initialization.

  • get_valid_loader(self, **kwargs) is called before the execution of validation tasks. This method returns the outcome of the validation task according to the data_loader contract argument. The kwargs dict returns the same information that was provided during the DataInterface initialization.

  • get_train_data_size(self) returns the number of samples in the local dataset for training. Use the information provided by the shard descriptor to determine how to split your training and validation tasks.

  • get_valid_data_size(self) returns the number of samples in the local dataset for validation.

Note

  • The User Dataset class should be instantiated to pass further to the Experiment object.

  • Dummy shard descriptor (or a custom local one) may be set up to test the augmentation or batching pipeline.

  • Keyword arguments used during initialization on the frontend node may be used during dataloaders construction on collaborator machines.

Start an FL Experiment

Use the Experiment API to prepare a workspace archive to transfer to the Director.

FLExperiment.start()

Note

Instances of interface classes (TaskInterface, DataInterface, ModelInterface) must be passed to FLExperiment.start() method along with other parameters.

This method:

  • Compiles all provided settings to a Plan object. The Plan is the central place where all actors in federation look up their parameters.

  • Saves plan.yaml to the plan folder inside the workspace.

  • Serializes interface objects on the disk.

  • Prepares requirements.txt for remote Python environment setup.

  • Compresses the whole workspace to an archive.

  • Sends the experiment archive to the Director so it may distribute the archive across the federation and start the Aggregator.

FLExperiment start() Method Parameters

The following are parameters of the start() method in FLExperiment:

model_provider

This parameter is defined earlier by the ModelInterface object.

task_keeper

This parameter is defined earlier by the TaskInterface object.

data_loader

This parameter is defined earlier by the DataInterface object.

task_assigner

This parameter is optional. You can pass a Custom task assigner function.

rounds_to_train

This parameter defines the number of aggregation rounds needed to be conducted before the experiment is considered finished.

delta_updates

This parameter sets up the aggregation to use calculated gradients instead of model checkpoints.

opt_treatment

This parameter defines the optimizer state treatment in the federation. The following are available values:

  • RESET: the optimizer state is initialized each round from noise

  • CONTINUE_LOCAL: the optimizer state will be reused locally by every collaborator

  • CONTINUE_GLOBAL: the optimizer’s state will be aggregated

device_assignment_policy

The following are available values:

  • CPU_ONLY: the device parameter (which is a part of a task contract) that is passed to an FL task each round will be cpu

  • CUDA_PREFFERED: the device parameter will be cuda:{index} if CUDA devices are enabled in the Envoy config and cpu otherwise.

Observe the Experiment Execution

If the experiment was accepted by the Director, you can oversee its execution with the FLexperiment.stream_metrics() method. This method prints metrics from the FL tasks (and saves TensorBoard logs).

Get Experiment Status

You can get the current experiment status with the FLexperiment.get_experiment_status() method. The status could be pending, in progress, finished, rejected or failed.

Complete the Experiment

When the experiment has completed:

  • retrieve trained models in the native format using FLexperiment.get_best_model() and FLexperiment.get_last_model().

  • erase experiment artifacts from the Director with FLexperiment.remove_experiment_data().

You may use the same federation object to report another experiment or even schedule several experiments that will be executed in series.

Custom task assigner function

OpenFL has an entity named Task Assigner, that responsible for aggregator task assigning to collaborators. There are three default tasks that are used: train, locally_tuned_model_validate, aggregated_model_validate. When you register a train function and pass optimizer it generates a train task:

task_keeper = TaskInterface()


@task_keeper.register_fl_task(model='net_model', data_loader='train_loader',
                              device='device', optimizer='optimizer')
def train(net_model, train_loader, optimizer, device, loss_fn=cross_entropy, some_parameter=None):
    torch.manual_seed(0)
    ...

When you register a validate function, it generates two tasks: locally_tuned_model_validate and aggregated_model_validate. locally_tuned_model_validate is applied by collaborator to locally trained model, aggregated_model_validate - to a globally aggregated model. If there not a train task only aggregated_model_validate are generated.

Since 1.3 version it is possible to create a custom task assigner function to implement your own task assigning logic. You can get registered task from task_keeper calling method get_registered_tasks:

tasks = task_keeper.get_registered_tasks()

And then implement your own assigner function:

def random_assigner(collaborators, round_number, **kwargs):
    """Assigning task groups randomly while ensuring target distribution"""
    import random
    random.shuffle(collaborators)
    collaborator_task_map = {}
    for idx, col in enumerate(collaborators):
        # select only 70% collaborators for training and validation, 30% for validation
        if (idx+1)/len(collaborators) <= 0.7:
            collaborator_task_map[col] = tasks.values()  # all three tasks
        else:
            collaborator_task_map[col] = [tasks['aggregated_model_validate']]
    return collaborator_task_map
And then pass that function to fl_experiment start method:
fl_experiment.start(
    model_provider=model_interface,
    task_keeper=task_keeper,
    data_loader=fed_dataset,
    task_assigner=random_assigner,
    rounds_to_train=50,
    opt_treatment='CONTINUE_GLOBAL',
    device_assignment_policy='CUDA_PREFERRED'
)

It will be passed to assigner and tasks will be assigned to collaborators by using this function.

Another example. If you want only exclude some collaborators from experiment, you can define next assigner function:

def filter_assigner(collaborators, round_number, **kwargs):
    collaborator_task_map = {}
    exclude_collaborators = ['env_two', 'env_three']
    for collaborator_name in collaborators:
        if collaborator_name in exclude_collaborators:
            continue
        collaborator_task_map[collaborator_name] = [
            tasks['train'],
            tasks['locally_tuned_model_validate'],
            tasks['aggregated_model_validate']
        ]
    return collaborator_task_map

Also you can use static shard information to exclude any collaborators without cuda devices from training:

shard_registry = federation.get_shard_registry()
def filter_by_shard_registry_assigner(collaborators, round_number, **kwargs):
    collaborator_task_map = {}
    for collaborator in collaborators:
        col_status = shard_registry.get(collaborator)
        if not col_status or not col_status['is_online']:
            continue
        node_info = col_status['shard_info'].node_info
        # Assign train task if collaborator has GPU with total memory more that 8 GB
        if len(node_info.cuda_devices) > 0 and node_info.cuda_devices[0].memory_total > 8 * 1024**3:
            collaborator_task_map[collaborator] = [
                tasks['train'],
                tasks['locally_tuned_model_validate'],
                tasks['aggregated_model_validate'],
            ]
        else:
            collaborator_task_map[collaborator] = [
                tasks['aggregated_model_validate'],
            ]
    return collaborator_task_map

Assigner with additional validation round:

rounds_to_train = 3
total_rounds = rounds_to_train + 1 # use fl_experiment.start(..., rounds_to_train=total_rounds,...)

def assigner_with_last_round_validation(collaborators, round_number, **kwargs):
    collaborator_task_map = {}
    for collaborator in collaborators:
        if round_number == total_rounds - 1:
            collaborator_task_map[collaborator] = [
                tasks['aggregated_model_validate'],
            ]
        else:
            collaborator_task_map[collaborator] = [
                tasks['train'],
                tasks['locally_tuned_model_validate'],
                tasks['aggregated_model_validate']
            ]
    return collaborator_task_map

Aggregator-Based Workflow

An overview of this workflow is shown below.

_images/openfl_flow.png

Overview of the Aggregator-Based Workflow

There are two ways to run federation without Director:

This workflow uses short-lived components in a federation, which is terminated when the experiment is finished. The components are as follows:

  • The Collaborator uses a local dataset to train a global model and the Aggregator receives model updates from Collaborators and aggregates them to create the new global model.

  • The Aggregator is framework-agnostic, while the Collaborator can use any deep learning frameworks, such as TensorFlow* or PyTorch*.

For this workflow, you modify the federation workspace to your requirements by editing the Federated Learning plan (FL plan) along with the Python* code that defines the model and the data loader. The FL plan is a YAML file that defines the collaborators, aggregator, connections, models, data, and any other parameters that describe the training.

Federated Learning Plan (FL Plan) Settings

Note

Use the Federated Learning plan (FL plan) to modify the federation workspace to your requirements in an aggregator-based workflow.

The FL plan is described by the plan.yaml file located in the plan directory of the workspace.

Each YAML top-level section contains the following subsections:

  • template: The name of the class including top-level packages names. An instance of this class is created when the plan gets initialized.

  • settings: The arguments that are passed to the class constructor.

  • defaults: The file that contains default settings for this subsection. Any setting from defaults file can be overridden in the plan.yaml file.

The following is an example of a plan.yaml:

# Copyright (C) 2020-2021 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

aggregator :
  defaults : plan/defaults/aggregator.yaml
  template : openfl.component.Aggregator
  settings :
    init_state_path     : save/torch_cnn_mnist_init.pbuf
    best_state_path     : save/torch_cnn_mnist_best.pbuf
    last_state_path     : save/torch_cnn_mnist_last.pbuf
    rounds_to_train     : 10
    log_metric_callback :
      template : src.mnist_utils.write_metric


collaborator :
  defaults : plan/defaults/collaborator.yaml
  template : openfl.component.Collaborator
  settings :
    delta_updates    : false
    opt_treatment    : RESET

data_loader :
  defaults : plan/defaults/data_loader.yaml
  template : src.ptmnist_inmemory.PyTorchMNISTInMemory
  settings :
    collaborator_count : 2
    data_group_name    : mnist
    batch_size         : 256

task_runner :
  defaults : plan/defaults/task_runner.yaml
  template : src.pt_cnn.PyTorchCNN

network :
  defaults : plan/defaults/network.yaml

assigner :
  defaults : plan/defaults/assigner.yaml
  
tasks :
  defaults : plan/defaults/tasks_torch.yaml

compression_pipeline :
  defaults : plan/defaults/compression_pipeline.yaml
Configurable Settings
Tasks

Each task subsection contains the following:

  • function: The function name to call. The function must be the one defined in TaskRunner class.

  • kwargs: kwargs passed to the function.

Note

See an example of the TaskRunner class for details.

Bare Metal Approach

Note

Ensure you have installed the OpenFL package on every node (aggregator and collaborators) in the federation.

See Install the Package for details.

You can use the “Hello Federation” bash script to quickly create a federation (an aggregator node and two collaborator nodes) to test the project pipeline.

However, continue with the following procedure for details in creating a federation with an aggregator-based workflow.

`STEP 1: Create a Workspace on the Aggregator`_

  • Creates a federated learning workspace on one of the nodes.

STEP 2: Configure the Federation

  • Ensures each node in the federation has a valid public key infrastructure (PKI) certificate.

  • Distributes the workspace from the aggregator node to the other collaborator nodes.

STEP 3: Start the Federation

STEP 1: Create a Workspace
  1. Start a Python 3.8 (>=3.6, <3.11) virtual environment and confirm OpenFL is available.

    fx
    
  2. This example uses the keras_cnn_mnist template.

    Set the environment variables to use the keras_cnn_mnist as the template and ${HOME}/my_federation as the path to the workspace directory.

export WORKSPACE_TEMPLATE=keras_cnn_mnist
export WORKSPACE_PATH=${HOME}/my_federation
  1. Decide a workspace template, which are end-to-end federated learning training demonstrations. The following is a sample of available templates:

  • keras_cnn_mnist: a workspace with a simple Keras CNN model that will download the MNIST dataset and train in a federation.

  • tf_2dunet: a workspace with a simple TensorFlow CNN model that will use the BraTS dataset and train in a federation.

  • tf_cnn_histology: a workspace with a simple TensorFlow CNN model that will download the Colorectal Histology dataset and train in a federation.

  • torch_cnn_histology: a workspace with a simple PyTorch CNN model that will download the Colorectal Histology dataset and train in a federation.

  • torch_cnn_mnist: a workspace with a simple PyTorch CNN model that will download the MNIST dataset and train in a federation.

See the complete list of available templates.

fx workspace create --prefix ${WORKSPACE_PATH}
  1. Create a workspace directory for the new federation project.

    fx workspace create --prefix ${WORKSPACE_PATH} --template ${WORKSPACE_TEMPLATE}
    

    Note

    You can use your own models by overwriting the Python scripts in the src subdirectory in the workspace directory.

  2. Change to the workspace directory.

    cd ${WORKSPACE_PATH}
    
  3. Install the workspace requirements:

    pip install -r requirements.txt
    
  4. Create an initial set of random model weights.

Note

While models can be trained from scratch, in many cases the federation performs fine-tuning of a previously trained model. For this reason, pre-trained weights for the model are stored in protobuf files on the aggregator node and passed to collaborator nodes during initialization.

The protobuf file with the initial weights is found in ${WORKSPACE_TEMPLATE}_init.pbuf.

fx plan initialize

This command initializes the FL plan and auto populates the fully qualified domain name (FQDN) of the aggregator node. This FQDN is embedded within the FL plan so the collaborator nodes know the address of the externally accessible aggregator server to connect to.

If you have connection issues with the auto populated FQDN in the FL plan, you can do one of the following:

  • OPTION 1: override the auto populated FQDN value with the -a flag.

    fx plan initialize -a aggregator-hostname.internal-domain.com
    
  • OPTION 2: override the apparent FQDN of the system by setting an FQDN environment variable.

    export FQDN=x.x.x.x
    

    and initializing the FL plan

    fx plan initialize
    

Note

Each workspace may have multiple FL plans and multiple collaborator lists associated with it. Therefore, fx plan initialize has the following optional parameters.

Optional Parameters

Description

-p, –plan_config PATH

Federated Learning plan [default = plan/plan.yaml]

-c, –cols_config PATH

Authorized collaborator list [default = plan/cols.yaml]

-d, –data_config PATH

The data set/shard configuration file

STEP 2: Configure the Federation

The objectives in this step:

  • Ensure each node in the federation has a valid public key infrastructure (PKI) certificate. See OpenFL Public Key Infrastructure (PKI) Solutions for details on available workflows.

  • Distribute the workspace from the aggregator node to the other collaborator nodes.

On the Aggregator Node:

Setting Up the Certificate Authority

  1. Change to the path of your workspace:

    cd WORKSPACE_PATH
    
  2. Set up the aggregator node as the certificate authority for the federation.

All certificates will be signed by the aggregator node. Follow the instructions and enter the information as prompted. The command will create a simple database file to keep track of all issued certificates.

fx workspace certify
  1. Run the aggregator certificate creation command, replacing AFQDN with the actual fully qualified domain name (FQDN) for the aggregator node.

    fx aggregator generate-cert-request --fqdn AFQDN
    

    Note

    On Linux*, you can discover the FQDN with this command:

    hostname --all-fqdns | awk '{print $1}'
    

    Note

    You can override the apparent FQDN of the system by setting an FQDN environment variable before creating the certificate.

    fx aggregator generate-cert-request export FQDN=x.x.x.x
    

    If you omit the --fdqn parameter, then fx will automatically use the FQDN of the current node assuming the node has been correctly set with a static address.

    fx aggregator generate-cert-request
    
  2. Run the aggregator certificate signing command, replacing AFQDN with the actual fully qualified domain name (FQDN) for the aggregator node.

    fx aggregator certify --fqdn AFQDN
    

    Note

    You can override the apparent FQDN of the system by setting an FQDN environment variable (export FQDN=x.x.x.x) before signing the certificate.

    fx aggregator certify export FQDN=x.x.x.x
    
  3. This node now has a signed security certificate as the aggregator for this new federation. You should have the following files.

    File Type

    Filename

    Certificate chain

    WORKSPACE.PATH/cert/cert_chain.crt

    Aggregator certificate

    WORKSPACE.PATH/cert/server/agg_{AFQDN}.crt

    Aggregator key

    WORKSPACE.PATH/cert/server/agg_{AFQDN}.key

    where AFQDN is the fully-qualified domain name of the aggregator node.

Exporting the Workspace

  1. Export the workspace so that it can be imported to the collaborator nodes.

    fx workspace export
    

    The export command will archive the current workspace (with a zip file extension) and create a requirements.txt of the current Python*packages in the virtual environment.

  2. The next step is to transfer this workspace archive to each collaborator node.

On the Collaborator Node:

Importing the Workspace

  1. Copy the workspace archive from the aggregator node to the collaborator nodes.

  2. Import the workspace archive.

    fx workspace import --archive WORKSPACE.zip
    

where WORKSPACE.zip is the name of the workspace archive. This will unzip the workspace to the current directory and install the required Python packages within the current virtual environment.

  1. For each test machine you want to run as collaborator nodes, create a collaborator certificate request to be signed by the certificate authority.

Replace COL_LABEL with the label you assigned to the collaborator. This label does not have to be the FQDN; it can be any unique alphanumeric label.

fx collaborator generate-cert-request -n {COL_LABEL}

The creation script will also ask you to specify the path to the data. For this example, enter the integer that represents which MNIST shard to use on this collaborator node. For the first collaborator node enter 1. For the second collaborator node enter 2.

This will create the following files:

File Type

Filename

Collaborator CSR

WORKSPACE.PATH/cert/client/col_{COL_LABEL}.csr

Collaborator key

WORKSPACE.PATH/cert/client/col_{COL_LABEL}.key

Collaborator CSR Package

WORKSPACE.PATH/col_{COL_LABEL}_to_agg_cert_request.zip

  1. On the aggregator node (i.e., the certificate authority in this example), sign the Collaborator CSR Package from the collaborator nodes.

    fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip
    

    where /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip is the path to the Collaborator CSR Package containing the .csr file from the collaborator node. The certificate authority will sign this certificate for use in the federation.

    The command packages the signed collaborator certificate, along with the cert_chain.crt file needed to verify certificate signatures, for transport back to the collaborator node:

    File Type

    Filename

    Certificate and Chain Package

    WORKSPACE.PATH/agg_to_col_{COL_LABEL}_signed_cert.zip

  2. On the collaborator node, import the signed certificate and certificate chain into your workspace.

    fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip
    
STEP 3: Start the Federation

On the Aggregator Node:

  1. Start the Aggregator.

    fx aggregator start
    

Now, the Aggregator is running and waiting for Collaborators to connect.

On the Collaborator Nodes:

  1. Open a new terminal, change the directory to the workspace, and activate the virtual environment.

  2. Run the Collaborator.

    fx collaborator start -n {COLLABORATOR_LABEL}
    

    where COLLABORATOR_LABEL is the label for this Collaborator.

    Note

    Each workspace may have multiple FL plans and multiple collaborator lists associated with it. Therefore, fx collaborator start has the following optional parameters.

    Optional Parameters

    Description

    -p, –plan_config PATH

    Federated Learning plan [default = plan/plan.yaml]

    -d, –data_config PATH

    The data set/shard configuration file

  3. Repeat the earlier steps for each collaborator node in the federation.

When all of the Collaborators connect, the Aggregator starts training. You will see log messages describing the progress of the federated training.

When the last round of training is completed, the Aggregator stores the final weights in the protobuf file that was specified in the YAML file, which in this example is located at save/${WORKSPACE_TEMPLATE}_latest.pbuf.

Post Experiment

Experiment owners may access the final model in its native format. Among other training artifacts, the aggregator creates the last and best aggregated (highest validation score) model snapshots. One may convert a snapshot to the native format and save the model to disk by calling the following command from the workspace:

fx model save -i model_protobuf_path.pth -o save_model_path

In order for this command to succeed, the TaskRunner used in the experiment must implement a save_native() method.

Another way to access the trained model is by calling the API command directly from a Python script:

from openfl import get_model
model = get_model(plan_config, cols_config, data_config, model_protobuf_path)

In fact, the get_model() method returns a TaskRunner object loaded with the chosen model snapshot. Users may utilize the linked model as a regular Python object.

Docker Approach

There are two ways you can run OpenFL with Docker*.

Option 1: Deploy a Federation in a Docker Container

Note

You have to built an OpenFL image. See OpenFL with Docker* for details.

  1. Run the OpenFL image.

    docker run -it --network host openfl
    

You can now experiment with OpenFL in the container. For example, you can test the project pipeline with the “Hello Federation” bash script.

Option 2: Deploy Your Workspace in a Docker Container

Note

You have to set up a TaskRunner and run fx plan initialize in the workspace directory. See `STEP 1: Create a Workspace on the Aggregator`_ for details.

  1. Build an image with the workspace you created.

    fx workspace dockerize
    

    By default, the image is saved as WORKSPACE_NAME_image.tar in the workspace directory.

  2. The image can be distributed and run on other nodes without any environment preparation.

    docker run -it --rm \
        --network host \
        -v user_data_folder:/home/user/workspace/data \
        ${WORKSPACE_IMAGE_NAME} \
        bash

    Note

    The FL plan should be initialized with the FQDN of the node where the aggregator container will be running.

  3. Generate public key infrastructure (PKI) certificates for all collaborators and the aggregator. See OpenFL Public Key Infrastructure (PKI) Solutions for details.

  4. STEP 3: Start the Federation.

Open Federated Learning (OpenFL) Utilities

The following are utilities available in Open Federated Learning (OpenFL).

OpenFL Public Key Infrastructure (PKI) Solutions

Use the Public Key Infrastructure (PKI) solution workflows to certify the nodes in your federation.

Dataset Splitters

Split your data to run your federation from a single dataset.

OpenFL Component Timeouts

Decorate methods to enforce timeout on it’s execution.

OpenFL Public Key Infrastructure (PKI) Solutions

Overview

Transport Layer Security (TLS) encryption is used for network connections in federated learning. Therefore, security keys and certificates will need to be created for the aggregator and collaborators to negotiate the connection securely.

If you have trusted workspaces and connections, you can start your experiment with the disable_tls option.

Otherwise, you can certify nodes with your own PKI solution or use the PKI solution workflows provided by OpenFL.

Note

The OpenFL PKI solution is based on step-ca as a server and step as a client utilities. They are downloaded during the workspace setup.

Note

Different certificates can be created for each project workspace.

sequenceDiagram Title: Collaborator Certificate Signing Flow    participant A as Alice   participant AC as Alice's Collaborator Node   participant B as Bob   participant BG as Bob's Certificate Signing System   A->>AC: Alice runs<br>`fx collaborator generate-cert-request`<br>to create .key and .csr file<br>   AC->>A: PKI script outputs a hash to the screen   A->>B: Alice sends the .csr to Bob   B->>BG: Bob moves the .csr<br/> to the signing system with<br>`fx collaborator certify --request-pkg`   B-->>A: Bob Calls Alice to confirm PKI   Note over A,B: This is the **root of trust** : Bob called Alice to verify the hash    A-->>B: Alice reads the hash to Bob   Note over A,B: This ensures Bob is signing the same .csr Alice generated   B->>BG: Bob runs script to sign .csr,<br/> confirming the hash as input,<br/> creating the .crt file   B->>A: Bob sends the .crt file back to Alice   A->>AC: Alice copies the signed certificate (.crt)<br/>to her collaborator node.<br/>She now has a signed certificate.

Manual certificate generation and signing

sequenceDiagram Title: Collaborator Certificate Signing Flow   participant A as Aggregator   participant CA as CA   participant C as Collaborator   CA->>CA: 1. Create CA:<br>`step ca init --password-file pass_file`   CA->>CA: 2. Up HTTPS CA server:<br>`step_ca ca_config.json`   CA->>CA: 3. Generate JWK pair:<br>`step crypto jwk create pub.json priv.json --password-file pass_file`   CA->>CA: 4. Get JWT for aggregator:<br>`step ca token localhost --key priv.json --password-file pass_file --ca-url ca_url`   CA->>A: 5. Copy JWT to aggregator.    A->>CA: 6. Certify node:<br>`step ca certificate localhost agg.crt agg.key --token AbC1d2E..`   Note over A,CA: Get agg.crt   CA->>CA: 7. Get JWT for collaborator:<br>`step ca token col_name --key priv.json --password-file pass_file --ca-url ca_url`   CA->>C: 8. Copy JWT to collaborator.    C->>CA: 9. Certify node:<br>`step ca certificate col_name col_name.crt col_name.key --token AbC1d2E..`   Note over C,CA: Get col_name.crt   CA->>A: 10. Copy root_ca.crt to aggregator   Note over A,CA: This could be done at step 5 with token   CA->>C: 11. Copy root_ca.crt to collaborator   Note over C,CA: This could be done at step 8 with token

Step-ca certificate generation and signing

Semi-Automatic PKI Workflow

The OpenFL PKI pipeline involves creating a local certificate authority (CA) on a HTTPS server that listens for signing requests. Certificates from each client are signed by the CA via a token. The token must be copied to clients in a secure manner.

  1. Create the CA.

    fx pki install -p </path/to/ca/dir> --ca-url <host:port>
    
    where
    -p defines the path to the directory that contains CA files, and
    --ca-url defines the host and port that the CA server will listen, if not specified, --ca-url will be “localhost:9123”

    When executing this command, you will be prompted for a password and password confirmation. The password will encrypt some CA files. This command will also download step-ca and step binaries.

  2. Run the CA server.

    fx pki run -p </path/to/ca/dir>
    
    where
    -p defines the path to the directory that contains CA files.
  3. Create a token for client.

    fx pki get-token -n <subject> --ca-path </path/to/ca/dir> --ca-url <host:port>
    
    where
    -n defines the subject name, FQDN for director, collaborator name for envoy, or API name for the API-layer node.
    --ca-path defines the path to the directory that contains CA files.
    --ca-url defines the host and port that the CA server will listen, if not specified, --ca-url will be “localhost:9123”

    Run this command from the CA directory on the CA server. The output is a token which contains a JWT (JSON web token) from the CA server and the CA root certificate concatenated together. This JWT is valid for 24 hours.

  4. Copy the token to the clients (director or envoy) via a secure channel, and certify the token.

    cd <path/to/subject/folder>
    fx pki certify -n <subject> -t <generated token for subject>
    
    where
    -n defines the subject name, FQDN for director, collaborator name for envoy, or API name for the API-layer node.
    -t defines the output token from the previous command.

    With this command, the client connects to the CA server over HTTPS, which is provided by the root certificate which was copied together with the JWT. The CA server authenticates the client via the JWT, and the client authenticates the server via the root certificate.

The signed certificate and private key are stored on each node in the federation. The signed certificate is valid for one year. You should certify all nodes that will participate in the federation director, which includes all envoys and API-layer nodes.

Manual PKI Workflow

This solution is embedded into the aggregator-based workflow. See Configure the Federation for details.

Dataset Splitters

OpenFL allows you to specify custom data splits for simulation runs on a single dataset.

You may apply data splitters differently depending on the OpenFL workflow that you follow.

OPTION 1: Use Native Python API (Aggregator-Based Workflow) Functions to Split the Data

Predefined OpenFL data splitters functions are as follows:

  • openfl.utilities.data_splitters.EqualNumPyDataSplitter (default)

  • openfl.utilities.data_splitters.RandomNumPyDataSplitter

  • openfl.interface.aggregation_functions.LogNormalNumPyDataSplitter, which assumes the data argument as np.ndarray of integers (labels)

  • openfl.interface.aggregation_functions.DirichletNumPyDataSplitter, which assumes the data argument as np.ndarray of integers (labels)

Alternatively, you can create an implementation of openfl.plugins.data_splitters.NumPyDataSplitter and pass it to the FederatedDataset function as either train_splitter or valid_splitter keyword argument.

OPTION 2: Use Dataset Splitters in your Shard Descriptor

Apply one of previously mentioned splitting function on your data to perform a simulation.

NumPyDataSplitter requires a single split function. The split function returns a list of indices which represents the collaborator-wise indices groups.

This function receives data - NumPy array required to build the subsets of data indices. It could be the whole dataset, or labels only, or anything else.

X_train, y_train = ... # train set
X_valid, y_valid = ... # valid set
train_splitter = RandomNumPyDataSplitter()
valid_splitter = RandomNumPyDataSplitter()
# collaborator_count value is passed to DataLoader constructor
# shard_num can be evaluated from data_path
train_idx = train_splitter.split(y_train, collaborator_count)[shard_num]
valid_idx = valid_splitter.split(y_valid, collaborator_count)[shard_num]
X_train_shard = X_train[train_idx]
X_valid_shard = X_valid[valid_idx]

Note

By default, the data is shuffled and split equally. See an example of openfl.utilities.data_splitters.EqualNumPyDataSplitter for details.

OpenFL Component Timeouts

Overview

This feature allows decorating any arbitrary synchronous and/or asynchronous functions using @fedtiming(timeout=<seconds>). The decorated functions is then monitored and gets terminated right after the execution time exceeds the user specified or default timeout value.

openfl.utilities.fed_timer.py

Note

The fedtiming class, SyncAsyncTaskDecoFactory factory class, custom synchronous and asynchronous execution of decorated function is in-place. The end to end implementation of OpenFL Component timeouts feature is still in beta mode and would undergo design and implementation changes before the complete feature is made available. Appreciate any feedbacks or issues.

Class Diagram

An overview of this workflow is shown below.

_images/timeout_design.png
class center
Overview of the component timeout class diagram
Flow of execution
  1. [Step A] Decorate any sync or async function @fedtiming(timeout=<seconds>) to monitor its execution time and terminate after timeout=<seconds> value.

    @fedtiming(timeout=5)
    def some_sync_function():
        pass
    
    This decorated function execution gets terminated after 5 seconds.
    @fedtiming(timeout=10)
    async def some_async_function():
        await some_long_running_operation()
    
    This decorated function execution gets terminated after 10 seconds.
  2. [Step B] Concrete fedtiming class:

    During Compile time: Decorated functions are evaluated like below.

    Synchronous Example:

    some_sync_function = fedtiming(timeout=5)(some_sync_function)
    
    then
    
    some_sync_function() *is equivalent to* sync_wrapper().
    

    inside the sync_wrapper: the decorated function some_sync_function and timeout variables are stored as a closure variable.

    Aynchronous Example:

    some_async_function = fedtiming(timeout=5)(some_async_function)
    
    then
    
    some_async_function() *is equivalent to* async_wrapper().
    

    inside the async_wrapper: the decorated function some_async_function and timeout variables are stored as a closure variable.

  3. [Step C] SyncAsyncTaskDecoFactory class

    fedtiming(some_sync_function) internally calls the parent class SyncAsyncTaskDecoFactory __call__(some_sync_function) method.

    The __call__() method immediately returns either the sync_wrapper or async_wrapper depending on whether the sync or async method was decorated.

    During Runtime:

    The prepared some_sync_function or some_async_function when called internally with its respective parameters.

    some_sync_function(*args, **kwargs) -> sync_wrapper(*args, **kwargs)
    some_async_function(*args, **kwargs) -> async_wrapper(*args, **kwargs)
    
  4. [Step D] PrepareTask class

    Delegates the decorated sync or async function to be executed synchronously or asynchronously using CustomThread or asyncio.

    Contains the defination for the function sync_execute and async_execute.

  5. [Step E] Execution of delegated methods:

    The delegated function is executed synchronously or asynchronously and the result is returned back in the call chain. The final output from the thread or asyncio task is returned as a result of a decorated function execution.

    In this CustomThread or asyncio.wait_for() execution, the timeout is enforced which terminates the running function after a set period of time and an exception is called that tracebacks to the caller.

Upcoming Changes

Above design reflects current implementation.

Upcoming changes include:

  1. Dynamic timeout parameters updates for all decorated functions during runtime. Removal of timeout parameter @fedtiming(timeout=<?>).

  2. Add a callback parameter that defines a post timeout teardown logic and a way gracefully terminate executing function.

Advanced Topics

General

Speed up activating Open Federated Learning (OpenFL) commands:

Aggregator-Based Workflow

Learn to manage multiple Federation Learning plans (FL plan) in the same workspace:

Reduce the amount of data transferred in a federation through compression pipelines available in OpenFL:

Customize the aggregation function for each task:

Customize straggler handling function:

Director-Based Workflow

Customize the logging function for each task:

Update plan settings:

Activate Bash Autocomplete

Get faster access to available commands by activating bash completion in CLI mode.

STEP 1: Preparation

Make sure you are inside a virtual environment with Open Federated Learning (OpenFL) installed. See Install the Package for details.

STEP 2: Create the fx-autocomplete.sh Script

Note

Perform this procedure if you don’t have a ~/.fx-autocomplete.sh script or if the existing ~/.fx-autocomplete.sh script is corrupted.

  1. Create the script.

    _FX_COMPLETE=bash_source fx > ~/.fx-autocomplete.sh
    
  2. Check that the script was created properly.

    cat ~/.fx-autocomplete.sh
    

The output should look like the example below (Click==8.0.1), but could be different depend on Click version:

_fx_completion() {
    local IFS=$'\n'
    local response

    response=$(env COMP_WORDS="${COMP_WORDS[*]}" COMP_CWORD=$COMP_CWORD _FX_COMPLETE=bash_complete $1)

    for completion in $response; do
        IFS=',' read type value <<< "$completion"

        if [[ $type == 'dir' ]]; then
            COMREPLY=()
            compopt -o dirnames
        elif [[ $type == 'file' ]]; then
            COMREPLY=()
            compopt -o default
        elif [[ $type == 'plain' ]]; then
            COMPREPLY+=($value)
        fi
    done

    return 0
}

_fx_completion_setup() {
    complete -o nosort -F _fx_completion fx
}

_fx_completion_setup;
STEP 3: Activate the Autocomplete Feature

Perform this command every time you open a new terminal window.

source ~/.fx-autocomplete.sh

To save time, add the script into .bashrc so the script is activated when you log in.

  1. Edit the .bashrc file. The nano command line editor is used in this example.

    nano ~/.bashrc
    
  2. Add the script.

    . ~/.fx-autocomplete.sh
    
  3. Save your changes.

  4. Open a new terminal to use the updated bash shell.

Manage Multiple Plans

With aggregator-based workflow, you can use multiple Federated Learning plans (FL plan) for the same workspace. All FL plans are located in the WORKSPACE.FOLDER/plan/plans directory.

The following are the fx commands to manage your FL plans:

Create a New FL Plan

All workspaces begin with a default FL plan. See Create a Workspace on the Aggregator for details.

Save a New FL Plan

When you are working on an FL plan, you can save it for future use.

fx plan save -n NEW.PLAN.NAME

where NEW.PLAN.NAME is the new FL plan for your workspace. This command also combines switching to the NEW.PLAN.NAME plan.

Switch FL Plans

To switch to a different FL plan, run the following command from the workspace directory.

fx plan switch -n PLAN.NAME

where PLAN.NAME is the FL plan to which you want to switch.

Note

If you have changed the plan.yaml file, you should save the FL plan before switching. Otherwise, any changes will be lost.

Remove FL Plans

To remove an FL plan, run the following command from the workspace directory.

fx plan remove -n PLAN.NAME

where PLAN.NAME is the FL plan you wish to remove.

Apply Compression Settings

The Open Federated Learning (OpenFL) framework supports lossless and lossy compression pipelines. Federated learning enables a large number of participants to work together on the same model. Without a compression pipeline, this scalability results in increased communication cost. Furthermore, large models exacerbate this problem.

Note

In general, the weights of a model are typically not robust to information loss, so no compression is applied by default to the model weights sent bidirectionally; however, the deltas between the model weights for each round are inherently more sparse and better suited for lossy compression.

The following are the compression pipelines supported in OpenFL:

NoCompressionPipeline

The default option applied to model weights

RandomShiftPipeline

A lossless pipeline that randomly shifts the weights during transport

STCPipeline

A lossy pipeline consisting of three transformations:

  • Sparsity Transform (p_sparsity=0.1), which by default retains only the (p*100)% absolute values of greatest magnitude.

  • Ternary Transform, which discretizes the sparse array into three buckets

  • GZIP Transform

SKCPipeline

A lossy pipeline consisting of three transformations:

  • Sparsity Transform (p=0.1), which by default retains only the(p*100)% absolute values of greatest magnitude.

  • KMeans Transform (k=6), which applies the KMeans algorithm to the sparse array with k centroids

  • GZIP Transform

KCPipeline

A lossy pipeline consisting of two transformations:

  • KMeans Transform (k=6), which applies the KMeans algorithm to the original weight array with k centroids

  • GZIP Transform

Demonstration of a Compression Pipeline

The example template, keras_cnn_with_compression, uses the KCPipeline with six centroids for KMeans. To gain a better understanding of how experiments perform with greater or fewer centroids, you can modify the n_clusters parameter in the template plan.yaml:

compression_pipeline :
  defaults : plan/defaults/compression_pipeline.yaml
  template : openfl.pipelines.KCPipeline
  settings :
    n_clusters : 6

Override Aggregation Function

With the aggregator-based workflow, you can use custom aggregation functions for each task via Python* API or command line interface.

Python API
  1. Create an implementation of openfl.interface.aggregation_functions.core.AggregationFunction.

  2. In the override_config keyword argument of the openfl.native.run_experiment() native function, pass the implementation as a tasks.{task_name}.aggregation_type parameter.

Note

See Federated PyTorch MNIST Tutorial for an example of the custom aggregation function.

Command Line Interface
Predefined Aggregation Functions

Choose from the following predefined aggregation functions:

  • openfl.interface.aggregation_functions.WeightedAverage (default)

  • openfl.interface.aggregation_functions.Median

  • openfl.interface.aggregation_functions.GeometricMedian

  • openfl.interface.aggregation_functions.AdagradAdaptiveAggregation

  • openfl.interface.aggregation_functions.AdamAdaptiveAggregation

  • openfl.interface.aggregation_functions.YogiAdaptiveAggregation

Adaptive Aggregation Functions

Note

To create adaptive aggregation functions, the user must specify parameters for the aggregation optimizer (NumPyAdagrad, NumPyAdam or NumPyYogi) that will aggregate the global model. Theese parameters parameters are passed via keywords.

Also, user must pass one of the arguments: params - model parameters (a dictionary with named model parameters in the form of numpy arrays), or pass model_interface - an instance of the ModelInterface class. If user pass both params and model_interface, then the optimizer parameters are initialized via params, ignoring model_interface argument.

See the AdagradAdaptiveAggregation definitions for details.

Adaptive federated optimization original paper.

AdagradAdaptiveAggregation usage example:

from openfl.interface.interactive_api.experiment import TaskInterface, ModelInterface
from openfl.interface.aggregation_functions import AdagradAdaptiveAggregation

TI = TaskInterface()
MI = ModelInterface(model=model,
                    optimizer=optimizer,
                    framework_plugin=framework_adapter)
...

# Creating aggregation function
agg_fn = AdagradAdaptiveAggregation(model_interface=MI,
                                    learning_rate=0.4)

# Define training task
@TI.register_fl_task(model='model', data_loader='train_loader', \
                        device='device', optimizer='optimizer')
@TI.set_aggregation_function(agg_fn)
def train(...):
...

You can define your own numpy based optimizer, which will be used for global model aggreagation:

from openfl.utilities.optimizers.numpy.base_optimizer import Optimizer

class MyOpt(Optimizer):
    """My optimizer implementation."""

    def __init__(
        self,
        *,
        params: Optional[Dict[str, np.ndarray]] = None,
        model_interface=None,
        learning_rate: float = 0.001,
        param1: Any = None,
        param2: Any = None
    ) -> None:
        """Initialize.

        Args:
            params: Parameters to be stored for optimization.
            model_interface: Model interface instance to provide parameters.
            learning_rate: Tuning parameter that determines
                the step size at each iteration.
            param1: My own defined parameter.
            param2: My own defined parameter.
        """
        super().__init__()
        pass # Your code here!

    def step(self, gradients: Dict[str, np.ndarray]) -> None:
        """
        Perform a single step for parameter update.

        Implement your own optimizer weights update rule.

        Args:
            gradients: Partial derivatives with respect to optimized parameters.
        """
        pass # Your code here!
...

from openfl.interface.aggregation_functions import WeightedAverage
from openfl.interface.aggregation_functions.core import AdaptiveAggregation

# Creating your implemented optimizer instance based on numpy:
my_own_optimizer = MyOpt(model_interface=MI, learning_rate=0.01)

# Creating aggregation function
agg_fn = AdaptiveAggregation(optimizer=my_own_optimizer,
                             agg_func=WeightedAverage()) # WeightedAverage() is used for aggregating
                                                         # parameters that are not inside the given optimizer.

# Define training task
@TI.register_fl_task(model='model', data_loader='train_loader', \
                        device='device', optimizer='optimizer')
@TI.set_aggregation_function(agg_fn)
def train(...):
...

Note

If you do not understand how to write your own numpy based optimizer, please see the NumPyAdagrad and AdaptiveAggregation definitions for details.

Custom Aggregation Functions

OpenFL provides interfaces to support your own custom aggregation functions. You can also create your own implementation of openfl.interface.aggregation_functions.core.AggregationFunction. See example for details.

  1. Define the behavior of the aggregation.

  2. Include the implementation in the plan.yaml file in the plan directory of your workspace.

  3. In the tasks section, pick a task for which you want to change the aggregation and insert aggregation_type section with a single template key that defines a module path to your class.

The following is an example of a plan.yaml with a modified aggregation function:

# ...
# other top-level sections
# ...
tasks:
  aggregated_model_validation:
    function: validate
    kwargs:
      apply: global
      metrics:
      - acc
  defaults: plan/defaults/tasks_torch.yaml
  locally_tuned_model_validation:
    function: validate
    kwargs:
    apply: local
    metrics:
    - acc
  settings: {}
  train:
    function: train_batches
    aggregation_type:
      template: openfl.interface.aggregation_functions.Median
    kwargs:
      metrics:
      - loss
Interactive API

You can override aggregation function that will be used for the task this function corresponds to. In order to do this, call the set_aggregation_function decorator method of TaskInterface and pass AggregationFunction subclass instance as a parameter. For example, you can try:

from openfl.interface.aggregation_functions import Median
TI = TaskInterface()
agg_fn = Median()
@TI.register_fl_task(model='model', data_loader='train_loader', \
                     device='device', optimizer='optimizer')
@TI.set_aggregation_function(agg_fn)

Warning

All tasks with the same type of aggregation use the same class instance. If AggregationFunction implementation has its own state, then this state will be shared across tasks.

AggregationFunction requires a single call function. This function receives tensors for a single parameter from multiple collaborators with additional metadata (see definition of openfl.interface.aggregation_functions.core.AggregationFunction.call()) and returns a single tensor that represents the result of aggregation.

Note

See the definition of openfl.interface.aggregation_functions.core.AggregationFunction.call for details.

Example of a Custom Aggregation Function

This is an example of a custom tensor clipping aggregation function that multiplies all local tensors by 0.3 and averages them according to weights equal to data parts to produce the resulting global tensor.

from openfl.interface.aggregation_functions import AggregationFunction
import numpy as np

class ClippedAveraging(AggregationFunction):
    def __init__(self, ratio):
        self.ratio = ratio

    def call(self,
            local_tensors,
            db_iterator,
            tensor_name,
            fl_round,
            *__):
        """Aggregate tensors.

        Args:
            local_tensors(list[openfl.utilities.LocalTensor]): List of local tensors to aggregate.
            db_iterator: iterator over history of all tensors. Columns:
                - 'tensor_name': name of the tensor.
                    Examples for `torch.nn.Module`s: 'conv1.weight', 'fc2.bias'.
                - 'round': 0-based number of round corresponding to this tensor.
                - 'tags': tuple of tensor tags. Tags that can appear:
                    - 'model' indicates that the tensor is a model parameter.
                    - 'trained' indicates that tensor is a part of a training result.
                        These tensors are passed to the aggregator node after local learning.
                    - 'aggregated' indicates that tensor is a result of aggregation.
                        These tensors are sent to collaborators for the next round.
                    - 'delta' indicates that value is a difference between rounds
                        for a specific tensor.
                    also one of the tags is a collaborator name
                    if it corresponds to a result of a local task.

                - 'nparray': value of the tensor.
            tensor_name: name of the tensor
            fl_round: round number
            tags: tuple of tags for this tensor
        Returns:
            np.ndarray: aggregated tensor
        """
        clipped_tensors = []
        previous_tensor_value = None
        for record in db_iterator:
            if (
                record['round'] == (fl_round - 1)
                and record['tensor_name'] == tensor_name
                and 'aggregated' in record['tags']
                and 'delta' not in record['tags']
            ):
                previous_tensor_value = record['nparray']
        weights = []
        for local_tensor in local_tensors:
            prev_tensor = previous_tensor_value if previous_tensor_value is not None else local_tensor.tensor
            delta = local_tensor.tensor - prev_tensor
            new_tensor = prev_tensor + delta * self.ratio
            clipped_tensors.append(new_tensor)
            weights.append(local_tensor.weight)

        return np.average(clipped_tensors, weights=weights, axis=0)

A full implementation can be found at Federated_Pytorch_MNIST_custom_aggregation_Tutorial.ipynb

Example of a Privileged Aggregation Function

Most of the time the AggregationFunction interface is sufficient to implement custom methods, but in certain scenarios users may want to store additional information inside the TensorDB Dataframe beyond the aggregated tensor. The openfl.interface.aggregation_functions.experimental.PrivilegedAggregationFunction interface is provided for this use, and gives the user direct access to aggregator’s TensorDB dataframe (notice the tensor_db param in the call function replaces the db_iterator from the standard AggregationFunction interface). As the name suggests, this interface is called privileged because with great power comes great responsibility, and modifying the TensorDB dataframe directly can lead to unexpected behavior and experiment failures if entries are arbitrarily deleted.

from openfl.interface.aggregation_functions.experimental import PrivilegedAggregationFunction
import numpy as np
import pandas as pd

class PrioritizeLeastImproved(PrivilegedAggregationFunction):
    """
        Give collaborator with the least improvement in validation accuracy more influence over future weights

    """

    def call(self,
             local_tensors,
             tensor_db,
             tensor_name,
             fl_round,
             tags):
        """Aggregate tensors.

        Args:
            local_tensors(list[openfl.utilities.LocalTensor]): List of local tensors to aggregate.
            tensor_db: Aggregator's TensorDB [writable]. Columns:
                - 'tensor_name': name of the tensor.
                    Examples for `torch.nn.Module`s: 'conv1.weight', 'fc2.bias'.
                - 'round': 0-based number of round corresponding to this tensor.
                - 'tags': tuple of tensor tags. Tags that can appear:
                    - 'model' indicates that the tensor is a model parameter.
                    - 'trained' indicates that tensor is a part of a training result.
                        These tensors are passed to the aggregator node after local learning.
                    - 'aggregated' indicates that tensor is a result of aggregation.
                        These tensors are sent to collaborators for the next round.
                    - 'delta' indicates that value is a difference between rounds
                        for a specific tensor.
                    also one of the tags is a collaborator name
                    if it corresponds to a result of a local task.

                - 'nparray': value of the tensor.
            tensor_name: name of the tensor
            fl_round: round number
            tags: tuple of tags for this tensor
        Returns:
            np.ndarray: aggregated tensor
        """
        from openfl.utilities import change_tags

        tensors, weights, collaborators = zip(*[(x.tensor, x.weight, x.col_name) for idx,x in enumerate(local_tensors)])
        tensors, weights, collaborators = np.array(tensors), np.array(weights), collaborators

        if fl_round > 0:
            metric_tags = ('metric','validate_agg')
            collaborator_accuracy = {}
            previous_col_accuracy = {}
            change_in_accuracy = {}
            for col in collaborators:
                col_metric_tag = change_tags(metric_tags,add_field=col)
                collaborator_accuracy[col] = float(tensor_db[(tensor_db['tensor_name'] == 'acc') &
                                                       (tensor_db['round'] == fl_round) &
                                                       (tensor_db['tags'] == col_metric_tag)]['nparray'])
                previous_col_accuracy[col] = float(tensor_db[(tensor_db['tensor_name'] == 'acc') &
                                                       (tensor_db['round'] == fl_round - 1) &
                                                       (tensor_db['tags'] == col_metric_tag)]['nparray'])
                change_in_accuracy[col] = collaborator_accuracy[col] - previous_col_accuracy[col]


            least_improved_collaborator = min(change_in_accuracy,key=change_in_accuracy.get)

            # Dont add least improved collaborator more than once
            if len(tensor_db[(tensor_db['tags'] == ('least_improved',)) &
                         (tensor_db['round'] == fl_round)]) == 0:
                tensor_db.loc[tensor_db.shape[0]] = \
                        ['_','_',fl_round,True,('least_improved',),np.array(least_improved_collaborator)]
                fx.logger.info(f'Least improved collaborator = {least_improved_collaborator}')
                fx.logger.info(f"Least improved = {tensor_db[(tensor_db['tags'] == ('least_improved',)) & (tensor_db['nparray'] == np.array(least_improved_collaborator))]}")
                fx.logger.info(f'Collaborator accuracy = {collaborator_accuracy}')
                fx.logger.info(f'Change in accuracy {change_in_accuracy}')
            least_improved_weight_factor = 0.1 * len(tensor_db[(tensor_db['tags'] == ('least_improved',)) &
                                                               (tensor_db['nparray'] == np.array(least_improved_collaborator))])
            weights[collaborators.index(least_improved_collaborator)] += least_improved_weight_factor
            weights = weights / np.sum(weights)

        return np.average(tensors, weights=weights, axis=0)

A full implementation can be found at Federated_Pytorch_MNIST_custom_aggregation_Tutorial.ipynb

Metric Logging Callback

By default, both the director based flow and the taskrunner API support Tensorboard to log metrics. Once the experiment is over, the logs can be invoked from the workspace with tensorboard --logdir logs. The metrics that are logged by default are:

  • Aggregated model validation accuracy (Aggregator/aggregated_model_validate/acc, validate_agg/aggregated_model_validate/acc)

  • Locally tuned model validation accuracy (Aggregator/locally_tuned_model_validate/acc, validate_local/locally_tuned_model_validate/acc)

  • Train loss (Aggregator/train/train_loss, trained/train/train_loss)

You can also use custom metric logging function for each task via Python* API or command line interface. This function calls on the aggregator node.

Python API

For logging metrics through Tensorboard, once fl_experiment.stream_metrics() is called from the frontend API, it saves logs in the tensorboard format. After the experiment has finished, the logs can be invoked from the workspace with tensorboard --logdir logs.

You could also add your custom metric logging function by defining the function with the follow signature:

def callback_name(node_name, task_name, metric_name, metric, round_number):
    """
    Write metric callback

    Args:
        node_name (str): Name of node, which generate metric
        task_name (str): Name of task
        metric_name (str): Name of metric
        metric (np.ndarray): Metric value
        round_number (int): Round number
    """
    your code
Example of a Metric Callback

This example shows how to use MLFlow logger to log metrics:

import mlflow

def callback_name(node_name, task_name, metric_name, metric, round_number):
    """
    Write metric callback

    Args:
        node_name (str): Name of node, which generate metric
        task_name (str): Name of task
        metric_name (str): Name of metric
        metric (np.ndarray): Metric value
        round_number (int): Round number
    """
    mlflow.log_metrics({f'{node_name}/{task_name}/{metric_name}': float(metric), 'round_number': round_number})

You could view the log results either through UI interactively by typing mlflow ui or through the use of MLflowClient. By default, only the last logged value of the metric is returned. If you want to retrieve all the values of a given metric, uses mlflow.get_metric_history method.

import mlflow
client = mlflow.tracking.MlflowClient()
print(client.get_metric_history("<RUN ID>", "validate_local/locally_tuned_model_validation/accuracy"))
Command Line Interface

For logging through Tensorboard, enable the parameter write_logs : true in aggregator’s plan settings :

aggregator :
  template : openfl.component.Aggregator
  settings :
      write_logs : true

Follow the steps below to write your custom callback function instead. As an example, a full implementation can be found at Federated_Pytorch_MNIST_Tutorial.ipynb and in the torch_cnn_mnist workspace.

  1. Define the callback function, like how you defined in Python API, in the src directory in your workspace.

  2. Provide a way to your function with the log_metric_callback key in the aggregator section of the plan.yaml file in your workspace.

aggregator :
  defaults : plan/defaults/aggregator.yaml
  template : openfl.component.Aggregator
  settings :
    init_state_path     : save/torch_cnn_mnist_init.pbuf
    best_state_path     : save/torch_cnn_mnist_best.pbuf
    last_state_path     : save/torch_cnn_mnist_last.pbuf
    rounds_to_train     : 10
    write_logs          : true
    log_metric_callback :
      template : src.mnist_utils.callback_name
Example of a Metric Callback

The following is an example of a log metric callback, which writes metric values to the TensorBoard.

from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5)


def write_metric(node_name, task_name, metric_name, metric, round_number):
    writer.add_scalar("{}/{}/{}".format(node_name, task_name, metric_name),
                    metric, round_number)

Supported aggregation algorithms

FedAvg

Default aggregation algorithm in OpenFL. Multiplies local model weights with relative data size and averages this multiplication result.

FedProx

Paper: https://arxiv.org/abs/1812.06127

FedProx in OpenFL is implemented as a custom optimizer for PyTorch/TensorFlow. In order to use FedProx, do the following:

  1. PyTorch:

  • replace your optimizer with SGD-based openfl.utilities.optimizers.torch.FedProxOptimizer or Adam-based openfl.utilities.optimizers.torch.FedProxAdam. Also, you should save model weights for the next round via calling .set_old_weights() method of the optimizer before the training epoch.

  1. TensorFlow:

  • replace your optimizer with SGD-based openfl.utilities.optimizers.keras.FedProxOptimizer.

For more details, see openfl-tutorials/Federated_FedProx_*_MNIST_Tutorial.ipynb where * is the framework name.

FedOpt

Paper: https://arxiv.org/abs/2003.00295

FedOpt in OpenFL: Adaptive Aggregation Functions

FedCurv

Paper: https://arxiv.org/abs/1910.07796

Requires PyTorch >= 1.9.0. Other frameworks are not supported yet.

Use openfl.utilities.fedcurv.torch.FedCurv to override train function using .get_penalty(), .on_train_begin(), and .on_train_end() methods. In addition, you should override default AggregationFunction of the train task with openfl.interface.aggregation_functions.FedCurvWeightedAverage. See PyTorch_Histology_FedCurv tutorial in openfl-tutorials/interactive_api directory for more details.

Updating plan settings

With the director-based workflow, you can use custom plan settings before starting the experiment. Changing plan settings in command line interface is straightforward by modifying plan.yaml. When using Python API or Director Envoy based interactive API, override_config can be used to update plan settings.

Python API

Modify the plan settings:

final_fl_model = fx.run_experiment(collaborators, override_config={
'aggregator.settings.rounds_to_train': 5,
'aggregator.settings.log_metric_callback': write_metric,
})
Director Envoy Based Interactive API Interface

Once you create an FL_experiment object, a basic federated learning plan with default settings is created. To check the default plan settings, print the plan as shown below:

fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)
import openfl.native as fx
print(fx.get_plan(fl_plan=fl_experiment.plan))

Here is an example of the default plan settings that get displayed:

"aggregator.settings.best_state_path": "save/best.pbuf",
"aggregator.settings.db_store_rounds": 2,
"aggregator.settings.init_state_path": "save/init.pbuf",
"aggregator.settings.last_state_path": "save/last.pbuf",
"aggregator.settings.rounds_to_train": 10,
"aggregator.settings.write_logs": true,
"aggregator.template": "openfl.component.Aggregator",
"assigner.settings.task_groups.0.name": "train_and_validate",
"assigner.settings.task_groups.0.percentage": 1.0,
"assigner.settings.task_groups.0.tasks.0": "aggregated_model_validation",
"assigner.settings.task_groups.0.tasks.1": "train",
"assigner.settings.task_groups.0.tasks.2": "locally_tuned_model_validation",
"assigner.template": "openfl.component.RandomGroupedAssigner",
"collaborator.settings.db_store_rounds": 1,
"collaborator.settings.delta_updates": false,
"collaborator.settings.opt_treatment": "RESET",
"collaborator.template": "openfl.component.Collaborator",
"compression_pipeline.settings": {},
"compression_pipeline.template": "openfl.pipelines.NoCompressionPipeline",
"data_loader.settings": {},
"data_loader.template": "openfl.federated.DataLoader",
"network.settings.agg_addr": "auto",
"network.settings.agg_port": "auto",
"network.settings.cert_folder": "cert",
"network.settings.client_reconnect_interval": 5,
"network.settings.disable_client_auth": false,
"network.settings.hash_salt": "auto",
"network.settings.tls": true,
"network.template": "openfl.federation.Network",
"task_runner.settings": {},
"task_runner.template": "openfl.federated.task.task_runner.CoreTaskRunner",
"tasks.settings": {}

Use override_config with FL_experiment.start to make any changes to the default plan settings. It’s essentially a dictionary with the keys corresponding to plan parameters along with the corresponding values (or list of values). Any new key entry will be added to the plan and for any existing key present in the plan, the value will be overrriden.

fl_experiment.start(model_provider=MI,
               task_keeper=TI,
               data_loader=fed_dataset,
               rounds_to_train=5,
               opt_treatment='CONTINUE_GLOBAL',
               override_config={'aggregator.settings.db_store_rounds': 1, 'compression_pipeline.template': 'openfl.pipelines.KCPipeline', 'compression_pipeline.settings.n_clusters': 2})

Since ‘aggregator.settings.db_store_rounds’ and ‘compression_pipeline.template’ fields are already present in the plan, the values of these fields get replaced. Field ‘compression_pipeline.settings.n_clusters’ is a new entry that gets added to the plan:

INFO     Updating aggregator.settings.db_store_rounds to 1...                                                                           native.py:102

INFO     Updating compression_pipeline.template to openfl.pipelines.KCPipeline...                                                       native.py:102

INFO     Did not find compression_pipeline.settings.n_clusters in config. Make sure it should exist. Creating...                        native.py:105

A full implementation can be found at Federated_Pytorch_MNIST_Tutorial.ipynb and at Tensorflow_MNIST.ipynb.

Open Federated Learning (OpenFL) Tutorials

These tutorials use the Jupyter* Lab server to help you become familiar with the APIs used in Open Federated Learning (OpenFL).

Aggregator-Based Workflow Tutorial

Use this tutorial to familiarize with the APIs of the short-lived components (Aggregator and Collaborator).

Aggregator-Based Workflow Tutorial

You will start a Jupyter* lab server and receive a URL you can use to access the tutorials. Jupyter notebooks are provided for PyTorch* and TensorFlow* that simulate a federation on a local machine.

Note

Follow the procedure to become familiar with the APIs used in aggregator-based workflow and conventions such as FL Plans, Aggregators, and Collaborators.

Start the Tutorials
  1. Start a Python* 3.8 (>=3.6, <3.9) virtual environment and confirm OpenFL is available.

    fx
    

    You should see a list of available commands

  2. Start a Jupyter server. This returns a URL to access available tutorials.

    fx tutorial start
    
  3. Open the URL (including the token) in your browser.

  4. Choose a tutorial from which to start. Each tutorial is a demonstration of a simulated federated learning. The following are examples of available tutorials:

  • Federated Keras MNIST Tutorial: workspace with a simple Keras CNN model that will download the MNIST dataset and train in a federation.

  • Federated Pytorch MNIST Tutorial: workspace with a simple PyTorch CNN model that will download the MNIST dataset and train in a federation.

  • Federated PyTorch UNET Tutorial: workspace with a UNET PyTorch model that will download the Hyper-Kvasir dataset and train in a federation.

  • Federated PyTorch TinyImageNet: workspace with a MobileNet-V2 PyTorch model that will download the Tiny-ImageNet dataset and train in a federation.

Familiarize with the API Concepts in an Aggregator-Based Worklow
Step 1: Enable the OpenFL Python API

Add the following lines to your Python script.

import openfl.native as fx
from openfl.federated import FederatedModel, FederatedDataSet

This loads the OpenFL package and import wrappers that adapt your existing data and models to a (simulated) federated context.

Step 2: Set Up the Experiment

For a basic experiment, run the following command.

fx.init()

This creates a workspace directory containing default FL plan values for your experiments, and sets up a an experiment with two collaborators (the collaborators are creatively named one and two).

For an experiment with more collaborators, run the following command.

collaborator_list = [str(i) for i in range(NUM_COLLABORATORS)]
fx.init('keras_cnn_mnist', col_names=collaborator_list)

Note

The following are template recommendations for training models:

  • For Keras models, run fx.init('keras_cnn_mnist') to start with the keras_cnn_mnist template.

  • For PyTorch models, run fx.init('torch_cnn_mnist') to start with the torch_cnn_mnist template.

Step 3: Customize the Federated Learning Plan (FL Plan)

For this example, the experiment is set up with the keras_cnn_mnist template.

fx.init('keras_cnn_mnist')

See the FL plan values that can be set with the fx.get_plan() command.

print(fx.get_plan())

{
  "aggregator.settings.best_state_path": "save/keras_cnn_mnist_best.pbuf",
  "aggregator.settings.init_state_path": "save/keras_cnn_mnist_init.pbuf",
  "aggregator.settings.last_state_path": "save/keras_cnn_mnist_last.pbuf",
  "aggregator.settings.rounds_to_train": 10,
  "aggregator.template": "openfl.component.Aggregator",
  ...
}

Based on this plan values, the experiment will run for 10 rounds. You can customize the experiment to run for 20 rounds either at runtime or ahead of time.

Set the value at runtime with the override-config parameter of fx.run_experiment.

#set values at experiment runtime
fx.run_experiment(experiment_collaborators, override_config={"aggregator.settings.rounds_to_train": 20})

Set the value ahead of time with fx.update_plan().

#Set values ahead of time with fx.update_plan()
fx.update_plan({"aggregator.settings.rounds_to_train": 20})
Step 4: Wrap the Data and Model

Use the FederatedDataSet function to wrap in-memory numpy datasets and split the data into N mutually-exclusive chunks for each collaborator participating in the experiment.

fl_data = FederatedDataSet(train_images, train_labels, valid_images, valid_labels, batch_size=32, num_classes=classes)

Similarly, the FederatedModel function takes as an argument your model definition. For the first example, you can wrap a Keras model in a function that outputs the compiled model.

Example 1:

def build_model(feature_shape,classes):
    #Defines the MNIST model
    model = Sequential()
    model.add(Dense(64, input_shape=feature_shape, activation='relu'))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(classes, activation='softmax'))

    model.compile(optimizer='adam', loss='categorical_crossentropy',metrics=['accuracy'])
    return model

fl_model = FederatedModel(build_model, data_loader=fl_data)

For the second example with a PyTorch model, the FederatedModel function takes the following parameters:

  • The class that defines the network definition and associated forward function

  • The lambda optimizer method that can be set to a newly instantiated network

  • The loss function

Example 2:

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, 3)
        self.fc1 = nn.Linear(32 * 5 * 5, 32)
        self.fc2 = nn.Linear(32, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(x.size(0),-1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)

optimizer = lambda x: optim.Adam(x, lr=1e-4)

def cross_entropy(output, target):
    """Binary cross-entropy metric
    """
    return F.binary_cross_entropy_with_logits(input=output,target=target)

fl_model = FederatedModel(build_model=Net, optimizer=optimizer, loss_fn=cross_entropy, data_loader=fl_data)
Step 5: Define the Collaborators

Define the collaborators taking part in the experiment. The example below uses the collaborator list, created earlier with the the fx.init() command.

experiment_collaborators = {col_name:col_model for col_name, col_model \
                                 in zip(collaborator_list, fl_model.setup(len(collaborator_list)))}

This command creates a model for each collaborator with their data shard.

Note

In production deployments of OpenFL, each collaborator will have the data on premise. Splitting data into shards is not necessary.

Step 6: Run the Experiment

Run the experiment for five rounds and return the final model once completed.

final_fl_model = fx.run_experiment(experiment_collaborators, override_config={"aggregator.settings.rounds_to_train": 5})

Experimental Features

This section includes a set of experimental features that our team wants feedback on before adding them into core OpenFL. Experimental features are not ready for production. These features are under active development and intended to make their way into core OpenFL, but there are several key considerations to make when building on top of these:

  1. Backward compatibility is not guaranteed - Our goal is to maintain backward compatibility whenever possible, but user feedback (and our own internal research) may result in necessary changes to the APIs.

Workflow Interface

Learn how to:
  • Chain a series of tasks that run on aggregator or collaborator.

  • Filter out information that should stay local

  • Use Metaflow tools to analyze and debug experiments

Workflow Interface

Important Note

The OpenFL workflow interface is experimental, subject to change, and is currently limited to single node execution. To setup and launch a real federation, see Run the Federation

What is it?

A new OpenFL interface that gives significantly more flexility to researchers in the construction of federated learning experiments. It is heavily influenced by the interface and design of `Metaflow , the popular framework for data scientists originally developed at Netflix. There are several reasons we converged on Metaflow as inspiration for our work:

  1. Clean expression of task sequence. Flows start with a start task, and end with end. The next task in the sequence is called by self.next.

  2. Easy selection of what should be sent between tasks using include or exclude

  3. Excellent tooling ecosystem: the metaflow client gives easy access to prior runs, tasks, and data artifacts generated by an experiment.

There are several modifications we make in our reimagined version of this interface that are necessary for federated learning:

  1. Placement: Metaflow’s @step decorator is replaced by placement decorators that specify where a task will run. In horizontal federated learning, there are server (or aggregator) and client (or collaborator) nodes. Tasks decorated by @aggregator will run on the aggregator node, and @collaborator will run on the collaborator node. These placement decorators are interpreted by Runtime implementations: these do the heavy lifting of figuring out how to get the state of the current task to another process or node.

  2. Runtime: Each flow has a .runtime attribute. The runtime encapsulates the details of the infrastucture where the flow will run. In this experimental release, we support only a LocalRuntime single node implementation, but as this work matures, we will extend to a FederatedRuntime that implements distributed operation across remote infrastructure.

  3. Conditional branches: Perform different tasks if a criteria is met

  4. Loops: Internal loops are within a flow; this is necessary to support rounds of training where the same sequence of tasks is performed repeatedly.

How to use it?

Let’s start with the basics. A flow is intended to define the entirety of federated learning experiment. Every flow begins with the start task and concludes with the end task. At each step in the flow, attributes can be defined, modified, or deleted. Attributes get passed forward to the next step in the flow, which is defined by the name of the task passed to the next function. In the line before each task, there is a placement decorator. The placement decorator defines where that task will be run. The OpenFL Workflow Interface adopts the conventions set by Metaflow, that every workflow begins with start and concludes with the end task. In the following example, the aggregator begins with an optionally passed in model and optimizer. The aggregator begins the flow with the start task, where the list of collaborators is extracted from the runtime (self.collaborators = self.runtime.collaborators) and is then used as the list of participants to run the task listed in self.next, aggregated_model_validation. The model, optimizer, and anything that is not explicitly excluded from the next function will be passed from the start function on the aggregator to the aggregated_model_validation task on the collaborator. Where the tasks run is determined by the placement decorator that precedes each task definition (@aggregator or @collaborator). Once each of the collaborators (defined in the runtime) complete the aggregated_model_validation task, they pass their current state onto the train task, from train to local_model_validation, and then finally to join at the aggregator. It is in join that an average is taken of the model weights, and the next round can begin.

class FederatedFlow(FLSpec):

    def __init__(self, model = None, optimizer = None, rounds=3, **kwargs):
        super().__init__(**kwargs)
        if model is not None:
            self.model = model
            self.optimizer = optimizer
        else:
            self.model = Net()
            self.optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,
                                   momentum=momentum)
        self.rounds = rounds

    @aggregator
    def start(self):
        print(f'Performing initialization for model')
        self.collaborators = self.runtime.collaborators
        self.private = 10
        self.current_round = 0
        self.next(self.aggregated_model_validation,foreach='collaborators',exclude=['private'])

    @collaborator
    def aggregated_model_validation(self):
        print(f'Performing aggregated model validation for collaborator {self.input}')
        self.agg_validation_score = inference(self.model,self.test_loader)
        print(f'{self.input} value of {self.agg_validation_score}')
        self.next(self.train)

    @collaborator
    def train(self):
        self.model.train()
        self.optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,
                                   momentum=momentum)
        train_losses = []
        for batch_idx, (data, target) in enumerate(self.train_loader):
          self.optimizer.zero_grad()
          output = self.model(data)
          loss = F.nll_loss(output, target)
          loss.backward()
          self.optimizer.step()
          if batch_idx % log_interval == 0:
            print('Train Epoch: 1 [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
               batch_idx * len(data), len(self.train_loader.dataset),
              100. * batch_idx / len(self.train_loader), loss.item()))
            self.loss = loss.item()
            torch.save(self.model.state_dict(), 'model.pth')
            torch.save(self.optimizer.state_dict(), 'optimizer.pth')
        self.training_completed = True
        self.next(self.local_model_validation)

    @collaborator
    def local_model_validation(self):
        self.local_validation_score = inference(self.model,self.test_loader)
        print(f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')
        self.next(self.join, exclude=['training_completed'])

    @aggregator
    def join(self,inputs):
        self.average_loss = sum(input.loss for input in inputs)/len(inputs)
        self.aggregated_model_accuracy = sum(input.agg_validation_score for input in inputs)/len(inputs)
        self.local_model_accuracy = sum(input.local_validation_score for input in inputs)/len(inputs)
        print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')
        print(f'Average training loss = {self.average_loss}')
        print(f'Average local model validation values = {self.local_model_accuracy}')
        self.model = FedAvg([input.model for input in inputs])
        self.optimizer = [input.optimizer for input in inputs][0]
        self.current_round += 1
        if self.current_round < self.rounds:
            self.next(self.aggregated_model_validation, foreach='collaborators', exclude=['private'])
        else:
            self.next(self.end)

    @aggregator
    def end(self):
        print(f'This is the end of the flow')
Background

Prior interfaces in OpenFL support the standard horizontal FL training workflow:

  1. The collaborator downloads the latest model from the aggregator

  2. The collaborator performs validation with their local validation dataset on the aggregated model, and sends these metrics to the aggregator (aggregated_model_validation task)

  3. The collaborator trains the model on their local training data set, and sends the local model weights and metrics to the aggregator (train task)

  4. The collaborator performs validation with their local validation dataset on their locally trained model, and sends their validation metrics to the aggregator (locally_tuned_model_validation task)

  5. The aggregator applies an aggregation function (weighted average, FedCurv, FedProx, etc.) to the model weights, and reports the aggregate metrics.

The Task Assigner determines the list of collaborator tasks to be performed, and both in the task runner API as well as the interactive API these tasks can be modified (to varying degrees). For example, to perform federated evaluation of a model, only the aggregated_model_validation task would be selected for the assigner’s block of the federated plan. Equivalently for the interactive API, this can be done by only registering a single validation task. But there are many other types of workflows that can’t be easily represented purely by training / validation tasks performed on a collaborator with a single model. An example is training a Federated Generative Adversarial Network (GAN); because this may be represented by separate generative and discriminator models, and could leak information about a collaborator dataset, the interface we provide should allow for better control over what gets sent over the network and how. Another common request we get is for validation with an aggregator’s dataset after training. Prior to OpenFL 1.5, there has not a great way to support this in OpenFL.

Goals
  1. Simplify the federated workflow representation

  2. Clean separation of workflow from runtime infrastructure

  3. Help users better understand the steps in federated learning (weight extraction, tensor compression, etc.)

  4. Interface makes it clear what is sent across the network

  5. The placement of tasks and how they connect should be straightforward

  6. Don’t reinvent unless absolutely necessary

Workflow Interface API

The workflow interface formulates the experiment as a series of tasks, or a flow. Every flow begins with the start task and concludes with end.

Runtimes

A Runtime defines where the flow will be executed, who the participants are in the experiment, and the private information that each participant has access to. In this experimental release, single node execution is supported using the LocalRuntime. Let’s see how a LocalRuntime is created:

# Setup participants
aggregator = Aggregator()
aggregator.private_attributes = {}

# Setup collaborators with private attributes
collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']
collaborators = [Collaborator(name=name) for name in collaborator_names]
for idx, collaborator in enumerate(collaborators):
    local_train = deepcopy(mnist_train)
    local_test = deepcopy(mnist_test)
    local_train.data = mnist_train.data[idx::len(collaborators)]
    local_train.targets = mnist_train.targets[idx::len(collaborators)]
    local_test.data = mnist_test.data[idx::len(collaborators)]
    local_test.targets = mnist_test.targets[idx::len(collaborators)]
    collaborator.private_attributes = {
            'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),
            'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)
    }

# This is equivalent to:
# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)

Let’s break this down, starting with the Aggregator and Collaborator placeholders. These placeholders represent the nodes where tasks will be executed. Each participant placeholder has its own set of private_attributes; a dictionary where the key is the name of the attribute, and the value is the object. In the above example, each of the four collaborators (‘Portland’, ‘Seattle’, ‘Chandler’, and ‘Bangalore’), have a train_loader and test_loader that they can access. These private attributes can be named anything, and do not necessarily need to be the same across each participant.

Now let’s see how the runtime for a flow is assigned, and the flow gets run:

flow = FederatedFlow()
flow.runtime = local_runtime
flow.run()

And that’s it! This will run an instance of the FederatedFlow on a single node in a single process.

Runtime Backends

The Runtime defines where code will run, but the Runtime has a Backend - which defines the underlying implementation of how the flow will be executed. 'single_process' is the default in the LocalRuntime: it executes all code sequentially within a single python process, and is well suited to run both on high spec and low spec hardware. For users with large servers or multiple GPUs they wish to take advantage of, we also provide a Ray <https://github.com/ray-project/ray> backend. The Ray backend enables parallel task execution for collaborators, and optionally allows users to request dedicated GPUs for collaborator tasks in the placement decorator, as follows:

ExampleDedicatedGPUFlow(FLSpec):
    ...
    # We request one dedicated GPU for this task
    @collaborator(num_gpus=1)
    def training(self):
        print(f'CUDA_VISIBLE_DEVICES: {os.environ["CUDA_VISIBLE_DEVICES"]}'))
        self.loss = train_func(self.model, self.train_loader)
        self.next(self.validation)
    ...

 # The Ray Backend will now be used for local execution
 local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')
Debugging with the Metaflow Client

Federated learning is difficult to debug. A common example of this difficulty comes in the form of mislabeled datasets. Even one mislabeled dataset on a collaborator’s training set in a large federation can result model convergence delay and lower aggregate accuracy. Wouldn’t it be better to pinpoint these problems early instead of after the full experiment has taken place?

To improve debugging of federated learning experiments, we are reusing Metaflow’s interfaces to (optionally) save all of the attributes generated by each participant, every task’s stdout / stderr, and provide a visual representation of the workflow graph.

Capturing this information requires just a one line change to the Flow object initialization by setting checkpoint=True:

flow = FederatedFlow(..., checkpoint=True)

After the flow has started running, you can use the Metaflow Client to get intermediate information from any of the participants tasks:

from metaflow import Flow, Run, Task, Step

# The name of the flow is the name of the class
flow = Flow('FederatedFlow')
run = flow.latest_run
list(run)
> [Step('FederatedFlow/1671152854447797/end'),
   Step('FederatedFlow/1671152854447797/join'),
   Step('FederatedFlow/1671152854447797/local_model_validation'),
   Step('FederatedFlow/1671152854447797/train'),
   Step('FederatedFlow/1671152854447797/aggregated_model_validation'),
   Step('FederatedFlow/1671152854447797/start')]
step = Step('FederatedFlow/1671152854447797/aggregated_model_validation')
for task in step:
    if task.data.input == 'Portland':
        print(task.data)
        portland_task = task
        model = task.data.model
> <MetaflowData: train_loader, collaborators, loss, optimizer, model, input, rounds, agg_validation_score, current_round, test_loader, training_completed>
print(model)
> Net(
   (conv1): Conv2d(1, 10, kernel_size=(5, 5), stride=(1, 1))
   (conv2): Conv2d(10, 20, kernel_size=(5, 5), stride=(1, 1))
   (conv2_drop): Dropout2d(p=0.5, inplace=False)
   (fc1): Linear(in_features=320, out_features=50, bias=True)
   (fc2): Linear(in_features=50, out_features=10, bias=True)
 )

And if we wanted to get log or error message for that task, you can just run:

print(portland_task.stdout)
> Train Epoch: 1 [0/15000 (0%)]      Loss: 2.295608
  Train Epoch: 1 [640/15000 (4%)]    Loss: 2.311402
  Train Epoch: 1 [1280/15000 (9%)]   Loss: 2.281983
  Train Epoch: 1 [1920/15000 (13%)]  Loss: 2.269565
  Train Epoch: 1 [2560/15000 (17%)]  Loss: 2.261440
  ...
print(portland_task.stderr)
> [No output]

While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, checkpoint is disabled by default.

Runtimes: Future Plans

Our goal is to make it a one line change to configure where and how a flow is executed. While we only support single node execution with the LocalRuntime today, our aim in future releases is to make going from one to multiple nodes as easy as:

flow = FederatedFlow()
# Run on a single node first
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
flow.runtime = local_runtime
flow.run()

# A future example of how the same flow could be run on distributed infrastructure
federated_runtime = FederatedRuntime(...)
flow.runtime = federated_runtime
flow.run()

OpenFL Structure

Learn about the short-lived and long-lived components that compose Open Federated Learning (OpenFL):

Understand the procedure calls to the Director service.

Learn about the plugin framework that makes OpenFL flexible and extensible for your use:

Core Components

Open Federated Learning (OpenFL) has the following components:

Short-Lived Components

These components are terminated when the experiment is finished.

  • The Aggregator which receives model updates from Collaborators and combines them to form the global model.

  • The Collaborator which uses local dataset to train a global model.

The Aggregator is framework-agnostic, as it operates tensors in OpenFL inner representation, while the Collaborator can use deep learning frameworks as computational backend, such as TensorFlow* or PyTorch*.

Aggregator

The Aggregator is a short-lived entity, which means that its lifespan is limited by the experiment execution time. It orchestrates Collaborators according to the FL plan, performs model aggregation at the end of each round, and acts as a parameter server for collaborators.

Model weight aggregation logic may be customized via plugin mechanism.

The Aggregator is spawned by the Director when a new experiment is submitted.

Collaborator

The Collaborator is a short-lived entity that manages training the model on local data, which includes

  • executing assigned tasks,

  • converting deep learning framework-specific tensor objects to OpenFL inner representation, and

  • exchanging model parameters with the Aggregator.

The Collaborator is created by the Envoy when a new experiment is submitted in the Director-based workflow. The Collaborator should be started from CLI if a user follows the Aggregator-based workflow

Every Collaborator is a unique service. The data loader is loaded with a local shard descriptor to perform tasks included in an FL experiment. At the end of the training task, weight tensors are extracted and sent to the central node and aggregated.

Converting tensor objects is handled by framework adapter plugins. Included in OpenFL are framework adapters for PyTorch and TensorFlow 2.x. The list of framework adapters is extensible. User can contribute new framework adapters for deep learning frameworks they would like see supported in OpenFL.

Long-Lived Components

These components were introduced to support the Director-based workflow.

  • The Director is the central node of the federation. This component starts an Aggregator for each experiment, broadcasts experiment archive to connected collaborator nodes, and provides updates on the status.

  • The Envoy runs on collaborator nodes and is always connected to the Director. When the Director starts an experiment, the Envoy starts the Collaborator to train the global model.

These components stay available to distribute several of experiments in the federation.

Director

The Director is a long-lived entity and is the central node of the federation. It accepts connections from:

  • Frontend clients (data scientists using Interactive Python API (Beta))

  • Envoys, if their Shard Descriptors are complient to the same data interface

The Director supports concurrent frontend connections. While the Director may take in several experiments, the experiments are executed in series.

When an experiment is reported, the Director starts an Aggregator and sends the experiment data to involved Envoys. While an experiment is running, the Director oversees the Aggregator and delivers updates on the status of the experiment, which includes trained model snapshots and metrics by request.

Envoy

The Envoy is a long-lived entity that runs on collaborator nodes connected to the Director.

Every Envoy is matched to one shard descriptor in order to run. When the Director starts an experiment, the Envoy accepts the experiment workspace, prepares the environment, and starts a Collaborator.

The envoy is also responsible for sending heartbeat messages to the Director. These messages may also include information regarding collaborator machine resource utilization. Refer to device monitor plugin for details.

Static Diagram

_images/director_workflow.svg

Director Service Communication Diagrams

The following diagrams depict existing procedure calls to the Director service. Included are interactions with the Director’s inner representations to better understand their signatures.

Director-Envoy Communication

The following diagram depicts a typical process of establishing a Federation and registering an experiment.

sequenceDiagram     participant D as Director     participant E as Envoy     rect rgb(0, 255, 0,.1)         Note over D,E: A Federation startup process         D->D: Starts         E->E: Starts and loads local Shard Descriptor         E-->>D: Connects using FQDN and certificate         E-->>+D: Communicates dataset info         D-->D: Ensures unified data interface         D-->>-E: Approves         D-->D: Keeps a list of connected Envoys              end     Note over D,E: We consider a Federation set up     rect rgb(0, 0, 255,.05)         Note over D,E: An Experiment's start         D->D: Registers a model, FL tasks and an FL plan         D->D:Starts an Aggregator with a toy plan         Note left of D: An Envoy with dummy data is <br>started locally         opt Test run failed             D-->D: Notifies user that <br>the experiment is inconsistent             D-->D: The experiment ends         end         D->D: Reviews the FL plan         D->D: Starts an Aggregator         D-->>E: Sends the experiment archive <br>and the FL plan to envoys         E-->E: Reviews the FL Plan         E-->E: Starts a Collaborator         D-->D: Fills the model with the final weights <br>and returns to user     end     Note over D,E: The Experiment ended. <br> The Federation keeps existing.

Basic Scenario of Director-Envoy Communication

Plugin Components

Open Federated Learning (OpenFL) is designed to be a flexible and extensible framework. Plugins are interchangeable parts of OpenFL components. Different plugins support varying usage scenarios. A plugin may be required or optional.

You can provide your implementations of OpenFL plugins to achieve a desired behavior. Technically, a plugin is just a class that implements some interface. You may enable a plugin by putting its import path and initialization parameters to the config file of a corresponding OpenFL component or to the frontend Python API. See openfl-tutorials for more details.

Framework Adapter

The Framework Adapter plugin enables OpenFL support for Deep Learning frameworks usage in FL experiments. It is a required plugin for the frontend API component and Envoy. All the framework-specific operations on model weights are isolated in this plugin so OpenFL can be framework-agnostic.

The Framework adapter plugin interface has two required methods to load and extract tensors from a model and an optimizer:

  • get_tensor_dict

  • set_tensor_dict

get_tensor_dict method accepts a model and optionally an optimizer. It should return a dictionary {tensor_name : ndarray} that maps tensor names to tensors in the NumPy representation.

@staticmethod
def get_tensor_dict(model, optimizer=None) -> dict:

set_tensor_dict method accepts a tensor dictionary, a model, and optionally an optimizer. It loads weights from the tensor dictionary to the model in place. Tensor names in the dictionary match corresponding names set in get_tensor_dict.

@staticmethod
def set_tensor_dict(model, tensor_dict, optimizer=None, device='cpu') -> None:

If your new framework model cannot be directly serialized with pickle-type libraries, you can optionally implement the serialization_setup method to prepare the model object for serialization.

def serialization_setup():

Experiment Serializer

The Serializer plugin is used on the frontend Python API to serialize the Experiment components and then on Envoys to deserialize them. Currently, the default serializer plugin is based on pickling. It is a required plugin.

The serializer plugin must implement the serialize method that creates a Python object representation on disk.

@staticmethod
def serialize(object_, filename: str) -> None:

The plugin must also implement the restore_object method that will load previously serialized object from disk.

@staticmethod
def restore_object(filename: str):

CUDA Device Monitor

The CUDA Device Monitor plugin is an optional plugin for Envoys that can gather status information about GPU devices. This information may be used by Envoys and included in a healthcheck message that is sent to the Director. Therefore, you can query this Envoy Registry information from the Director to determine the status of CUDA devices.

CUDA Device Monitor plugin must implement the following interface:

class CUDADeviceMonitor:

   def get_driver_version(self) -> str:
      ...

   def get_device_memory_total(self, index: int) -> int:
      ...

   def get_device_memory_utilized(self, index: int) -> int:
      ...

   def get_device_utilization(self, index: int) -> str:
      """It is just a general method that returns a string that may be shown to the frontend user."""
      ...

OpenFL Troubleshooting

The following is a list of commonly reported issues in Open Federated Learning (OpenFL). If you don’t see your issue reported here, please submit a Github issue or contact us directly on Slack.

  1. I see the error Cannot import name TensorFlowDataLoader from openfl.federated

    OpenFL currently uses conditional imports to attempt to be framework agnostic. If your task runner is derived from KerasTaskRunner or TensorflowTaskRunner, this error could come up if TensorFlow* was not installed in your collaborator’s virtual environment. If running on multi-node experiment, we recommend using the fx workspace export and fx workspace import commands, as this will ensure consistent modules between aggregator and collaborators.

  2. None of the collaborators can connect to my aggregator node

    There are a few reasons that this can happen, but the most common is the aggregator node’s FQDN (Fully qualified domain name) was incorrectly specified in the plan. By default, fx plan initialize will attempt to resolve the FQDN for you (this should look something like hostname.domain.com), but this can sometimes parse an incorrect domain name.

    If you face this issue, look at agg_addr in plan/plan.yaml and verify that you can access this address externally. If the address is externally accessible and you are running OpenFL in an enterprise environment, verify that the aggregator’s listening port is not blocked. In such cases, agg_port should be manually specified in the FL plan and then redistributed to all participants.

  3. After starting the collaborator, I see the error Handshake failed with fatal error SSL_ERROR_SSL

    This error likely results from a bad certificate presented by the collaborator. Steps for regenerating the collaborator certificate can be found here.

  4. I am seeing some other error while running the experiment. Is there more verbose logging available so I can investigate this on my own?

    Yes! You can turn on verbose logging with fx -l DEBUG collaborator start or fx -l DEBUG aggregator start. This will give verbose information related to gRPC, bidirectional tensor transfer, and compression related information.

  5. Silent failures resulting from Out of Memory errors

    Observations:
    • fx envoy command terminates abruptly during the execution of training or validation loop due to the SIGKILL command issued by the kernel.

    • OOM error is captured in the kernel trace but not at the user program level.

    • The failure is likely due to non-optimal memory resource utilization in the prior PyTorch version 1.3.1 & 1.4.0.

    Solution:
    • Recent version of PyTorch better handles the memory utilization during runtime. Upgrade the PyTorch version to >= 1.11.0

Notices and Disclaimers

© Intel Corporation. Intel, the Intel logo, and other Intel marks are trademarks of Intel Corporation or its subsidiaries. Other names and brands may be claimed as the property of others.

​​Intel technologies may require enabled hardware, software or service activation.​​​​

​No product or compon​ent can be absolutely secure. ​

Your costs and results may vary.

​​No license (express or implied, by estoppel or otherwise) to any intellectual property rights is granted by this document.