Coverage for backpack/kvs.py: 100%

116 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-30 23:12 +0000

1''' :class:`~backpack.kvs.KVSSkyLine` is a :class:`~backpack.skyline.SkyLine` implementation 

2that streams the output of the AWS Panorama application to AWS Kinesis Video Streams service. 

3 

4To use this class you MUST have the following dependencies correctly configured on your system: 

5 

6 - `GStreamer 1.0`_ installed with standard plugins pack, libav, tools and development libraries 

7 - `OpenCV 4.2.0`_, compiled with GStreamer support and Python bindings 

8 - `Amazon Kinesis Video Streams (KVS) Producer SDK`_ compiled with `kvssink GStreamer plugin 

9 support`_ 

10 - Environment variable ``GST_PLUGIN_PATH`` configured to point to the directory where the compiled 

11 binaries of KVS Producer SDK GStreamer plugin is placed 

12 - Environment variable ``LD_LIBRARY_PATH`` including the open source third party dependencies 

13 compiled by KVS Producer SDK 

14 - `numpy`_ 

15 - `boto3`_ 

16 

17These dependencies can not be easily specified by a ``requirements.txt`` or a Conda environment. 

18See the example ``Dockerfile`` on how to install these dependencies on your system. 

19 

20.. _`GStreamer 1.0`: https://gstreamer.freedesktop.org 

21.. _`OpenCV 4.2.0`: https://opencv.org/opencv-4-2-0/ 

22.. _`Amazon Kinesis Video Streams (KVS) Producer SDK`: 

23 https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/producer-sdk-cpp.html 

24.. _`kvssink GStreamer plugin support`: 

25 https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/producer-sdk-cpp-gstreamer.html 

26.. _`numpy`: https://numpy.org 

27.. _`boto3`: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html 

28''' 

29 

30import re 

31import os 

32import datetime 

33import logging 

34from concurrent.futures import Executor, ThreadPoolExecutor 

35from typing import List, Dict, Any 

36from abc import ABC, abstractmethod 

37 

38import boto3 

39from botocore.credentials import Credentials, RefreshableCredentials 

40 

41from .skyline import SkyLine 

42from .timepiece import AtSchedule, Callback, local_now 

43 

44def _is_refreshable(credentials): 

45 return isinstance(credentials, RefreshableCredentials) 

46 

47def _format_time(datetime_): 

48 return datetime_.strftime('%Y-%m-%dT%H:%M:%SZ') 

49 

50class KVSSkyLine(SkyLine): 

51 

52 ''' Sends OpenCV frames to Kinesis Video Streams. 

53 

54 :class:`KVSSkyLine` can be used to create programmatically a video stream and send 

55 it to AWS Kinesis Video Streams service. 

56 

57 When initializing the :class:`KVSSkyLine` instance, you should provide the AWS 

58 credentials that will be used to stream the video in your AWS account. 

59 The :class:`KVSCredentialsHandler` subclasses implement different ways of passing the 

60 credentials to the underlying Kinesis Video Stream Producer. In most of the 

61 cases, :class:`KVSFileCredentialsHandler` with the default arguments should work well, 

62 as long as your AWS user or the assumed IAM Role have a policy to put data in KVS. 

63 

64 You can configure the frame width, height and fps auto-detection as described 

65 in the :class:`~backpack.skyline.SkyLine` class documentation. 

66 

67 Args: 

68 stream_region: The AWS region of the Kinesis Video Stream 

69 stream_name: The name of the Kinesis Video Stream 

70 credentials_handler: The credentials handler 

71 args: Positional arguments to be passed to SkyLine initializer. 

72 kwargs: Keyword arguments to be passed to SkyLine initializer. 

73 ''' 

74 

75 def __init__(self, 

76 stream_region: str, 

77 stream_name: str, 

78 credentials_handler: 'KVSCredentialsHandler', 

79 *args: List[Any], 

80 **kwargs: Dict[str, Any] 

81 ): 

82 super().__init__(*args, **kwargs) 

83 self.stream_region = stream_region 

84 self.stream_name = stream_name 

85 self.credentials_handler = credentials_handler 

86 if not self._check_gst_plugin('kvssink'): 

87 raise RuntimeError( 

88 'kvssink GSTreamer plugin was not found. Please check you have installed it ' 

89 'correctly and set the path to it in the GST_PLUGIN_PATH environment variable ' 

90 '(or in .env file)' 

91 ) 

92 

93 def _get_pipeline(self, fps, width, height): 

94 credentials_config_str = self.credentials_handler.plugin_config() 

95 kvs_config_str = ' '.join([ 

96 'storage-size=512', 

97 f'stream-name="{self.stream_name}"', 

98 f'aws-region="{self.stream_region}"', 

99 f'framerate={fps}', 

100 credentials_config_str 

101 ]) 

102 pipeline = ' ! '.join([ 

103 'appsrc', 

104 'videoconvert', 

105 f'video/x-raw,format=I420,width={width},height={height},framerate={fps}/1', 

106 'x264enc bframes=0 key-int-max=45 bitrate=500', 

107 'video/x-h264,stream-format=avc,alignment=au,profile=baseline', 

108 f'kvssink {kvs_config_str}' 

109 ]) 

110 pipeline_safe = self.credentials_handler.plugin_config_mask(pipeline) 

111 self.logger.info(f'GStreamer pipeline definition:\n{pipeline_safe}') 

112 return pipeline 

113 

114 def _put_frame(self, *args, **kwargs): 

115 result = super()._put_frame(*args, **kwargs) 

116 self.credentials_handler.check_refresh() 

117 return result 

118 

119 

120class KVSCredentialsHandler(ABC): 

121 ''' Abstract base class for credential providers. 

122 

123 Credential providers provide AWS credentials to Kinesis Video Stream Producer library. 

124 

125 If no static credentials are provided in the init method, the Credentials 

126 Handler will figure out the AWS credentials from the boto3 configuration. 

127 

128 Use of static credentials are recommended at most for testing purposes. If you 

129 instantiate this class with custom credentials in the init method arguments, or 

130 calling the init method from a workstation where you've configured an AWS user 

131 credentials, you will get static credentials. If the caller code assumed an IAM 

132 role, you will use dynamic credentials. 

133 

134 When using dynamic credentials, you are expected to call the :meth:`check_refresh` 

135 method periodically to control the expiration of the dynamic credentials. Ideally 

136 you would call this method each time when you want send a new frame to Kinesis 

137 Video Streams Producer. If the credentials are not expired, this method should add 

138 almost no overhead. 

139 

140 You should not instantiate this class: use one if its subclasses instead. 

141 

142 Args: 

143 aws_access_key_id: If you want to use custom static credentials, specify your 

144 AWS access key ID here. This method is not recommended for production builds. 

145 aws_secret_access_key: If you want to use custom static credentials, specify your 

146 AWS secret access key here. This method is not recommended for production builds. 

147 executor: Credential refresh operator will be dispatched to this executor. 

148 parent_logger: Connect the logger of this class to a parent logger. 

149 ''' 

150 

151 REFRESH_BEFORE_EXPIRATION = datetime.timedelta(seconds=2 * 60) 

152 ''' Refresh credentials so many seconds before expiration. ''' 

153 

154 def __init__( 

155 self, 

156 aws_access_key_id: str=None, 

157 aws_secret_access_key: str=None, 

158 executor: Executor=ThreadPoolExecutor(max_workers=1), 

159 parent_logger: logging.Logger=None 

160 ): 

161 self.logger = ( 

162 logging.getLogger(self.__class__.__name__) if parent_logger is None else 

163 parent_logger.getChild(self.__class__.__name__) 

164 ) 

165 if aws_access_key_id or aws_secret_access_key: 

166 self.session = boto3.Session( 

167 aws_access_key_id=aws_access_key_id, 

168 aws_secret_access_key=aws_secret_access_key 

169 ) 

170 else: 

171 self.session = boto3.Session() 

172 

173 self.caller_arn = self.session.client('sts').get_caller_identity()['Arn'] 

174 self.credentials = self.session.get_credentials() 

175 self.schedule = AtSchedule( 

176 at=None, 

177 callback=Callback(cb=self._refresh, executor=executor) 

178 ) 

179 self._refresh() 

180 

181 def _refresh(self): 

182 # pylint: disable=protected-access 

183 # since botocore credentials do not give access to expiry date, 

184 # we've to use protected members 

185 if _is_refreshable(self.credentials): 

186 self.logger.info(f'Refreshing credentials using {self.caller_arn}') 

187 # This will refresh the credentials if needed 

188 advisory = self.credentials.refresh_needed(self.credentials._advisory_refresh_timeout) 

189 mandatory = self.credentials.refresh_needed(self.credentials._mandatory_refresh_timeout) 

190 self.logger.info(f'Refresh needed: advisory={advisory}, mandatory={mandatory}') 

191 self.logger.info(f'Forcing refresh credentials with method ' 

192 f'{self.credentials._refresh_using}') 

193 self.credentials._protected_refresh(is_mandatory=True) 

194 frozen_credentials = self.credentials.get_frozen_credentials() 

195 expiry_time = self.credentials._expiry_time 

196 self.logger.info('Got credentials: ' 

197 f'access_key={frozen_credentials.access_key[:5]}..., ' 

198 f'secret_key={frozen_credentials.secret_key[:5]}..., ' 

199 f'token={frozen_credentials.token[:5]}..., ' 

200 f'expiry={_format_time(expiry_time)}' 

201 ) 

202 

203 # Schedule next refresh when we're in the mandatory refresh timeout 

204 # of the underlying credentials 

205 # 

206 # token expiry_time+ 

207 # | 

208 # advisory_refresh_timeout+ | 

209 # |<---- 15m ----->| 

210 # | | 

211 # mandatory_refresh_timeout+ | 

212 # now | |<-- 10m -->| 

213 # v v v v 

214 # -----------------------------------------| 

215 # ^ 

216 # update_timeout+ 

217 # 

218 next_update = expiry_time - self.REFRESH_BEFORE_EXPIRATION 

219 credentials = frozen_credentials 

220 self.logger.info(f'Next update: {_format_time(next_update)}') 

221 now = local_now() 

222 if next_update < now: 

223 msg = ('Next update time is in the past! ' 

224 f'current_time={_format_time(now)}, ' 

225 f'next_update={_format_time(next_update)}, ' 

226 f'expiry={_format_time(expiry_time)}' 

227 ) 

228 self.logger.warning(msg) 

229 else: 

230 self.logger.info('Credentials are static') 

231 next_update = None 

232 credentials = self.credentials 

233 self._save_credentials(credentials, next_update) 

234 self.schedule.at = next_update 

235 

236 def check_refresh(self) -> None: 

237 ''' Call this method periodically to refresh credentials. ''' 

238 self.schedule.tick() 

239 

240 @abstractmethod 

241 def _save_credentials( 

242 self, 

243 credentials: Credentials, 

244 next_update: datetime.datetime 

245 ) -> None: 

246 ''' Saves the credentials for Kinesis Video Stream Producer component. 

247 

248 This method should be implemented in subclasses. 

249 

250 Args: 

251 credentials (Credentials): The credentials to be saved. 

252 next_update (datetime.datetime): When the next credentials update is expected. 

253 ''' 

254 

255 def plugin_config(self) -> str: # pylint: disable=no-self-use 

256 ''' Returns a string that should be included in the ``kvssink`` GStreamer plugin config.''' 

257 return '' 

258 

259 def plugin_config_mask(self, plugin_config: str) -> str: # pylint: disable=no-self-use 

260 ''' Masks credentials for printing in logs. ''' 

261 return plugin_config 

262 

263 

264class KVSInlineCredentialsHandler(KVSCredentialsHandler): 

265 ''' Provides AWS credentials inline in the ``kvssink`` plugin config. 

266 

267 This credentials handler can be used only with static credentials as there is 

268 no way to refresh the credentials once they were passed to KVS Producer. 

269 

270 Args: 

271 args: Positional arguments to be passed to :class:`KVSCredentialsHandler` initializer. 

272 kwargs: Keyword arguments to be passed to :class:`KVSCredentialsHandler` initializer. 

273 ''' 

274 

275 def __init__(self, *args: List[Any], **kwargs: Dict[str, Any]): 

276 super().__init__(*args, **kwargs) 

277 if _is_refreshable(self.credentials): 

278 raise RuntimeError( 

279 'InlineCredentialsHandler must not be used with refreshable ' 

280 'credentials as they will not be refreshed once passed to ' 

281 'kvssink plugin configuration.' 

282 ) 

283 

284 def plugin_config(self) -> str: 

285 return ' '.join([ 

286 f'access-key="{self.credentials.access_key}"', 

287 f'secret-key="{self.credentials.secret_key}"', 

288 ]) 

289 

290 def plugin_config_mask(self, plugin_config: str) -> str: 

291 ''' Masks credentials for printing in logs. 

292 

293 Args: 

294 plugin_config: The original plugin config string. 

295 ''' 

296 res = plugin_config 

297 res = re.sub(r'secret-key="([^"]*)"', 'secret-key="*****"', res) 

298 res = re.sub(r'access-key="([^"]*)"', 'access-key="*****"', res) 

299 return res 

300 

301 def _save_credentials(self, *args, **kwargs) -> None: 

302 # Empty implementation, credentials are passed in plugin config 

303 pass 

304 

305 

306class KVSEnvironmentCredentialsHandler(KVSCredentialsHandler): 

307 ''' Saves AWS credentials in environment variables. 

308 

309 Experience shows that the kvssink plugin does not check periodically the 

310 environment variables for updated dynamic credentials, so there are chances 

311 that your stream will stop once the original dynamic credentials expiry. 

312 

313 For this reason, it is recommended that you use this credentials handler 

314 only with static credentials and do not use it in production setting. 

315 

316 Args: 

317 args: Positional arguments to be passed to :class:`KVSCredentialsHandler` initializer. 

318 kwargs: Keyword arguments to be passed to :class:`KVSCredentialsHandler` initializer. 

319 ''' 

320 def __init__(self, *args: List[Any], **kwargs: Dict[str, Any]): 

321 super().__init__(*args, **kwargs) 

322 

323 def _save_credentials( 

324 self, 

325 credentials: Credentials, 

326 _: datetime.datetime 

327 ): 

328 if _is_refreshable(credentials): 

329 os.environ['AWS_SESSION_TOKEN'] = credentials.token 

330 os.environ['AWS_ACCESS_KEY_ID'] = credentials.access_key 

331 os.environ['AWS_SECRET_ACCESS_KEY'] = credentials.secret_key 

332 

333 

334class KVSFileCredentialsHandler(KVSCredentialsHandler): 

335 ''' Saves AWS variables in a text file compatible with `credential-path` 

336 parameter of kvssink. 

337 

338 Using this credentials handler establishes the most robust communication 

339 between the handler and the Kinesis Video Stream Producer. The credentials 

340 are written into a text file with a predefined format. This handler can be 

341 used both with static and dynamic credentials: the handler will ensure that 

342 the refreshed credentials are updated also in the text file, before the 

343 declared file expiration. 

344 

345 Args: 

346 credentials_path: The path of the credentials file. 

347 args: Positional arguments to be passed to :class:`KVSCredentialsHandler` initializer. 

348 kwargs: Keyword arguments to be passed to :class:`KVSCredentialsHandler` initializer. 

349 ''' 

350 

351 FILE_REFRESH_GRACE_PERIOD = datetime.timedelta(seconds=60) 

352 ''' Give some time (60s) to boto3 credentials handler to effectively 

353 refresh the credentials, and declare the expiration of the KVS 

354 credentials file after the waiting this time, KVS will find the 

355 new tokens in the file. ''' 

356 

357 def __init__( 

358 self, credentials_path: str='/tmp/credentials.txt', 

359 *args: List[Any], 

360 **kwargs: Dict[str, Any] 

361 ): 

362 self.credentials_path = credentials_path 

363 super().__init__(*args, **kwargs) 

364 

365 def _save_credentials(self, credentials, next_update): 

366 if next_update is not None: 

367 file_expire = next_update + self.FILE_REFRESH_GRACE_PERIOD 

368 file_expire_str = _format_time(file_expire) 

369 self.logger.info(f'Credentials file expiration: {file_expire_str}') 

370 

371 credentials_str = '\t'.join([ 

372 'CREDENTIALS', 

373 credentials.access_key, 

374 file_expire_str, 

375 credentials.secret_key, 

376 credentials.token 

377 ]) 

378 self.logger.info( 

379 f'Updated {self.credentials_path}:\n' + '\t'.join([ 

380 'CREDENTIALS', 

381 f'{credentials.access_key[:5]}...', 

382 file_expire_str, 

383 f'{credentials.secret_key[:5]}...', 

384 f'{credentials.token[:5]}...', 

385 ]) 

386 ) 

387 else: 

388 credentials_str = '\t'.join([ 

389 'CREDENTIALS', 

390 credentials.access_key, 

391 credentials.secret_key 

392 ]) 

393 self.logger.info( 

394 f'Updated {self.credentials_path}:\n' + '\t'.join([ 

395 'CREDENTIALS', 

396 f'{credentials.access_key[:5]}...', 

397 f'{credentials.secret_key[:5]}...', 

398 ]) 

399 ) 

400 

401 with open(self.credentials_path, 'w', encoding='utf-8') as credentials_file: 

402 credentials_file.write(credentials_str) 

403 

404 def plugin_config(self) -> str: 

405 return f'credential-path="{self.credentials_path}"'