Skip to content
Snippets Groups Projects
Commit 187f78aa authored by Michael Reneer's avatar Michael Reneer Committed by tensorflow-copybara
Browse files

Add a compiler function to the `ExecutionContext` object.

This change adds a compiler function to the `ExecutionContext` object. Conceptually a `Context` can be thought of as an "environment" which owns compilation and owns execution for a given computation. Additionally, this change replaces `set_default_executor` with higher level functions in order to simplify how contexts are constructed.

* Added compiler function to the `ExecutionContext`.
* Deprecated `set_default_executor`.
* Removed all usage of `set_default_executor` internally.
* Added convenience high level functions that set an execution context:
  * tff.backends.native.set_local_execution_context
* Updated `set_default_executor` call-sites to either use the convenience high level functions or to manually construct a context and use `set_default_context`.

Note that we should consider creating the following convenience high level functions:
  * tff.backends.native.set_remote_execution_context
  * tff.backends.native.set_sizing_execution_context
  * tff.backends.iree.set_iree_execution_context

PiperOrigin-RevId: 321263709
parent 997350cd
Branches
Tags
No related merge requests found
Showing
with 112 additions and 22 deletions
%% Cell type:markdown id: tags:
##### Copyright 2019 The TensorFlow Authors.
%% Cell type:code id: tags:
```
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```
%% Cell type:markdown id: tags:
# Custom Federated Algorithms, Part 2: Implementing Federated Averaging
%% Cell type:markdown id: tags:
<table class="tfo-notebook-buttons" align="left">
<td>
<a target="_blank" href="https://www.tensorflow.org/federated/tutorials/custom_federated_algorithms_2"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
</td>
<td>
<a target="_blank" href="https://colab.research.google.com/github/tensorflow/federated/blob/v0.14.0/docs/tutorials/custom_federated_algorithms_2.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
</td>
<td>
<a target="_blank" href="https://github.com/tensorflow/federated/blob/v0.14.0/docs/tutorials/custom_federated_algorithms_2.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
</td>
</table>
%% Cell type:markdown id: tags:
This tutorial is the second part of a two-part series that demonstrates how to
implement custom types of federated algorithms in TFF using the
[Federated Core (FC)](../federated_core.md), which serves as a foundation for
the [Federated Learning (FL)](../federated_learning.md) layer (`tff.learning`).
We encourage you to first read the
[first part of this series](custom_federated_algorithms_1.ipynb), which
introduce some of the key concepts and programming abstractions used here.
This second part of the series uses the mechanisms introduced in the first part
to implement a simple version of federated training and evaluation algorithms.
We encourage you to review the
[image classification](federated_learning_for_image_classification.ipynb) and
[text generation](federated_learning_for_text_generation.ipynb) tutorials for a
higher-level and more gentle introduction to TFF's Federated Learning APIs, as
they will help you put the concepts we describe here in context.
%% Cell type:markdown id: tags:
## Before we start
Before we start, try to run the following "Hello World" example to make sure
your environment is correctly setup. If it doesn't work, please refer to the
[Installation](../install.md) guide for instructions.
%% Cell type:code id: tags:
```
#@test {"skip": true}
!pip install --quiet --upgrade tensorflow_federated
```
%% Cell type:code id: tags:
```
import collections
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
# TODO(b/148678573,b/148685415): must use the ReferenceExecutor because it
# supports unbounded references and tff.sequence_* intrinsics.
tff.framework.set_default_executor(tff.test.ReferenceExecutor())
tff.framework.set_default_context(tff.test.ReferenceExecutor())
```
%% Cell type:code id: tags:
```
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
```
%% Output
'Hello, World!'
%% Cell type:markdown id: tags:
## Implementing Federated Averaging
As in
[Federated Learning for Image Classification](federated_learning_for_image_classification.ipynb),
we are going to use the MNIST example, but since this is intended as a low-level
tutorial, we are going to bypass the Keras API and `tff.simulation`, write raw
model code, and construct a federated data set from scratch.
%% Cell type:markdown id: tags:
### Preparing federated data sets
For the sake of a demonstration, we're going to simulate a scenario in which we
have data from 10 users, and each of the users contributes knowledge how to
recognize a different digit. This is about as
non-[i.i.d.](https://en.wikipedia.org/wiki/Independent_and_identically_distributed_random_variables)
as it gets.
First, let's load the standard MNIST data:
%% Cell type:code id: tags:
```
mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
```
%% Output
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
%% Cell type:code id: tags:
```
[(x.dtype, x.shape) for x in mnist_train]
```
%% Output
[(dtype('uint8'), (60000, 28, 28)), (dtype('uint8'), (60000,))]
%% Cell type:markdown id: tags:
The data comes as Numpy arrays, one with images and another with digit labels, both
with the first dimension going over the individual examples. Let's write a
helper function that formats it in a way compatible with how we feed federated
sequences into TFF computations, i.e., as a list of lists - the outer list
ranging over the users (digits), the inner ones ranging over batches of data in
each client's sequence. As is customary, we will structure each batch as a pair
of tensors named `x` and `y`, each with the leading batch dimension. While at
it, we'll also flatten each image into a 784-element vector and rescale the
pixels in it into the `0..1` range, so that we don't have to clutter the model
logic with data conversions.
%% Cell type:code id: tags:
```
NUM_EXAMPLES_PER_USER = 1000
BATCH_SIZE = 100
def get_data_for_digit(source, digit):
output_sequence = []
all_samples = [i for i, d in enumerate(source[1]) if d == digit]
for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE):
batch_samples = all_samples[i:i + BATCH_SIZE]
output_sequence.append({
'x':
np.array([source[0][i].flatten() / 255.0 for i in batch_samples],
dtype=np.float32),
'y':
np.array([source[1][i] for i in batch_samples], dtype=np.int32)
})
return output_sequence
federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)]
federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
```
%% Cell type:markdown id: tags:
As a quick sanity check, let's look at the `Y` tensor in the last batch of data
contributed by the fifth client (the one corresponding to the digit `5`).
%% Cell type:code id: tags:
```
federated_train_data[5][-1]['y']
```
%% Output
array([5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5], dtype=int32)
%% Cell type:markdown id: tags:
Just to be sure, let's also look at the image corresponding to the last element of that batch.
%% Cell type:code id: tags:
```
from matplotlib import pyplot as plt
plt.imshow(federated_train_data[5][-1]['x'][-1].reshape(28, 28), cmap='gray')
plt.grid(False)
plt.show()
```
%% Output
%% Cell type:markdown id: tags:
### On combining TensorFlow and TFF
In this tutorial, for compactness we immediately decorate functions that
introduce TensorFlow logic with `tff.tf_computation`. However, for more complex
logic, this is not the pattern we recommend. Debugging TensorFlow can already be
a challenge, and debugging TensorFlow after it has been fully serialized and
then re-imported necessarily loses some metadata and limits interactivity,
making debugging even more of a challenge.
Therefore, **we strongly recommend writing complex TF logic as stand-alone
Python functions** (that is, without `tff.tf_computation` decoration). This way
the TensorFlow logic can be developed and tested using TF best practices and
tools (like eager mode), before serializing the computation for TFF (e.g., by invoking `tff.tf_computation` with a Python function as the argument).
%% Cell type:markdown id: tags:
### Defining a loss function
Now that we have the data, let's define a loss function that we can use for
training. First, let's define the type of input as a TFF named tuple. Since the
size of data batches may vary, we set the batch dimension to `None` to indicate
that the size of this dimension is unknown.
%% Cell type:code id: tags:
```
BATCH_SPEC = collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None], dtype=tf.int32))
BATCH_TYPE = tff.to_type(BATCH_SPEC)
str(BATCH_TYPE)
```
%% Output
'<x=float32[?,784],y=int32[?]>'
%% Cell type:markdown id: tags:
You may be wondering why we can't just define an ordinary Python type. Recall
the discussion in [part 1](custom_federated_algorithms_1.ipynb), where we
explained that while we can express the logic of TFF computations using Python,
under the hood TFF computations *are not* Python. The symbol `BATCH_TYPE`
defined above represents an abstract TFF type specification. It is important to
distinguish this *abstract* TFF type from concrete Python *representation*
types, e.g., containers such as `dict` or `collections.namedtuple` that may be
used to represent the TFF type in the body of a Python function. Unlike Python,
TFF has a single abstract type constructor `tff.NamedTupleType` for tuple-like
containers, with elements that can be individually named or left unnamed. This
type is also used to model formal parameters of computations, as TFF
computations can formally only declare one parameter and one result - you will
see examples of this shortly.
Let's now define the TFF type of model parameters, again as a TFF named tuple of
*weights* and *bias*.
%% Cell type:code id: tags:
```
MODEL_SPEC = collections.OrderedDict(
weights=tf.TensorSpec(shape=[784, 10], dtype=tf.float32),
bias=tf.TensorSpec(shape=[10], dtype=tf.float32))
MODEL_TYPE = tff.to_type(MODEL_SPEC)
print(MODEL_TYPE)
```
%% Output
<weights=float32[784,10],bias=float32[10]>
%% Cell type:markdown id: tags:
With those definitions in place, now we can define the loss for a given model, over a single batch. Note the usage of `@tf.function` decorator inside the `@tff.tf_computation` decorator. This allows us to write TF using Python like semantics even though were inside a `tf.Graph` context created by the `tff.tf_computation` decorator.
%% Cell type:code id: tags:
```
# NOTE: `forward_pass` is defined separately from `batch_loss` so that it can
# be later called from within another tf.function. Necessary because a
# @tf.function decorated method cannot invoke a @tff.tf_computation.
@tf.function
def forward_pass(model, batch):
predicted_y = tf.nn.softmax(
tf.matmul(batch['x'], model['weights']) + model['bias'])
return -tf.reduce_mean(
tf.reduce_sum(
tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1]))
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE)
def batch_loss(model, batch):
return forward_pass(model, batch)
```
%% Cell type:markdown id: tags:
As expected, computation `batch_loss` returns `float32` loss given the model and
a single data batch. Note how the `MODEL_TYPE` and `BATCH_TYPE` have been lumped
together into a 2-tuple of formal parameters; you can recognize the type of
`batch_loss` as `(<MODEL_TYPE,BATCH_TYPE> -> float32)`.
%% Cell type:code id: tags:
```
str(batch_loss.type_signature)
```
%% Output
'(<<weights=float32[784,10],bias=float32[10]>,<x=float32[?,784],y=int32[?]>> -> float32)'
%% Cell type:markdown id: tags:
As a sanity check, let's construct an initial model filled with zeros and
compute the loss over the batch of data we visualized above.
%% Cell type:code id: tags:
```
initial_model = collections.OrderedDict(
weights=np.zeros([784, 10], dtype=np.float32),
bias=np.zeros([10], dtype=np.float32))
sample_batch = federated_train_data[5][-1]
batch_loss(initial_model, sample_batch)
```
%% Output
2.3025854
%% Cell type:markdown id: tags:
Note that we feed the TFF computation with the initial model defined as a
`dict`, even though the body of the Python function that defines it consumes
model parameters as `model.weight` and `model.bias`. The arguments of the call
to `batch_loss` aren't simply passed to the body of that function.
What happens when we invoke `batch_loss`?
The Python body of `batch_loss` has already been traced and serialized in the above cell where it was defined. TFF acts as the caller to `batch_loss`
at the computation definition time, and as the target of invocation at the time
`batch_loss` is invoked. In both roles, TFF serves as the bridge between TFF's
abstract type system and Python representation types. At the invocation time,
TFF will accept most standard Python container types (`dict`, `list`, `tuple`,
`collections.namedtuple`, etc.) as concrete representations of abstract TFF
tuples. Also, although as noted above, TFF computations formally only accept a
single parameter, you can use the familiar Python call syntax with positional
and/or keyword arguments in case where the type of the parameter is a tuple - it
works as expected.
%% Cell type:markdown id: tags:
### Gradient descent on a single batch
Now, let's define a computation that uses this loss function to perform a single
step of gradient descent. Note how in defining this function, we use
`batch_loss` as a subcomponent. You can invoke a computation constructed with
`tff.tf_computation` inside the body of another computation, though typically
this is not necessary - as noted above, because serialization looses some
debugging information, it is often preferable for more complex computations to
write and test all the TensorFlow without the `tff.tf_computation` decorator.
%% Cell type:code id: tags:
```
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32)
def batch_train(initial_model, batch, learning_rate):
# Define a group of model variables and set them to `initial_model`. Must
# be defined outside the @tf.function.
model_vars = collections.OrderedDict([
(name, tf.Variable(name=name, initial_value=value))
for name, value in initial_model.items()
])
optimizer = tf.keras.optimizers.SGD(learning_rate)
@tf.function
def _train_on_batch(model_vars, batch):
# Perform one step of gradient descent using loss from `batch_loss`.
with tf.GradientTape() as tape:
loss = forward_pass(model_vars, batch)
grads = tape.gradient(loss, model_vars)
optimizer.apply_gradients(
zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars)))
return model_vars
return _train_on_batch(model_vars, batch)
```
%% Cell type:code id: tags:
```
str(batch_train.type_signature)
```
%% Output
'(<<weights=float32[784,10],bias=float32[10]>,<x=float32[?,784],y=int32[?]>,float32> -> <weights=float32[784,10],bias=float32[10]>)'
%% Cell type:markdown id: tags:
When you invoke a Python function decorated with `tff.tf_computation` within the
body of another such function, the logic of the inner TFF computation is
embedded (essentially, inlined) in the logic of the outer one. As noted above,
if you are writing both computations, it is likely preferable to make the inner
function (`batch_loss` in this case) a regular Python or `tf.function` rather
than a `tff.tf_computation`. However, here we illustrate that calling one
`tff.tf_computation` inside another basically works as expected. This may be
necessary if, for example, you do not have the Python code defining
`batch_loss`, but only its serialized TFF representation.
Now, let's apply this function a few times to the initial model to see whether
the loss decreases.
%% Cell type:code id: tags:
```
model = initial_model
losses = []
for _ in range(5):
model = batch_train(model, sample_batch, 0.1)
losses.append(batch_loss(model, sample_batch))
```
%% Cell type:code id: tags:
```
losses
```
%% Output
[0.19690022, 0.13176313, 0.10113226, 0.082738124, 0.0703014]
%% Cell type:markdown id: tags:
### Gradient descent on a sequence of local data
Now, since `batch_train` appears to work, let's write a similar training
function `local_train` that consumes the entire sequence of all batches from one
user instead of just a single batch. The new computation will need to now
consume `tff.SequenceType(BATCH_TYPE)` instead of `BATCH_TYPE`.
%% Cell type:code id: tags:
```
LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE)
@tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE)
def local_train(initial_model, learning_rate, all_batches):
# Mapping function to apply to each batch.
@tff.federated_computation(MODEL_TYPE, BATCH_TYPE)
def batch_fn(model, batch):
return batch_train(model, batch, learning_rate)
return tff.sequence_reduce(all_batches, initial_model, batch_fn)
```
%% Cell type:code id: tags:
```
str(local_train.type_signature)
```
%% Output
'(<<weights=float32[784,10],bias=float32[10]>,float32,<x=float32[?,784],y=int32[?]>*> -> <weights=float32[784,10],bias=float32[10]>)'
%% Cell type:markdown id: tags:
There are quite a few details buried in this short section of code, let's go
over them one by one.
First, while we could have implemented this logic entirely in TensorFlow,
relying on `tf.data.Dataset.reduce` to process the sequence similarly to how
we've done it earlier, we've opted this time to express the logic in the glue
language, as a `tff.federated_computation`. We've used the federated operator
`tff.sequence_reduce` to perform the reduction.
The operator `tff.sequence_reduce` is used similarly to
`tf.data.Dataset.reduce`. You can think of it as essentially the same as
`tf.data.Dataset.reduce`, but for use inside federated computations, which as
you may remember, cannot contain TensorFlow code. It is a template operator with
a formal parameter 3-tuple that consists of a *sequence* of `T`-typed elements,
the initial state of the reduction (we'll refer to it abstractly as *zero*) of
some type `U`, and the *reduction operator* of type `(<U,T> -> U)` that alters the
state of the reduction by processing a single element. The result is the final
state of the reduction, after processing all elements in a sequential order. In
our example, the state of the reduction is the model trained on a prefix of the
data, and the elements are data batches.
Second, note that we have again used one computation (`batch_train`) as a
component within another (`local_train`), but not directly. We can't use it as a
reduction operator because it takes an additional parameter - the learning rate.
To resolve this, we define an embedded federated computation `batch_fn` that
binds to the `local_train`'s parameter `learning_rate` in its body. It is
allowed for a child computation defined this way to capture a formal parameter
of its parent as long as the child computation is not invoked outside the body
of its parent. You can think of this pattern as an equivalent of
`functools.partial` in Python.
The practical implication of capturing `learning_rate` this way is, of course,
that the same learning rate value is used across all batches.
Now, let's try the newly defined local training function on the entire sequence
of data from the same user who contributed the sample batch (digit `5`).
%% Cell type:code id: tags:
```
locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5])
```
%% Cell type:markdown id: tags:
Did it work? To answer this question, we need to implement evaluation.
%% Cell type:markdown id: tags:
### Local evaluation
Here's one way to implement local evaluation by adding up the losses across all data
batches (we could have just as well computed the average; we'll leave it as an
exercise for the reader).
%% Cell type:code id: tags:
```
@tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE)
def local_eval(model, all_batches):
# TODO(b/120157713): Replace with `tff.sequence_average()` once implemented.
return tff.sequence_sum(
tff.sequence_map(
tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE),
all_batches))
```
%% Cell type:code id: tags:
```
str(local_eval.type_signature)
```
%% Output
'(<<weights=float32[784,10],bias=float32[10]>,<x=float32[?,784],y=int32[?]>*> -> float32)'
%% Cell type:markdown id: tags:
Again, there are a few new elements illustrated by this code, let's go over them
one by one.
First, we have used two new federated operators for processing sequences:
`tff.sequence_map` that takes a *mapping function* `T->U` and a *sequence* of
`T`, and emits a sequence of `U` obtained by applying the mapping function
pointwise, and `tff.sequence_sum` that just adds all the elements. Here, we map
each data batch to a loss value, and then add the resulting loss values to
compute the total loss.
Note that we could have again used `tff.sequence_reduce`, but this wouldn't be
the best choice - the reduction process is, by definition, sequential, whereas
the mapping and sum can be computed in parallel. When given a choice, it's best
to stick with operators that don't constrain implementation choices, so that
when our TFF computation is compiled in the future to be deployed to a specific
environment, one can take full advantage of all potential opportunities for a
faster, more scalable, more resource-efficient execution.
Second, note that just as in `local_train`, the component function we need
(`batch_loss`) takes more parameters than what the federated operator
(`tff.sequence_map`) expects, so we again define a partial, this time inline by
directly wrapping a `lambda` as a `tff.federated_computation`. Using wrappers
inline with a function as an argument is the recommended way to use
`tff.tf_computation` to embed TensorFlow logic in TFF.
Now, let's see whether our training worked.
%% Cell type:code id: tags:
```
print('initial_model loss =', local_eval(initial_model,
federated_train_data[5]))
print('locally_trained_model loss =',
local_eval(locally_trained_model, federated_train_data[5]))
```
%% Output
initial_model loss = 23.025854
locally_trained_model loss = 0.4348469
%% Cell type:markdown id: tags:
Indeed, the loss decreased. But what happens if we evaluated it on another
user's data?
%% Cell type:code id: tags:
```
print('initial_model loss =', local_eval(initial_model,
federated_train_data[0]))
print('locally_trained_model loss =',
local_eval(locally_trained_model, federated_train_data[0]))
```
%% Output
initial_model loss = 23.025854
locally_trained_model loss = 74.50075
%% Cell type:markdown id: tags:
As expected, things got worse. The model was trained to recognize `5`, and has
never seen a `0`. This brings the question - how did the local training impact
the quality of the model from the global perspective?
%% Cell type:markdown id: tags:
### Federated evaluation
This is the point in our journey where we finally circle back to federated types
and federated computations - the topic that we started with. Here's a pair of
TFF types definitions for the model that originates at the server, and the data
that remains on the clients.
%% Cell type:code id: tags:
```
SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER)
CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS)
```
%% Cell type:markdown id: tags:
With all the definitions introduced so far, expressing federated evaluation in
TFF is a one-liner - we distribute the model to clients, let each client invoke
local evaluation on its local portion of data, and then average out the loss.
Here's one way to write this.
%% Cell type:code id: tags:
```
@tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE)
def federated_eval(model, data):
return tff.federated_mean(
tff.federated_map(local_eval, [tff.federated_broadcast(model), data]))
```
%% Cell type:markdown id: tags:
We've already seen examples of `tff.federated_mean` and `tff.federated_map`
in simpler scenarios, and at the intuitive level, they work as expected, but
there's more in this section of code than meets the eye, so let's go over it
carefully.
First, let's break down the *let each client invoke local evaluation on its
local portion of data* part. As you may recall from the preceding sections,
`local_eval` has a type signature of the form `(<MODEL_TYPE, LOCAL_DATA_TYPE> ->
float32)`.
The federated operator `tff.federated_map` is a template that accepts as a
parameter a 2-tuple that consists of the *mapping function* of some type `T->U`
and a federated value of type `{T}@CLIENTS` (i.e., with member constituents of
the same type as the parameter of the mapping function), and returns a result of
type `{U}@CLIENTS`.
Since we're feeding `local_eval` as a mapping function to apply on a per-client
basis, the second argument should be of a federated type `{<MODEL_TYPE,
LOCAL_DATA_TYPE>}@CLIENTS`, i.e., in the nomenclature of the preceding sections,
it should be a federated tuple. Each client should hold a full set of arguments
for `local_eval` as a member consituent. Instead, we're feeding it a 2-element
Python `list`. What's happening here?
Indeed, this is an example of an *implicit type cast* in TFF, similar to
implicit type casts you may have encountered elsewhere, e.g., when you feed an
`int` to a function that accepts a `float`. Implicit casting is used scarcily at
this point, but we plan to make it more pervasive in TFF as a way to minimize
boilerplate.
The implicit cast that's applied in this case is the equivalence between
federated tuples of the form `{<X,Y>}@Z`, and tuples of federated values
`<{X}@Z,{Y}@Z>`. While formally, these two are different type signatures,
looking at it from the programmers's perspective, each device in `Z` holds two
units of data `X` and `Y`. What happens here is not unlike `zip` in Python, and
indeed, we offer an operator `tff.federated_zip` that allows you to perform such
conversions explicity. When the `tff.federated_map` encounters a tuple as a
second argument, it simply invokes `tff.federated_zip` for you.
Given the above, you should now be able to recognize the expression
`tff.federated_broadcast(model)` as representing a value of TFF type
`{MODEL_TYPE}@CLIENTS`, and `data` as a value of TFF type
`{LOCAL_DATA_TYPE}@CLIENTS` (or simply `CLIENT_DATA_TYPE`), the two getting
filtered together through an implicit `tff.federated_zip` to form the second
argument to `tff.federated_map`.
The operator `tff.federated_broadcast`, as you'd expect, simply transfers data
from the server to the clients.
Now, let's see how our local training affected the average loss in the system.
%% Cell type:code id: tags:
```
print('initial_model loss =', federated_eval(initial_model,
federated_train_data))
print('locally_trained_model loss =',
federated_eval(locally_trained_model, federated_train_data))
```
%% Output
initial_model loss = 23.025852
locally_trained_model loss = 54.432625
%% Cell type:markdown id: tags:
Indeed, as expected, the loss has increased. In order to improve the model for
all users, we'll need to train in on everyone's data.
%% Cell type:markdown id: tags:
### Federated training
The simplest way to implement federated training is to locally train, and then
average the models. This uses the same building blocks and patters we've already
discussed, as you can see below.
%% Cell type:code id: tags:
```
SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER)
@tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE,
CLIENT_DATA_TYPE)
def federated_train(model, learning_rate, data):
return tff.federated_mean(
tff.federated_map(local_train, [
tff.federated_broadcast(model),
tff.federated_broadcast(learning_rate), data
]))
```
%% Cell type:markdown id: tags:
Note that in the full-featured implementation of Federated Averaging provided by
`tff.learning`, rather than averaging the models, we prefer to average model
deltas, for a number of reasons, e.g., the ability to clip the update norms,
for compression, etc.
Let's see whether the training works by running a few rounds of training and
comparing the average loss before and after.
%% Cell type:code id: tags:
```
model = initial_model
learning_rate = 0.1
for round_num in range(5):
model = federated_train(model, learning_rate, federated_train_data)
learning_rate = learning_rate * 0.9
loss = federated_eval(model, federated_train_data)
print('round {}, loss={}'.format(round_num, loss))
```
%% Output
round 0, loss=21.60552406311035
round 1, loss=20.365678787231445
round 2, loss=19.27480125427246
round 3, loss=18.31110954284668
round 4, loss=17.45725440979004
%% Cell type:markdown id: tags:
For completeness, let's now also run on the test data to confirm that our model
generalizes well.
%% Cell type:code id: tags:
```
print('initial_model test loss =',
federated_eval(initial_model, federated_test_data))
print('trained_model test loss =', federated_eval(model, federated_test_data))
```
%% Output
initial_model test loss = 22.795593
trained_model test loss = 17.278767
%% Cell type:markdown id: tags:
This concludes our tutorial.
Of course, our simplified example doesn't reflect a number of things you'd need
to do in a more realistic scenario - for example, we haven't computed metrics
other than loss. We encourage you to study
[the implementation](https://github.com/tensorflow/federated/blob/master/tensorflow_federated/python/learning/federated_averaging.py)
of federated averaging in `tff.learning` as a more complete example, and as a
way to demonstrate some of the coding practices we'd like to encourage.
......
%% Cell type:markdown id: tags:
# High-performance Simulation with Kubernetes
This tutorial will describe how to set up high-performance simulation using a
TFF runtime running on Kubernetes. The model is the same as in the previous
tutorial, **High-performance simulations with TFF**. The only difference is that
here we use a worker pool instead of a local executor.
This tutorial refers to Google Cloud's [GKE](https://cloud.google.com/kubernetes-engine/) to create the Kubernetes cluster,
but all the steps after the cluster is created can be used with any Kubernetes
installation.
%% Cell type:markdown id: tags:
## Launch the TFF Workers on GKE
> **Note:** This tutorial assumes the user has an existing GCP project.
### Create a Kubernetes Cluster
The following step only needs to be done once. The cluster can be re-used for future workloads.
Follow the GKE instructions to [create a container cluster](https://cloud.google.com/kubernetes-engine/docs/tutorials/hello-app#step_4_create_a_container_cluster). The rest of this tutorial assumes that the cluster is named `tff-cluster`, but the actual name isn't important.
Stop following the instructions when you get to "*Step 5: Deploy your application*".
### Deploy the TFF Worker Application
The commands to interact with GCP can be run [locally](https://cloud.google.com/kubernetes-engine/docs/tutorials/hello-app#option_b_use_command-line_tools_locally) or in the [Google Cloud Shell](https://cloud.google.com/shell/). We recommend the Google Cloud Shell since it doesn't require additional setup.
1. Run the following command to launch the Kubernetes application.
```
$ kubectl create deployment tff-workers --image=gcr.io/tensorflow-federated/remote-executor-service:{{version}}
```
Replace `{{version}}` with a release of TFF, e.g. `0.11.0` or `latest`.
2. Add a load balancer for the application.
```
$ kubectl expose deployment tff-workers --type=LoadBalancer --port 80 --target-port 8000
```
> **Note:** This exposes your deployment to the internet and is for demo
purposes only. For production use, a firewall and authentication are strongly
recommended.
%% Cell type:markdown id: tags:
Look up the IP address of the loadbalancer on the Google Cloud Console. You'll need it later to connect the training loop to the worker app.
%% Cell type:markdown id: tags:
### (Alternately) Launch the Docker Container Locally
```
$ docker run --rm -p 8000:8000 gcr.io/tensorflow-federated/remote_executor_service:{{version}}
```
%% Cell type:markdown id: tags:
## Set Up TFF Environment
%% Cell type:code id: tags:
```
#@test {"skip": true}
!pip install --upgrade tensorflow_federated
```
%% Output
%% Cell type:markdown id: tags:
## Define the Model to Train
%% Cell type:code id: tags:
```
import collections
import time
import tensorflow as tf
import tensorflow_federated as tff
source, _ = tff.simulation.datasets.emnist.load_data()
def map_fn(example):
return collections.OrderedDict(
x=tf.reshape(example['pixels'], [-1, 784]), y=example['label'])
def client_data(n):
ds = source.create_tf_dataset_for_client(source.client_ids[n])
return ds.repeat(10).batch(20).map(map_fn)
train_data = [client_data(n) for n in range(10)]
input_spec = train_data[0].element_spec
def model_fn():
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(784,)),
tf.keras.layers.Dense(units=10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
return tff.learning.from_keras_model(
model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
trainer = tff.learning.build_federated_averaging_process(
model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.02))
def evaluate(num_rounds=10):
state = trainer.initialize()
for round in range(num_rounds):
t1 = time.time()
state, metrics = trainer.next(state, train_data)
t2 = time.time()
print('Round {}: loss {}, round time {}'.format(round, metrics.loss, t2 - t1))
```
%% Cell type:markdown id: tags:
## Set Up the Remote Executors
By default, TFF executes all computations locally. In this step we tell TFF to connect to the Kubernetes services we set up above. Be sure to copy the IP address of your service here.
%% Cell type:code id: tags:
```
import grpc
ip_address = '0.0.0.0' #@param {type:"string"}
port = 80 #@param {type:"integer"}
client_ex = []
for i in range(10):
channel = grpc.insecure_channel('{}:{}'.format(ip_address, port))
client_ex.append(tff.framework.RemoteExecutor(channel, rpc_mode='STREAMING'))
tff.framework.set_default_executor(tff.framework.worker_pool_executor_factory(client_ex))
factory = tff.framework.worker_pool_executor_factory(client_ex)
context = tff.framework.ExecutionContext(factory)
tff.framework.set_default_context(context)
```
%% Cell type:markdown id: tags:
## Run Training
%% Cell type:code id: tags:
```
evaluate()
```
%% Output
Round 0: loss 4.370407581329346, round time 4.201097726821899
Round 1: loss 4.1407670974731445, round time 3.3283166885375977
Round 2: loss 3.865147590637207, round time 3.098310947418213
Round 3: loss 3.534019708633423, round time 3.1565616130828857
Round 4: loss 3.272688388824463, round time 3.175067663192749
Round 5: loss 2.935391664505005, round time 3.008434534072876
Round 6: loss 2.7399251461029053, round time 3.31435227394104
Round 7: loss 2.5054931640625, round time 3.4411356449127197
Round 8: loss 2.290508985519409, round time 3.158798933029175
Round 9: loss 2.1194536685943604, round time 3.1348156929016113
......
......@@ -57,4 +57,4 @@ from tensorflow_federated.python.core.impl.executors import default_executor
# Initialize a default executor; which is implicitly executed once the first
# time a module in the `core` package is imported.
default_executor.initialize_default_executor()
default_executor.initialize_default_execution_context()
......@@ -76,7 +76,9 @@ py_test(
":value_base",
"//tensorflow_federated/python/common_libs:test",
"//tensorflow_federated/python/core/impl:test",
"//tensorflow_federated/python/core/impl/context_stack:set_default_context",
"//tensorflow_federated/python/core/impl/executors:default_executor",
"//tensorflow_federated/python/core/impl/executors:execution_context",
"//tensorflow_federated/python/core/impl/executors:executor_stacks",
"//tensorflow_federated/python/core/impl/executors:executor_test_utils",
"//tensorflow_federated/python/core/impl/types:type_factory",
......
......@@ -32,7 +32,9 @@ from tensorflow_federated.python.core.api import computations
from tensorflow_federated.python.core.api import intrinsics
from tensorflow_federated.python.core.api import value_base
from tensorflow_federated.python.core.impl import test as core_test
from tensorflow_federated.python.core.impl.context_stack import set_default_context
from tensorflow_federated.python.core.impl.executors import default_executor
from tensorflow_federated.python.core.impl.executors import execution_context
from tensorflow_federated.python.core.impl.executors import executor_stacks
from tensorflow_federated.python.core.impl.executors import executor_test_utils
from tensorflow_federated.python.core.impl.types import type_factory
......@@ -117,7 +119,8 @@ class TensorFlowComputationsTest(parameterized.TestCase):
return intrinsics.federated_mean(result_map, count_map)
factory = executor_stacks.sizing_executor_factory(num_clients=num_clients)
default_executor.set_default_executor(factory)
context = execution_context.ExecutionContext(factory)
set_default_context.set_default_context(context)
to_float = lambda x: tf.cast(x, tf.float32)
temperatures = [tf.data.Dataset.range(10).map(to_float)] * num_clients
......@@ -665,5 +668,5 @@ class ComputationsTest(parameterized.TestCase):
if __name__ == '__main__':
default_executor.initialize_default_executor()
default_executor.initialize_default_execution_context()
common_test.main()
......@@ -855,5 +855,5 @@ class IntrinsicsTest(parameterized.TestCase):
if __name__ == '__main__':
default_executor.initialize_default_executor()
default_executor.initialize_default_execution_context()
common_test.main()
......@@ -13,5 +13,8 @@ py_library(
"//tensorflow_federated:__pkg__",
"//tensorflow_federated/python/core:__pkg__",
],
deps = ["//tensorflow_federated/python/core/backends/mapreduce"],
deps = [
"//tensorflow_federated/python/core/backends/mapreduce",
"//tensorflow_federated/python/core/backends/native",
],
)
......@@ -14,3 +14,4 @@
"""This directory contains code for interfacing custom types of backends."""
from tensorflow_federated.python.core.backends import mapreduce
from tensorflow_federated.python.core.backends import native
......@@ -127,7 +127,8 @@ py_test(
":backend_info",
":executor",
"//tensorflow_federated/python/core/api:computations",
"//tensorflow_federated/python/core/impl/executors:default_executor",
"//tensorflow_federated/python/core/impl/context_stack:set_default_context",
"//tensorflow_federated/python/core/impl/executors:execution_context",
"//tensorflow_federated/python/core/impl/executors:executor_factory",
],
)
......
......@@ -20,7 +20,8 @@ import tensorflow as tf
from tensorflow_federated.python.core.api import computations
from tensorflow_federated.python.core.backends.iree import backend_info
from tensorflow_federated.python.core.backends.iree import executor
from tensorflow_federated.python.core.impl.executors import default_executor
from tensorflow_federated.python.core.impl.context_stack import set_default_context
from tensorflow_federated.python.core.impl.executors import execution_context
from tensorflow_federated.python.core.impl.executors import executor_factory
......@@ -91,10 +92,11 @@ class ExecutorTest(tf.test.TestCase):
result = asyncio.get_event_loop().run_until_complete(result_val.compute())
self.assertEqual(result, 11.0)
def test_as_default_executor(self):
def test_as_default_context(self):
ex = executor.IreeExecutor(backend_info.VULKAN_SPIRV)
default_executor.set_default_executor(
executor_factory.create_executor_factory(lambda _: ex))
factory = executor_factory.create_executor_factory(lambda _: ex)
context = execution_context.ExecutionContext(factory)
set_default_context.set_default_context(context)
@computations.tf_computation(tf.float32)
def comp(x):
......
......@@ -88,7 +88,7 @@ py_test(
"//tensorflow_federated/python/core/impl/compiler:intrinsic_defs",
"//tensorflow_federated/python/core/impl/compiler:tree_analysis",
"//tensorflow_federated/python/core/impl/compiler:tree_transformations",
"//tensorflow_federated/python/core/impl/executors:default_executor",
"//tensorflow_federated/python/core/impl/context_stack:set_default_context",
"//tensorflow_federated/python/core/impl/wrappers:computation_wrapper_instances",
"//tensorflow_federated/python/core/templates:iterative_process",
],
......
......@@ -33,7 +33,7 @@ from tensorflow_federated.python.core.impl.compiler import building_blocks
from tensorflow_federated.python.core.impl.compiler import intrinsic_defs
from tensorflow_federated.python.core.impl.compiler import tree_analysis
from tensorflow_federated.python.core.impl.compiler import tree_transformations
from tensorflow_federated.python.core.impl.executors import default_executor
from tensorflow_federated.python.core.impl.context_stack import set_default_context
from tensorflow_federated.python.core.impl.wrappers import computation_wrapper_instances
from tensorflow_federated.python.core.templates import iterative_process
......@@ -824,5 +824,5 @@ class GetCanonicalFormForIterativeProcessTest(CanonicalFormTestCase,
if __name__ == '__main__':
reference_executor = reference_executor.ReferenceExecutor()
default_executor.set_default_executor(reference_executor)
set_default_context.set_default_context(reference_executor)
test.main()
......@@ -1030,5 +1030,5 @@ class NormalizedBitTest(absltest.TestCase):
if __name__ == '__main__':
default_executor.initialize_default_executor()
default_executor.initialize_default_execution_context()
absltest.main()
package(default_visibility = [
# Subpackages
"//tensorflow_federated/python/core/backends/native:__subpackages__",
])
licenses(["notice"])
py_library(
name = "native",
srcs = ["__init__.py"],
srcs_version = "PY3",
visibility = ["//tensorflow_federated/python/core/backends:__pkg__"],
deps = [":execution_contexts"],
)
py_library(
name = "execution_contexts",
srcs = ["execution_contexts.py"],
srcs_version = "PY3",
deps = [
"//tensorflow_federated/python/core/impl/context_stack:context_stack_impl",
"//tensorflow_federated/python/core/impl/executors:execution_context",
"//tensorflow_federated/python/core/impl/executors:executor_stacks",
],
)
# Copyright 2020, The TensorFlow Federated Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for interacting with a native backend."""
from tensorflow_federated.python.core.backends.native.execution_contexts import set_local_execution_context
# Copyright 2020, The TensorFlow Federated Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Execution contexts for the native backend."""
from tensorflow_federated.python.core.impl.context_stack import context_stack_impl
from tensorflow_federated.python.core.impl.executors import execution_context
from tensorflow_federated.python.core.impl.executors import executor_stacks
def set_local_execution_context(num_clients=None,
max_fanout=100,
num_client_executors=32,
server_tf_device=None,
client_tf_devices=tuple()):
"""Sets an execution context that executes computations locally."""
factory = executor_stacks.local_executor_factory(
num_clients=num_clients,
max_fanout=max_fanout,
num_client_executors=num_client_executors,
server_tf_device=server_tf_device,
client_tf_devices=client_tf_devices)
context = execution_context.ExecutionContext(factory)
context_stack_impl.context_stack.set_default_context(context)
......@@ -30,6 +30,7 @@ py_library(
"//tensorflow_federated/python/core/impl/executors:caching_executor",
"//tensorflow_federated/python/core/impl/executors:default_executor",
"//tensorflow_federated/python/core/impl/executors:eager_tf_executor",
"//tensorflow_federated/python/core/impl/executors:execution_context",
"//tensorflow_federated/python/core/impl/executors:executor_base",
"//tensorflow_federated/python/core/impl/executors:executor_factory",
"//tensorflow_federated/python/core/impl/executors:executor_service",
......
......@@ -55,6 +55,7 @@ from tensorflow_federated.python.core.impl.context_stack.set_default_context imp
from tensorflow_federated.python.core.impl.executors.caching_executor import CachingExecutor
from tensorflow_federated.python.core.impl.executors.default_executor import set_default_executor
from tensorflow_federated.python.core.impl.executors.eager_tf_executor import EagerTFExecutor
from tensorflow_federated.python.core.impl.executors.execution_context import ExecutionContext
from tensorflow_federated.python.core.impl.executors.executor_base import Executor
from tensorflow_federated.python.core.impl.executors.executor_factory import create_executor_factory
from tensorflow_federated.python.core.impl.executors.executor_factory import ExecutorFactory
......
......@@ -32,15 +32,14 @@ class CompilerPipeline(object):
backend that supports only a map-reduce execution model may accept instances
of `tff.backends.mapreduce.CanonicalForm`. The TFF representation of such a
backend takes the form of an instance of `tff.framework.Context`, which would
be initialized with a `CompilerPipeline` whose `compilation_function` accepts
be initialized with a `CompilerPipeline` whose `compilation_fn` accepts
`tff.Computations` and returns CanonicalForms.
"""
def __init__(self,
compilation_function: Callable[[computation_base.Computation],
def __init__(self, compilation_fn: Callable[[computation_base.Computation],
Any]):
py_typecheck.check_callable(compilation_function)
self._compilation_fn = compilation_function
py_typecheck.check_callable(compilation_fn)
self._compilation_fn = compilation_fn
self._compiled_computations = {}
@functools.lru_cache()
......
......@@ -36,5 +36,5 @@ class ComputationSerializationTest(test.TestCase):
if __name__ == '__main__':
default_executor.initialize_default_executor()
default_executor.initialize_default_execution_context()
test.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment