3. Distributed Training with TensorFlow

3.1. Introduction

Distributed training is becoming more and more important with the growth of training data size and model complexity. Many methods have been proposed by industry and academia like data parallelism, model parallelism, hybrid parallelism and more. This document describes how to implement synchronous data parallel distributed training systems wherein the training process is accelerated by concurrent processing of the distributed dataset on multiple workers.

From a hardware perspective, Habana Gaudi supports the RoCEv2 protocol, RDMA over Converged Ethernet. Each Gaudi natively integrates ten 100Gigabit Ethernet ports supporting RoCEv2. Figure 3.1 shows an HLS-1 server which features eight Gaudi devices. Within each Gaudi device, seven of the ten NIC ports are used for connecting to the other seven Gaudis within the server in an all-to-all processor configuration for scale-up and three are used for scale-out across servers.


Figure 3.1 HLS-1 Block Diagram

From a software perspective, Gaudi scaling with data parallelism in the TensorFlow framework is achieved using two distinct methods:

  • By using Habana Horovod (see Figure 3.2).

  • By using HPUStrategy integrated with tf.distribute API.

The following versions are supported:

  • All supported TensorFlow versions. See the TensorFlow section of the Release Notes for more details.

  • Habana Horovod - forked from v0.20.0 of the official Horovod


Figure 3.2 HLS-1 Software Stack

3.2. Horovod-based Scaling of Gaudi on TensorFlow

Scale-up within the HLS-1 server is achieved with Gaudi NICs. Scale-out across servers is achieved through Gaudi NICs and Host NICs.

In Figure 3.3 Gaudi NICs are used for both scale-up and scale-out.


Figure 3.3 Gaudi NIC for both Scale-up and Scale-out

In Figure 3.4 Gaudi NICs are used for scale-up and Host NICs are used for scale-out.


Figure 3.4 Gaudi NIC for Scale-up and Host NIC for Scale-out

3.2.1. Horovod APIs Support

Habana provides two sets of APIs to support Horovod:


HCCL is Habana’s implementation of Nvidia NCCL for Gaudi.

The below table presents the API recommendation. An environment variable enables switching between these two APIs during runtime.


By default, the code uses HCCL APIs.

HW solution

SW solution

Environment variables

Scaling using Gaudi NIC for both scale-up and scale-out



Scaling using Gaudi NIC for scale-up and Host NIC for scale-out

HCCL (default API) Horovod Integration with Habana Communication Library (HCL)

HCL provides a set of communication APIs providing support on Habana’s SynapseAI layer.

The Horovod collective operations implementation provided by Habana does not use HCL directly. In order to leverage all synchronization and optimization done for TensorFlow, HCL is accessed by Synapse_helpers library provided by TensorFlow integration. Horovod Integration with Habana Collective Communications Library (HCCL)

HCCL is implemented on top of the Habana TensorFlow integration layer. The API follows all the NCCL functions declaration for user convenience. With HCCL, implementation of collective operations provided by NVIDIA (nccl_operations.cc) is used.

Internally, HCCL uses the same Synapse_helpers methods that are used in case of HCL based Horovod operations implementation. As such, Horovod collectives can call HCCL API directly.

3.2.2. Scale-up Using Gaudi NICs Within a Server Theory of Operation: Scale-up Using Gaudi NICs

Collective Ops in Horovod for data parallel distributed training including AllReduce, Broadcast, AllGather, and so on are implemented. The AllReduce Op is used to average the weight gradients across workers during the back propagation of the training. The Broadcast Op is used to broadcast initial weights to all workers.

In the case of scale-up within one server, each worker is a Gaudi device and AllReduce uses Gaudi NICs for communication. The all-to-all connections across Gaudi devices within the server guarantee the best performance of AllReduce for the best scaling efficiency.


Figure 3.5 AllReduce Within a Server Example: Scale-up Within a Server

The below is a simple example of distributed training and is based on the single Gaudi training example detailed in the Gaudi Migration Guide. The training model and the corresponding scripts are available in the TensorFlow Hello World Example on Github.

The highlighted lines of code are added for distributed training.

import tensorflow as tf
from TensorFlow.common.library_loader import load_habana_module
import horovod.tensorflow.keras as hvd
# Initialization of Horovod.

# Ensure only 1 process downloads the data on each node
if hvd.local_rank() == 0:
	(x_train, y_train), (x_test, y_test) =
	hvd.broadcast(0, 0)
	hvd.broadcast(0, 0)
	(x_train, y_train), (x_test, y_test) =

# Data partition for different workers
num_pics_per_rank = x_train.shape[0] // hvd.size()
pic_begin = num_pics_per_rank \* hvd.rank()
pic_end = pic_begin + num_pics_per_rank
x_train = x_train[pic_begin:pic_end,]
y_train = y_train[pic_begin:pic_end,]

x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
loss =

# Using hvd.size()(number of workers) to scale learning rate and wrapping
# optimizer with Distributed optimizer class provided by horovod.
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01*hvd.size())
optimizer =hvd.DistributedOptimizer(optimizer)

callbacks = [
# Horovod: broadcast initial variable states from rank0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
model.fit(x_train, y_train, epochs=1, batch_size=128,

model.evaluate(x_test, y_test)

The code above runs in multiple processes, one for each Gaudi.

1. Create a JSON file with HCL configuration. The below is an example for running 8 Gaudi training in one HLS-1 server:

$ cat hcl_config.json
    "HCL_PORT": 5332,
    "HCL_TYPE": "HLS1",
    "HCL_COUNT": 8

2. After creating hcl_config.json, set the HCL_CONFIG_PATH environment variable on the file path. For more details, refer to Habana Communication Library (HCL) API Reference.

3. Run the following command to launch the distributed training for eight Gaudi devices within one host. The command is wrapped in a script called run_hvd_8gaudi.sh. OpenMPI is required for host communication and launching processes version. The recommended OpenMPI version is 4.0.5:

$ mpirun -np 8 python3 example_hvd.py

The below is an example output:

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7289 - accuracy: 0.8361

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7916 - accuracy: 0.8051

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7939 - accuracy: 0.8053

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7928 - accuracy: 0.8093

3.2.3. Scale-out Using Gaudi NICs Across Servers Theory of Operation: Scale-out Using Gaudi NICs

When training is distributed across more than one server, weight gradients need to be averaged for all workers across all servers. Habana’s implementation of AllReduce supports this transparently.

Internally local ReduceScatter within the server, is followed by across-server AllReduce, and finally a local AllGather. All communications are done through Gaudi NICs, whether communication is within the server or across servers.

The figures below shows how AllReduce works in a network configuration of four servers and eight ranks in each server:

  • Each rank is a Gaudi device.

  • The figures below show only the first four ranks in each server for illustration purpose.

Figure 3.6 is the result of a local ReduceScatter within each server. The ReduceScatter is implemented as an all-to-all communication within the server followed by a local reduction within each rank. It is the same as the first two steps outlined in Figure 3.5.

After the local ReduceScatter within each server, there are AllReduce operations for chunks of the data among corresponding ranks across different servers. That is, the AllReduce for the first chunk of data among rank 0/8/16/24, the AllReduce for the second chunk of the data among rank 1/9/17/25, and so on. The scale-out ports of the Gaudi NICs are used for communication across servers.

After the across-server AllReduce, each chunk of data among corresponding ranks across different servers have the same value as the color indicates (Figure 3.7). The last step is an AllGather within each sever. After that, all chunks of the data in all servers have the right values.

Final result is illustrated on Figure 3.8. All ranks have exactly the same set of reduced data.


Figure 3.6 Result of Local ReduceScatter Within Each Server


Figure 3.7 Result of AllReduce for Chunks of Data Among Corresponding Ranks Across Different Servers


Figure 3.8 Result of Local AllGather Within Each Server Example: Scale-out Using Gaudi NICs

The training model and the corresponding scripts are available in the TensorFlow Hello World Example on GitHub.

Changing the model (see Example: Scale-up Within a Server) to make it run across multiple HLS-1 servers is not required. The script, however, requires some changes.

A new script, run_hvd_16gaudi.sh is provided as an example of two HLS-1 servers. The scale-out ports of the Gaudi devices in one server are connected to those in another server through a switch. The script automatically generates the HCL JSON config file.

Run the script using the below command. You must append the IP addresses of two servers at the end of the script and the addresses should be separated by whitespaces, similar to the example below:

$ ./run_hvd_16gaudi.sh

The auto-generated HCL config JSON file is located under the /tmp folder. The content is similar to the below example:

    "HCL_PORT": 5332,
    "HCL_TYPE": "HLS1",
    "HCL_RANKS": [
        "", "", "", "",
        "", "", "", "",

        "", "", "", "",
        "", "", "", ""

The script also sets the path of the generated HCL JSON file to the environment variable HCL_CONFIG_PATH and eventually invokes a command like the example below:

$ mpirun --allow-run-as-root \
    --mca btl_tcp_if_exclude lo,docker \
    -x PATH \
    python3 example_hvd.py

3.2.4. Scale-out Using Host NICs Across Servers

There are two ways to implement scale-out through host NICs:

  • Without peer-direct RDMA

  • With peer-direct RDMA Scale-out Using Host NICs Without Peer-direct RDMA

Without peer-direct RDMA, exchanging data between two Gaudi devices in two different hosts requires:

  • Copying the data from device memory in one Gaudi to the host CPU memory through PCIe.

  • Using the host NIC to send the data to the CPU memory in another host.

  • Copying the data from the CPU memory to Gaudi device memory through PCIe (Figure 3.9).

This process is added to the AllReduce algorithm described above and implemented as hierarchical AllReduce defined by Horovod (NCCL Hierarchical Allreduce, nccl_operations.cc#L191).

Hierarchical AllReduce is enabled by setting export HOROVOD_HIERARCHICAL_ALLREDUCE=1 during runtime as detailed in Example: Scale-out Using Host NICs Without Peer-direct RDMA. The communication within the server is done through Gaudi NICs and the communication across servers is done through the Host NICs and PCIe ports.


Figure 3.9 Scale-out Using Host NIC Without Peer-direct RDMA Example: Scale-out Using Host NICs Without Peer-direct RDMA

The training model and the corresponding scripts are available in the TensorFlow Hello World Example on GitHub.

Changing the model (see Example: Scale-up Within a Server) is not required. A separate script to run a simple example of scale-out using host NICs (without peer-direct RDMA) is provided:

$ ./run_hvd_16gaudi_hostnic.sh

This script does not create an HCL config JSON file. This example uses the same file prepared in the scale-up example (see Example: Scale-up Within a Server), hcl_config.json. The script sets the path of the hcl_config.json to the environment variable HCL_CONFIG_PATH and also sets the environment variable HOROVOD_HIERARCHICAL_ALLREDUCE to 1. In the end, it invokes a command similar to the below example:

$ mpirun --allow-run-as-root \
    --mca btl_tcp_if_exclude lo,docker \
    --host,,,,,,,,,,,,,,, \
    -x PATH \
    python3 example_hvd.py Scale-out Using Host NICs with Peer-direct RDMA

Peer-direct RDMA is a technology that allows using host NICs for DMA data transfers between the memory of Gaudi devices located on separate servers. On each host, NIC used must share PCIe switch with Gaudi device (Figure 3.10).


Figure 3.10 Scale-out Using Host NIC with Peer-direct RDMA

With peer-direct RDMA enabled, there is no need to use CPU memory to exchange data across servers, which enables operation with lower latencies.


Peer-direct RDMA is currently ‘work in progress’ for Gaudi.

3.2.5. Examples of Real-world Applications

Examples of real-world applications, such as ResNet-50, BERT, and other models, including their performance and results are located in TensorFlow Models on GitHub. Keras ResNet Model Example

The below is an example of enabling distributed training in Keras ResNet Model :

  1. General sharding of ImageNet dataset can be found in Tensorflow/computer_vision/common/imagenet_preprocessing.py:

from TensorFlow.common.horovod_helpers import hvd, horovod_enabled

if horovod_enabled() and (is_training or use_distributed_eval):
    'HVD sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d',
    hvd.rank(), hvd.size())
  dataset = dataset.shard(hvd.size(), hvd.rank())

if input_context:
      'Sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d',
          input_context.input_pipeline_id, input_context.num_input_pipelines)
  dataset = dataset.shard(input_context.num_input_pipelines,
  1. Define the use_horovod flag located in common.py. The default value is false:

flags.DEFINE_boolean("use_horovod", default=False, help="Use horovod")
  1. Import basic habana horovod functions to be called in file common.py:

from TensorFlow.common.horovod_helpers import hvd_size, horovod_enabled
  1. Calculate the global batch size based on the batch size per card and total card number in common.py:

if horovod_enabled():
     adjusted_batch_size = batch_size * hvd_size()
  1. Import basic habana horovod functions to be called in resnet_ctl_imagenet_main.py:

from TensorFlow.common.horovod_helpers import hvd, hvd_init, hvd_size, horovod_enabled, synapse_logger_init
  1. Calculate the global batch size based on the batch size per card and total card number and name model directory according to rank ID in file resnet_ctl_imagenet_main.py:

if horovod_enabled():
     batch_size = adjust_batch_size(flags_obj.batch_size)
     model_dir = os.path.join(flags_obj.model_dir, "worker_" + str(hvd.rank()))
  1. Initialize habana horovod in resnet_ctl_imagenet_main.py:

if flags.FLAGS.use_horovod:
  1. Import basic habana horovod functions to be called in resnet_runnable.py:

from TensorFlow.common.horovod_helpers import hvd, horovod_enabled` in file :code:`resnet_runnable.py`
if self.flags_obj.use_distributed_eval and horovod_enabled():
   test_accuracy = hvd.allreduce(self.test_accuracy.result())

3.3. Multi-Worker Training using HPUStrategy

3.3.1. Usage

Habana provides a dedicated HPUStrategy class to support distributed training using Gaudi’s high performance RDMA communication:

from habana_frameworks.tensorflow.distribute import HPUStrategy

strategy = HPUStrategy()

# For use with Keras
with strategy.scope():
   model = ...

# For use with Estimator
run_config = tf.estimator.RunConfig(train_distribute=strategy, ...)

# Manually
server = tf.distribute.Server(...)
strategy.extended._std_server_started = True   # To prevent the double-launch of the Server
with tf.device("/device:HPU:0"):
   y1 = strategy.reduce(...)
   ... = tensorflow.python.ops.collective_ops.broadcast_send(...)
   ... = tensorflow.python.ops.collective_ops.broadcast_recv(...)
   ... = tensorflow.python.ops.collective_ops.all_gather(...)
with tf.compat.v1.Session(server.target) as sess:
   ... = sess.run(y1)


Using other TensorFlow built-in distribution strategies is not supported with Habana software stack on Gaudi (MirroredStrategy, MultiWorkerMirroredStrategy, CentralStorageStrategy, ParameterServerStrategy).

HPUStrategy class has the same usage model as MultiWorkerMirroredStrategy, in which each worker runs in a separate process, with a single Gaudi device acquired.

In multi-worker, multi-process training, TensorFlow establishes inter-process communication using gRPC at startup. Workers run the same training script in parallel, effectively having their own replica (copy) of the same computation graph. TensorFlow Keras and Estimator automatically ensure initial broadcast of trainable variables (weights and biases) prior to training start, and reduce the gradients among workers with each step. In addition, TensorFlow provides a mechanism for both automatic and manual data sharding, so every worker processes a distinct set of samples from the training dataset.

Unlike Horovod, neither tf.distribute nor HPUStrategy use OpenMPI. Worker processes can be initialized in any way (for example, using mpirun).

For a demo of HPUStrategy with multi-worker training, please refer to TensorFlow distribute_with_hpu_strategy Example on GitHub.

3.3.2. Migration from TensorFlow Built-in Strategies

If your training script uses MultiWorkerMirroredStrategy, replacing it with HPUStrategy is sufficient. The worker process performing the training must load the Habana Device integration module for TensorFlow. If Estimator is used, a dedicated evaluator job may be designated to evaluate the last checkpoint in parallel to the training process. In such a case, evaluation must be performed on the CPU. That is, the process running the Estimator-based script does not load the Habana Device integration module for TensorFlow.

MirroredStrategy uses an in-graph approach. An in-graph approach uses a single process running a computation graph directly targeting its operations on various accelerator devices (like CPU:0 and multiple GPU:<X>). Since only one Gaudi device can be acquired per process, every worker must be ran in a separate process. This is an between-graph approach, in which every worker targets its operations to either CPU:0 or HPU:0 only.

For more information on setting up a multi-process training cluster, refer to Multi-worker training with Keras TensorFlow documentation.

3.3.3. Advanced Details

A native MultiWorkerMirroredStrategy class internally uses Cross Device Ops to add collective operations into the computation graph. Those operations are:

  • CollectiveReduce – All-Reduce operation applied to every gradient before weight update.

  • CollectiveBcastSend – Broadcast Send operation for sending the initial weights used only by the cluster leader (worker:0 or chief:0).

  • CollectiveBcastRecv – Broadcast Receive operation for receiving the initial weights used by all workers except the cluster leader.

  • CollectiveGather – All-Gather operation (not exercised in a basic usage flow).

HPUStrategy class reuses MultiWorkerMirroredStrategy, assuring that there is only one accelerator per process, and all collective operations are placed on the "/device:HPU:0" device. During graph pre-processing, the Habana Device integration component for TensorFlow modifies the collective operations to dedicated HPU operations: HpuCollectiveReduce, HpuCollectiveGather, HpuCollectiveBcastSend, and HpuCollectiveBcastRecv. HPU collective operations use Habana’s Communication Library (HCL) internally for collective communication. Neither gRPC nor TensorFlow’s CollectiveExecutor is used at this point. HPU collective operations support exchanging tensors of data types float, int32, float and bfloat16 (using the TF_BF16_CONVERSION environment variable).

Habana’s software stack provides support for Scoped Allocator Optimization (SAO) done by TensorFlow’s Grappler. SAO merges multiple pass-through (in-place) ops into a single one, and merges their input tensors into a single memory allocation called backing tensor (allocated by _ScopedAllocator op). When applied to All-Reduce operations, this optimization may noticeably decrease the duration of a single step, in which hundreds of gradients are averaged at once. In that case, HpuCollectiveReduce needs to be registered as a subject for the optimization:

# With use of the session config
session_config = ...
if enable_scoped_allocator:
   rewrite_options = session_config.graph_options.rewrite_options
   from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig
   rewrite_options.scoped_allocator_optimization = RewriterConfig.ON

# Set globally
if enable_scoped_allocator:
   tf.config.optimizer.set_experimental_options({"scoped_allocator_optimization": True})
   from tensorflow.python.eager import context
   ctx = context.context()
   ctx._collective_scoped_allocator_enabled_ops = ["HpuCollectiveReduce"]