TensorFlow Distributed based Scaling of Gaudi
On this Page
TensorFlow Distributed based Scaling of Gaudi¶
Habana provides a dedicated HPUStrategy
class to support distributed training using Gaudi’s high performance RDMA communication.
Note
Using other TensorFlow built-in distribution strategies is not supported with Habana software stack on Gaudi (MirroredStrategy
, MultiWorkerMirroredStrategy
, CentralStorageStrategy
, ParameterServerStrategy
).
They may be still used, but the user must assure that the collective operations are assigned to CPU device and not to /device:HPU:0
device to exchange data by using Host NICs.
HPUStrategy Usage¶
HPUStrategy
class has the same usage model as MultiWorkerMirroredStrategy
, in which each worker runs in a separate process, with a single Gaudi device acquired.
For a demo of HPUStrategy
with multi-worker training, please refer to TensorFlow distribute_with_hpu_strategy Example on GitHub.
from habana_frameworks.tensorflow.distribute import HPUStrategy
# Set TF_CONFIG environment variable or pass cluster_resolver to the strategy
os.environ["TF_CONFIG"] = ...
strategy = HPUStrategy()
# For use with Keras
with strategy.scope():
model = ...
model.compile(...)
# For use with Estimator
run_config = tf.estimator.RunConfig(train_distribute=strategy, ...)
# Manually
server = tf.distribute.Server(...)
with tf.device("/device:HPU:0"):
y1 = strategy.reduce(...)
... = tensorflow.python.ops.collective_ops.broadcast_send(...)
... = tensorflow.python.ops.collective_ops.broadcast_recv(...)
... = tensorflow.python.ops.collective_ops.all_gather(...)
In multi-worker, multi-process training, TensorFlow establishes inter-process communication using gRPC at startup. Workers run the same training script in parallel, effectively having their own replica (copy) of the same computation graph. TensorFlow Keras and Estimator automatically ensure initial broadcast of trainable variables (weights and biases) prior to training start, and reduce the gradients among workers with each step. In addition, TensorFlow provides a mechanism, DistributedDataset, which offers both automatic and manual data sharding, so every worker processes a distinct set of samples from the training dataset.
Unlike Horovod, neither tf.distribute
nor HPUStrategy
use/require OpenMPI at any point.
Worker processes can be initialized in any way (for example, using mpirun
for convenience, as it offers process-to-core binding mechanism, e.g. –bind-to core –map-by slot:PE=6).
Setting TF_CONFIG¶
Similar to MultiWorkerMirroredStrategy
, HPUStrategy
requires information about the cluster, i.e. all workers participating in the training.
Every worker process must set this information in one of the following ways:
Setting the
TF_CONFIG
environment variable.Passing
cluster_resolver
to HPUStrategy.
Note
These options are mutually exclusive - in case of no cluster_resolver being passed to HPUStrategy
constructor, tf.distribute.cluster_resolver.TFConfigClusterResolver
will be instantiated.
TFConfigClusterResolver
resolves the cluster by reading TF_CONFIG
.
cluster_resolver
object, passed to constructor of HPUStrategy
class, should be an instance of a class that implements tf.distribute.cluster_resolver.ClusterResolver interface.
More information about ClusterResolver interface can be found in TensorFlow documentation.
TF_CONFIG
is a JSON string which lists all tasks taking part in the training and specifies the current process role in the cluster.
As every training process in a cluster has a different role, the value of TF_CONFIG
must be different in every process.
More information about TF_CONFIG
can be found in the TensorFlow documentation: Distributed training with TensorFlow.
The below code example shows how to set TF_CONFIG
variable from within the training script:
def set_tf_config(worker_index: int, num_workers: int):
""" Makes a TensorFlow cluster information and sets it to TF_CONFIG environment variable.
"""
tf_config = {
"cluster": {
"worker": [f"localhost:{BASE_TF_SERVER_PORT + index}" for index in range(num_workers)]
},
"task": {"type": "worker", "index": worker_index}
}
tf_config_text = json.dumps(tf_config)
os.environ["TF_CONFIG"] = tf_config_text
print(f"TF_CONFIG = {tf_config_text}")
return tf_config_text
The presented code requires two global variables to be set earlier:
num_workers
- The total number of workers in the cluster.worker_index
- A unique worker index (within a range from 0 tonum_workers-1
inclusively).
If worker processes are spawned using mpirun
as a launcher, those values are easily obtained using mpi4py
library:
from mpi4py import MPI
worker_index = MPI.COMM_WORLD.Get_rank()
num_workers = MPI.COMM_WORLD.Get_size()
set_tf_config(worker_index, num_workers)
If you do not want to use Open MPI, you can proceed with python.multiprocessing library. By doing so you can easily spawn multiple sub-processes from the Python script, running the same python script:
def my_run(worker_index: int, num_workers: int):
# This function shall be ran in child processes.
set_tf_config(worker_index, num_workers)
...
num_workers = 2
processes = [mp.Process(target=my_run, args=(i, num_workers)) for i in range(num_workers)]
for p in processes:
p.start()
for p in processes:
p.join()
Refer to TensorFlow distribute_with_hpu_strategy Example for full demo codes of both mpirun-based and multiprocessing-based variants.
Migration from TensorFlow Built-in Strategies¶
If your training script uses MultiWorkerMirroredStrategy
, replacing it with HPUStrategy
is sufficient.
The worker process performing the training must load the Habana Device integration module for TensorFlow.
If Estimator is used, a dedicated evaluator
job may be designated to evaluate the last checkpoint in parallel to the training process.
In such a case, evaluation must be performed on the CPU. That is, the process running the Estimator-based script does not load the Habana Device integration module for TensorFlow.
MirroredStrategy
uses an in-graph approach.
An in-graph approach uses a single process running a computation graph directly targeting its operations on various accelerator devices (like CPU:0 and multiple GPU:<X>).
Since only one Gaudi device can be acquired per process, every worker must be ran in a separate process.
This is an between-graph approach, in which every worker targets its operations to either CPU:0 or HPU:0 only.
For more information on setting up a multi-process training cluster, refer to Multi-worker training with Keras TensorFlow documentation.
Advanced Details¶
A native MultiWorkerMirroredStrategy
class internally uses Cross Device Ops to add collective operations into the computation graph.
Those operations are:
CollectiveReduce
,CollectiveReduceV2
andCollectiveReduceV3
– All-Reduce operation applied to every gradient before weight update.CollectiveBcastSend
andCollectiveBcastSendV2
– Broadcast Send operation for sending the initial weights used only by the cluster leader (worker:0 or chief:0).CollectiveBcastRecv
andCollectiveBcastRecvV2
– Broadcast Receive operation for receiving the initial weights used by all workers except the cluster leader.CollectiveGather
andCollectiveGatherV2
– All-Gather operation (not exercised in a basic usage flow).
HPUStrategy
class reuses MultiWorkerMirroredStrategy
, assuring that there is only one accelerator per process, and all collective operations are placed on the "/device:HPU:0"
device.
During graph pre-processing, the Habana Device integration component for TensorFlow modifies some of the collective operations to dedicated HPU operations:
HpuCollectiveReduce
, HpuCollectiveGather
, HpuCollectiveGatherV2
, HpuCollectiveBcastSend
, HpuCollectiveBcastSendV2
, HpuCollectiveBcastRecv
and HpuCollectiveBcastRecvV2
(CollectiveReduceV2
and CollectiveReduceV3
are not renamed).
CollectiveAllToAllV3
is not supported on "/device:HPU:0"
.
HPU collective operations use Habana’s Collective Communications Library (HCCL) internally for collective communication. Neither gRPC nor TensorFlow’s CollectiveExecutor is used at this point.
The following table shows which dtypes are supported for HPU collective operations (therefore: executed on Gaudi NICs):
Collective Operation |
Supported dtypes on “/device:HPU:0” |
---|---|
CollectiveReduce, CollectiveReduceV2, CollectiveReduceV3 |
bfloat16, float |
CollectiveBcastSend, CollectiveBcastSendV2, CollectiveBcastRecv, CollectiveBcastRecvV2 |
bfloat16 (only through TF_BF16_CONVERSION JSON recipe), float, int32, int64 |
CollectiveGather, CollectiveGatherV2 |
bfloat16 (only through TF_BF16_CONVERSION JSON recipe), float, float16/half, int32, int64 |
TensorFlow 2.5 introduced a mechanism of gradient tensor packing which automatically concatenates the gradients into a single all-reduce operation, and then splits the results into multiple outputs.
This optimization technique supersedes its older alternative - Scoped Allocator Optimization (SAO) - which is no longer accessible to HPUStrategy
users since TensorFlow 2.5.
However, SAO may be still used with Horovod’s HorovodAllreduce
.