Source code for detectron2.evaluation.panoptic_evaluation

# Copyright (c) Facebook, Inc. and its affiliates.
import contextlib
import io
import itertools
import json
import logging
import numpy as np
import os
import tempfile
from collections import OrderedDict
from typing import Optional
from PIL import Image
from tabulate import tabulate

from detectron2.data import MetadataCatalog
from detectron2.utils import comm
from detectron2.utils.file_io import PathManager

from .evaluator import DatasetEvaluator

logger = logging.getLogger(__name__)


[docs]class COCOPanopticEvaluator(DatasetEvaluator): """ Evaluate Panoptic Quality metrics on COCO using PanopticAPI. It saves panoptic segmentation prediction in `output_dir` It contains a synchronize call and has to be called from all workers. """
[docs] def __init__(self, dataset_name: str, output_dir: Optional[str] = None): """ Args: dataset_name: name of the dataset output_dir: output directory to save results for evaluation. """ self._metadata = MetadataCatalog.get(dataset_name) self._thing_contiguous_id_to_dataset_id = { v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items() } self._stuff_contiguous_id_to_dataset_id = { v: k for k, v in self._metadata.stuff_dataset_id_to_contiguous_id.items() } self._output_dir = output_dir if self._output_dir is not None: PathManager.mkdirs(self._output_dir)
[docs] def reset(self): self._predictions = []
def _convert_category_id(self, segment_info): isthing = segment_info.pop("isthing", None) if isthing is None: # the model produces panoptic category id directly. No more conversion needed return segment_info if isthing is True: segment_info["category_id"] = self._thing_contiguous_id_to_dataset_id[ segment_info["category_id"] ] else: segment_info["category_id"] = self._stuff_contiguous_id_to_dataset_id[ segment_info["category_id"] ] return segment_info
[docs] def process(self, inputs, outputs): from panopticapi.utils import id2rgb for input, output in zip(inputs, outputs): panoptic_img, segments_info = output["panoptic_seg"] panoptic_img = panoptic_img.cpu().numpy() if segments_info is None: # If "segments_info" is None, we assume "panoptic_img" is a # H*W int32 image storing the panoptic_id in the format of # category_id * label_divisor + instance_id. We reserve -1 for # VOID label, and add 1 to panoptic_img since the official # evaluation script uses 0 for VOID label. label_divisor = self._metadata.label_divisor segments_info = [] for panoptic_label in np.unique(panoptic_img): if panoptic_label == -1: # VOID region. continue pred_class = panoptic_label // label_divisor isthing = ( pred_class in self._metadata.thing_dataset_id_to_contiguous_id.values() ) segments_info.append( { "id": int(panoptic_label) + 1, "category_id": int(pred_class), "isthing": bool(isthing), } ) # Official evaluation script uses 0 for VOID label. panoptic_img += 1 file_name = os.path.basename(input["file_name"]) file_name_png = os.path.splitext(file_name)[0] + ".png" with io.BytesIO() as out: Image.fromarray(id2rgb(panoptic_img)).save(out, format="PNG") segments_info = [self._convert_category_id(x) for x in segments_info] self._predictions.append( { "image_id": input["image_id"], "file_name": file_name_png, "png_string": out.getvalue(), "segments_info": segments_info, } )
[docs] def evaluate(self): comm.synchronize() self._predictions = comm.gather(self._predictions) self._predictions = list(itertools.chain(*self._predictions)) if not comm.is_main_process(): return # PanopticApi requires local files gt_json = PathManager.get_local_path(self._metadata.panoptic_json) gt_folder = PathManager.get_local_path(self._metadata.panoptic_root) with tempfile.TemporaryDirectory(prefix="panoptic_eval") as pred_dir: logger.info("Writing all panoptic predictions to {} ...".format(pred_dir)) for p in self._predictions: with open(os.path.join(pred_dir, p["file_name"]), "wb") as f: f.write(p.pop("png_string")) with open(gt_json, "r") as f: json_data = json.load(f) json_data["annotations"] = self._predictions output_dir = self._output_dir or pred_dir predictions_json = os.path.join(output_dir, "predictions.json") with PathManager.open(predictions_json, "w") as f: f.write(json.dumps(json_data)) from panopticapi.evaluation import pq_compute with contextlib.redirect_stdout(io.StringIO()): pq_res = pq_compute( gt_json, PathManager.get_local_path(predictions_json), gt_folder=gt_folder, pred_folder=pred_dir, ) res = {} res["PQ"] = 100 * pq_res["All"]["pq"] res["SQ"] = 100 * pq_res["All"]["sq"] res["RQ"] = 100 * pq_res["All"]["rq"] res["PQ_th"] = 100 * pq_res["Things"]["pq"] res["SQ_th"] = 100 * pq_res["Things"]["sq"] res["RQ_th"] = 100 * pq_res["Things"]["rq"] res["PQ_st"] = 100 * pq_res["Stuff"]["pq"] res["SQ_st"] = 100 * pq_res["Stuff"]["sq"] res["RQ_st"] = 100 * pq_res["Stuff"]["rq"] results = OrderedDict({"panoptic_seg": res}) _print_panoptic_results(pq_res) return results
def _print_panoptic_results(pq_res): headers = ["", "PQ", "SQ", "RQ", "#categories"] data = [] for name in ["All", "Things", "Stuff"]: row = [name] + [pq_res[name][k] * 100 for k in ["pq", "sq", "rq"]] + [pq_res[name]["n"]] data.append(row) table = tabulate( data, headers=headers, tablefmt="pipe", floatfmt=".3f", stralign="center", numalign="center" ) logger.info("Panoptic Evaluation Results:\n" + table) if __name__ == "__main__": from detectron2.utils.logger import setup_logger logger = setup_logger() import argparse parser = argparse.ArgumentParser() parser.add_argument("--gt-json") parser.add_argument("--gt-dir") parser.add_argument("--pred-json") parser.add_argument("--pred-dir") args = parser.parse_args() from panopticapi.evaluation import pq_compute with contextlib.redirect_stdout(io.StringIO()): pq_res = pq_compute( args.gt_json, args.pred_json, gt_folder=args.gt_dir, pred_folder=args.pred_dir ) _print_panoptic_results(pq_res)