TensorFlow Distributed based Scaling of Gaudi

Habana provides a dedicated HPUStrategy class to support distributed training using Gaudi’s high performance RDMA communication.

Note

Using other TensorFlow built-in distribution strategies is not supported with Habana software stack on Gaudi (MirroredStrategy, MultiWorkerMirroredStrategy, CentralStorageStrategy, ParameterServerStrategy). They may be still used, but the user must assure that the collective operations are assigned to CPU device and not to /device:HPU:0 device to exchange data by using Host NICs .

HPUStrategy Usage

HPUStrategy class has the same usage model as MultiWorkerMirroredStrategy, in which each worker runs in a separate process, with a single Gaudi device acquired. For a demo of HPUStrategy with multi-worker training, please refer to TensorFlow distribute_with_hpu_strategy Example on GitHub.

from habana_frameworks.tensorflow.distribute import HPUStrategy

# Set TF_CONFIG environment variable or pass cluster_resolver to the strategy
os.environ["TF_CONFIG"] = ...
strategy = HPUStrategy()

# For use with Keras
with strategy.scope():
   model = ...
   model.compile(...)

# For use with Estimator
run_config = tf.estimator.RunConfig(train_distribute=strategy, ...)

# Manually
server = tf.distribute.Server(...)
with tf.device("/device:HPU:0"):
   y1 = strategy.reduce(...)
   ... = tensorflow.python.ops.collective_ops.broadcast_send(...)
   ... = tensorflow.python.ops.collective_ops.broadcast_recv(...)
   ... = tensorflow.python.ops.collective_ops.all_gather(...)

In multi-worker, multi-process training, TensorFlow establishes inter-process communication using gRPC at startup. Workers run the same training script in parallel, effectively having their own replica (copy) of the same computation graph. TensorFlow Keras and Estimator automatically ensure initial broadcast of trainable variables (weights and biases) prior to training start, and reduce the gradients among workers with each step. In addition, TensorFlow provides a mechanism, DistributedDataset, which offers both automatic and manual data sharding, so every worker processes a distinct set of samples from the training dataset.

Unlike Horovod, neither tf.distribute nor HPUStrategy use/require OpenMPI at any point. Worker processes can be initialized in any way (for example, using mpirun for convenience, as it offers process-to-core binding mechanism, e.g. –bind-to core –map-by slot:PE=4).

Setting TF_CONFIG

Similar to MultiWorkerMirroredStrategy, HPUStrategy requires information about the cluster, i.e. all workers participating in the training.

Every worker process must set this information in one of the following ways:

  • Setting the TF_CONFIG environment variable.

  • Passing cluster_resolver to HPUStrategy.

Note

These options are mutually exclusive - in case of no cluster_resolver being passed to HPUStrategy constructor, tf.distribute.cluster_resolver.TFConfigClusterResolver will be instantiated. TFConfigClusterResolver resolves the cluster by reading TF_CONFIG.

cluster_resolver object, passed to constructor of HPUStrategy class, should be an instance of a class that implements tf.distribute.cluster_resolver.ClusterResolver interface. More information about ClusterResolver interface can be found in TensorFlow documentation.

TF_CONFIG is a JSON string which lists all tasks taking part in the training and specifies the current process role in the cluster. As every training process in a cluster has a different role, the value of TF_CONFIG must be different in every process. More information about TF_CONFIG can be found in the TensorFlow documentation: Distributed training with TensorFlow.

The below code example shows how to set TF_CONFIG variable from within the training script:

def set_tf_config(worker_index: int, num_workers: int):
   """ Makes a TensorFlow cluster information and sets it to TF_CONFIG environment variable.
   """
   tf_config = {
      "cluster": {
            "worker": [f"localhost:{BASE_TF_SERVER_PORT + index}" for index in range(num_workers)]
      },
      "task": {"type": "worker", "index": worker_index}
   }
   tf_config_text = json.dumps(tf_config)
   os.environ["TF_CONFIG"] = tf_config_text
   print(f"TF_CONFIG = {tf_config_text}")
   return tf_config_text

The presented code requires two global variables to be set earlier:

  • num_workers - The total number of workers in the cluster.

  • worker_index - A unique worker index (within a range from 0 to num_workers-1 inclusively).

If worker processes are spawned using mpirun as a launcher, those values are easily obtained using mpi4py library:

from mpi4py import MPI
worker_index = MPI.COMM_WORLD.Get_rank()
num_workers = MPI.COMM_WORLD.Get_size()

set_tf_config(worker_index, num_workers)

If you do not want to use Open MPI, you can proceed with python.multiprocessing library. By doing so you can easily spawn multiple sub-processes from the Python script, running the same python script:

def my_run(worker_index: int, num_workers: int):
   # This function shall be ran in child processes.
   set_tf_config(worker_index, num_workers)
   ...

num_workers = 2
processes = [mp.Process(target=my_run, args=(i, num_workers)) for i in range(num_workers)]
 for p in processes:
     p.start()
 for p in processes:
     p.join()

Refer to TensorFlow distribute_with_hpu_strategy Example for full demo codes of both mpirun-based and multiprocessing-based variants.

Migration from TensorFlow Built-in Strategies

If your training script uses MultiWorkerMirroredStrategy, replacing it with HPUStrategy is sufficient. The worker process performing the training must load the Habana Device integration module for TensorFlow. If Estimator is used, a dedicated evaluator job may be designated to evaluate the last checkpoint in parallel to the training process. In such a case, evaluation must be performed on the CPU. That is, the process running the Estimator-based script does not load the Habana Device integration module for TensorFlow.

MirroredStrategy uses an in-graph approach. An in-graph approach uses a single process running a computation graph directly targeting its operations on various accelerator devices (like CPU:0 and multiple GPU:<X>). Since only one Gaudi device can be acquired per process, every worker must be ran in a separate process. This is an between-graph approach, in which every worker targets its operations to either CPU:0 or HPU:0 only.

For more information on setting up a multi-process training cluster, refer to Multi-worker training with Keras TensorFlow documentation.

Advanced Details

A native MultiWorkerMirroredStrategy class internally uses Cross Device Ops to add collective operations into the computation graph. Those operations are:

  • CollectiveReduce, CollectiveReduceV2 and CollectiveReduceV3 – All-Reduce operation applied to every gradient before weight update.

  • CollectiveBcastSend and CollectiveBcastSendV2 – Broadcast Send operation for sending the initial weights used only by the cluster leader (worker:0 or chief:0).

  • CollectiveBcastRecv and CollectiveBcastRecvV2 – Broadcast Receive operation for receiving the initial weights used by all workers except the cluster leader.

  • CollectiveGather and CollectiveGatherV2 – All-Gather operation (not exercised in a basic usage flow).

HPUStrategy class reuses MultiWorkerMirroredStrategy, assuring that there is only one accelerator per process, and all collective operations are placed on the "/device:HPU:0" device.

During graph pre-processing, the Habana Device integration component for TensorFlow modifies some of the collective operations to dedicated HPU operations: HpuCollectiveReduce, HpuCollectiveGather, HpuCollectiveGatherV2, HpuCollectiveBcastSend, HpuCollectiveBcastSendV2, HpuCollectiveBcastRecv and HpuCollectiveBcastRecvV2 (CollectiveReduceV2 and CollectiveReduceV3 are not renamed). CollectiveAllToAllV3 is not supported on "/device:HPU:0".

HPU collective operations use Habana’s Collective Communications Library (HCCL) internally for collective communication. Neither gRPC nor TensorFlow’s CollectiveExecutor is used at this point.

The following table shows which dtypes are supported for HPU collective operations (therefore: executed on Gaudi NICs):

Collective Operation

Supported dtypes on “/device:HPU:0”

CollectiveReduce, CollectiveReduceV2, CollectiveReduceV3

bfloat16, float

CollectiveBcastSend, CollectiveBcastSendV2, CollectiveBcastRecv, CollectiveBcastRecvV2

bfloat16 (only through TF_BF16_CONVERSION JSON recipe), float, int32, int64

CollectiveGather, CollectiveGatherV2

bfloat16 (only through TF_BF16_CONVERSION JSON recipe), float, float16/half, int32, int64

TensorFlow 2.5 introduced a mechanism of gradient tensor packing which automatically concatenates the gradients into a single all-reduce operation, and then splits the results into multiple outputs. This optimization technique supersedes its older alternative - Scoped Allocator Optimization (SAO) - which is no longer accessible to HPUStrategy users since TensorFlow 2.5. However, SAO may be still used with Horovod’s HorovodAllreduce.