Source code for pyEdgeEval.edge_tools.mask2edge_mp

#!/usr/bin/env python3

"""
NOTE: not much speed ups probably because allocating memory is slower than
the gain from multiprocessing

Might be beneficial when each worker has more things to do...

TODO:
- add `thin`
"""

from multiprocessing import Pool, RawArray

import numpy as np

from pyEdgeEval.utils import mask2bdry

__all__ = [
    "mp_mask2edge",
    "mp_instance_mask2edge",
]

# HACK: A global dictionary storing the variables passed from the initializer.
var_dict = {}


def _init_worker(data_dict):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.

    for k, v in data_dict.items():
        var_dict[k] = v


def _worker_func(label):
    mask_shape = var_dict["mask_shape"]
    mask_dtype = var_dict["mask_dtype"]
    edge_shape = var_dict["edge_shape"]
    edge_dtype = var_dict["edge_dtype"]
    ignore_mask_shape = var_dict["ignore_mask_shape"]
    ignore_mask_dtype = var_dict["ignore_mask_dtype"]
    ignore_indices = var_dict["ignore_indices"]
    mask2bdry_kwargs = var_dict["mask2bdry_kwargs"]

    mask = np.frombuffer(var_dict["mask"], dtype=mask_dtype).reshape(mask_shape)
    ignore_mask = np.frombuffer(
        var_dict["ignore_mask"], dtype=ignore_mask_dtype
    ).reshape(ignore_mask_shape)
    edge = np.frombuffer(var_dict["edge"], dtype=edge_dtype).reshape(edge_shape)

    mask = mask[label]

    if label in ignore_indices:
        return None

    # if there are no class labels in the mask
    if not np.count_nonzero(mask):
        return None

    edge[label] = mask2bdry(
        mask=mask,
        ignore_mask=ignore_mask,
        **mask2bdry_kwargs,
    )
    return None


[docs]def mp_mask2edge( mask, ignore_indices, nproc, radius, thin=False, use_cv2=True, quality=0, ): """mask2edge multiprocessing""" if thin: raise NotImplementedError( "thin=True has not been implemented for this function." ) global var_dict assert mask.ndim == 3 num_labels, h, w = mask.shape # make ignore mask ignore_mask = np.zeros((h, w), dtype=np.uint8) for i in ignore_indices: ignore_mask += mask[i] mask2bdry_kwargs = dict( radius=radius, use_cv2=use_cv2, quality=quality, ) edge = np.zeros_like(mask) # create shared arrays mp_mask = RawArray(np.ctypeslib.as_ctypes_type(mask.dtype), mask.size) mp_edge = RawArray(np.ctypeslib.as_ctypes_type(edge.dtype), edge.size) mp_ignore_mask = RawArray( np.ctypeslib.as_ctypes_type(ignore_mask.dtype), ignore_mask.size ) # wrap arrays as an numpy array so we can easily manipulate data np_mask = np.frombuffer(mp_mask, dtype=mask.dtype).reshape(mask.shape) np_edge = np.frombuffer(mp_edge, dtype=edge.dtype).reshape(edge.shape) np_ignore_mask = np.frombuffer( mp_ignore_mask, dtype=ignore_mask.dtype ).reshape(ignore_mask.shape) # copy data to wrapped shared arrays np.copyto(np_mask, mask) np.copyto(np_edge, edge) np.copyto(np_ignore_mask, ignore_mask) args = dict( mask=mp_mask, mask_dtype=mask.dtype, mask_shape=mask.shape, edge=mp_edge, edge_dtype=edge.dtype, edge_shape=edge.shape, ignore_mask=mp_ignore_mask, ignore_mask_dtype=ignore_mask.dtype, ignore_mask_shape=ignore_mask.shape, ignore_indices=ignore_indices, mask2bdry_kwargs=mask2bdry_kwargs, ) with Pool( processes=nproc, initializer=_init_worker, initargs=[args] ) as pool: # order doesn't really matter since we specify which index to compute pool.map(_worker_func, range(num_labels)) edge = np.frombuffer(mp_edge, dtype=edge.dtype).reshape(edge.shape) # HACK: empty global dictionary var_dict = {} return edge
def _instance_worker_func(label): mask_shape = var_dict["mask_shape"] mask_dtype = var_dict["mask_dtype"] inst_mask_shape = var_dict["inst_mask_shape"] inst_mask_dtype = var_dict["inst_mask_dtype"] edge_shape = var_dict["edge_shape"] edge_dtype = var_dict["edge_dtype"] ignore_mask_shape = var_dict["ignore_mask_shape"] ignore_mask_dtype = var_dict["ignore_mask_dtype"] ignore_indices = var_dict["ignore_indices"] label_inst = var_dict["label_inst"] mask2bdry_kwargs = var_dict["mask2bdry_kwargs"] mask = np.frombuffer(var_dict["mask"], dtype=mask_dtype).reshape(mask_shape) inst_mask = np.frombuffer( var_dict["inst_mask"], dtype=inst_mask_dtype ).reshape(inst_mask_shape) ignore_mask = np.frombuffer( var_dict["ignore_mask"], dtype=ignore_mask_dtype ).reshape(ignore_mask_shape) edge = np.frombuffer(var_dict["edge"], dtype=edge_dtype).reshape(edge_shape) mask = mask[label] if label in ignore_indices: return None # if there are no class labels in the mask if not np.count_nonzero(mask): return None dist = mask2bdry( mask=mask, ignore_mask=ignore_mask, **mask2bdry_kwargs, ) # per instance boundaries if label in label_inst.keys(): instances = label_inst[label] # list for instance in instances: iid = int(str(label) + str(instance).zfill(3)) _mask = inst_mask == iid dist = dist | mask2bdry( mask=_mask, ignore_mask=ignore_mask, **mask2bdry_kwargs, ) # set edge edge[label] = dist return None
[docs]def mp_instance_mask2edge( mask, inst_mask, inst_labelIds, ignore_indices, nproc, radius, thin=False, use_cv2=True, quality=0, ): """mask2edge multiprocessing""" if thin: raise NotImplementedError( "thin=True has not been implemented for this function." ) global var_dict assert mask.ndim == 3 num_labels, h, w = mask.shape # make ignore mask ignore_mask = np.zeros((h, w), dtype=np.uint8) for i in ignore_indices: ignore_mask += mask[i] # make sure that instance labels are sorted inst_labels = sorted(inst_labelIds) # create a lookup dictionary {label: [instances]} label_inst = {} _cand_insts = np.unique(inst_mask) for inst_label in _cand_insts: if inst_label < inst_labels[0] * 1000: # 24000 continue _label = int(str(inst_label)[:2]) _inst = int(str(inst_label)[2:]) if _label not in label_inst.keys(): label_inst[_label] = [_inst] else: label_inst[_label].append(_inst) mask2bdry_kwargs = dict( radius=radius, use_cv2=use_cv2, quality=quality, ) edge = np.zeros_like(mask) # create shared arrays mp_mask = RawArray(np.ctypeslib.as_ctypes_type(mask.dtype), mask.size) mp_inst_mask = RawArray( np.ctypeslib.as_ctypes_type(inst_mask.dtype), inst_mask.size ) mp_edge = RawArray(np.ctypeslib.as_ctypes_type(edge.dtype), edge.size) mp_ignore_mask = RawArray( np.ctypeslib.as_ctypes_type(ignore_mask.dtype), ignore_mask.size ) # wrap arrays as an numpy array so we can easily manipulate data np_mask = np.frombuffer(mp_mask, dtype=mask.dtype).reshape(mask.shape) np_inst_mask = np.frombuffer(mp_inst_mask, dtype=inst_mask.dtype).reshape( inst_mask.shape ) np_edge = np.frombuffer(mp_edge, dtype=edge.dtype).reshape(edge.shape) np_ignore_mask = np.frombuffer( mp_ignore_mask, dtype=ignore_mask.dtype ).reshape(ignore_mask.shape) # copy data to wrapped shared arrays np.copyto(np_mask, mask) np.copyto(np_inst_mask, inst_mask) np.copyto(np_edge, edge) np.copyto(np_ignore_mask, ignore_mask) args = dict( mask=mp_mask, mask_dtype=mask.dtype, mask_shape=mask.shape, inst_mask=mp_inst_mask, inst_mask_dtype=inst_mask.dtype, inst_mask_shape=inst_mask.shape, edge=mp_edge, edge_dtype=edge.dtype, edge_shape=edge.shape, ignore_mask=mp_ignore_mask, ignore_mask_dtype=ignore_mask.dtype, ignore_mask_shape=ignore_mask.shape, ignore_indices=ignore_indices, label_inst=label_inst, mask2bdry_kwargs=mask2bdry_kwargs, ) with Pool( processes=nproc, initializer=_init_worker, initargs=[args] ) as pool: # order doesn't really matter since we specify which index to compute pool.map(_instance_worker_func, range(num_labels)) edge = np.frombuffer(mp_edge, dtype=edge.dtype).reshape(edge.shape) # HACK: empty global dictionary var_dict = {} return edge