3. Distributed Training with TensorFlow

3.1. Introduction

Distributed training is becoming more and more important with the growth of training data size and model complexity. Many methods have been proposed by industry and academia like data parallelism, model parallelism, hybrid parallelism and more. This document describes how to implement synchronous data parallel distributed training systems wherein the training process is accelerated by concurrent processing of the distributed dataset on multiple workers.

From a hardware perspective, the Habana® Gaudi® HPU supports the RoCEv2 protocol, RDMA over Converged Ethernet. Each Gaudi natively integrates ten 100Gigabit Ethernet ports supporting RoCEv2. Figure 3.1 shows an HLS-1 server which features eight Gaudi devices. Within each Gaudi device, seven of the ten NIC ports are used for connecting to the other seven Gaudis within the server in an all-to-all processor configuration for scale-up and three are used for scale-out across servers.


Figure 3.1 HLS-1 Block Diagram

From a software perspective, Gaudi scaling with data parallelism in the TensorFlow framework is achieved using two distinct methods:

  • By using Habana Horovod (see Figure 3.2).

  • By using HPUStrategy integrated with tf.distribute API.


Using either Habana Horovod or tf.distribute for scaling is the user’s decision, based on preference and previous usage. Users looking to start scaling up from single card to multi-card or multi-server, should start with Habana Horovod. Habana Horovod has broader coverage and similar scaling efficiency in scaling up to 8 Gaudi cards is observed.

The following versions are supported:

  • All supported TensorFlow versions. See the Support Matrix section of the Release Notes for more details.

  • Habana Horovod - forked from v0.22.1 of the official Horovod


Figure 3.2 Gaudi Distributed Training Software Stack

The following table summarizes the scale-out topology further described in the below sections:

Scale Out Topology

Distributed Framework




Gaudi based


RDMA based scaling


Default mode = RDMA

TensorFlow Distributed

Host NIC Scaling


RDMA based scaling


MPI based Host NIC Scale Out


Libfabric based scaling


Refer to Scale-Out via Host-NIC over OFI


TCP/IP based scaling


Refer to Scale-Out via Host-NIC over TCP

TensorFlow Distributed

Libfabric based scaling


Refer to Scale-Out via Host-NIC over OFI

TensorFlow Distributed

TCP/IP based scaling


Refer to Scale-Out via Host-NIC over TCP


The behavior of setting both HCCL_OVER_TCP=1 and HOROVOD_HIERARCHICAL_ALLREDUCE=1 is undefined.

3.2. Horovod-based Scaling of Gaudi on TensorFlow

Scale-up within the HLS-1 server is achieved with Gaudi NICs. Scale-out across servers is achieved through Gaudi NICs and Host NICs.

In Figure 3.3 Gaudi NICs are used for both scale-up and scale-out.


Figure 3.3 Gaudi NIC for both Scale-up and Scale-out

In Figure 3.4 Gaudi NICs are used for scale-up and Host NICs are used for scale-out.


Figure 3.4 Gaudi NIC for Scale-up and Host NIC for Scale-out

3.2.1. Horovod Integration with Habana Collective Communications Library (HCCL)

Habana provides HCCL API to support communication between devices:


HCCL is Habana’s implementation of Nvidia NCCL for Gaudi.

Habana Collective Communications Library (HCCL) provides a set of collective communication primitives that are using both Gaudi NIC and Host NIC communication.

Horovod HCCL implementation requires synapse_helpers library provided by TensorFlow integration. This library is needed to leverage synchronization between computation and collective streams.

3.2.2. Scale-up Using Gaudi NICs Within a Server Theory of Operation: Scale-up Using Gaudi NICs

Collective Ops in Horovod for data parallel distributed training including AllReduce, Broadcast, AllGather, and so on are implemented. The AllReduce Op is used to average the weight gradients across workers during the back propagation of the training. The Broadcast Op is used to broadcast initial weights to all workers.

In the case of scale-up within one server, each worker is a Gaudi device and AllReduce uses Gaudi NICs for communication. The all-to-all connections across Gaudi devices within the server guarantee the best performance of AllReduce for the best scaling efficiency.


Figure 3.5 AllReduce Within a Server Example: Scale-up Within a Server

The below is a simple example of distributed training and is based on the single Gaudi training example detailed in the Gaudi Migration Guide. The training model and the corresponding scripts are available in the TensorFlow Hello World Example on Github.

The highlighted lines of code are added for distributed training.

import tensorflow as tf
from habana_frameworks.tensorflow import load_habana_module
# ensure that load_habana_module() needs to be called before import horovod
import horovod.tensorflow.keras as hvd
#Initialization of Horovod. 

# Ensure only 1 process downloads the data on each node
if hvd.local_rank() == 0:
	(x_train, y_train), (x_test, y_test) =
	hvd.broadcast(0, 0)
	hvd.broadcast(0, 0)
	(x_train, y_train), (x_test, y_test) =

# Data partition for different workers
num_pics_per_rank = x_train.shape[0] // hvd.size()
pic_begin = num_pics_per_rank * hvd.rank()
pic_end = pic_begin + num_pics_per_rank
x_train = x_train[pic_begin:pic_end,]
y_train = y_train[pic_begin:pic_end,]

x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
loss =

# Using hvd.size()(number of workers) to scale learning rate and wrapping
# optimizer with Distributed optimizer class provided by horovod.
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01*hvd.size())
optimizer =hvd.DistributedOptimizer(optimizer)

callbacks = [
# Horovod: broadcast initial variable states from rank0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
model.fit(x_train, y_train, epochs=1, batch_size=128,

model.evaluate(x_test, y_test)

The code above runs in multiple processes, one for each Gaudi.

In order to launch the distributed training for eight Gaudi devices within one host run the following command. OpenMPI is required for host communication and launching processes version. The recommended OpenMPI version is 4.0.5:

$ mpirun -np 8 python3 example_hvd.py

The below is an example output:

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7289 - accuracy: 0.8361

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7916 - accuracy: 0.8051

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7939 - accuracy: 0.8053

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7928 - accuracy: 0.8093

3.2.3. Scale-out Using Gaudi NICs Across Servers


This configuration is available only when using Gaudi based fabric for scale out. Theory of Operation: Scale-out Using Gaudi NICs

When training is distributed across more than one server, weight gradients need to be averaged for all workers across all servers. Habana’s implementation of AllReduce supports this transparently.

Internally local ReduceScatter within the server, is followed by across-server AllReduce, and finally a local AllGather. All communications are done through Gaudi NICs, whether communication is within the server or across servers.

The figures below shows how AllReduce works in a network configuration of four servers and eight ranks in each server:

  • Each rank is a Gaudi device.

  • The figures below show only the first four ranks in each server for illustration purpose.

Figure 3.6 is the result of a local ReduceScatter within each server. The ReduceScatter is implemented as an all-to-all communication within the server followed by a local reduction within each rank. It is the same as the first two steps outlined in Figure 3.5.

After the local ReduceScatter within each server, there are AllReduce operations for chunks of the data among corresponding ranks across different servers. That is, the AllReduce for the first chunk of data among rank 0/8/16/24, the AllReduce for the second chunk of the data among rank 1/9/17/25, and so on. The scale-out ports of the Gaudi NICs are used for communication across servers.

After the across-server AllReduce, each chunk of data among corresponding ranks across different servers have the same value as the color indicates (Figure 3.7). The last step is an AllGather within each sever. After that, all chunks of the data in all servers have the right values.

Final result is illustrated on Figure 3.8. All ranks have exactly the same set of reduced data.


Figure 3.6 Result of Local ReduceScatter Within Each Server


Figure 3.7 Result of AllReduce for Chunks of Data Among Corresponding Ranks Across Different Servers


Figure 3.8 Result of Local AllGather Within Each Server Example: Scale-out Using Gaudi NICs


This configuration is supported only when using Gaudi based fabric for scale out.

The training model and the corresponding scripts are available in the TensorFlow Hello World Example on GitHub.

Changing the model (see Example: Scale-up Within a Server) to make it run across multiple HLS-1 servers is not required. The script, however, requires some changes.

A new script, run_hvd_16gaudi.sh is provided as an example of two HLS-1 servers. The scale-out ports of the Gaudi devices in one server are connected to those in another server through a switch. The script automatically generates the HCL config JSON file used for the training.


HCL (Habana Communication Library) is a communication layer, providing Gaudi NIC communication capabilities, that HCCL uses internally. More information about HCL is available at Habana Communication Library (HCL) API Reference


When using HCCL Horovod backend, generating HCL config JSON file and setting HCL_CONFIG_PATH is no longer required, however user still can provide custom HCL config if non standard configuration is needed.

Run the script using the below command. You must append the IP addresses of two servers at the end of the script and the addresses should be separated by whitespaces, similar to the example below:

$ ./run_hvd_16gaudi.sh

The auto-generated HCL config JSON file is located under the /tmp folder. The content is similar to the below example:


    "HCL_RANKS": [
        "", "", "", "",
        "", "", "", "",

        "", "", "", "",
        "", "", "", ""

The script also sets the path of the generated HCL JSON file to the environment variable HCL_CONFIG_PATH and eventually invokes a command like the example below:

$ mpirun --allow-run-as-root \
    --mca btl_tcp_if_include,
    --prefix /usr/local/openmpi/
-x PATH \
python3 example_hvd.py

By default, the shell scripts connect to port 3022, however, the port listened by the SSH server may differ between different environments. If your environment requires specifying a different port of the remote SSH server, you can use the SSHD_PORT environment variable.

The example below uses port 22:

$ SSHD_PORT=22 ./run_hvd_16gaudi.sh

To change the port, use the below command. Make sure to set the port to 22 as in the below example.

$ /etc/init.d/ssh restart '-p 22'

3.2.4. Scale-out Using Host NICs Across Servers

There are two ways to implement scale-out through host NICs:

  • Without peer-direct RDMA

  • With peer-direct RDMA Scale-out Using Host NICs Without Peer-direct RDMA


Currently, this configuration is not supported using Gaudi based fabric for scale out.

Without peer-direct RDMA, exchanging data between two Gaudi devices in two different hosts requires:

  • Copying the data from device memory in one Gaudi to the host CPU memory through PCIe.

  • Using the host NIC to send the data to the CPU memory in another host.

  • Copying the data from the CPU memory to Gaudi device memory through PCIe (Figure 3.9).

This process is added to the AllReduce algorithm described above and implemented as hierarchical AllReduce defined by Horovod (NCCL Hierarchical Allreduce, nccl_operations.cc#L191).

Hierarchical AllReduce is enabled by setting export HOROVOD_HIERARCHICAL_ALLREDUCE=1 during runtime as detailed in Example: Scale-out Using Host NICs Without Peer-direct RDMA. The communication within the server is done through Gaudi NICs and the communication across servers is done through the Host NICs and PCIe ports.


Figure 3.9 Scale-out Using Host NIC Without Peer-direct RDMA Example: Scale-out Using Host NICs Without Peer-direct RDMA

The training model and the corresponding scripts are available in the TensorFlow Hello World Example on GitHub.

Changing the model (see Example: Scale-up Within a Server) is not required. A separate script to run a simple example of scale-out using host NICs (without peer-direct RDMA) is provided:

$ ./run_hvd_16gaudi_hostnic.sh

This example sets the environment variable HOROVOD_HIERARCHICAL_ALLREDUCE to 1 and invokes a command similar to the below example:

$ mpirun --allow-run-as-root \
    --mca btl_tcp_if_include,
    --prefix /usr/local/openmpi/
    --host,,,,,,,,19,,,,,,, \
    -x HABANA_LOGS \
    -x OPAL_PREFIX \
    -x PATH \
    python3 example_hvd.py

The port listened by the ssh server might be different if the workload is not running inside the container. You can specify the port of the remote ssh server using the SSHD_PORT environment variable.

$ SSHD_PORT=22 ./run_hvd_16gaudi_hostnic.sh Scale-out Using Host NICs with Peer-direct RDMA

Peer-direct RDMA is a technology that allows using host NICs for DMA data transfers between the memory of Gaudi devices located on separate servers. On each host, NIC used must share PCIe switch with Gaudi device (Figure 3.10).


Figure 3.10 Scale-out Using Host NIC with Peer-direct RDMA

With peer-direct RDMA enabled, there is no need to use CPU memory to exchange data across servers, which enables operation with lower latencies.


Peer-direct RDMA is currently ‘work in progress’ for Gaudi.

3.2.5. Examples of Real-world Applications

Examples of real-world applications, such as ResNet-50, BERT, and other models, including their performance and results are located in TensorFlow Models on GitHub. Keras ResNet Model Example

The below is an example of enabling distributed training in Keras ResNet Model :

  1. General sharding of ImageNet dataset can be found in Tensorflow/computer_vision/common/imagenet_preprocessing.py:

from TensorFlow.common.horovod_helpers import hvd, horovod_enabled

if horovod_enabled() and (is_training or use_distributed_eval):
    'HVD sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d',
    hvd.rank(), hvd.size())
  dataset = dataset.shard(hvd.size(), hvd.rank())

if input_context:
      'Sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d',
          input_context.input_pipeline_id, input_context.num_input_pipelines)
  dataset = dataset.shard(input_context.num_input_pipelines,
  1. Define the use_horovod flag located in common.py. The default value is false:

flags.DEFINE_boolean("use_horovod", default=False, help="Use horovod")
  1. Import basic habana horovod functions to be called in file common.py:

from TensorFlow.common.horovod_helpers import hvd_size, horovod_enabled
  1. Calculate the global batch size based on the batch size per card and total card number in common.py:

if horovod_enabled():
     adjusted_batch_size = batch_size * hvd_size()
  1. Import basic habana horovod functions to be called in resnet_ctl_imagenet_main.py:

from TensorFlow.common.horovod_helpers import hvd, hvd_init, hvd_size, horovod_enabled, synapse_logger_init
  1. Calculate the global batch size based on the batch size per card and total card number and name model directory according to rank ID in file resnet_ctl_imagenet_main.py:

if horovod_enabled():
     batch_size = adjust_batch_size(flags_obj.batch_size)
     model_dir = os.path.join(flags_obj.model_dir, "worker_" + str(hvd.rank()))
  1. Initialize habana horovod in resnet_ctl_imagenet_main.py:

if flags.FLAGS.use_horovod:
  1. Import basic habana horovod functions to be called in resnet_runnable.py:

from TensorFlow.common.horovod_helpers import hvd, horovod_enabled` in file :code:`resnet_runnable.py`
if self.flags_obj.use_distributed_eval and horovod_enabled():
   test_accuracy = hvd.allreduce(self.test_accuracy.result())

3.3. Multi-Worker Training using HPUStrategy

3.3.1. Usage

Habana provides a dedicated HPUStrategy class to support distributed training using Gaudi’s high performance RDMA communication:

from habana_frameworks.tensorflow.distribute import HPUStrategy

strategy = HPUStrategy()

# For use with Keras
with strategy.scope():
   model = ...

# For use with Estimator
run_config = tf.estimator.RunConfig(train_distribute=strategy, ...)

# Manually
server = tf.distribute.Server(...)
strategy.extended._std_server_started = True   # To prevent the double-launch of the Server
with tf.device("/device:HPU:0"):
   y1 = strategy.reduce(...)
   ... = tensorflow.python.ops.collective_ops.broadcast_send(...)
   ... = tensorflow.python.ops.collective_ops.broadcast_recv(...)
   ... = tensorflow.python.ops.collective_ops.all_gather(...)
with tf.compat.v1.Session(server.target) as sess:
   ... = sess.run(y1)


Using other TensorFlow built-in distribution strategies is not supported with Habana software stack on Gaudi (MirroredStrategy, MultiWorkerMirroredStrategy, CentralStorageStrategy, ParameterServerStrategy).

HPUStrategy class has the same usage model as MultiWorkerMirroredStrategy, in which each worker runs in a separate process, with a single Gaudi device acquired.

In multi-worker, multi-process training, TensorFlow establishes inter-process communication using gRPC at startup. Workers run the same training script in parallel, effectively having their own replica (copy) of the same computation graph. TensorFlow Keras and Estimator automatically ensure initial broadcast of trainable variables (weights and biases) prior to training start, and reduce the gradients among workers with each step. In addition, TensorFlow provides a mechanism, DistributedDataset, which offers both automatic and manual data sharding, so every worker processes a distinct set of samples from the training dataset.

Unlike Horovod, neither tf.distribute nor HPUStrategy use/require OpenMPI at any point. Worker processes can be initialized in any way (for example, using mpirun for convenience, as it offers process-to-core binding mechanism, e.g. –bind-to core –map-by slot:PE=4).

For a demo of HPUStrategy with multi-worker training, please refer to TensorFlow distribute_with_hpu_strategy Example on GitHub.

3.3.2. Migration from TensorFlow Built-in Strategies

If your training script uses MultiWorkerMirroredStrategy, replacing it with HPUStrategy is sufficient. The worker process performing the training must load the Habana Device integration module for TensorFlow. If Estimator is used, a dedicated evaluator job may be designated to evaluate the last checkpoint in parallel to the training process. In such a case, evaluation must be performed on the CPU. That is, the process running the Estimator-based script does not load the Habana Device integration module for TensorFlow.

MirroredStrategy uses an in-graph approach. An in-graph approach uses a single process running a computation graph directly targeting its operations on various accelerator devices (like CPU:0 and multiple GPU:<X>). Since only one Gaudi device can be acquired per process, every worker must be ran in a separate process. This is an between-graph approach, in which every worker targets its operations to either CPU:0 or HPU:0 only.

For more information on setting up a multi-process training cluster, refer to Multi-worker training with Keras TensorFlow documentation.

3.3.3. Advanced Details

A native MultiWorkerMirroredStrategy class internally uses Cross Device Ops to add collective operations into the computation graph. Those operations are:

  • CollectiveReduce and CollectiveReduceV2 – All-Reduce operation applied to every gradient before weight update.

  • CollectiveBcastSend and CollectiveBcastSendV2 – Broadcast Send operation for sending the initial weights used only by the cluster leader (worker:0 or chief:0).

  • CollectiveBcastRecv and CollectiveBcastRecvV2 – Broadcast Receive operation for receiving the initial weights used by all workers except the cluster leader.

  • CollectiveGather and CollectiveGatherV2 – All-Gather operation (not exercised in a basic usage flow).

HPUStrategy class reuses MultiWorkerMirroredStrategy, assuring that there is only one accelerator per process, and all collective operations are placed on the "/device:HPU:0" device. During graph pre-processing, the Habana Device integration component for TensorFlow modifies the collective operations to dedicated HPU operations: HpuCollectiveReduce, HpuCollectiveReduceV2, HpuCollectiveGather, HpuCollectiveGatherV2, HpuCollectiveBcastSend, HpuCollectiveBcastSendV2, HpuCollectiveBcastRecv and HpuCollectiveBcastRecvV2. HPU collective operations use Habana’s Communication Library (HCL) internally for collective communication. Neither gRPC nor TensorFlow’s CollectiveExecutor is used at this point. HPU collective operations support exchanging tensors of data types float, and bfloat16 (using the TF_BF16_CONVERSION environment variable).

TensorFlow 2.5 introduces a mechanism which automatically concatenates the gradients into a single All-Reduce operation, and then splits the results into multiple outputs. This optimization may noticeably decrease the duration of a single step, in which hundreds of gradients are averaged at once.

In case of TensorFlow 2.4, no such mechanism takes place, and an alternative method is provided. Habana’s software stack provides support for Scoped Allocator Optimization (SAO) done by TensorFlow’s Grappler. SAO merges multiple pass-through (in-place) ops into a single one, and merges their input tensors into a single memory allocation called backing tensor (allocated by _ScopedAllocator op). In that case, HpuCollectiveReduce and HpuCollectiveReduceV2 need to be registered as a subject for the optimization:

# Letting `HPUStrategy` do the work
strategy = HPUStrategy(enable_scoped_allocator=True)

# With use of the session config
session_config = ...
if enable_scoped_allocator:
   rewrite_options = session_config.graph_options.rewrite_options
   from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig
   rewrite_options.scoped_allocator_optimization = RewriterConfig.ON
   rewrite_options.scoped_allocator_opts.enable_op.extend(["HpuCollectiveReduce", "HpuCollectiveReduceV2"])

# Set globally
if enable_scoped_allocator:
   tf.config.optimizer.set_experimental_options({"scoped_allocator_optimization": True})
   from tensorflow.python.eager import context
   ctx = context.context()
   ctx._collective_scoped_allocator_enabled_ops = ["HpuCollectiveReduce", "HpuCollectiveReduceV2"]