"""OpenCV windows that will display the arrays."""
import warnings
from threading import Thread
from typing import List, Union, Callable, Any, Dict, Iterable, Optional
import cv2
import numpy as np
from localpubsub import NoData
from displayarray.callbacks import global_cv_display_callback
from displayarray._uid import uid_for_source
from displayarray.frame import subscriber_dictionary
from displayarray.frame.frame_updater import FrameCallable
from displayarray.frame.frame_updater import FrameUpdater
from displayarray.input import MouseEvent
from displayarray.window import window_commands
from displayarray._util import WeakMethod
from displayarray.effects.select_channels import SelectChannels
[docs]class SubscriberWindows(object):
"""Windows that subscribe to updates to cameras, videos, and arrays."""
FRAME_DICT: Dict[str, np.ndarray] = {}
ESC_KEY_CODES = [27] # ESC key on most keyboards
def __init__(
self,
window_names: Iterable[str] = ("displayarray",),
video_sources: Iterable[Union[str, int]] = (0,),
callbacks: Optional[List[Callable[[np.ndarray], Any]]] = None,
silent: bool = False,
):
"""Create the array displaying window."""
self.source_names: List[Union[str, int]] = []
self.close_threads: Optional[List[Thread]] = []
self.frames: List[np.ndarray] = []
self.input_vid_global_names: List[str] = []
self.window_names: List[str] = []
self.input_cams: List[str] = []
self.exited = False
self.silent = silent
if callbacks is None:
callbacks = []
for name in video_sources:
self.add_source(name)
self.callbacks = callbacks
if not self.silent:
for name in window_names:
self.add_window(name)
self.update()
def __bool__(self):
self.update()
return not self.exited
def __iter__(self):
while not self.exited:
self.update()
yield self.frames
[docs] def block(self):
"""Update the window continuously while blocking the outer program."""
self.loop()
for ct in self.close_threads:
ct.join()
[docs] def add_source(self, name):
"""Add another source for this class to display."""
uid = uid_for_source(name)
self.source_names.append(uid)
self.input_vid_global_names.append(uid)
self.input_cams.append(name)
return self
[docs] def add_window(self, name):
"""Add another window for this class to display sources with. The name will be the title."""
self.window_names.append(name)
cv2.namedWindow(name + " (press ESC to quit)")
m = WeakMethod(self.handle_mouse)
cv2.setMouseCallback(name + " (press ESC to quit)", m)
return self
[docs] def add_callback(self, callback):
"""Add a callback for this class to apply to videos."""
self.callbacks.append(callback)
return self
def __stop_all_cams(self):
for c in self.source_names:
subscriber_dictionary.stop_cam(c)
[docs] def handle_keys(self, key_input: int):
"""Capture key input for the escape function and passing to key control subscriber threads."""
if key_input in self.ESC_KEY_CODES:
for name in self.window_names:
cv2.destroyWindow(name + " (press ESC to quit)")
self.exited = True
window_commands.quit()
self.__stop_all_cams()
return "quit"
elif key_input not in [-1, 0]:
try:
window_commands.key_pub.publish(chr(key_input))
except ValueError:
warnings.warn(
RuntimeWarning(
f"Unknown key code: [{key_input}]. Please report to the displayarray issue page."
)
)
[docs] def handle_mouse(self, event, x, y, flags, param):
"""Capture mouse input for mouse control subscriber threads."""
mousey = MouseEvent(event, x, y, flags, param)
window_commands.mouse_pub.publish(mousey)
def display_frames(self, frames, win_num=0, ids=None):
if isinstance(frames, Exception):
raise frames
for f in range(len(frames)):
# detect nested:
if (
isinstance(frames[f], (list, tuple))
or frames[f].dtype.num == 17
or (
len(frames[f].shape) != 2
and (len(frames[f].shape) != 3 or frames[f].shape[-1] != 3)
)
):
win_num = self.display_frames(frames[f], win_num, ids)
else:
if len(self.window_names) <= win_num:
self.add_window(str(win_num))
cv2.imshow(
self.window_names[win_num] + " (press ESC to quit)", frames[f]
)
win_num += 1
return win_num
def __check_too_many_channels(self):
for f in range(len(self.frames)):
if isinstance(self.frames[f], Exception):
raise self.frames[f]
if (
self.frames[f].shape[-1] not in [1, 3]
and len(self.frames[f].shape) != 2
):
print(
f"Too many channels in output. (Got {self.frames[f].shape[-1]} instead of 1 or 3.) "
f"Frame selection callback added."
)
print(
"Ctrl+scroll to change first channel.\n"
"Shift+scroll to change second channel.\n"
"Alt+scroll to change third channel."
)
sel = SelectChannels()
sel.enable_mouse_control()
sel.mouse_print_channels = True
self.callbacks.append(sel)
for fr in range(len(self.frames)):
self.frames[fr] = self.callbacks[-1](self.frames[fr])
break
[docs] def update_frames(self):
"""Update the windows with the newest data for all frames."""
self.frames = []
for i in range(len(self.input_vid_global_names)):
if self.input_vid_global_names[i] in self.FRAME_DICT and not isinstance(
self.FRAME_DICT[self.input_vid_global_names[i]], NoData
):
self.frames.append(self.FRAME_DICT[self.input_vid_global_names[i]])
if isinstance(self.frames, np.ndarray) and len(self.frames.shape) <= 3:
self.frames = [self.frames]
if len(self.callbacks) > 0:
for c in self.callbacks:
frame = c(self.frames[-1])
if frame is not None:
self.frames[-1] = frame
if not self.silent:
self.__check_too_many_channels()
self.FRAME_DICT[self.input_vid_global_names[i]] = NoData()
if not self.silent:
self.display_frames(self.frames)
[docs] def update(self, arr: np.ndarray = None, id: str = None):
"""Update window frames once. Optionally add a new input and input id."""
if arr is not None and id is not None:
global_cv_display_callback(arr, id)
if id not in self.input_cams:
self.add_source(id)
if not self.silent:
self.add_window(id)
sub_cmd = window_commands.win_cmd_sub()
self.update_frames()
msg_cmd = sub_cmd.get()
key = self.handle_keys(cv2.waitKey(1))
return msg_cmd, key
[docs] def wait_for_init(self):
"""Update window frames in a loop until they're actually updated. Useful for waiting for cameras to init."""
msg_cmd = ""
key = ""
while msg_cmd != "quit" and key != "quit" and len(self.frames) == 0:
msg_cmd, key = self.update()
return self
[docs] def end(self):
"""Close all threads. Should be used with non-blocking mode."""
window_commands.quit(force_all_read=False)
self.__stop_all_cams()
for t in self.close_threads:
t.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end()
def __del__(self):
self.end()
def __delete__(self, instance):
self.end()
[docs] def loop(self):
"""Continually update window frame. OpenCV only allows this in the main thread."""
sub_cmd = window_commands.win_cmd_sub()
msg_cmd = ""
key = ""
while msg_cmd != "quit" and key != "quit":
msg_cmd, key = self.update()
sub_cmd.release()
window_commands.quit(force_all_read=False)
self.__stop_all_cams()
def _get_video_callback_dict_threads(
*vids,
callbacks: Optional[Dict[Any, Union[FrameCallable, List[FrameCallable]]]] = None,
fps=float("inf"),
size=(-1, -1),
):
assert callbacks is not None
vid_threads = []
for v in vids:
v_name = uid_for_source(v)
v_callbacks: List[Callable[[np.ndarray], Any]] = []
if v_name in callbacks:
if isinstance(callbacks[v_name], List):
v_callbacks.extend(callbacks[v_name]) # type: ignore
elif callable(callbacks[v_name]):
v_callbacks.append(callbacks[v_name]) # type: ignore
if v in callbacks:
if isinstance(callbacks[v], List):
v_callbacks.extend(callbacks[v]) # type: ignore
elif callable(callbacks[v]):
v_callbacks.append(callbacks[v]) # type: ignore
vid_threads.append(
FrameUpdater(v, callbacks=v_callbacks, fps_limit=fps, request_size=size)
)
return vid_threads
def _get_video_threads(
*vids,
callbacks: Optional[
Union[
Dict[Any, Union[FrameCallable, List[FrameCallable]]],
List[FrameCallable],
FrameCallable,
]
] = None,
fps=float("inf"),
size=(-1, -1),
):
vid_threads: List[Thread] = []
if isinstance(callbacks, Dict):
vid_threads = _get_video_callback_dict_threads(
*vids, callbacks=callbacks, fps=fps, size=size
)
elif isinstance(callbacks, List):
for v in vids:
vid_threads.append(
FrameUpdater(v, callbacks=callbacks, fps_limit=fps, request_size=size)
)
elif callable(callbacks):
for v in vids:
vid_threads.append(
FrameUpdater(v, callbacks=[callbacks], fps_limit=fps, request_size=size)
)
else:
for v in vids:
if v is not None:
vid_threads.append(FrameUpdater(v, fps_limit=fps, request_size=size))
return vid_threads
[docs]def display(
*vids,
callbacks: Optional[
Union[
Dict[Any, Union[FrameCallable, List[FrameCallable]]],
List[FrameCallable],
FrameCallable,
]
] = None,
window_names=None,
blocking=False,
fps_limit=float("inf"),
size=(-1, -1),
silent=False,
):
"""
Display all the arrays, cameras, and videos passed in.
callbacks can be a dictionary linking functions to videos, or a list of function or functions operating on the video
data before displaying.
Window names end up becoming the title of the windows
"""
vid_threads = _get_video_threads(
*vids, callbacks=callbacks, fps=fps_limit, size=size
)
for v in vid_threads:
v.start()
if window_names is None:
window_names = ["window {}".format(i) for i in range(len(vids))]
if blocking:
SubscriberWindows(
window_names=window_names, video_sources=vids, silent=silent
).loop()
for vt in vid_threads:
vt.join()
else:
s = SubscriberWindows(
window_names=window_names, video_sources=vids, silent=silent
)
s.close_threads = vid_threads
return s
[docs]def breakpoint_display(*args, **kwargs):
"""Display all the arrays, cameras, and videos passed in. Stops code execution until the window is closed."""
return display(*args, **kwargs, blocking=True)
[docs]def read_updates(*args, **kwargs):
"""Read back all frame updates and yield a list of frames. List is empty if no frames were read."""
return display(*args, **kwargs, silent=True)