# 3. Distributed Training with TensorFlow¶

## 3.1. Introduction¶

Distributed training is becoming more and more important with the growth of training data size and model complexity. Many methods have been proposed by industry and academia like data parallelism, model parallelism, hybrid parallelism and more. This document describes how to implement synchronous data parallel distributed training systems wherein the training process is accelerated by concurrent processing of the distributed dataset on multiple workers.

From a hardware perspective, the Habana® Gaudi® HPU supports the RoCEv2 protocol, RDMA over Converged Ethernet. Each Gaudi natively integrates ten 100Gigabit Ethernet ports supporting RoCEv2. Figure 3.1 shows an HLS-1 server which features eight Gaudi devices. Within each Gaudi device, seven of the ten NIC ports are used for connecting to the other seven Gaudis within the server in an all-to-all processor configuration for scale-up and three are used for scale-out across servers.

Figure 3.1 HLS-1 Block Diagram

From a software perspective, Gaudi scaling with data parallelism in the TensorFlow framework is achieved using two distinct methods:

• By using Habana Horovod (see Figure 3.2).

• By using HPUStrategy integrated with tf.distribute API.

The following versions are supported:

• All supported TensorFlow versions. See the TensorFlow section of the Release Notes for more details.

• Habana Horovod - forked from v0.20.0 of the official Horovod

Figure 3.2 Gaudi Distributed Training Software Stack

The following table summarizes the scale-out topology further described in the below sections:

Scale Out Topology

Distributed Framework

Scaling

Flags

Notes

Gaudi based

Horovod

RDMA based scaling

None

Default mode = RDMA

TensorFlow Distributed

Host NIC Scaling

Horovod

RDMA based scaling

HOROVOD_HIERARCHICAL _ALLREDUCE=1

MPI based Host NIC Scale Out

Horovod

TCP/IP based scaling

HCCL_OVER_TCP=1

Refer to Scale-Out via Host-NIC over TCP

TensorFlow Distributed

RDMA based scaling

N/A

Not supported in v1.0.1

## 3.2. Horovod-based Scaling of Gaudi on TensorFlow¶

Scale-up within the HLS-1 server is achieved with Gaudi NICs. Scale-out across servers is achieved through Gaudi NICs and Host NICs.

In Figure 3.3 Gaudi NICs are used for both scale-up and scale-out.

Figure 3.3 Gaudi NIC for both Scale-up and Scale-out

In Figure 3.4 Gaudi NICs are used for scale-up and Host NICs are used for scale-out.

Figure 3.4 Gaudi NIC for Scale-up and Host NIC for Scale-out

### 3.2.1. Horovod APIs Support¶

Habana provides two sets of APIs to support Horovod:

Note

HCCL is Habana’s implementation of Nvidia NCCL for Gaudi. Internally, HCCL uses the Habana Communication Library (HCL).

The below table presents the API recommendation. An environment variable enables switching between these two APIs during runtime.

Note

By default, the code uses HCCL APIs.

HW solution

SW solution

Environment variables

Scaling using Gaudi NIC for both scale-up and scale-out

HCL

export HABANA_NCCL_COMM_API=false

Scaling using Gaudi NIC for scale-up and Host NIC for scale-out

HCCL (default API)

#### 3.2.1.1. Horovod Integration with Habana Collective Communications Library (HCCL)¶

HCL provides a set of communication APIs providing support on Habana’s SynapseAI layer. While HCCL library directly operates on top of Habana Communication Library (HCL), Horovod HCCL requires not only HCCL, but also synapse_helpers library provided by TensorFlow integration, in order to leverage synchronization between computation and collective streams.

#### 3.2.1.2. Horovod Integration with Habana Communication Library (HCL)¶

Horovod HCL uses HCL indirectly, through synapse_helpers library provided by TensorFlow integration, in order to leverage synchronization between computation and collective streams.

### 3.2.2. Scale-up Using Gaudi NICs Within a Server¶

#### 3.2.2.1. Theory of Operation: Scale-up Using Gaudi NICs¶

Collective Ops in Horovod for data parallel distributed training including AllReduce, Broadcast, AllGather, and so on are implemented. The AllReduce Op is used to average the weight gradients across workers during the back propagation of the training. The Broadcast Op is used to broadcast initial weights to all workers.

In the case of scale-up within one server, each worker is a Gaudi device and AllReduce uses Gaudi NICs for communication. The all-to-all connections across Gaudi devices within the server guarantee the best performance of AllReduce for the best scaling efficiency.

Figure 3.5 AllReduce Within a Server

#### 3.2.2.2. Example: Scale-up Within a Server¶

The below is a simple example of distributed training and is based on the single Gaudi training example detailed in the Gaudi Migration Guide. The training model and the corresponding scripts are available in the TensorFlow Hello World Example on Github.

The highlighted lines of code are added for distributed training.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import tensorflow as tf from habana_frameworks.tensorflow import load_habana_module import horovod.tensorflow.keras as hvd tf.compat.v1.disable_eager_execution() load_habana_module() # Initialization of Horovod. hvd.init() # Ensure only 1 process downloads the data on each node if hvd.local_rank() == 0: (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() hvd.broadcast(0, 0) else: hvd.broadcast(0, 0) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Data partition for different workers num_pics_per_rank = x_train.shape[0] // hvd.size() pic_begin = num_pics_per_rank * hvd.rank() pic_end = pic_begin + num_pics_per_rank x_train = x_train[pic_begin:pic_end,] y_train = y_train[pic_begin:pic_end,] x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(10) ]) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) # Using hvd.size()(number of workers) to scale learning rate and wrapping # optimizer with Distributed optimizer class provided by horovod. optimizer = tf.keras.optimizers.SGD(learning_rate=0.01*hvd.size()) optimizer =hvd.DistributedOptimizer(optimizer) callbacks = [ # Horovod: broadcast initial variable states from rank0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), ] model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy']) model.fit(x_train, y_train, epochs=1, batch_size=128, callbacks=callbacks) model.evaluate(x_test, y_test) 

The code above runs in multiple processes, one for each Gaudi.

$cat hcl_config.json { "HCL_PORT": 5332, "HCL_COUNT": 8 }  2. After creating hcl_config.json, set the HCL_CONFIG_PATH environment variable on the file path. For more details, refer to Habana Communication Library (HCL) API Reference. 3. Run the following command to launch the distributed training for eight Gaudi devices within one host. The command is wrapped in a script called run_hvd_8gaudi.sh. OpenMPI is required for host communication and launching processes version. The recommended OpenMPI version is 4.0.5: $ mpirun -np 8 python3 example_hvd.py


The below is an example output:

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7289 - accuracy: 0.8361

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7916 - accuracy: 0.8051

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7939 - accuracy: 0.8053

7500/7500 [==============================] - 104s 14ms/sample - loss:
0.7928 - accuracy: 0.8093


### 3.2.3. Scale-out Using Gaudi NICs Across Servers¶

Note

This configuration is available only when using Gaudi based fabric for scale out.

#### 3.2.3.1. Theory of Operation: Scale-out Using Gaudi NICs¶

When training is distributed across more than one server, weight gradients need to be averaged for all workers across all servers. Habana’s implementation of AllReduce supports this transparently.

Internally local ReduceScatter within the server, is followed by across-server AllReduce, and finally a local AllGather. All communications are done through Gaudi NICs, whether communication is within the server or across servers.

The figures below shows how AllReduce works in a network configuration of four servers and eight ranks in each server:

• Each rank is a Gaudi device.

• The figures below show only the first four ranks in each server for illustration purpose.

Figure 3.6 is the result of a local ReduceScatter within each server. The ReduceScatter is implemented as an all-to-all communication within the server followed by a local reduction within each rank. It is the same as the first two steps outlined in Figure 3.5.

After the local ReduceScatter within each server, there are AllReduce operations for chunks of the data among corresponding ranks across different servers. That is, the AllReduce for the first chunk of data among rank 0/8/16/24, the AllReduce for the second chunk of the data among rank 1/9/17/25, and so on. The scale-out ports of the Gaudi NICs are used for communication across servers.

After the across-server AllReduce, each chunk of data among corresponding ranks across different servers have the same value as the color indicates (Figure 3.7). The last step is an AllGather within each sever. After that, all chunks of the data in all servers have the right values.

Final result is illustrated on Figure 3.8. All ranks have exactly the same set of reduced data.

Figure 3.6 Result of Local ReduceScatter Within Each Server

Figure 3.7 Result of AllReduce for Chunks of Data Among Corresponding Ranks Across Different Servers

Figure 3.8 Result of Local AllGather Within Each Server

#### 3.2.3.2. Example: Scale-out Using Gaudi NICs¶

Note

This configuration is supported only when using Gaudi based fabric for scale out.

The training model and the corresponding scripts are available in the TensorFlow Hello World Example on GitHub.

Changing the model (see Example: Scale-up Within a Server) to make it run across multiple HLS-1 servers is not required. The script, however, requires some changes.

A new script, run_hvd_16gaudi.sh is provided as an example of two HLS-1 servers. The scale-out ports of the Gaudi devices in one server are connected to those in another server through a switch. The script automatically generates the HCL config JSON file used for the training.

Run the script using the below command. You must append the IP addresses of two servers at the end of the script and the addresses should be separated by whitespaces, similar to the example below:

$./run_hvd_16gaudi.sh 192.168.0.1 192.168.0.2  The auto-generated HCL config JSON file is located under the /tmp folder. The content is similar to the below example: { "HCL_PORT": 5332, "HCL_RANKS": [ "192.168.0.1", "192.168.0.1", "192.168.0.1", "192.168.0.1", "192.168.0.1", "192.168.0.1", "192.168.0.1", "192.168.0.1", "192.168.0.2", "192.168.0.2", "192.168.0.2", "192.168.0.2", "192.168.0.2", "192.168.0.2", "192.168.0.2", "192.168.0.2" ] }  The script also sets the path of the generated HCL JSON file to the environment variable HCL_CONFIG_PATH and eventually invokes a command like the example below: $ mpirun --allow-run-as-root \
--mca btl_tcp_if_include 192.168.0.1/24,192.168.0.2/24
--prefix /usr/local/openmpi/
--host
192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,19
2.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2\
-x GC_KERNEL_PATH \
-x HABANA_LOGS \
-x TF_MODULES_RELEASE_BUILD \
-x OPAL_PREFIX \
-x PYTHONPATH \
-x LD_LIBRARY_PATH \
-x PATH \
...
-x HCL_CONFIG_PATH \
python3 example_hvd.py


By default, the shell scripts connect to port 3022, however, the port listened by the SSH server may differ between different environments. If your environment requires specifying a different port of the remote SSH server, you can use the SSHD_PORT environment variable.

The example below uses port 22:

$SSHD_PORT=22 ./run_hvd_16gaudi.sh 192.168.0.1 192.168.0.2  To change the port, use the below command. Make sure to set the port to 22 as in the below example. $ /etc/init.d/ssh restart '-p 22'


### 3.2.4. Scale-out Using Host NICs Across Servers¶

There are two ways to implement scale-out through host NICs:

• Without peer-direct RDMA

• With peer-direct RDMA

#### 3.2.4.1. Scale-out Using Host NICs Without Peer-direct RDMA¶

Note

Currently, this configuration is not supported using Gaudi based fabric for scale out.

Without peer-direct RDMA, exchanging data between two Gaudi devices in two different hosts requires:

• Copying the data from device memory in one Gaudi to the host CPU memory through PCIe.

• Using the host NIC to send the data to the CPU memory in another host.

• Copying the data from the CPU memory to Gaudi device memory through PCIe (Figure 3.9).

This process is added to the AllReduce algorithm described above and implemented as hierarchical AllReduce defined by Horovod (NCCL Hierarchical Allreduce, nccl_operations.cc#L191).

Hierarchical AllReduce is enabled by setting export HOROVOD_HIERARCHICAL_ALLREDUCE=1 during runtime as detailed in Example: Scale-out Using Host NICs Without Peer-direct RDMA. The communication within the server is done through Gaudi NICs and the communication across servers is done through the Host NICs and PCIe ports.

Figure 3.9 Scale-out Using Host NIC Without Peer-direct RDMA

#### 3.2.4.2. Example: Scale-out Using Host NICs Without Peer-direct RDMA¶

The training model and the corresponding scripts are available in the TensorFlow Hello World Example on GitHub.

Changing the model (see Example: Scale-up Within a Server) is not required. A separate script to run a simple example of scale-out using host NICs (without peer-direct RDMA) is provided:

$./run_hvd_16gaudi_hostnic.sh 192.168.0.1 192.168.0.2  This example sets the environment variable HOROVOD_HIERARCHICAL_ALLREDUCE to 1 and invokes a command similar to the below example: $ mpirun --allow-run-as-root \
--mca btl_tcp_if_include 192.168.0.1/24,192.168.0.2/24
--prefix /usr/local/openmpi/
--host
192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,192.168.0.1,19
2.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2,192.168.0.2 \
-x GC_KERNEL_PATH \
-x HABANA_LOGS \
-x TF_MODULES_RELEASE_BUILD \
-x OPAL_PREFIX \
-x PYTHONPATH \
-x LD_LIBRARY_PATH \
-x PATH \
...
-x HOROVOD_HIERARCHICAL_ALLREDUCE \
python3 example_hvd.py


The port listened by the ssh server might be different if the workload is not running inside the container. You can specify the port of the remote ssh server using the SSHD_PORT environment variable.

\$ SSHD_PORT=22 ./run_hvd_16gaudi_hostnic.sh 192.168.0.1 192.168.0.2


#### 3.2.4.3. Scale-out Using Host NICs with Peer-direct RDMA¶

Peer-direct RDMA is a technology that allows using host NICs for DMA data transfers between the memory of Gaudi devices located on separate servers. On each host, NIC used must share PCIe switch with Gaudi device (Figure 3.10).

Figure 3.10 Scale-out Using Host NIC with Peer-direct RDMA

With peer-direct RDMA enabled, there is no need to use CPU memory to exchange data across servers, which enables operation with lower latencies.

Note

Peer-direct RDMA is currently ‘work in progress’ for Gaudi.

### 3.2.5. Examples of Real-world Applications¶

Examples of real-world applications, such as ResNet-50, BERT, and other models, including their performance and results are located in TensorFlow Models on GitHub.

#### 3.2.5.1. Keras ResNet Model Example¶

The below is an example of enabling distributed training in Keras ResNet Model :

1. General sharding of ImageNet dataset can be found in Tensorflow/computer_vision/common/imagenet_preprocessing.py:

from TensorFlow.common.horovod_helpers import hvd, horovod_enabled

if horovod_enabled() and (is_training or use_distributed_eval):
logging.info(
'HVD sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d',
hvd.rank(), hvd.size())
dataset = dataset.shard(hvd.size(), hvd.rank())

if input_context:
logging.info(
'Sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d',
input_context.input_pipeline_id, input_context.num_input_pipelines)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)

1. Define the use_horovod flag located in common.py. The default value is false:

flags.DEFINE_boolean("use_horovod", default=False, help="Use horovod")

1. Import basic habana horovod functions to be called in file common.py:

from TensorFlow.common.horovod_helpers import hvd_size, horovod_enabled

1. Calculate the global batch size based on the batch size per card and total card number in common.py:

if horovod_enabled():

1. Import basic habana horovod functions to be called in resnet_ctl_imagenet_main.py:

from TensorFlow.common.horovod_helpers import hvd, hvd_init, hvd_size, horovod_enabled, synapse_logger_init

1. Calculate the global batch size based on the batch size per card and total card number and name model directory according to rank ID in file resnet_ctl_imagenet_main.py:

if horovod_enabled():
model_dir = os.path.join(flags_obj.model_dir, "worker_" + str(hvd.rank()))

1. Initialize habana horovod in resnet_ctl_imagenet_main.py:

if flags.FLAGS.use_horovod:
hvd_init()

1. Import basic habana horovod functions to be called in resnet_runnable.py:

from TensorFlow.common.horovod_helpers import hvd, horovod_enabled in file :code:resnet_runnable.py

if self.flags_obj.use_distributed_eval and horovod_enabled():
test_accuracy = hvd.allreduce(self.test_accuracy.result())


## 3.3. Multi-Worker Training using HPUStrategy¶

### 3.3.1. Usage¶

Habana provides a dedicated HPUStrategy class to support distributed training using Gaudi’s high performance RDMA communication:

from habana_frameworks.tensorflow.distribute import HPUStrategy

strategy = HPUStrategy()

# For use with Keras
with strategy.scope():
model = ...
model.compile(...)

# For use with Estimator
run_config = tf.estimator.RunConfig(train_distribute=strategy, ...)

# Manually
server = tf.distribute.Server(...)
strategy.extended._std_server_started = True   # To prevent the double-launch of the Server
with tf.device("/device:HPU:0"):
y1 = strategy.reduce(...)
... = tensorflow.python.ops.collective_ops.all_gather(...)
with tf.compat.v1.Session(server.target) as sess:
... = sess.run(y1)


Warning

Using other TensorFlow built-in distribution strategies is not supported with Habana software stack on Gaudi (MirroredStrategy, MultiWorkerMirroredStrategy, CentralStorageStrategy, ParameterServerStrategy).

Warning

Currently, HPUStrategy does not support multi-HLS training over host NIC.

HPUStrategy class has the same usage model as MultiWorkerMirroredStrategy, in which each worker runs in a separate process, with a single Gaudi device acquired.

In multi-worker, multi-process training, TensorFlow establishes inter-process communication using gRPC at startup. Workers run the same training script in parallel, effectively having their own replica (copy) of the same computation graph. TensorFlow Keras and Estimator automatically ensure initial broadcast of trainable variables (weights and biases) prior to training start, and reduce the gradients among workers with each step. In addition, TensorFlow provides a mechanism, DistributedDataset, which offers both automatic and manual data sharding, so every worker processes a distinct set of samples from the training dataset.

Unlike Horovod, neither tf.distribute nor HPUStrategy use/require OpenMPI at any point. Worker processes can be initialized in any way (for example, using mpirun for convenience, as it offers process-to-core binding mechanism, e.g. –bind-to core –map-by slot:PE=4).

For a demo of HPUStrategy with multi-worker training, please refer to TensorFlow distribute_with_hpu_strategy Example on GitHub.

### 3.3.2. Migration from TensorFlow Built-in Strategies¶

If your training script uses MultiWorkerMirroredStrategy, replacing it with HPUStrategy is sufficient. The worker process performing the training must load the Habana Device integration module for TensorFlow. If Estimator is used, a dedicated evaluator job may be designated to evaluate the last checkpoint in parallel to the training process. In such a case, evaluation must be performed on the CPU. That is, the process running the Estimator-based script does not load the Habana Device integration module for TensorFlow.

MirroredStrategy uses an in-graph approach. An in-graph approach uses a single process running a computation graph directly targeting its operations on various accelerator devices (like CPU:0 and multiple GPU:<X>). Since only one Gaudi device can be acquired per process, every worker must be ran in a separate process. This is an between-graph approach, in which every worker targets its operations to either CPU:0 or HPU:0 only.

For more information on setting up a multi-process training cluster, refer to Multi-worker training with Keras TensorFlow documentation.

A native MultiWorkerMirroredStrategy class internally uses Cross Device Ops to add collective operations into the computation graph. Those operations are:

• CollectiveReduce and CollectiveReduceV2 – All-Reduce operation applied to every gradient before weight update.

• CollectiveBcastSend and CollectiveBcastSendV2 – Broadcast Send operation for sending the initial weights used only by the cluster leader (worker:0 or chief:0).

• CollectiveBcastRecv and CollectiveBcastRecvV2 – Broadcast Receive operation for receiving the initial weights used by all workers except the cluster leader.

• CollectiveGather and CollectiveGatherV2 – All-Gather operation (not exercised in a basic usage flow).

HPUStrategy class reuses MultiWorkerMirroredStrategy, assuring that there is only one accelerator per process, and all collective operations are placed on the "/device:HPU:0" device. During graph pre-processing, the Habana Device integration component for TensorFlow modifies the collective operations to dedicated HPU operations: HpuCollectiveReduce, HpuCollectiveReduceV2, HpuCollectiveGather, HpuCollectiveGatherV2, HpuCollectiveBcastSend, HpuCollectiveBcastSendV2, HpuCollectiveBcastRecv and HpuCollectiveBcastRecvV2. HPU collective operations use Habana’s Communication Library (HCL) internally for collective communication. Neither gRPC nor TensorFlow’s CollectiveExecutor is used at this point. HPU collective operations support exchanging tensors of data types float, and bfloat16 (using the TF_BF16_CONVERSION environment variable).

TensorFlow 2.5 introduces a mechanism which automatically concatenates the gradients into a single All-Reduce operation, and then splits the results into multiple outputs. This optimization may noticeably decrease the duration of a single step, in which hundreds of gradients are averaged at once.

In case of TensorFlow 2.4, no such mechanism takes place, and an alternative method is provided. Habana’s software stack provides support for Scoped Allocator Optimization (SAO) done by TensorFlow’s Grappler. SAO merges multiple pass-through (in-place) ops into a single one, and merges their input tensors into a single memory allocation called backing tensor (allocated by _ScopedAllocator op). In that case, HpuCollectiveReduce and HpuCollectiveReduceV2 need to be registered as a subject for the optimization:

# Letting HPUStrategy do the work
strategy = HPUStrategy(enable_scoped_allocator=True)

# With use of the session config
session_config = ...
if enable_scoped_allocator:
rewrite_options = session_config.graph_options.rewrite_options
from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig
rewrite_options.scoped_allocator_optimization = RewriterConfig.ON
rewrite_options.scoped_allocator_opts.enable_op.extend(["HpuCollectiveReduce", "HpuCollectiveReduceV2"])

# Set globally
if enable_scoped_allocator:
tf.config.optimizer.set_experimental_options({"scoped_allocator_optimization": True})
from tensorflow.python.eager import context
ctx = context.context()
ctx._collective_scoped_allocator_enabled_ops = ["HpuCollectiveReduce", "HpuCollectiveReduceV2"]
`