Creating and Executing Media Pipeline
On this Page
Creating and Executing Media Pipeline¶
This section describes the steps required to create and execute a media processing pipeline. The steps include:
Creating a derived class from
MediaPipe
e.gmyMediaPipe
and initializing the super class argumentsdevice
,prefetch_depth
,batch_size
,num_threads
, andpipe_name
.Creating operators required in the pipeline along with their parameters. All supported operators are listed in Operators table.
Defining the sequence of media processing in
definegraph()
method.Building and initializing the pipeline.
Executing the pipeline.
It also provides a MediaPipe Example Using HPU Device with the above steps implemented.
Note
Before you Start: Make sure to set up your environment as shown in the Installation Guide and On-Premise System Update.
Creating a New Pipeline¶
The Intel Gaudi framework provides a MediaPipe
class which includes necessary functions for defining, building and running a Media
data processing pipeline.
You can define a new pipeline (for example myMediaPipe
) by:
Creating a derived class from
MediaPipe
.Passing the following parameters to the base class constructor:
Keyword Arguments:
kwargs |
Description |
---|---|
device |
Device on which MediaPipe is to be executed. When the entire pipeline is executed on CPU, the device should be set to
|
prefetch_depth |
Input batch queue depth for circular buffering. Controls how many image batches can be preloaded by MediaPipe.
|
batch_size |
Batch size for each Media Pipeline execution. This should be aligned to the number of examples in a batch on one worker for a neural network model.
|
num_threads |
Number of CPU threads for Media Pipeline.
|
pipe_name |
Pipeline name used to identify specific pipe in the logs.
|
The below is an example of a derived class constructor. In the below example, additional parameters which are not part of the MediaPipe base class were added to the code:
dir
, channel
, height
and width
.
These parameters are used when defining operators specific to myMediaPipe
implementation.
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, num_threads, channel, height, width):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
num_threads,
self.__class__.__name__)
# Create MediaPipe object
pipe = myMediaPipe('hpu', dir, queue_depth, batch_size, num_threads,
channels, height, width)
Supported Device Type Per Operator¶
myMediaPipe
supports a number of operators implemented either on HPU or CPU. Some operators can be implemented on both HPU and CPU, allowing you to select which device to run on. See Operators for a full list of operators and supported devices.
When defining myMediaPipe
, make sure to follow the below guidelines:
When using
device='mixed'
, each operator in the MediaPipe should have a device type (HPU or CPU) on which it will be executed. See MediaPipe with CPU Operators Only for examples.If all operators in a pipeline are CPU,
device='cpu'
should be set and the device type for operators should specified as ‘cpu’.In any type of MediaPipe (CPU/HPU), the reader operator always runs on the CPU.
Note
All CPU operations must be performed prior to HPU operations. See CPU-HPU Operators Ordering Limitation for further details.
Defining Operators and Processing Sequence¶
The MediaPipe is constructed using operators. All operators and their parameter options are available here.
First, a set of operators needs to be created in the pipeline along with their parameters e.g. ReadImageDatasetFromDir
, ImageDecoder
, Crop
.
Such operations include reading and decoding data, and an optional set of operations such as image cropping or flipping can be augmented.
After creating the operators, define the sequence of media processing in definegraph()
method.
Internally, MediaPipe
converts operators to nodes and builds graphs according to the specification of definegraph
.
The graph is then executed on input batches performing the required operations.
The below example shows the derived class constructor, the set of operators used and the sequence of media processing defined in definegraph()
method:
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, num_threads, img_h, img_w):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
num_threads,
self.__class__.__name__)
self.input = fn.ReadImageDatasetFromDir(shuffle=False,
dir=dir,
format="jpg")
self.decode = fn.ImageDecoder(device="hpu",
output_format=it.RGB_I,
resize=[img_w, img_h])
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
return images, labels
Building and Initializing the Pipeline¶
Once the class is defined with the required nodes and sequence of media processing (using definegraph
), the build()
and iter_init()
methods should be called on the class
object to build the MediaPipe and initialize the iterator. The iterator is an internal part of the MediaPipe class which retrieves a sequence of data batches.
pipe.build()
pipe.iter_init()
Executing the Pipeline¶
To produce an output from MediaPipe, the run()
method should be called on the class object. The run()
method gives an output for a batch of data,
already defined in the returned data given in the definegraph()
method.
After the processing is complete, if the pipeline data is present on the device, you can call the as_cpu()
method on the device tensor object
to view or manipulate tensors on the host, which gives a host tensor object. For numpy manipulation, the as_nparray()
method of host tensor object
can be called to get a numpy host array.
As shown below, the defined graph returns images and labels which are also the output of the pipe.run()
.
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
return images, labels
images, labels = pipe.run()
Please note that image batch is not necessarily prepared at the time of calling pipe.run()
.
Depending on the value of prefetch_depth
passed to MediaPipe constructor,
a given number of batches will be prepared ahead of time.
MediaPipe Example Using HPU Device¶
The following example shows how to create a MediaPipe with image decode operation and resizing these images to a fixed height and width after decoding.
fn.ReadImageDatasetFromDir
reads JPEG files and labels from a given directoryfn.ImageDecoder
decodes and resizes jpeg images
from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import imgtype as it
import matplotlib.pyplot as plt
import os
g_display_timeout = os.getenv("DISPLAY_TIMEOUT") or 5
# Create MediaPipe derived class
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, num_threads, img_h, img_w):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
num_threads,
self.__class__.__name__)
self.input = fn.ReadImageDatasetFromDir(shuffle=False,
dir=dir,
format="jpg")
self.decode = fn.ImageDecoder(device="hpu",
output_format=it.RGB_I,
resize=[img_w, img_h])
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
return images, labels
def display_images(images, labels, batch_size, cols):
rows = (batch_size + 1) // cols
plt.figure(figsize=(10, 10))
for i in range(batch_size):
ax = plt.subplot(rows, cols, i + 1)
plt.imshow(images[i])
plt.title("label:"+str(labels[i]))
plt.axis("off")
plt.show(block=False)
plt.pause(g_display_timeout)
plt.close()
def main():
batch_size = 6
num_threads = 1
img_width = 200
img_height = 200
queue_depth = 2
base_dir = os.environ['DATASET_DIR']
dir = base_dir + "/img_data/"
columns = 3
# Create MediaPipe object
pipe = myMediaPipe('mixed', dir, queue_depth, batch_size, num_threads,
img_height, img_width)
# Build MediaPipe
pipe.build()
# Initialize MediaPipe iterator
pipe.iter_init()
# Run MediaPipe
images, labels = pipe.run()
def as_cpu(tensor):
if (callable(getattr(tensor, "as_cpu", None))):
tensor = tensor.as_cpu()
return tensor
# Copy data to host from device as numpy array
images = as_cpu(images).as_nparray()
labels = as_cpu(labels).as_nparray()
del pipe
# Display images
display_images(images, labels, batch_size, columns)
if __name__ == "__main__":
main()
Decoded and Resized Images 1
- 1
Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.
Adding Crop as Augmentation Operators¶
The following code snippet shows augmenting the decoded images with crop operation:
from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import imgtype as it
from habana_frameworks.mediapipe.media_types import dtype as dt
import matplotlib.pyplot as plt
import os
g_display_timeout = os.getenv("DISPLAY_TIMEOUT") or 5
# Create MediaPipe derived class
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, num_threads, img_h, img_w):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
num_threads,
self.__class__.__name__)
self.input = fn.ReadImageDatasetFromDir(shuffle=False,
dir=dir,
format="jpg")
self.decode = fn.ImageDecoder(device="hpu",
output_format=it.RGB_I,
resize=[img_w, img_h])
self.crop = fn.Crop(crop_w=150,
crop_h=150,
dtype=dt.UINT8)
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
images = self.crop(images)
return images, labels
def display_images(images, labels, batch_size, cols):
rows = (batch_size + 1) // cols
plt.figure(figsize=(10, 10))
for i in range(batch_size):
ax = plt.subplot(rows, cols, i + 1)
plt.imshow(images[i])
plt.title("label:" + str(labels[i]))
plt.axis("off")
plt.show(block=False)
plt.pause(g_display_timeout)
plt.close()
def main():
batch_size = 6
num_threads = 1
img_width = 200
img_height = 200
queue_depth = 2
base_dir = os.environ['DATASET_DIR']
dir = base_dir + "/img_data/"
columns = 3
# Create MediaPipe object
pipe = myMediaPipe('mixed', dir, queue_depth, batch_size, num_threads,
img_height, img_width)
# Build MediaPipe
pipe.build()
# Initialize MediaPipe iterator
pipe.iter_init()
# Run MediaPipe
images, labels = pipe.run()
def as_cpu(tensor):
if (callable(getattr(tensor, "as_cpu", None))):
tensor = tensor.as_cpu()
return tensor
# Copy data to host from device as numpy array
images = as_cpu(images).as_nparray()
labels = as_cpu(labels).as_nparray()
del pipe
# Display images
display_images(images, labels, batch_size, columns)
if __name__ == "__main__":
main()
Decoded, Resized and Cropped Images 2
- 2
Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.
MediaPipe with CPU Operators Only¶
The following code snippet shows MediaPipe with CPU operations.
Note that all operators must be defined with device='cpu'
. The ‘cpu’ device value is also passed to the constructor.
from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import dtype as dt
import os
# Create MediaPipe derived class
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, num_threads, patch_size):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
num_threads,
self.__class__.__name__)
self.input = fn.ReadNumpyDatasetFromDir(num_outputs=1,
shuffle=False,
dir=dir,
pattern='case_*_x.npy',
dtype=dt.FLOAT32,
device='cpu')
self.crop_img = fn.Crop(crop_w=patch_size[0],
crop_h=patch_size[1],
crop_d=patch_size[2],
crop_pos_x=0.5,
crop_pos_y=0.5,
crop_pos_z=0.5,
dtype=dt.FLOAT32,
device='cpu')
def definegraph(self):
image = self.input()
image = self.crop_img(image)
return image
def main():
batch_size = 2
queue_depth = 1
num_threads = 1
patch_size = [160, 192, 64]
base_dir = os.environ['DATASET_DIR']
dir = base_dir + "/npy_data/fp32_4d/"
# Create MediaPipe object
pipe = myMediaPipe('cpu', dir, queue_depth,
batch_size, num_threads, patch_size)
# Build MediaPipe
pipe.build()
# Initialize MediaPipe iterator
pipe.iter_init()
# Run MediaPipe
image = pipe.run()
print(image.as_nparray().shape)
if __name__ == "__main__":
main()
MediaConst and MediaFunc¶
In addition to the above, the Media API allows for defining constant tensors using MediaConst. These tensors can then be used with operators such as CropMirrorNorm (as shown in the example). If tensors need to be created per batch, MediaFunc should be used. It allows, for example, to introduce random tensors to the operators sequence.
CPU-HPU Operators Ordering Limitation¶
All CPU operations must be performed prior to HPU operations.
Therefore, when implementing the sequence of media processing using definegraph
, operations need to be processed by CPU first.
If operations are processed on the HPU first, no operations performed on the CPU can be processed afterward.