DDP-based Scaling of Gaudi on PyTorch

mpirun Configuration

mpirun map-by PE attribute value may vary on your setup and should be calculated as: socket:PE = floor((number of physical cores) / (number of gaudi devices per each node)).

This sample code can also be used to calculate the number of physical CPU cores and HPU count to generate the appropriate PE value, shown as MPI_PE below. This can be incorporated into any model:

export PHY_CPU_COUNT=$(lscpu --all --parse=CORE,SOCKET | grep -Ev "^#" | sort -u | wc -l)
export PHY_HPU_COUNT=$(ls /dev/hl? | wc -l)
export MPI_PE=$(($PHY_CPU_COUNT/$PHY_HPU_COUNT))

The PE value in the Model-References examples may be set to a common number to ensure functionality, but depending on the Host CPU, the directions above should be used for optimal system performance.

Scale-up Using Gaudi NICs Within a Server

The below is a simple example, hccl_example.py showing distributed training support. HCCL communication backend is loaded and process group communication backend is initialized as “hccl” using the following script changes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import torch
import habana_frameworks.torch.core as htcore
import platform

torch.manual_seed(0)
#load hpu backend for PyTorch
device = torch.device('hpu')

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12340'
    #Import the distributed package for HCCL, set the backend to HCCL
    import habana_frameworks.torch.distributed.hccl
    torch.distributed.init_process_group(backend='hccl', rank=rank, world_size=world_size)

def cleanup():
    torch.distributed.destroy_process_group()

def allReduce(rank):
    _tensor = torch.ones(8).to(device)
    torch.distributed.all_reduce(_tensor)
    _tensor_cpu = _tensor.cpu()

def run_allreduce(rank, world_size):
    setup(rank, world_size)

    for i in range(100):
        allReduce(rank)

    cleanup()

def main():
    #Run Habana's Initialize HPU function to collect the world size and rank
    from habana_frameworks.torch.distributed.hccl import initialize_distributed_hpu
    world_size, rank, local_rank = initialize_distributed_hpu()
    run_allreduce(rank, world_size)

if __name__ == '__main__':
    main()

The code above runs in multiple processes, one for each Gaudi.

In order to launch the distributed training for eight Gaudi devices within one host run the following command:

$ mpirun --allow-run-as-root -np 8 python3 hccl_example.py

Note

Open MPI is required for host communication and launching processes.

Scale-out Across Servers

Scale-out Using Host NICs

To use scale-out over Host NICs, install the OFI Wrapper and libfabric as detailed in Scale-Out via Host-NIC over OFI. The OFI Wrapper and libFabric are installed by default when using Intel Gaudi containers.

For further information on how the network setting is detected, refer to Scale-Out via Host-NIC.

Scale-out over Host NICs is enabled with Gaudi Direct functionality. Follow the below steps to use Gaudi Direct.

  1. Ensure LD_LIBRARY_PATH is set with usr/lib/habanalabs included.

  2. Set the RDMAV_FORK_SAFE=1 and FI_EFA_USE_DEVICE_RDMA=1 variables. These variable are set by default in the Intel Gaudi Docker image.

  3. Run mpirun command with RDMAV_FORK_SAFE=1 and FI_EFA_USE_DEVICE_RDMA=1:

    mpirun --allow-run-as-root  \
       -x RDMAV_FORK_SAFE=1   \
       -x FI_EFA_USE_DEVICE_RDMA=1 \
       ...
    
  1. Ensure LD_LIBRARY_PATH is set with usr/lib/habanalabs included.

  2. Set the RDMAV_FORK_SAFE=1 and MLX5_SCATTER_TO_CQE=0 environment variables.

  3. Ensure PCIe Access Control (ACS) is disabled.

  4. Use Intel Gaudi proprietary libfabric.

  5. When building libfabric, use -with-synapseai configuration option.

  6. Run mpirun command with RDMAV_FORK_SAFE=1 and MLX5_SCATTER_TO_CQE=0:

    mpirun --allow-run-as-root  \
       -x RDMAV_FORK_SAFE=1   \
       -x MLX5_SCATTER_TO_CQE=0 \
       ...
    

Note

Gaudi Direct with Verbs provider is supported on Gaudi 2 only.

Scale-out Using Gaudi NICs

Run the script using the below command. You must append the IP addresses of two servers at the end of the script and the addresses should be separated by commas, similar to the example below:

mpirun --allow-run-as-root  \
    -x LD_LIBRARY_PATH=".."  \
    -x HABANA_LOGS=".."  \
    -x GC_KERNEL_PATH=".."  \
    --prefix /usr/local/openmpi  \
    --mca btl_tcp_if_include 10.211.160.0/16  \
    -x MASTER_ADDR=10.211.160.154  \
    -x MASTER_PORT=12345  \
    --mca plm_rsh_args "-p 3022"  \
    -H 10.211.160.154,10.211.160.52 \
    -n 16  \
    ...
    python3 hccl_example.py

By default, the shell scripts connect to port 3022, however, the port listened by the SSH server may differ between different environments. If your environment requires specifying a different port of the remote SSH server, you can use the SSHD_PORT environment variable.

To change the port, use the below command. Make sure to set the port to 22 as in the below example.

$ /etc/init.d/ssh restart '-p 22'

For further details, refer to the README located in PyTorch Model References GitHub page.

HCCL with Distributed Data Parallel (DDP) Hook

The example below, from PyTorch Documentation, includes the Gaudi-specific modifications to showcase the initialization and usage of HCCL package with PyTorch’s DDP hook, ddp_model = DDP(model).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#reference https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

import habana_frameworks.torch.core as htcore

device = torch.device('hpu')

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    os.environ["ID"] = str(rank)
    #distributed package for HCCL
    import habana_frameworks.torch.distributed.hccl
    dist.init_process_group(backend='hccl', rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(device)
    ddp_model = DDP(model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10).to(device))
    labels = torch.randn(20, 5).to(device)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == "__main__":
    world_size = 8
    run_demo(demo_basic, world_size)

Advanced Usage: ZeroRedundancyOptimizer with DDP

The example below, from PyTorch Documentation includes the Gaudi-specific modifications to showcase the initialization and usage of HCCL package with PyTorch’s DDP hook and custom fused optimizer.

The fused optimizer is wrapped around with the functional optimizer class so that its accessible by zero1. For further details on custom optimizers, refer to Custom Optimizers.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#reference https://pytorch.org/tutorials/recipes/zero_redundancy_optimizer.html
#usage: NATIVE=0 OVERLAP=1 python -u testZero1.py

import os
import time
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.distributed.optim import ZeroRedundancyOptimizer
from torch.nn.parallel import DistributedDataParallel as DDP
from habana_frameworks.torch.hpex.optimizers import FusedAdamW
from torch.distributed.algorithms.ddp_comm_hooks.ddp_zero_hook import (
    hook_with_zero_step,
    hook_with_zero_step_interleaved,
)
from torch.distributed.algorithms.ddp_comm_hooks.default_hooks import (
    allreduce_hook,
)

import habana_frameworks.torch.core as htcore

#To register a functional optimizer, import the optimizer and
#invoke register_functional_optim(key,optimizer) from torch.distributed.optim.utils
#to register the optimizer
from habana_frameworks.torch.hpex.optimizers.distributed import FusedAdamW as FunctionalFusedAdamW
from torch.distributed.optim.utils import register_functional_optim
register_functional_optim(FunctionalFusedAdamW,FunctionalFusedAdamW)

NATIVE=0
use_native = int(os.environ['NATIVE'])
OVERLAP=0
use_overlap = int(os.environ['OVERLAP'])
WARMUP_STEPS=2

device = torch.device('hpu')
torch.manual_seed(0)
input=[torch.randn(2000, 2000),torch.randn(2000, 2000)]
label= [torch.randn(2000, 2000),torch.randn(2000, 2000)]

def example(rank, world_size, use_zero):
    torch.manual_seed(0)
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    # create default process group
    import  habana_frameworks.torch.distributed.hccl
    dist.init_process_group("hccl", rank=rank, world_size=world_size)

    # create local model
    model = nn.Sequential(*[nn.Linear(2000, 2000).to(device) for _ in range(20)])
    model = model.to(device)
    # construct DDP model
    import copy
    ddp_model = DDP(copy.deepcopy(model).to(device), bucket_cap_mb=10000*1024*1024, gradient_as_bucket_view=True)

    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    if use_zero:
        optimizer = ZeroRedundancyOptimizer(
            ddp_model.parameters(),
            optimizer_class=torch.optim.Adam if use_native else FunctionalFusedAdamW,
            lr=0.01,
            overlap_with_ddp=True if use_overlap else False,
            weight_decay=1e-2,
            eps = 1e-8
       )
        if use_overlap:
            print("registering comm hook")
            ddp_model.register_comm_hook(
                None,
                hook_with_zero_step_interleaved(allreduce_hook, ddp_model, optimizer, shard_buckets=True)
            )
    else:
        optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.01,weight_decay=1e-2,eps = 1e-8) if use_native else FusedAdamW(ddp_model.parameters(), lr=0.01,weight_decay=1e-2,eps = 1e-8)
    i=1
    while i!=10:
        # forward pass
        outputs = ddp_model(input[rank].to(device))
        labels = label[rank].to(device)
        # backward pass
        loss = loss_fn(outputs, labels)
        loss.backward()
        #break the graph
        htcore.mark_step()
        # update parameters
        if (((not use_zero) or (use_zero and not use_overlap)) and (i>WARMUP_STEPS)): #2 warm-up steps
            optimizer.step()
            htcore.mark_step()
        if rank == 0:
            print(" loss for step ",i, " is ", loss.to("cpu"))
        i=i+1

    print(f"params sum is: {sum(model.parameters()).sum()}")

def main():
    world_size = 2
    print("=== Using ZeroRedundancyOptimizer ===")
    start_time = time.time()
    mp.spawn(example,
        args=(world_size, True),
        nprocs=world_size,
        join=True)
    print("Time : ",time.time()-start_time)
    print("=== Not Using ZeroRedundancyOptimizer ===")
    start_time = time.time()
    mp.spawn(example,
        args=(world_size, False),
        nprocs=world_size,
        join=True)
    print("Time : ",time.time()-start_time)

if __name__=="__main__":
    main()

Examples of Real-world Applications

Examples of real-world applications, such as ResNet-50, BERT, and other models, including their performance and results can be found in the PyTorch Models GitHub page with specific README and examples.