habana_frameworks.mediapipe.fn.MediaExtReaderOp

Class:
  • habana_frameworks.mediapipe.fn.MediaExtReaderOp(**kwargs)

Define graph call:
  • __call__()

Parameter:
  • None

Description:

MediaExtReaderOp provides a way to integrate external data reader with MediaPipe.

External data reader must inherit an abstract base class habana_frameworks.mediapipe.operators.reader_nodes.reader_nodes.media_ext_reader_op_impl and provide its own implementations of all the abstract methods.

Output of external data reader should be list of file paths and list of corresponding metadata for every batch.

Supported backend:
  • CPU

Keyword Arguments:

kwargs

Description

impl

Class which inherits media_ext_reader_op_impl.

  • Type : class

  • Optional: no

  • Note: User should implement all the functions of abstract class media_ext_reader_op_impl.

num_outputs

Number of output numpy arrays returned by external reader.

  • Type : int

  • Optional: no

  • Note: For most common cases, if external reader is returning image list and label list then num_outputs=2.

seed

User implementing external reader can use this seed for randomization.

  • Type : int

  • Optional: yes

priv_params

Dictionary of parameters required by user defined external reader.

  • Type: dict

  • Optional: no

  • Note: These priv_params will be available in __init__ function of custom media reader as part of params.

class my_media_reader(media_ext_reader_op_impl):
    def __init__(self, params):
        my_params = params['priv_params']

media_ext_reader_op_impl

Abstract base class representing external reader node. External reader should inherit media_ext_reader_op_impl and implement below functions.

  • def __init__(self, params): Abstract constructor method. All parameters of custom reader should be initialized here. priv_params passed as arguments will be available as part of params in this function.

  • def __iter__(self): Abstract method to initialize iterator. It will be called before every epoch. Initialize / reset of iterator should be done here.

  • def __next__(self): Abstract method to get one batch of dataset output from iterator. It should return list of file paths and list of corresponding metadata. Raise StopIteration exception at end of complete dataset.

  • def __len__(self): Abstract method. It should return number of batches.

  • def set_params(self, params): Abstract setter method to set MediaPipe specific params in external reader. Generally reader needs batch_size from MediaPipe that is passed as part of input params. Input params are of type class media_ext_reader_op_params. It will be populated by MediaPipe and passed to reader.

  • def gen_output_info(self): Abstract method to generate output type information. It should return list of media_ext_reader_op_tensor_info type object for every output. Data type, shape and layout of every output must be specified using media_ext_reader_op_tensor_info type object.

  • def get_largest_file(self): Abstract method to get largest media in the dataset. It will be used to pre-allocate memory.

  • def get_media_output_type(self): MediaExtReaderOp can output a FILE_LIST (i.e. list of file names) or BUFFER_LIST (list of image/jpeg buffers) , It should return FILE_LIST or BUFFER_LIST defined in habana_frameworks.mediapipe.media_types.readerOutType.

class media_ext_reader_op_impl(ABC):
    """
    Abstract class representing external reader node.
    """
    @abstractmethod
    def __init__(self, params):
        """
        Abstract constructor method.
        :params params: private params of this node
        """
        pass

    @abstractmethod
    def __iter__(self):
        """
        Abstract method to initialize iterator. It will be called before every epoch.
        """
        pass

    @abstractmethod
    def __next__(self):
        """
        Abstract method to get one batch of dataset ouput from iterator.
        """
        pass

    @abstractmethod
    def __len__(self):
        """
        Abstract method to get dataset length.
        returns: length of dataset in units of batch_size.
        """
        pass

    @abstractmethod
    def set_params(self, params):
        """
        Abstract setter method to set mediapipe specific params.
        :params params: mediapipe params of type "media_ext_reader_op_params".
        """
        pass

    @abstractmethod
    def gen_output_info(self):
        """
        Abstract method to generate output type information.
        :returns : output tensor information of type "media_ext_reader_op_tensor_info".
        """
        pass

    @abstractmethod
    def get_largest_file(self):
        """
        Abstract method to get largest media in the dataset.
        """
        pass

    @abstractmethod
    def get_media_output_type(self):
        pass

media_ext_reader_op_params

Class defining param information sent to external reader op class. Object of this class will be passed to def set_params(self, params) function of external reader as params. As of now batch_size, set for entire mediapipe will be passed to external reader.

class media_ext_reader_op_params(object):
    """
    Class defining param information sent to external reader op class.
    """
    def __init__(self, batch_size):
        """
        Constructor method.
        :params batch_size: Batch size.
        """
        self.batch_size = batch_size

media_ext_reader_op_tensor_info

Class defining data type, shape, layout of returned numpy array by external reader.

class media_ext_reader_op_tensor_info(object):
    """
    Class defining return numpy tensor information of external cpu op class.
    """
    def __init__(self, dtype, shape, layout):
        """
        Constructor method.

        :params dtype: output data type.
        :params shape: output shape.
        :params layout: output layout.
        """
        self.dtype = dtype
        self.shape = shape
        self.layout = layout

readerOutType

External reader should return readerOutType.FILE_LIST or readerOutType.BUFFER_LIST from def get_media_output_type(self)

class readerOutType:
    """
    Class defining media reader output type.

    """
    FILE_LIST = 0
    BUFFER_LIST = 1

Example #1: Use of MediaExtReaderOp

Showing integration of external JPEG reader in MediaPipe. Sample implementation of below helping functions are provided:

  • def gen_image_list(dir, format): to generate list of images.

  • def gen_class_list(data_dir): to generate list of unique class labels.

  • def gen_label_list(file_list, class_names): to create list of integer labels for every image.

  • def get_max_file(img_list): to find path of max size image file.

User can have their own implementations of the above helping functions.

In below code, external_reader inherits abstract class media_ext_reader_op_impl and implements the following functions:

  • def __init__(self, params): external_reader is receiving “data_dir”, “format”, “metadata_dtype” as part of params['priv_params']. These priv_params are being set in __init__ function of myMediaPipe. external_reader. Also calls all above helping functions to initialize list of all the images, find maximum size image, initialize class label list.

  • def __iter__(self): This method will be called before every epoch, so resetting iter_loc to zero for creating batch from start of image & label list. Also shuffle image & label list if required. This function must return self.

  • def __next__(self): Take batch_size slice out off the image & label list and return.

  • def __len__(self): Return total number of batches.

  • def set_params(self, params): Here external_reader receives MediaPipe specific parameter i.e. batch_size and compute number of batches.

  • def gen_output_info(self): As external_reader is returning two things, img_list and lbl_list in __next__ method. So gen_output_info should return list of two media_ext_reader_op_tensor_info objects mentioning output data type, shape and layout of both outputs.

  • def get_largest_file(self): Return path of largest image file.

  • def get_media_output_type(self): As external_reader is returning list of image file names, so this function should return habana_frameworks.mediapipe.media_types.readerOutType.FILE_LIST

from habana_frameworks.mediapipe import fn
from habana_frameworks.mediapipe.mediapipe import MediaPipe
from habana_frameworks.mediapipe.media_types import dtype as dt
from habana_frameworks.mediapipe.media_types import imgtype as it
from habana_frameworks.mediapipe.media_types import readerOutType as ro
from habana_frameworks.mediapipe.operators.reader_nodes.reader_nodes import media_ext_reader_op_impl
from habana_frameworks.mediapipe.operators.reader_nodes.reader_nodes import media_ext_reader_op_tensor_info
import os
import glob
import time
import pathlib
import numpy as np
import matplotlib.pyplot as plt


g_display_timeout = os.getenv("DISPLAY_TIMEOUT") or 5

def gen_label_list(file_list, class_names):
    label_list = np.array([])
    idx = 0
    for f in file_list:
        cls_name = os.path.basename(os.path.dirname((f)))
        while (idx < len(class_names)):
            if not (cls_name == class_names[idx]):
                idx = idx + 1
            else:
                break
        label_list = np.append(label_list, idx)
    label_list = np.array(label_list, dtype=np.uint32)
    return label_list


def gen_class_list(data_dir):
    data_dir = pathlib.Path(data_dir)
    return np.array(sorted(
        [item.name for item in data_dir.glob('*') if item.is_dir() == True]))


def gen_image_list(dir, format):
    return np.array(sorted(glob.glob(dir + "/*/*."+format)))


def get_max_file(img_list):
    return max(img_list, key=lambda x: os.stat(x).st_size)


class external_reader(media_ext_reader_op_impl):
    def __init__(self, params, fw_params):
        params = params['priv_params']
        self.data_dir = params['dir']
        self.format = params['format']
        self.shuffle = params['enable_shuffle']
        self.metadata_dtype = params['label_dtype']
        self.img_list = gen_image_list(self.data_dir, self.format)
        self.max_file = get_max_file(self.img_list)
        self.class_list = gen_class_list(self.data_dir)
        self.lbl_list = gen_label_list(
            self.img_list, self.class_list)
        self.num_imgs = len(self.img_list)
        print("Total images/labels {} classes {}".format(self.num_imgs,
                                                        len(self.class_list)))
        self.shuffle_idx = np.arange(self.num_imgs)
        self.num_imgs_slice = self.num_imgs
        self.img_list_slice = self.img_list
        self.lbl_list_slice = self.lbl_list
        self.iter_loc = 0
        self.batch_size = fw_params.batch_size
        self.num_batches = int(self.num_imgs / self.batch_size)

    def __iter__(self):
        if (self.shuffle == True):
            np.random.shuffle(self.shuffle_idx)
            self.img_list_slice = self.img_list_slice[self.shuffle_idx]
            self.lbl_list_slice = self.lbl_list_slice[self.shuffle_idx]
        self.iter_loc = 0
        return self

    def __len__(self):
        return self.num_batches

    def __next__(self):
        if self.iter_loc > (self.num_imgs_slice - 1):
            raise StopIteration
        start = self.iter_loc
        end = self.iter_loc + self.batch_size
        img_list = self.img_list_slice[start:end]
        lbl_list = self.lbl_list_slice[start:end]
        self.iter_loc = self.iter_loc + self.batch_size
        return img_list, lbl_list

    def get_media_output_type(self):
        return ro.FILE_LIST

    def get_largest_file(self):
        print("max_file:", self.max_file)
        return self.max_file

    def gen_output_info(self):
        out_info = []
        o = media_ext_reader_op_tensor_info(
            dt.NDT, np.array([self.batch_size], dtype=np.uint32), "")
        out_info.append(o)
        o = media_ext_reader_op_tensor_info(
            self.metadata_dtype, np.array([self.batch_size], dtype=np.uint32), "")
        out_info.append(o)
        return out_info


class myMediaPipe(MediaPipe):
    def __init__(self, device, queue_depth, batch_size,
                num_threads, op_device, dir,
                img_height, img_width):
        super(
            myMediaPipe,
            self).__init__(
            device,
            queue_depth,
            batch_size,
            num_threads,
            self.__class__.__name__)
        mediapipe_seed = int(time.time_ns() % (2**31 - 1))
        priv_params = {}
        priv_params['dir'] = dir
        priv_params['format'] = "jpg"
        priv_params['enable_shuffle'] = False
        priv_params['label_dtype'] = dt.UINT32

        self.input = fn.MediaExtReaderOp(impl=external_reader,
                                        num_outputs=2,
                                        priv_params=priv_params,
                                        seed=mediapipe_seed,
                                        device=op_device)
        self.decode = fn.ImageDecoder(
            device="hpu", output_format=it.RGB_I, resize=[img_width, img_height], dtype=dt.UINT8)

    def definegraph(self):
        images, labels = self.input()
        images = self.decode(images)
        return images, labels


def display_images(images, labels, batch_size, cols):
    rows = (batch_size + 1) // cols
    plt.figure(figsize=(10, 10))
    for i in range(batch_size):
        ax = plt.subplot(rows, cols, i + 1)
        plt.imshow(images[i])
        plt.title("label:"+str(labels[i]))
        plt.axis("off")
    plt.show(block=False)
    plt.pause(g_display_timeout)
    plt.close()


def run(device, op_device):
    batch_size = 6
    queue_depth = 2
    num_threads = 1
    img_width = 200
    img_height = 200
    base_dir = os.environ['DATASET_DIR']
    dir = base_dir + "/img_data/"
    pipe = myMediaPipe(device, queue_depth, batch_size,
                    num_threads, op_device, dir,
                    img_height, img_width)
    pipe.build()
    pipe.iter_init()
    pipe.iter_init()
    bcnt = 0
    while (bcnt < 1):
        try:
            images, labels = pipe.run()
        except StopIteration:
            break
        images = images.as_cpu().as_nparray()
        labels = labels.as_cpu().as_nparray()
        print(images.shape)
        display_images(images, labels, batch_size, cols=3)
        bcnt = bcnt + 1


if __name__ == "__main__":
    dev_opdev = {'legacy': ['cpu']}

    for dev in dev_opdev.keys():
        for op_dev in dev_opdev[dev]:
            run(dev, op_dev)

Example #1: Output Images 1

label 0 label 0
label 0 label 0
label 0 label 0
label 1 label 1
label 1 label 1
label 1 label 1
1

Licensed under a CC BY SA 4.0 license. The images used here are taken from https://data.caltech.edu/records/mzrjq-6wc02.