Coverage for backpack/skyline.py: 99%

176 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-30 23:12 +0000

1''' :class:`SkyLine` streams OpenCV frames to a `GStreamer`_ pipeline. 

2 

3:class:`SkyLine` itself is an abstract base class that you can not instantiate directly. Instead, 

4use one of the subclasses derived from :class:`SkyLine` that provide concrete implementation. 

5For example, :class:`~backpack.kvs.KVSSkyLine` sends frames to AWS Kinesis Video Streams service, 

6and :class:`~backpack.rtsp.RTSPSkyLine` streams your frames with a built-in RTSP server. 

7 

8.. _`GStreamer`: https://gstreamer.freedesktop.org 

9''' 

10 

11import os 

12import subprocess 

13import logging 

14from collections import OrderedDict 

15import datetime 

16from enum import Enum 

17from typing import Optional 

18from abc import ABC, abstractmethod 

19 

20try: 

21 import cv2 

22except ImportError as e: 

23 raise ImportError( 

24 'OpenCV installation is not found. You must manually install OpenCV with GStreamer ' 

25 'support to use SkyLine. Please refer to installation instructions for details.' 

26 ) from e 

27 

28from dotenv import find_dotenv, dotenv_values 

29 

30from .timepiece import Ticker 

31 

32USE_LAST_VALUE = -999 

33''' Using this value for dynamic streaming attributes like fps, width and height 

34will cause to use the values from the last streaming session. ''' 

35 

36class SkyLine(ABC): 

37 

38 ''' Abstract base class for sending OpenCV frames to a remote service using GStreamer. 

39 

40 :class:`SkyLine` can be used to create programmatically a video stream and send it to 

41 an external video ingestion service supported by GStreamer. Once the :class:`SkyLine` 

42 instances is configured and the streaming pipeline was opened by calling 

43 :meth:`start_streaming`, successive frames can be passed to the :meth:`put` method of the 

44 instance in OpenCV image format (:class:`numpy.ndarray` with a shape of ``(height, width, 3)``, 

45 BGR channel order, :class:`numpy.uint8` type). 

46 

47 The frequency of frames (frames per second) as well as the image 

48 dimensions (width, height) are static during the streaming. You can either 

49 specify these properties upfront in the constructor, or let SkyLine figure out 

50 these values. In the later case, up to :attr:`FPS_METER_WARMUP_FRAMES` 

51 frames (by default 100) will be discarded at the beginning of the streaming 

52 and during this period the value of the stream fps, width and height will 

53 be determined automatically. In all cases you are expected to call the :meth:`put` 

54 method with the frequency of the :attr:`video_fps` property, and send images 

55 of (:attr:`video_width`, :attr:`video_height`) size. 

56 

57 You should also specify ``GST_PLUGIN_PATH`` variable to the folder 

58 where the kvssink plugin binaries were built, and add the open-source 

59 dependency folder of kvssink to the ``LD_LIBRARY_PATH`` variable. 

60 You can define these variables as environment variables, or in a file 

61 called ``.env`` in the same folder where this file can be found. 

62 

63 Args: 

64 parent_logger: If you want to connect the logger of KVS to a parent, 

65 specify it here. 

66 gst_log_file: If you want to redirect GStreamer logs to a file, specify 

67 the full path of the file in this parameter. 

68 gst_log_level: If you want to override GStreamer log level configuration, 

69 specify in this parameter. 

70 dotenv_path: The path of the .env configuration file. If left to None, 

71 SkyLine will use the default search mechanism of the python-dotenv library 

72 to search for the .env file (searching in the current and parent folders). 

73 

74 Attributes: 

75 video_width (int): The width of the frames in the video stream. 

76 video_height (int): The height of the frames in the video stream. 

77 video_fps (float): The number of frames per second sent to the video stream. 

78 logger (logging.Logger): The logger instance. 

79 ''' 

80 

81 # pylint: disable=too-many-instance-attributes 

82 # We could group the stream parameters like fps, width and height into a structure, but why? 

83 

84 _FRAME_LOG_FREQUENCY = datetime.timedelta(seconds=60) 

85 

86 # Wait for so many put() requests before starting streaming. 

87 # During this period the average FPS will be measured and the 

88 # stream will be initialized with this FPS 

89 FPS_METER_WARMUP_FRAMES = 100 

90 

91 class State(Enum): 

92 ''' States of the :class:`SkyLine`. 

93 

94 Attributes: 

95 STOPPED: The :class:`SkyLine` instance is stopped. 

96 START_WARMUP: The :class:`SkyLine` instance is about to start the warmup period. 

97 WARMUP: The :class:`SkyLine` instance is measuring frame rate and frame size during 

98 the warmup period. 

99 STREAMING: The :class:`SkyLine` instance is streaming. The 

100 :meth:`~SkyLine.put` method should be called regularly. 

101 ERROR: The :class:`SkyLine` instance is encountered an error. 

102 ''' 

103 STOPPED = 0 

104 START_WARMUP = 1 

105 WARMUP = 2 

106 STREAMING = 3 

107 ERROR = -1 

108 

109 def __init__(self, 

110 parent_logger: Optional[logging.Logger] = None, 

111 gst_log_file: Optional[str] = None, 

112 gst_log_level: Optional[str] = None, 

113 dotenv_path: Optional[str] = None 

114 ): 

115 self.video_width = None 

116 self.video_height = None 

117 self.video_fps = None 

118 self.logger = ( 

119 logging.getLogger(self.__class__.__name__) if parent_logger is None else 

120 parent_logger.getChild(self.__class__.__name__) 

121 ) 

122 if gst_log_file is not None: 

123 os.environ['GST_DEBUG_FILE'] = gst_log_file 

124 if gst_log_level is not None: 

125 os.environ['GST_DEBUG'] = gst_log_level 

126 self._config_env(dotenv_path) 

127 self._check_env() 

128 self._last_log = datetime.datetime.min 

129 self._fps_meter_warmup_cnt = 0 

130 self._fps_meter_ticker = Ticker(self.FPS_METER_WARMUP_FRAMES + 1) 

131 self._last_fps = None 

132 self._last_width = None 

133 self._last_height = None 

134 self._video_writer = None 

135 self.state = SkyLine.State.STOPPED 

136 

137 def _check_gst_plugin(self, plugin_name: str) -> bool: 

138 ''' Checks if a given GStreamer plugin can be correctly loaded. 

139 

140 Args: 

141 plugin_name (str): The name of the GStreamer plugin 

142 

143 Returns: 

144 True if the plugin can be loaded by the GStreamer system. 

145 ''' 

146 try: 

147 cmd = f'gst-inspect-1.0 {plugin_name} --plugin' 

148 env = os.environ.copy() 

149 env['GST_DEBUG'] = '0' 

150 subprocess.check_output(cmd.split(' '), stderr=subprocess.STDOUT, env=env) 

151 self.logger.info('"%s" returned no error', cmd) 

152 return True 

153 except subprocess.CalledProcessError as error: 

154 self.logger.warning( 

155 '"%s" returned error code=%d, output:\n%s', 

156 cmd, error.returncode, error.output.decode() 

157 ) 

158 return False 

159 

160 @abstractmethod 

161 def _get_pipeline(self, fps: float, width: int, height: int) -> str: 

162 ''' Returns to GStreamer pipeline definition. 

163 

164 Implement this method in subclasses and return the GStreamer pipeline 

165 definition.''' 

166 

167 def _put_frame(self, frame): 

168 size = (self.video_width, self.video_height) 

169 resized = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR) 

170 if not self._video_writer or not self._video_writer.isOpened(): 

171 self._frame_log(lambda: self.logger.warning( 

172 'Tried to write to cv2.VideoWriter but it is not opened' 

173 )) 

174 return False 

175 self._video_writer.write(resized) 

176 return True 

177 

178 def _config_env(self, dotenv_path=None): 

179 os.environ['GST_DEBUG_NO_COLOR'] = '1' 

180 dotenv_path = dotenv_path or find_dotenv() 

181 self.logger.info('Loading config variables from %s', dotenv_path) 

182 if not dotenv_path or not os.path.isfile(dotenv_path): 

183 self.logger.warning('dotenv configuration file was not found at path: %s', dotenv_path) 

184 return 

185 cfg = dotenv_values(dotenv_path=dotenv_path) 

186 self.logger.info('Loaded env config structure: %s', cfg) 

187 if 'GST_PLUGIN_PATH' in cfg: 

188 os.environ['GST_PLUGIN_PATH'] = cfg['GST_PLUGIN_PATH'] 

189 path_elems = [] 

190 if 'LD_LIBRARY_PATH' in os.environ: 

191 path_elems.extend(os.environ['LD_LIBRARY_PATH'].split(':')) 

192 if 'LD_LIBRARY_PATH' in cfg: 

193 path_elems.append(cfg['LD_LIBRARY_PATH']) 

194 path_elems = list(OrderedDict.fromkeys(path_elems)) # remove duplicates 

195 path = ':'.join(path_elems) 

196 path = path.replace('::', ':') 

197 os.environ['LD_LIBRARY_PATH'] = path 

198 if 'LD_PRELOAD' in cfg: 

199 os.environ['LD_PRELOAD'] = cfg['LD_PRELOAD'] 

200 

201 def _check_env(self): 

202 def _check_var(var_name, warn=True): 

203 val = os.environ.get(var_name) 

204 if val: 

205 self.logger.info('%s=%s', var_name, val) 

206 elif warn: 

207 self.logger.warning('%s environment variable is not defined', var_name) 

208 

209 _check_var('GST_PLUGIN_PATH') 

210 _check_var('LD_LIBRARY_PATH') 

211 _check_var('GST_DEBUG', warn=False) 

212 _check_var('GST_DEBUG_FILE', warn=False) 

213 

214 self.logger.info('Local time on host: %s', datetime.datetime.now().isoformat()) 

215 self.logger.info('UTC time on host: %s', datetime.datetime.utcnow().isoformat()) 

216 

217 def _open_stream(self, fps, width, height): 

218 pipeline = self._get_pipeline(fps, width, height) 

219 self.logger.info('Opening streaming pipeline') 

220 self._video_writer = cv2.VideoWriter( 

221 pipeline, cv2.CAP_GSTREAMER, 0, fps, (width, height) 

222 ) 

223 self.video_fps = fps 

224 self.video_width = width 

225 self.video_height = height 

226 self._last_fps = fps 

227 self._last_width = width 

228 self._last_height = height 

229 return self._video_writer.isOpened() 

230 

231 def _close_stream(self): 

232 if self._video_writer: 

233 self._video_writer.release() 

234 self._video_writer = None 

235 

236 def _frame_log(self, log_fn): 

237 now = datetime.datetime.now() 

238 if now - self._last_log > self._FRAME_LOG_FREQUENCY: 

239 log_fn() 

240 self._last_log = now 

241 

242 def _start_warmup(self): 

243 self._fps_meter_ticker.reset() 

244 self._fps_meter_warmup_cnt = self.FPS_METER_WARMUP_FRAMES 

245 

246 def _finish_warmup(self, frame): 

247 fps = self.video_fps or round(self._fps_meter_ticker.freq()) 

248 width = self.video_width or frame.shape[1] 

249 height = self.video_height or frame.shape[0] 

250 return fps, width, height 

251 

252 def _try_open_stream(self, fps, width, height): 

253 if not self._open_stream(fps, width, height): 

254 self.logger.warning('Could not open cv2.VideoWriter') 

255 self.state = SkyLine.State.ERROR 

256 return False 

257 self.state = SkyLine.State.STREAMING 

258 return True 

259 

260 @property 

261 def state(self) -> 'SkyLine.State': 

262 ''' State of the SkyLine. ''' 

263 return self._state 

264 

265 @state.setter 

266 def state(self, state: 'SkyLine.State') -> None: 

267 ''' Set the state of the SkyLine. ''' 

268 self.logger.info('state = %s', state) 

269 self._state = state 

270 

271 # Events 

272 

273 def start_streaming( 

274 self, 

275 fps: float = USE_LAST_VALUE, 

276 width: int = USE_LAST_VALUE, 

277 height: int = USE_LAST_VALUE 

278 ) -> None: 

279 ''' Start the streaming. 

280 

281 After calling this method, you are expected to call the :meth:`put` method at 

282 regular intervals. The streaming can be stopped and restarted arbitrary times on 

283 the same :class:`SkyLine` instance. 

284 

285 You should specify the desired frame rate and frame dimensions. Using :attr:`USE_LAST_VALUE` 

286 for any of theses attributes will use the value from the last streaming session. If no 

287 values are found (or the values are explicitly set to ``None``), a warmup session will be 

288 started. 

289 

290 Args: 

291 fps: The declared frame per seconds of the video. Set this to ``None`` to determine 

292 this value automatically, or :attr:`USE_LAST_VALUE` to use the value from 

293 the last streaming session. 

294 width: The declared width of the video. Set this to ``None`` to determine 

295 this value automatically, or :attr:`USE_LAST_VALUE` to use the value from 

296 the last streaming session. 

297 height: The declared height of the video. Set this to ``None`` to determine 

298 this value automatically, or :attr:`USE_LAST_VALUE` to use the value from 

299 the last streaming session. 

300 ''' 

301 self.video_fps = self._last_fps if fps == USE_LAST_VALUE else fps 

302 self.video_width = self._last_width if width == USE_LAST_VALUE else width 

303 self.video_height = self._last_height if width == USE_LAST_VALUE else height 

304 if self.state in [SkyLine.State.WARMUP, SkyLine.State.STREAMING]: 

305 self._close_stream() 

306 if any(p is None for p in (self.video_fps, self.video_width, self.video_height)): 

307 self.logger.info('Starting FPS meter warmup.') 

308 self.state = SkyLine.State.START_WARMUP 

309 else: 

310 self.logger.info('No FPS meter warmup needed.') 

311 self._try_open_stream(self.video_fps, self.video_width, self.video_height) 

312 

313 def put( 

314 self, 

315 frame: 'numpy.ndarray', 

316 ) -> bool: 

317 ''' Put a frame to the video stream. 

318 

319 Args: 

320 frame: A numpy array of ``(height, width, 3)`` shape and of :class:`numpy.uint8` type. 

321 

322 Returns: 

323 ``True`` if the frame was effectively put on the downstream pipeline. 

324 ''' 

325 

326 if self.state == SkyLine.State.STOPPED: 

327 # Streamer paused 

328 return False 

329 

330 if self.state == SkyLine.State.ERROR: 

331 self._frame_log( 

332 lambda: self.logger.warning( 

333 '%s.put() was called in %s state', self.__class__.__name__, self.state 

334 ) 

335 ) 

336 return False 

337 

338 if self.state == SkyLine.State.START_WARMUP: 

339 self._start_warmup() 

340 self.state = SkyLine.State.WARMUP 

341 return False 

342 

343 if self.state == SkyLine.State.WARMUP: 

344 self._fps_meter_ticker.tick() 

345 self._fps_meter_warmup_cnt -= 1 

346 if self._fps_meter_warmup_cnt <= 0: 

347 fps, width, height = self._finish_warmup(frame) 

348 self.logger.info( 

349 'Finished FPS meter warmup. Determined fps=%f, width=%d, height=%d', 

350 fps, width, height 

351 ) 

352 if self._try_open_stream(fps, width, height): 

353 return self._put_frame(frame) 

354 

355 return False 

356 

357 if self.state == SkyLine.State.STREAMING: 

358 return self._put_frame(frame) 

359 

360 # Should not arrive here 

361 assert False, f'Unhandled SkyLine state {self.state}' # pragma: no cover 

362 

363 def stop_streaming(self) -> None: 

364 ''' Stops the streaming. 

365 

366 Successive calls to :meth:`put` method will silently discard the frames. 

367 ''' 

368 self._close_stream() 

369 self.state = SkyLine.State.STOPPED