Source code for detectron2.utils.video_visualizer

# Copyright (c) Facebook, Inc. and its affiliates.
import numpy as np
from typing import List
import pycocotools.mask as mask_util

from detectron2.structures import Instances
from detectron2.utils.visualizer import (
    ColorMode,
    Visualizer,
    _create_text_labels,
    _PanopticPrediction,
)

from .colormap import random_color, random_colors


class _DetectedInstance:
    """
    Used to store data about detected objects in video frame,
    in order to transfer color to objects in the future frames.

    Attributes:
        label (int):
        bbox (tuple[float]):
        mask_rle (dict):
        color (tuple[float]): RGB colors in range (0, 1)
        ttl (int): time-to-live for the instance. For example, if ttl=2,
            the instance color can be transferred to objects in the next two frames.
    """

    __slots__ = ["label", "bbox", "mask_rle", "color", "ttl"]

    def __init__(self, label, bbox, mask_rle, color, ttl):
        self.label = label
        self.bbox = bbox
        self.mask_rle = mask_rle
        self.color = color
        self.ttl = ttl


[docs]class VideoVisualizer:
[docs] def __init__(self, metadata, instance_mode=ColorMode.IMAGE): """ Args: metadata (MetadataCatalog): image metadata. """ self.metadata = metadata self._old_instances = [] assert instance_mode in [ ColorMode.IMAGE, ColorMode.IMAGE_BW, ], "Other mode not supported yet." self._instance_mode = instance_mode self._max_num_instances = self.metadata.get("max_num_instances", 74) self._assigned_colors = {} self._color_pool = random_colors(self._max_num_instances, rgb=True, maximum=1) self._color_idx_set = set(range(len(self._color_pool)))
[docs] def draw_instance_predictions(self, frame, predictions): """ Draw instance-level prediction results on an image. Args: frame (ndarray): an RGB image of shape (H, W, C), in the range [0, 255]. predictions (Instances): the output of an instance detection/segmentation model. Following fields will be used to draw: "pred_boxes", "pred_classes", "scores", "pred_masks" (or "pred_masks_rle"). Returns: output (VisImage): image object with visualizations. """ frame_visualizer = Visualizer(frame, self.metadata) num_instances = len(predictions) if num_instances == 0: return frame_visualizer.output boxes = predictions.pred_boxes.tensor.numpy() if predictions.has("pred_boxes") else None scores = predictions.scores if predictions.has("scores") else None classes = predictions.pred_classes.numpy() if predictions.has("pred_classes") else None keypoints = predictions.pred_keypoints if predictions.has("pred_keypoints") else None colors = predictions.COLOR if predictions.has("COLOR") else [None] * len(predictions) periods = predictions.ID_period if predictions.has("ID_period") else None period_threshold = self.metadata.get("period_threshold", 0) visibilities = ( [True] * len(predictions) if periods is None else [x > period_threshold for x in periods] ) if predictions.has("pred_masks"): masks = predictions.pred_masks # mask IOU is not yet enabled # masks_rles = mask_util.encode(np.asarray(masks.permute(1, 2, 0), order="F")) # assert len(masks_rles) == num_instances else: masks = None if not predictions.has("COLOR"): if predictions.has("ID"): colors = self._assign_colors_by_id(predictions) else: # ToDo: clean old assign color method and use a default tracker to assign id detected = [ _DetectedInstance(classes[i], boxes[i], mask_rle=None, color=colors[i], ttl=8) for i in range(num_instances) ] colors = self._assign_colors(detected) labels = _create_text_labels(classes, scores, self.metadata.get("thing_classes", None)) if self._instance_mode == ColorMode.IMAGE_BW: # any() returns uint8 tensor frame_visualizer.output.reset_image( frame_visualizer._create_grayscale_image( (masks.any(dim=0) > 0).numpy() if masks is not None else None ) ) alpha = 0.3 else: alpha = 0.5 labels = ( None if labels is None else [y[0] for y in filter(lambda x: x[1], zip(labels, visibilities))] ) # noqa assigned_colors = ( None if colors is None else [y[0] for y in filter(lambda x: x[1], zip(colors, visibilities))] ) # noqa frame_visualizer.overlay_instances( boxes=None if masks is not None else boxes[visibilities], # boxes are a bit distracting masks=None if masks is None else masks[visibilities], labels=labels, keypoints=None if keypoints is None else keypoints[visibilities], assigned_colors=assigned_colors, alpha=alpha, ) return frame_visualizer.output
[docs] def draw_sem_seg(self, frame, sem_seg, area_threshold=None): """ Args: sem_seg (ndarray or Tensor): semantic segmentation of shape (H, W), each value is the integer label. area_threshold (Optional[int]): only draw segmentations larger than the threshold """ # don't need to do anything special frame_visualizer = Visualizer(frame, self.metadata) frame_visualizer.draw_sem_seg(sem_seg, area_threshold=None) return frame_visualizer.output
def draw_panoptic_seg_predictions( self, frame, panoptic_seg, segments_info, area_threshold=None, alpha=0.5 ): frame_visualizer = Visualizer(frame, self.metadata) pred = _PanopticPrediction(panoptic_seg, segments_info, self.metadata) if self._instance_mode == ColorMode.IMAGE_BW: frame_visualizer.output.reset_image( frame_visualizer._create_grayscale_image(pred.non_empty_mask()) ) # draw mask for all semantic segments first i.e. "stuff" for mask, sinfo in pred.semantic_masks(): category_idx = sinfo["category_id"] try: mask_color = [x / 255 for x in self.metadata.stuff_colors[category_idx]] except AttributeError: mask_color = None frame_visualizer.draw_binary_mask( mask, color=mask_color, text=self.metadata.stuff_classes[category_idx], alpha=alpha, area_threshold=area_threshold, ) all_instances = list(pred.instance_masks()) if len(all_instances) == 0: return frame_visualizer.output # draw mask for all instances second masks, sinfo = list(zip(*all_instances)) num_instances = len(masks) masks_rles = mask_util.encode( np.asarray(np.asarray(masks).transpose(1, 2, 0), dtype=np.uint8, order="F") ) assert len(masks_rles) == num_instances category_ids = [x["category_id"] for x in sinfo] detected = [ _DetectedInstance(category_ids[i], bbox=None, mask_rle=masks_rles[i], color=None, ttl=8) for i in range(num_instances) ] colors = self._assign_colors(detected) labels = [self.metadata.thing_classes[k] for k in category_ids] frame_visualizer.overlay_instances( boxes=None, masks=masks, labels=labels, keypoints=None, assigned_colors=colors, alpha=alpha, ) return frame_visualizer.output def _assign_colors(self, instances): """ Naive tracking heuristics to assign same color to the same instance, will update the internal state of tracked instances. Returns: list[tuple[float]]: list of colors. """ # Compute iou with either boxes or masks: is_crowd = np.zeros((len(instances),), dtype=bool) if instances[0].bbox is None: assert instances[0].mask_rle is not None # use mask iou only when box iou is None # because box seems good enough rles_old = [x.mask_rle for x in self._old_instances] rles_new = [x.mask_rle for x in instances] ious = mask_util.iou(rles_old, rles_new, is_crowd) threshold = 0.5 else: boxes_old = [x.bbox for x in self._old_instances] boxes_new = [x.bbox for x in instances] ious = mask_util.iou(boxes_old, boxes_new, is_crowd) threshold = 0.6 if len(ious) == 0: ious = np.zeros((len(self._old_instances), len(instances)), dtype="float32") # Only allow matching instances of the same label: for old_idx, old in enumerate(self._old_instances): for new_idx, new in enumerate(instances): if old.label != new.label: ious[old_idx, new_idx] = 0 matched_new_per_old = np.asarray(ious).argmax(axis=1) max_iou_per_old = np.asarray(ious).max(axis=1) # Try to find match for each old instance: extra_instances = [] for idx, inst in enumerate(self._old_instances): if max_iou_per_old[idx] > threshold: newidx = matched_new_per_old[idx] if instances[newidx].color is None: instances[newidx].color = inst.color continue # If an old instance does not match any new instances, # keep it for the next frame in case it is just missed by the detector inst.ttl -= 1 if inst.ttl > 0: extra_instances.append(inst) # Assign random color to newly-detected instances: for inst in instances: if inst.color is None: inst.color = random_color(rgb=True, maximum=1) self._old_instances = instances[:] + extra_instances return [d.color for d in instances] def _assign_colors_by_id(self, instances: Instances) -> List: colors = [] untracked_ids = set(self._assigned_colors.keys()) for id in instances.ID: if id in self._assigned_colors: colors.append(self._color_pool[self._assigned_colors[id]]) untracked_ids.remove(id) else: assert ( len(self._color_idx_set) >= 1 ), f"Number of id exceeded maximum, \ max = {self._max_num_instances}" idx = self._color_idx_set.pop() color = self._color_pool[idx] self._assigned_colors[id] = idx colors.append(color) for id in untracked_ids: self._color_idx_set.add(self._assigned_colors[id]) del self._assigned_colors[id] return colors