Creating and Executing Media Pipeline
On this Page
Creating and Executing Media Pipeline¶
This section describes the steps required to create and execute a media processing pipeline. The steps include:
Creating a derived class from
MediaPipe
e.gmyMediaPipe
and initializing the super class argumentsdevice
,prefetch_depth
,batch_size
,pipe_name
.Creating operators required in the pipeline along with their parameters. All supported operators are listed in Operators table.
Defining the sequence of media processing in
definegraph()
method.Building and initializing the pipeline.
Executing the pipeline.
It also provides a Media Pipeline Example Using HPU Device with the above steps implemented.
Note
Before you Start: Make sure to set up your environment as shown in the Installation Guide.
Creating New Pipeline¶
The Habana framework provides a MediaPipe
class which includes necessary functions for defining, building and running a Media
data processing pipeline.
You can define a new pipeline (for example myMediaPipe
) by:
Creating a derived class from
MediaPipe
.Passing the following parameters to the base class constructor:
Keyword Arguments:
kwargs |
Description |
---|---|
device |
Device on which media pipe is to be executed. Should be set to ‘hpu’ or ‘cpu’.
|
prefetch_depth |
Input batch queue depth for circular buffering. Controls how many image batches can be preloaded by Media Pipe.
|
batch_size |
Batch size for each media pipeline execution. This should be aligned to the number of examples in a batch on one worker for a neural network model.
|
pipe_name |
Media pipeline name used to identify specific pipe in the logs.
|
The below is an example of a derived class constructor. Note that in the below example additional parameters which are not a part of the media pipe base class were added to the code:
dir
, channel
, height
and width
.
These parameters are used when defining operators specific to myMediaPipe
implementation.
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, channel, height, width):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
self.__class__.__name__)
# Create object of media pipe
pipe = myMediaPipe('hpu', dir, queue_depth, batch_size,
channels, height, width)
Supported Device Type Per Operator¶
myMediaPipe
supports a number of operators implemented either on HPU or CPU. Some operators can be implemented on both HPU and CPU, therefore, allowing you to select which device to run on. See Operators for a full list of operators and supported devices.
When defining myMediaPipe
, make sure to the follow the below guidelines:
For operators implemented on HPU only, set
device='hpu'
.For operators implemented on HPU or CPU, set
device='hpu'
.For operators implemented on CPU only, set
device='cpu'
. When usingdevice='cpu'
, only operators which use CPU are supported anddevice
for those operators must also be set to ‘cpu’. See Media Pipeline with CPU Operators Only for example.
Note that all CPU operations must be performed prior to HPU operations. See CPU-HPU Operators Ordering Limitation for further details.
Defining Operators and Processing Sequence¶
The media pipeline is constructed using operators. All operators and their parameter options are available here.
First, a set of operators needs to be created in the pipeline along with their parameters e.g. ReadImageDatasetFromDir
, ImageDecoder
, Crop
.
Such operations include reading and decoding data and an optional set of operations such as image cropping or flipping can be augmented.
After creating the operators, define the sequence of media processing in definegraph()
method.
Internally, MediaPipe
converts operators to nodes and builds graphs according to the specification of definegraph
.
The graph is then executed on input batches performing the required operations.
The below example shows the derived class constructor, the set of operators used and the sequence of media processing defined in definegraph()
method:
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, img_h, img_w):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
self.__class__.__name__)
self.input = fn.ReadImageDatasetFromDir(shuffle=False,
dir=dir,
format="jpg")
self.decode = fn.ImageDecoder(device="hpu",
output_format=it.RGB_I,
resize=[img_w, img_h])
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
return images, labels
Building and Initializing the Pipeline¶
Once the class is defined with the required nodes and sequence of media processing (definegraph
), build()
and iter_init()
methods should be called on object of
the class for building the media pipe and initializing the iterator. The iterator is an internal part of the media pipe class which retrieves a sequence of data batches.
Executing the Pipeline¶
To produce an output from media pipeline, the run()
method should be called on the class object. The run()
method gives an output for a batch of data,
already defined in the returned data given in the definegraph()
method.
After the processing is complete, the pipeline data is present on the device. To view or manipulate tensors on the host, as_cpu()
method of
device tensor object can be called, which gives a host tensor object. For numpy manipulation, as_nparray()
method of host tensor object can
be called to get a numpy host array.
As shown below, the defined graph returns images and labels which are also the output of pipe.run()
.
Please note that image batch is not necessarily prepared at the time of calling pipe.run()
.
Depending on the value of prefetch_depth
(passed to MediaPipe constructor),
a given number of batches will be prepared ahead of time.
Media Pipeline Example Using HPU Device¶
The following example shows how to create a media pipeline with image decode operation and resizing these images to a fixed height and width after decoding.
fn.ReadImageDatasetFromDir
reads jpeg files and labels from a given directoryfn.ImageDecoder
decodes and resizes jpeg images
from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import imgtype as it
import matplotlib.pyplot as plt
# Create media pipeline derived class
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, img_h, img_w):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
self.__class__.__name__)
self.input = fn.ReadImageDatasetFromDir(shuffle=False,
dir=dir,
format="jpg")
self.decode = fn.ImageDecoder(device="hpu",
output_format=it.RGB_I,
resize=[img_w, img_h])
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
return images, labels
def display_images(images, batch_size, cols):
rows = (batch_size + 1) // cols
plt.figure(figsize=(10, 10))
for i in range(batch_size):
ax = plt.subplot(rows, cols, i + 1)
plt.imshow(images[i])
plt.axis("off")
plt.show()
def main():
batch_size = 6
img_width = 200
img_height = 200
img_dir = "/path/to/images"
queue_depth = 2
columns = 3
# Create media pipeline object
pipe = myMediaPipe('hpu', img_dir, queue_depth, batch_size,
img_height, img_width)
# Build media pipeline
pipe.build()
# Initialize media pipeline iterator
pipe.iter_init()
# Run media pipeline
images, labels = pipe.run()
# Copy images to host from device as numpy array
images = images.as_cpu().as_nparray()
labels = labels.as_cpu().as_nparray()
# Display images
display_images(images, batch_size, columns)
if __name__ == "__main__":
main()
Decoded and Resized Images 1
- 1
Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.
Adding Crop as Augmentation Operators¶
The following code snippet shows augmenting the decoded images with crop operation:
from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import imgtype as it
from habana_frameworks.mediapipe.media_types import dtype as dt
import matplotlib.pyplot as plt
class myMediaPipe(MediaPipe):
def __init__(self, device, dir, queue_depth, batch_size, img_h, img_w):
super(
myMediaPipe,
self).__init__(
device,
queue_depth,
batch_size,
self.__class__.__name__)
self.input = fn.ReadImageDatasetFromDir(shuffle=False,
dir=dir,
format="jpg")
self.decode = fn.ImageDecoder(device="hpu",
output_format=it.RGB_I,
resize=[img_w, img_h])
self.crop = fn.Crop(crop_w=150,
crop_h=150,
dtype=dt.UINT8)
def definegraph(self):
images, labels = self.input()
images = self.decode(images)
images = self.crop(images)
return images, labels
def display_images(images, batch_size, cols):
rows = (batch_size + 1) // cols
plt.figure(figsize=(10, 10))
for i in range(batch_size):
ax = plt.subplot(rows, cols, i + 1)
plt.imshow(images[i])
plt.axis("off")
plt.show()
def main():
batch_size = 6
img_width = 200
img_height = 200
img_dir = "/path/to/images"
queue_depth = 2
columns = 3
# Create media pipeline object
pipe = myMediaPipe('hpu', img_dir, queue_depth, batch_size,
img_height, img_width)
# Build media pipeline
pipe.build()
# Initialize media pipeline iterator
pipe.iter_init()
# Run media pipeline
images, labels = pipe.run()
# Copy images to host from device as numpy array
images = images.as_cpu().as_nparray()
labels = labels.as_cpu().as_nparray()
# Display images
display_images(images, batch_size, columns)
if __name__ == "__main__":
main()
Decoded, Resized and Cropped Images 2
- 2
Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.
Media Pipeline with CPU Operators Only¶
The following code snippet shows media pipeline with CPU operations.
Note that all operators must be defined with device='cpu'
. The ‘cpu’ device value is also passed to the constructor.
from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import dtype as dt
# Create media pipeline derived class
class myMediaPipe(MediaPipe):
def __init__(self, dir, queue_depth, batch_size, num_threads, patch_size):
super(
myMediaPipe,
self).__init__(
'cpu',
queue_depth,
batch_size,
num_threads,
self.__class__.__name__)
self.input = fn.ReadNumpyDatasetFromDir(num_outputs=1,
shuffle=False,
dir=dir,
pattern='case_*_x.npy',
dtype=dt.FLOAT32,
device='cpu')
self.crop_img = fn.Crop(crop_w=patch_size[0],
crop_h=patch_size[1],
crop_d=patch_size[2],
crop_pos_x=0.5,
crop_pos_y=0.5,
crop_pos_z=0.5,
dtype=dt.FLOAT32,
device='cpu')
def definegraph(self):
image = self.input()
image = self.crop_img(image)
return image
def main():
batch_size = 2
img_dir = '/path/to/numpy/files/'
queue_depth = 1
num_threads = 1
patch_size = [160, 192, 64]
# Create media pipeline object
pipe = myMediaPipe(img_dir, queue_depth,
batch_size, num_threads, patch_size)
# Build media pipeline
pipe.build()
# Initialize media pipeline iterator
pipe.iter_init()
# Run media pipeline
image = pipe.run()
if __name__ == "__main__":
main()
MediaConst and MediaFunc¶
In addition, Media API allows for defining constant tensors using MediaConst. Those tensors can then be used with operators such CropMirrorNorm (as shown in the example). If tensors need to be created per batch, MediaFunc should be used. It allows, for example, to introduce random tensors to the operators sequence.
CPU-HPU Operators Ordering Limitation¶
All CPU operations must be performed prior to HPU operations.
Therefore, when implementing the sequence of media processing using definegraph
, operations need to be processed by CPU first.
If operations are processed on HPU first, no operations performed on CPU can be processed after.