Interactive API: MNIST Classification Tutorial

In this tutorial, we will set up a federation and train a basic TensoFlow model on the MNIST dataset using the interactive API. See full tutorial.

About the dataset

It is a dataset of 60,000 small square 28x28 pixel grayscale images of handwritten single digits between 0 and 9. More info at wiki.

Note

This tutorial will be run without TLS and will be done locally as a simulation

Step 0: Installation

  • If you haven’t done so already, create a virtual environment, upgrade pip and install OpenFL (See Install the Package)

Step 1: Set up environment

Split terminal into 3 (1 terminal for the director, 1 for the envoy, and 1 for the experiment) and activate the virtual environment created in Step 0

$ source venv/bin/activate

Clone the OpenFL repository:

$ git clone https://github.com/securefederatedai/openfl.git

Navigate to the tutorial:

$ cd openfl/openfl-tutorials/interactive_api/Tensorflow_MNIST

Step 2: Setting up Director

In the first terminal, run the director:

$ cd director
$ ./start_director.sh

Step 3: Setting up Envoy

In the second terminal, run the envoy:

$ cd envoy
$ ./start_envoy.sh env_one envoy_config_one.yaml

Optional: Run a second envoy in an additional terminal:

  • Ensure steps 0 and 1 are complete for this terminal as well.

  • Run the second envoy:

$ cd envoy
$ ./start_envoy.sh env_two envoy_config_two.yaml

Step 4: Run the federation

In the third terminal (or forth terminal, if you chose to do two envoys) run the Tensorflow_MNIST.ipynb Jupyter Notebook:

$ cd workspace
$ jupyter lab Tensorflow_MNIST.ipynb

Notebook walkthrough:

Contents of this notebook can be found here.

Install additional dependencies if not already installed

$ pip install tensorflow==2.8

Import:

import tensorflow as tf
print('TensorFlow', tf.__version__)

Connect to the Federation

Be sure to start Director and Envoy (Steps 2 and 3) before proceeding with this cell.

This cell connects this notebook to the Federation.

from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
client_id = 'api'
cert_dir = 'cert'
director_node_fqdn = 'localhost'
director_port = 50051

# Run with TLS disabled (trusted environment)

# Create a Federation
federation = Federation(
    client_id=client_id,
    director_node_fqdn=director_node_fqdn,
    director_port=director_port,
    tls=False
)

Query Datasets from Shard Registry

shard_registry = federation.get_shard_registry()
shard_registry
# First, request a dummy_shard_desc that holds information about the federated dataset
dummy_shard_desc = federation.get_dummy_shard_descriptor(size=10)
dummy_shard_dataset = dummy_shard_desc.get_dataset('train')
sample, target = dummy_shard_dataset[0]
f"Sample shape: {sample.shape}, target shape: {target.shape}"

Describing FL experiment

from openfl.interface.interactive_api.experiment import TaskInterface
from openfl.interface.interactive_api.experiment import ModelInterface
from openfl.interface.interactive_api.experiment import FLExperiment

Register model

# Define model
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28, 28, 1)),
    tf.keras.layers.MaxPooling2D((2, 2)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation=None),
], name='simplecnn')
model.summary()

# Define optimizer
optimizer = tf.optimizers.Adam(learning_rate=1e-3)

# Loss and metrics. These will be used later.
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()

# Create ModelInterface
framework_adapter = 'openfl.plugins.frameworks_adapters.keras_adapter.FrameworkAdapterPlugin'
MI = ModelInterface(model=model, optimizer=optimizer, framework_plugin=framework_adapter)

Register dataset

import numpy as np
from tensorflow.keras.utils import Sequence

from openfl.interface.interactive_api.experiment import DataInterface


class DataGenerator(Sequence):

    def __init__(self, shard_descriptor, batch_size):
        self.shard_descriptor = shard_descriptor
        self.batch_size = batch_size
        self.indices = np.arange(len(shard_descriptor))
        self.on_epoch_end()

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, index):
        index = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.indices[k] for k in index]

        X, y = self.shard_descriptor[batch]
        return X, y

    def on_epoch_end(self):
        np.random.shuffle(self.indices)


class MnistFedDataset(DataInterface):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @property
    def shard_descriptor(self):
        return self._shard_descriptor

    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor

        self.train_set = shard_descriptor.get_dataset('train')
        self.valid_set = shard_descriptor.get_dataset('val')

    def __getitem__(self, index):
        return self.shard_descriptor[index]

    def __len__(self):
        return len(self.shard_descriptor)

    def get_train_loader(self):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        if self.kwargs['train_bs']:
            batch_size = self.kwargs['train_bs']
        else:
            batch_size = 32
        return DataGenerator(self.train_set, batch_size=batch_size)

    def get_valid_loader(self):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        if self.kwargs['valid_bs']:
            batch_size = self.kwargs['valid_bs']
        else:
            batch_size = 32

        return DataGenerator(self.valid_set, batch_size=batch_size)

    def get_train_data_size(self):
        """
        Information for aggregation
        """

        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)

Create Mnist federated dataset

fed_dataset = MnistFedDataset(train_bs=64, valid_bs=512)

Define and register FL tasks

import time

TI = TaskInterface()

# from openfl.interface.aggregation_functions import AdagradAdaptiveAggregation    # Uncomment this lines to use
# agg_fn = AdagradAdaptiveAggregation(model_interface=MI, learning_rate=0.4)       # Adaptive Federated Optimization
# @TI.set_aggregation_function(agg_fn)                                             # alghorithm!
#                                                                                  # See details in the:
#                                                                                  # https://arxiv.org/abs/2003.00295

@TI.register_fl_task(model='model', data_loader='train_dataset', device='device', optimizer='optimizer')
def train(model, train_dataset, optimizer, device, loss_fn=loss_fn, warmup=False):
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))
        if warmup:
            break

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()


    return {'train_acc': train_acc,}


@TI.register_fl_task(model='model', data_loader='val_dataset', device='device')
def validate(model, val_dataset, device):
    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))

    return {'validation_accuracy': val_acc,}

Time to start a federated learning experiment

# create an experimnet in federation
experiment_name = 'mnist_experiment'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name,serializer_plugin='openfl.plugins.interface_serializer.keras_seri
# print the default federated learning plan
import openfl.native as fx
print(fx.get_plan(fl_plan=fl_experiment.plan))
# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(model_provider=MI,
                task_keeper=TI,
                data_loader=fed_dataset,
                rounds_to_train=5,
                opt_treatment='CONTINUE_GLOBAL',
                override_config={'aggregator.settings.db_store_rounds': 1, 'compression_pipeline.template': 'openfl.pipelines.KCPip
fl_experiment.stream_metrics()