Coverage for backpack/skyline.py: 99%
176 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 23:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 23:12 +0000
1''' :class:`SkyLine` streams OpenCV frames to a `GStreamer`_ pipeline.
3:class:`SkyLine` itself is an abstract base class that you can not instantiate directly. Instead,
4use one of the subclasses derived from :class:`SkyLine` that provide concrete implementation.
5For example, :class:`~backpack.kvs.KVSSkyLine` sends frames to AWS Kinesis Video Streams service,
6and :class:`~backpack.rtsp.RTSPSkyLine` streams your frames with a built-in RTSP server.
8.. _`GStreamer`: https://gstreamer.freedesktop.org
9'''
11import os
12import subprocess
13import logging
14from collections import OrderedDict
15import datetime
16from enum import Enum
17from typing import Optional
18from abc import ABC, abstractmethod
20try:
21 import cv2
22except ImportError as e:
23 raise ImportError(
24 'OpenCV installation is not found. You must manually install OpenCV with GStreamer '
25 'support to use SkyLine. Please refer to installation instructions for details.'
26 ) from e
28from dotenv import find_dotenv, dotenv_values
30from .timepiece import Ticker
32USE_LAST_VALUE = -999
33''' Using this value for dynamic streaming attributes like fps, width and height
34will cause to use the values from the last streaming session. '''
36class SkyLine(ABC):
38 ''' Abstract base class for sending OpenCV frames to a remote service using GStreamer.
40 :class:`SkyLine` can be used to create programmatically a video stream and send it to
41 an external video ingestion service supported by GStreamer. Once the :class:`SkyLine`
42 instances is configured and the streaming pipeline was opened by calling
43 :meth:`start_streaming`, successive frames can be passed to the :meth:`put` method of the
44 instance in OpenCV image format (:class:`numpy.ndarray` with a shape of ``(height, width, 3)``,
45 BGR channel order, :class:`numpy.uint8` type).
47 The frequency of frames (frames per second) as well as the image
48 dimensions (width, height) are static during the streaming. You can either
49 specify these properties upfront in the constructor, or let SkyLine figure out
50 these values. In the later case, up to :attr:`FPS_METER_WARMUP_FRAMES`
51 frames (by default 100) will be discarded at the beginning of the streaming
52 and during this period the value of the stream fps, width and height will
53 be determined automatically. In all cases you are expected to call the :meth:`put`
54 method with the frequency of the :attr:`video_fps` property, and send images
55 of (:attr:`video_width`, :attr:`video_height`) size.
57 You should also specify ``GST_PLUGIN_PATH`` variable to the folder
58 where the kvssink plugin binaries were built, and add the open-source
59 dependency folder of kvssink to the ``LD_LIBRARY_PATH`` variable.
60 You can define these variables as environment variables, or in a file
61 called ``.env`` in the same folder where this file can be found.
63 Args:
64 parent_logger: If you want to connect the logger of KVS to a parent,
65 specify it here.
66 gst_log_file: If you want to redirect GStreamer logs to a file, specify
67 the full path of the file in this parameter.
68 gst_log_level: If you want to override GStreamer log level configuration,
69 specify in this parameter.
70 dotenv_path: The path of the .env configuration file. If left to None,
71 SkyLine will use the default search mechanism of the python-dotenv library
72 to search for the .env file (searching in the current and parent folders).
74 Attributes:
75 video_width (int): The width of the frames in the video stream.
76 video_height (int): The height of the frames in the video stream.
77 video_fps (float): The number of frames per second sent to the video stream.
78 logger (logging.Logger): The logger instance.
79 '''
81 # pylint: disable=too-many-instance-attributes
82 # We could group the stream parameters like fps, width and height into a structure, but why?
84 _FRAME_LOG_FREQUENCY = datetime.timedelta(seconds=60)
86 # Wait for so many put() requests before starting streaming.
87 # During this period the average FPS will be measured and the
88 # stream will be initialized with this FPS
89 FPS_METER_WARMUP_FRAMES = 100
91 class State(Enum):
92 ''' States of the :class:`SkyLine`.
94 Attributes:
95 STOPPED: The :class:`SkyLine` instance is stopped.
96 START_WARMUP: The :class:`SkyLine` instance is about to start the warmup period.
97 WARMUP: The :class:`SkyLine` instance is measuring frame rate and frame size during
98 the warmup period.
99 STREAMING: The :class:`SkyLine` instance is streaming. The
100 :meth:`~SkyLine.put` method should be called regularly.
101 ERROR: The :class:`SkyLine` instance is encountered an error.
102 '''
103 STOPPED = 0
104 START_WARMUP = 1
105 WARMUP = 2
106 STREAMING = 3
107 ERROR = -1
109 def __init__(self,
110 parent_logger: Optional[logging.Logger] = None,
111 gst_log_file: Optional[str] = None,
112 gst_log_level: Optional[str] = None,
113 dotenv_path: Optional[str] = None
114 ):
115 self.video_width = None
116 self.video_height = None
117 self.video_fps = None
118 self.logger = (
119 logging.getLogger(self.__class__.__name__) if parent_logger is None else
120 parent_logger.getChild(self.__class__.__name__)
121 )
122 if gst_log_file is not None:
123 os.environ['GST_DEBUG_FILE'] = gst_log_file
124 if gst_log_level is not None:
125 os.environ['GST_DEBUG'] = gst_log_level
126 self._config_env(dotenv_path)
127 self._check_env()
128 self._last_log = datetime.datetime.min
129 self._fps_meter_warmup_cnt = 0
130 self._fps_meter_ticker = Ticker(self.FPS_METER_WARMUP_FRAMES + 1)
131 self._last_fps = None
132 self._last_width = None
133 self._last_height = None
134 self._video_writer = None
135 self.state = SkyLine.State.STOPPED
137 def _check_gst_plugin(self, plugin_name: str) -> bool:
138 ''' Checks if a given GStreamer plugin can be correctly loaded.
140 Args:
141 plugin_name (str): The name of the GStreamer plugin
143 Returns:
144 True if the plugin can be loaded by the GStreamer system.
145 '''
146 try:
147 cmd = f'gst-inspect-1.0 {plugin_name} --plugin'
148 env = os.environ.copy()
149 env['GST_DEBUG'] = '0'
150 subprocess.check_output(cmd.split(' '), stderr=subprocess.STDOUT, env=env)
151 self.logger.info('"%s" returned no error', cmd)
152 return True
153 except subprocess.CalledProcessError as error:
154 self.logger.warning(
155 '"%s" returned error code=%d, output:\n%s',
156 cmd, error.returncode, error.output.decode()
157 )
158 return False
160 @abstractmethod
161 def _get_pipeline(self, fps: float, width: int, height: int) -> str:
162 ''' Returns to GStreamer pipeline definition.
164 Implement this method in subclasses and return the GStreamer pipeline
165 definition.'''
167 def _put_frame(self, frame):
168 size = (self.video_width, self.video_height)
169 resized = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR)
170 if not self._video_writer or not self._video_writer.isOpened():
171 self._frame_log(lambda: self.logger.warning(
172 'Tried to write to cv2.VideoWriter but it is not opened'
173 ))
174 return False
175 self._video_writer.write(resized)
176 return True
178 def _config_env(self, dotenv_path=None):
179 os.environ['GST_DEBUG_NO_COLOR'] = '1'
180 dotenv_path = dotenv_path or find_dotenv()
181 self.logger.info('Loading config variables from %s', dotenv_path)
182 if not dotenv_path or not os.path.isfile(dotenv_path):
183 self.logger.warning('dotenv configuration file was not found at path: %s', dotenv_path)
184 return
185 cfg = dotenv_values(dotenv_path=dotenv_path)
186 self.logger.info('Loaded env config structure: %s', cfg)
187 if 'GST_PLUGIN_PATH' in cfg:
188 os.environ['GST_PLUGIN_PATH'] = cfg['GST_PLUGIN_PATH']
189 path_elems = []
190 if 'LD_LIBRARY_PATH' in os.environ:
191 path_elems.extend(os.environ['LD_LIBRARY_PATH'].split(':'))
192 if 'LD_LIBRARY_PATH' in cfg:
193 path_elems.append(cfg['LD_LIBRARY_PATH'])
194 path_elems = list(OrderedDict.fromkeys(path_elems)) # remove duplicates
195 path = ':'.join(path_elems)
196 path = path.replace('::', ':')
197 os.environ['LD_LIBRARY_PATH'] = path
198 if 'LD_PRELOAD' in cfg:
199 os.environ['LD_PRELOAD'] = cfg['LD_PRELOAD']
201 def _check_env(self):
202 def _check_var(var_name, warn=True):
203 val = os.environ.get(var_name)
204 if val:
205 self.logger.info('%s=%s', var_name, val)
206 elif warn:
207 self.logger.warning('%s environment variable is not defined', var_name)
209 _check_var('GST_PLUGIN_PATH')
210 _check_var('LD_LIBRARY_PATH')
211 _check_var('GST_DEBUG', warn=False)
212 _check_var('GST_DEBUG_FILE', warn=False)
214 self.logger.info('Local time on host: %s', datetime.datetime.now().isoformat())
215 self.logger.info('UTC time on host: %s', datetime.datetime.utcnow().isoformat())
217 def _open_stream(self, fps, width, height):
218 pipeline = self._get_pipeline(fps, width, height)
219 self.logger.info('Opening streaming pipeline')
220 self._video_writer = cv2.VideoWriter(
221 pipeline, cv2.CAP_GSTREAMER, 0, fps, (width, height)
222 )
223 self.video_fps = fps
224 self.video_width = width
225 self.video_height = height
226 self._last_fps = fps
227 self._last_width = width
228 self._last_height = height
229 return self._video_writer.isOpened()
231 def _close_stream(self):
232 if self._video_writer:
233 self._video_writer.release()
234 self._video_writer = None
236 def _frame_log(self, log_fn):
237 now = datetime.datetime.now()
238 if now - self._last_log > self._FRAME_LOG_FREQUENCY:
239 log_fn()
240 self._last_log = now
242 def _start_warmup(self):
243 self._fps_meter_ticker.reset()
244 self._fps_meter_warmup_cnt = self.FPS_METER_WARMUP_FRAMES
246 def _finish_warmup(self, frame):
247 fps = self.video_fps or round(self._fps_meter_ticker.freq())
248 width = self.video_width or frame.shape[1]
249 height = self.video_height or frame.shape[0]
250 return fps, width, height
252 def _try_open_stream(self, fps, width, height):
253 if not self._open_stream(fps, width, height):
254 self.logger.warning('Could not open cv2.VideoWriter')
255 self.state = SkyLine.State.ERROR
256 return False
257 self.state = SkyLine.State.STREAMING
258 return True
260 @property
261 def state(self) -> 'SkyLine.State':
262 ''' State of the SkyLine. '''
263 return self._state
265 @state.setter
266 def state(self, state: 'SkyLine.State') -> None:
267 ''' Set the state of the SkyLine. '''
268 self.logger.info('state = %s', state)
269 self._state = state
271 # Events
273 def start_streaming(
274 self,
275 fps: float = USE_LAST_VALUE,
276 width: int = USE_LAST_VALUE,
277 height: int = USE_LAST_VALUE
278 ) -> None:
279 ''' Start the streaming.
281 After calling this method, you are expected to call the :meth:`put` method at
282 regular intervals. The streaming can be stopped and restarted arbitrary times on
283 the same :class:`SkyLine` instance.
285 You should specify the desired frame rate and frame dimensions. Using :attr:`USE_LAST_VALUE`
286 for any of theses attributes will use the value from the last streaming session. If no
287 values are found (or the values are explicitly set to ``None``), a warmup session will be
288 started.
290 Args:
291 fps: The declared frame per seconds of the video. Set this to ``None`` to determine
292 this value automatically, or :attr:`USE_LAST_VALUE` to use the value from
293 the last streaming session.
294 width: The declared width of the video. Set this to ``None`` to determine
295 this value automatically, or :attr:`USE_LAST_VALUE` to use the value from
296 the last streaming session.
297 height: The declared height of the video. Set this to ``None`` to determine
298 this value automatically, or :attr:`USE_LAST_VALUE` to use the value from
299 the last streaming session.
300 '''
301 self.video_fps = self._last_fps if fps == USE_LAST_VALUE else fps
302 self.video_width = self._last_width if width == USE_LAST_VALUE else width
303 self.video_height = self._last_height if width == USE_LAST_VALUE else height
304 if self.state in [SkyLine.State.WARMUP, SkyLine.State.STREAMING]:
305 self._close_stream()
306 if any(p is None for p in (self.video_fps, self.video_width, self.video_height)):
307 self.logger.info('Starting FPS meter warmup.')
308 self.state = SkyLine.State.START_WARMUP
309 else:
310 self.logger.info('No FPS meter warmup needed.')
311 self._try_open_stream(self.video_fps, self.video_width, self.video_height)
313 def put(
314 self,
315 frame: 'numpy.ndarray',
316 ) -> bool:
317 ''' Put a frame to the video stream.
319 Args:
320 frame: A numpy array of ``(height, width, 3)`` shape and of :class:`numpy.uint8` type.
322 Returns:
323 ``True`` if the frame was effectively put on the downstream pipeline.
324 '''
326 if self.state == SkyLine.State.STOPPED:
327 # Streamer paused
328 return False
330 if self.state == SkyLine.State.ERROR:
331 self._frame_log(
332 lambda: self.logger.warning(
333 '%s.put() was called in %s state', self.__class__.__name__, self.state
334 )
335 )
336 return False
338 if self.state == SkyLine.State.START_WARMUP:
339 self._start_warmup()
340 self.state = SkyLine.State.WARMUP
341 return False
343 if self.state == SkyLine.State.WARMUP:
344 self._fps_meter_ticker.tick()
345 self._fps_meter_warmup_cnt -= 1
346 if self._fps_meter_warmup_cnt <= 0:
347 fps, width, height = self._finish_warmup(frame)
348 self.logger.info(
349 'Finished FPS meter warmup. Determined fps=%f, width=%d, height=%d',
350 fps, width, height
351 )
352 if self._try_open_stream(fps, width, height):
353 return self._put_frame(frame)
355 return False
357 if self.state == SkyLine.State.STREAMING:
358 return self._put_frame(frame)
360 # Should not arrive here
361 assert False, f'Unhandled SkyLine state {self.state}' # pragma: no cover
363 def stop_streaming(self) -> None:
364 ''' Stops the streaming.
366 Successive calls to :meth:`put` method will silently discard the frames.
367 '''
368 self._close_stream()
369 self.state = SkyLine.State.STOPPED