Creating and Executing Media Pipeline

This section describes the steps required to create and execute a media processing pipeline. The steps include:

  • Creating a derived class from MediaPipe e.g myMediaPipe and initializing the super class arguments device, prefetch_depth, batch_size, pipe_name.

  • Creating operators required in the pipeline along with their parameters. All supported operators are listed in Operators table.

  • Defining the sequence of media processing in definegraph() method.

  • Building and initializing the pipeline.

  • Executing the pipeline.

It also provides a Media Pipeline Example Using HPU Device with the above steps implemented.

Note

Before you Start: Make sure to set up your environment as shown in the Installation Guide and On-Premise System Update.

Creating New Pipeline

The Intel Gaudi framework provides a MediaPipe class which includes necessary functions for defining, building and running a Media data processing pipeline. You can define a new pipeline (for example myMediaPipe) by:

  • Creating a derived class from MediaPipe.

  • Passing the following parameters to the base class constructor:

Keyword Arguments:

kwargs

Description

device

Device on which media pipe is to be executed. Should be set to ‘hpu’ or ‘cpu’.

  • Type: str

  • Default: None

prefetch_depth

Input batch queue depth for circular buffering. Controls how many image batches can be preloaded by Media Pipe.

  • Type: int

  • Optional: yes

  • Default: 2

batch_size

Batch size for each media pipeline execution. This should be aligned to the number of examples in a batch on one worker for a neural network model.

  • Type: int

  • Optional: yes

  • Default: 1

pipe_name

Media pipeline name used to identify specific pipe in the logs.

  • Type: str

  • Default: None

The below is an example of a derived class constructor. Note that in the below example additional parameters which are not a part of the media pipe base class were added to the code: dir, channel, height and width. These parameters are used when defining operators specific to myMediaPipe implementation.

class myMediaPipe(MediaPipe):
    def __init__(self, device, dir, queue_depth, batch_size, channel, height, width):
        super(
            myMediaPipe,
            self).__init__(
            device,
            queue_depth,
            batch_size,
            self.__class__.__name__)

# Create object of media pipe
pipe = myMediaPipe('hpu', dir, queue_depth, batch_size,
                   channels, height, width)

Supported Device Type Per Operator

myMediaPipe supports a number of operators implemented either on HPU or CPU. Some operators can be implemented on both HPU and CPU, therefore, allowing you to select which device to run on. See Operators for a full list of operators and supported devices.

When defining myMediaPipe, make sure to the follow the below guidelines:

  • When using device='cpu', each operator in the mediapipe should have device type (HPU or CPU) on which it will be executed. See Media Pipeline with CPU Operators Only for examples.

  • If all operators in a pipeline are HPU, device='hpu' should be set and device type for operators should not be specified.

  • In any type of mediapipe (CPU/HPU) reader operator always runs on CPU.

Note that all CPU operations must be performed prior to HPU operations. See CPU-HPU Operators Ordering Limitation for further details.

Defining Operators and Processing Sequence

The media pipeline is constructed using operators. All operators and their parameter options are available here. First, a set of operators needs to be created in the pipeline along with their parameters e.g. ReadImageDatasetFromDir, ImageDecoder, Crop. Such operations include reading and decoding data and an optional set of operations such as image cropping or flipping can be augmented. After creating the operators, define the sequence of media processing in definegraph() method.

Internally, MediaPipe converts operators to nodes and builds graphs according to the specification of definegraph. The graph is then executed on input batches performing the required operations.

The below example shows the derived class constructor, the set of operators used and the sequence of media processing defined in definegraph() method:

class myMediaPipe(MediaPipe):
        def __init__(self, device, dir, queue_depth, batch_size, img_h, img_w):
            super(
                myMediaPipe,
                self).__init__(
                device,
                queue_depth,
                batch_size,
                self.__class__.__name__)

            self.input = fn.ReadImageDatasetFromDir(shuffle=False,
                                                    dir=dir,
                                                    format="jpg")

            self.decode = fn.ImageDecoder(device="hpu",
                                        output_format=it.RGB_I,
                                        resize=[img_w, img_h])

        def definegraph(self):
            images, labels = self.input()
            images = self.decode(images)
            return images, labels

Building and Initializing the Pipeline

Once the class is defined with the required nodes and sequence of media processing (definegraph), build() and iter_init() methods should be called on object of the class for building the media pipe and initializing the iterator. The iterator is an internal part of the media pipe class which retrieves a sequence of data batches.

pipe.build()
pipe.iter_init()

Executing the Pipeline

To produce an output from media pipeline, the run() method should be called on the class object. The run() method gives an output for a batch of data, already defined in the returned data given in the definegraph() method. After the processing is complete, the pipeline data is present on the device. To view or manipulate tensors on the host, as_cpu() method of device tensor object can be called, which gives a host tensor object. For numpy manipulation, as_nparray() method of host tensor object can be called to get a numpy host array.

As shown below, the defined graph returns images and labels which are also the output of pipe.run().

def definegraph(self):
    images, labels = self.input()
    images = self.decode(images)
    return images, labels
images, labels = pipe.run()

Please note that image batch is not necessarily prepared at the time of calling pipe.run(). Depending on the value of prefetch_depth (passed to MediaPipe constructor), a given number of batches will be prepared ahead of time.

Media Pipeline Example Using HPU Device

The following example shows how to create a media pipeline with image decode operation and resizing these images to a fixed height and width after decoding.

  • fn.ReadImageDatasetFromDir reads jpeg files and labels from a given directory

  • fn.ImageDecoder decodes and resizes jpeg images

from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import imgtype as it
import matplotlib.pyplot as plt

# Create media pipeline derived class
class myMediaPipe(MediaPipe):
    def __init__(self, device, dir, queue_depth, batch_size, img_h, img_w):
        super(
            myMediaPipe,
            self).__init__(
            device,
            queue_depth,
            batch_size,
            self.__class__.__name__)

        self.input = fn.ReadImageDatasetFromDir(shuffle=False,
                                                dir=dir,
                                                format="jpg")

        self.decode = fn.ImageDecoder(device="hpu",
                                      output_format=it.RGB_I,
                                      resize=[img_w, img_h])

    def definegraph(self):
        images, labels = self.input()
        images = self.decode(images)
        return images, labels

def display_images(images, batch_size, cols):
    rows = (batch_size + 1) // cols
    plt.figure(figsize=(10, 10))
    for i in range(batch_size):
        ax = plt.subplot(rows, cols, i + 1)
        plt.imshow(images[i])
        plt.axis("off")
    plt.show()


def main():
    batch_size = 6
    img_width = 200
    img_height = 200
    img_dir = "/path/to/images"
    queue_depth = 2
    columns = 3

    # Create media pipeline object
    pipe = myMediaPipe('hpu', img_dir, queue_depth, batch_size,
                        img_height, img_width)

    # Build media pipeline
    pipe.build()

    # Initialize media pipeline iterator
    pipe.iter_init()

    # Run media pipeline
    images, labels = pipe.run()

    # Copy images to host from device as numpy array
    images = images.as_cpu().as_nparray()
    labels = labels.as_cpu().as_nparray()

    # Display images
    display_images(images, batch_size, columns)

if __name__ == "__main__":
    main()

Decoded and Resized Images 1

Image1 of decoded batch.
Image2 of decoded batch.
Image3 of decoded batch.
Image4 of decoded batch.
Image5 of decoded batch.
Image6 of decoded batch.
1

Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.

Adding Crop as Augmentation Operators

The following code snippet shows augmenting the decoded images with crop operation:

from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import imgtype as it
from habana_frameworks.mediapipe.media_types import dtype as dt
import matplotlib.pyplot as plt

class myMediaPipe(MediaPipe):
    def __init__(self, device, dir, queue_depth, batch_size, img_h, img_w):
        super(
            myMediaPipe,
            self).__init__(
            device,
            queue_depth,
            batch_size,
            self.__class__.__name__)

        self.input = fn.ReadImageDatasetFromDir(shuffle=False,
                                                dir=dir,
                                                format="jpg")

        self.decode = fn.ImageDecoder(device="hpu",
                                      output_format=it.RGB_I,
                                      resize=[img_w, img_h])

        self.crop = fn.Crop(crop_w=150,
                            crop_h=150,
                            dtype=dt.UINT8)

    def definegraph(self):
        images, labels = self.input()
        images = self.decode(images)
        images = self.crop(images)
        return images, labels

def display_images(images, batch_size, cols):
    rows = (batch_size + 1) // cols
    plt.figure(figsize=(10, 10))
    for i in range(batch_size):
        ax = plt.subplot(rows, cols, i + 1)
        plt.imshow(images[i])
        plt.axis("off")
    plt.show()

def main():
    batch_size = 6
    img_width = 200
    img_height = 200
    img_dir = "/path/to/images"
    queue_depth = 2
    columns = 3

    # Create media pipeline object
    pipe = myMediaPipe('hpu', img_dir, queue_depth, batch_size,
                        img_height, img_width)

    # Build media pipeline
    pipe.build()

    # Initialize media pipeline iterator
    pipe.iter_init()

    # Run media pipeline
    images, labels = pipe.run()

    # Copy images to host from device as numpy array
    images = images.as_cpu().as_nparray()
    labels = labels.as_cpu().as_nparray()

    # Display images
    display_images(images, batch_size, columns)

if __name__ == "__main__":
    main()

Decoded, Resized and Cropped Images 2

Image1 of decoded batch and cropped.
Image2 of decoded batch and cropped.
Image3 of decoded batch and cropped.
Image3 of decoded batch and cropped.
Image4 of decoded batch and cropped.
Image5 of decoded batch and cropped.
2

Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.

Media Pipeline with CPU Operators Only

The following code snippet shows media pipeline with CPU operations. Note that all operators must be defined with device='cpu'. The ‘cpu’ device value is also passed to the constructor.

from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import dtype as dt

# Create media pipeline derived class
class myMediaPipe(MediaPipe):
    def __init__(self, dir, queue_depth, batch_size, num_threads, patch_size):
        super(
            myMediaPipe,
            self).__init__(
            'cpu',
            queue_depth,
            batch_size,
            num_threads,
            self.__class__.__name__)

        self.input = fn.ReadNumpyDatasetFromDir(num_outputs=1,
                                                shuffle=False,
                                                dir=dir,
                                                pattern='case_*_x.npy',
                                                dtype=dt.FLOAT32,
                                                device='cpu')

        self.crop_img = fn.Crop(crop_w=patch_size[0],
                                crop_h=patch_size[1],
                                crop_d=patch_size[2],
                                crop_pos_x=0.5,
                                crop_pos_y=0.5,
                                crop_pos_z=0.5,
                                dtype=dt.FLOAT32,
                                device='cpu')

    def definegraph(self):
        image = self.input()
        image = self.crop_img(image)
        return image


def main():
    batch_size = 2
    img_dir = '/path/to/numpy/files/'
    queue_depth = 1
    num_threads = 1
    patch_size = [160, 192, 64]

    # Create media pipeline object
    pipe = myMediaPipe(img_dir, queue_depth,
                       batch_size, num_threads, patch_size)

    # Build media pipeline
    pipe.build()

    # Initialize media pipeline iterator
    pipe.iter_init()

    # Run media pipeline
    image = pipe.run()


if __name__ == "__main__":
    main()

MediaConst and MediaFunc

In addition, Media API allows for defining constant tensors using MediaConst. Those tensors can then be used with operators such CropMirrorNorm (as shown in the example). If tensors need to be created per batch, MediaFunc should be used. It allows, for example, to introduce random tensors to the operators sequence.

CPU-HPU Operators Ordering Limitation

All CPU operations must be performed prior to HPU operations. Therefore, when implementing the sequence of media processing using definegraph, operations need to be processed by CPU first. If operations are processed on HPU first, no operations performed on CPU can be processed after.