Coverage for backpack/kvs.py: 100%
116 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 23:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 23:12 +0000
1''' :class:`~backpack.kvs.KVSSkyLine` is a :class:`~backpack.skyline.SkyLine` implementation
2that streams the output of the AWS Panorama application to AWS Kinesis Video Streams service.
4To use this class you MUST have the following dependencies correctly configured on your system:
6 - `GStreamer 1.0`_ installed with standard plugins pack, libav, tools and development libraries
7 - `OpenCV 4.2.0`_, compiled with GStreamer support and Python bindings
8 - `Amazon Kinesis Video Streams (KVS) Producer SDK`_ compiled with `kvssink GStreamer plugin
9 support`_
10 - Environment variable ``GST_PLUGIN_PATH`` configured to point to the directory where the compiled
11 binaries of KVS Producer SDK GStreamer plugin is placed
12 - Environment variable ``LD_LIBRARY_PATH`` including the open source third party dependencies
13 compiled by KVS Producer SDK
14 - `numpy`_
15 - `boto3`_
17These dependencies can not be easily specified by a ``requirements.txt`` or a Conda environment.
18See the example ``Dockerfile`` on how to install these dependencies on your system.
20.. _`GStreamer 1.0`: https://gstreamer.freedesktop.org
21.. _`OpenCV 4.2.0`: https://opencv.org/opencv-4-2-0/
22.. _`Amazon Kinesis Video Streams (KVS) Producer SDK`:
23 https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/producer-sdk-cpp.html
24.. _`kvssink GStreamer plugin support`:
25 https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/producer-sdk-cpp-gstreamer.html
26.. _`numpy`: https://numpy.org
27.. _`boto3`: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html
28'''
30import re
31import os
32import datetime
33import logging
34from concurrent.futures import Executor, ThreadPoolExecutor
35from typing import List, Dict, Any
36from abc import ABC, abstractmethod
38import boto3
39from botocore.credentials import Credentials, RefreshableCredentials
41from .skyline import SkyLine
42from .timepiece import AtSchedule, Callback, local_now
44def _is_refreshable(credentials):
45 return isinstance(credentials, RefreshableCredentials)
47def _format_time(datetime_):
48 return datetime_.strftime('%Y-%m-%dT%H:%M:%SZ')
50class KVSSkyLine(SkyLine):
52 ''' Sends OpenCV frames to Kinesis Video Streams.
54 :class:`KVSSkyLine` can be used to create programmatically a video stream and send
55 it to AWS Kinesis Video Streams service.
57 When initializing the :class:`KVSSkyLine` instance, you should provide the AWS
58 credentials that will be used to stream the video in your AWS account.
59 The :class:`KVSCredentialsHandler` subclasses implement different ways of passing the
60 credentials to the underlying Kinesis Video Stream Producer. In most of the
61 cases, :class:`KVSFileCredentialsHandler` with the default arguments should work well,
62 as long as your AWS user or the assumed IAM Role have a policy to put data in KVS.
64 You can configure the frame width, height and fps auto-detection as described
65 in the :class:`~backpack.skyline.SkyLine` class documentation.
67 Args:
68 stream_region: The AWS region of the Kinesis Video Stream
69 stream_name: The name of the Kinesis Video Stream
70 credentials_handler: The credentials handler
71 args: Positional arguments to be passed to SkyLine initializer.
72 kwargs: Keyword arguments to be passed to SkyLine initializer.
73 '''
75 def __init__(self,
76 stream_region: str,
77 stream_name: str,
78 credentials_handler: 'KVSCredentialsHandler',
79 *args: List[Any],
80 **kwargs: Dict[str, Any]
81 ):
82 super().__init__(*args, **kwargs)
83 self.stream_region = stream_region
84 self.stream_name = stream_name
85 self.credentials_handler = credentials_handler
86 if not self._check_gst_plugin('kvssink'):
87 raise RuntimeError(
88 'kvssink GSTreamer plugin was not found. Please check you have installed it '
89 'correctly and set the path to it in the GST_PLUGIN_PATH environment variable '
90 '(or in .env file)'
91 )
93 def _get_pipeline(self, fps, width, height):
94 credentials_config_str = self.credentials_handler.plugin_config()
95 kvs_config_str = ' '.join([
96 'storage-size=512',
97 f'stream-name="{self.stream_name}"',
98 f'aws-region="{self.stream_region}"',
99 f'framerate={fps}',
100 credentials_config_str
101 ])
102 pipeline = ' ! '.join([
103 'appsrc',
104 'videoconvert',
105 f'video/x-raw,format=I420,width={width},height={height},framerate={fps}/1',
106 'x264enc bframes=0 key-int-max=45 bitrate=500',
107 'video/x-h264,stream-format=avc,alignment=au,profile=baseline',
108 f'kvssink {kvs_config_str}'
109 ])
110 pipeline_safe = self.credentials_handler.plugin_config_mask(pipeline)
111 self.logger.info(f'GStreamer pipeline definition:\n{pipeline_safe}')
112 return pipeline
114 def _put_frame(self, *args, **kwargs):
115 result = super()._put_frame(*args, **kwargs)
116 self.credentials_handler.check_refresh()
117 return result
120class KVSCredentialsHandler(ABC):
121 ''' Abstract base class for credential providers.
123 Credential providers provide AWS credentials to Kinesis Video Stream Producer library.
125 If no static credentials are provided in the init method, the Credentials
126 Handler will figure out the AWS credentials from the boto3 configuration.
128 Use of static credentials are recommended at most for testing purposes. If you
129 instantiate this class with custom credentials in the init method arguments, or
130 calling the init method from a workstation where you've configured an AWS user
131 credentials, you will get static credentials. If the caller code assumed an IAM
132 role, you will use dynamic credentials.
134 When using dynamic credentials, you are expected to call the :meth:`check_refresh`
135 method periodically to control the expiration of the dynamic credentials. Ideally
136 you would call this method each time when you want send a new frame to Kinesis
137 Video Streams Producer. If the credentials are not expired, this method should add
138 almost no overhead.
140 You should not instantiate this class: use one if its subclasses instead.
142 Args:
143 aws_access_key_id: If you want to use custom static credentials, specify your
144 AWS access key ID here. This method is not recommended for production builds.
145 aws_secret_access_key: If you want to use custom static credentials, specify your
146 AWS secret access key here. This method is not recommended for production builds.
147 executor: Credential refresh operator will be dispatched to this executor.
148 parent_logger: Connect the logger of this class to a parent logger.
149 '''
151 REFRESH_BEFORE_EXPIRATION = datetime.timedelta(seconds=2 * 60)
152 ''' Refresh credentials so many seconds before expiration. '''
154 def __init__(
155 self,
156 aws_access_key_id: str=None,
157 aws_secret_access_key: str=None,
158 executor: Executor=ThreadPoolExecutor(max_workers=1),
159 parent_logger: logging.Logger=None
160 ):
161 self.logger = (
162 logging.getLogger(self.__class__.__name__) if parent_logger is None else
163 parent_logger.getChild(self.__class__.__name__)
164 )
165 if aws_access_key_id or aws_secret_access_key:
166 self.session = boto3.Session(
167 aws_access_key_id=aws_access_key_id,
168 aws_secret_access_key=aws_secret_access_key
169 )
170 else:
171 self.session = boto3.Session()
173 self.caller_arn = self.session.client('sts').get_caller_identity()['Arn']
174 self.credentials = self.session.get_credentials()
175 self.schedule = AtSchedule(
176 at=None,
177 callback=Callback(cb=self._refresh, executor=executor)
178 )
179 self._refresh()
181 def _refresh(self):
182 # pylint: disable=protected-access
183 # since botocore credentials do not give access to expiry date,
184 # we've to use protected members
185 if _is_refreshable(self.credentials):
186 self.logger.info(f'Refreshing credentials using {self.caller_arn}')
187 # This will refresh the credentials if needed
188 advisory = self.credentials.refresh_needed(self.credentials._advisory_refresh_timeout)
189 mandatory = self.credentials.refresh_needed(self.credentials._mandatory_refresh_timeout)
190 self.logger.info(f'Refresh needed: advisory={advisory}, mandatory={mandatory}')
191 self.logger.info(f'Forcing refresh credentials with method '
192 f'{self.credentials._refresh_using}')
193 self.credentials._protected_refresh(is_mandatory=True)
194 frozen_credentials = self.credentials.get_frozen_credentials()
195 expiry_time = self.credentials._expiry_time
196 self.logger.info('Got credentials: '
197 f'access_key={frozen_credentials.access_key[:5]}..., '
198 f'secret_key={frozen_credentials.secret_key[:5]}..., '
199 f'token={frozen_credentials.token[:5]}..., '
200 f'expiry={_format_time(expiry_time)}'
201 )
203 # Schedule next refresh when we're in the mandatory refresh timeout
204 # of the underlying credentials
205 #
206 # token expiry_time+
207 # |
208 # advisory_refresh_timeout+ |
209 # |<---- 15m ----->|
210 # | |
211 # mandatory_refresh_timeout+ |
212 # now | |<-- 10m -->|
213 # v v v v
214 # -----------------------------------------|
215 # ^
216 # update_timeout+
217 #
218 next_update = expiry_time - self.REFRESH_BEFORE_EXPIRATION
219 credentials = frozen_credentials
220 self.logger.info(f'Next update: {_format_time(next_update)}')
221 now = local_now()
222 if next_update < now:
223 msg = ('Next update time is in the past! '
224 f'current_time={_format_time(now)}, '
225 f'next_update={_format_time(next_update)}, '
226 f'expiry={_format_time(expiry_time)}'
227 )
228 self.logger.warning(msg)
229 else:
230 self.logger.info('Credentials are static')
231 next_update = None
232 credentials = self.credentials
233 self._save_credentials(credentials, next_update)
234 self.schedule.at = next_update
236 def check_refresh(self) -> None:
237 ''' Call this method periodically to refresh credentials. '''
238 self.schedule.tick()
240 @abstractmethod
241 def _save_credentials(
242 self,
243 credentials: Credentials,
244 next_update: datetime.datetime
245 ) -> None:
246 ''' Saves the credentials for Kinesis Video Stream Producer component.
248 This method should be implemented in subclasses.
250 Args:
251 credentials (Credentials): The credentials to be saved.
252 next_update (datetime.datetime): When the next credentials update is expected.
253 '''
255 def plugin_config(self) -> str: # pylint: disable=no-self-use
256 ''' Returns a string that should be included in the ``kvssink`` GStreamer plugin config.'''
257 return ''
259 def plugin_config_mask(self, plugin_config: str) -> str: # pylint: disable=no-self-use
260 ''' Masks credentials for printing in logs. '''
261 return plugin_config
264class KVSInlineCredentialsHandler(KVSCredentialsHandler):
265 ''' Provides AWS credentials inline in the ``kvssink`` plugin config.
267 This credentials handler can be used only with static credentials as there is
268 no way to refresh the credentials once they were passed to KVS Producer.
270 Args:
271 args: Positional arguments to be passed to :class:`KVSCredentialsHandler` initializer.
272 kwargs: Keyword arguments to be passed to :class:`KVSCredentialsHandler` initializer.
273 '''
275 def __init__(self, *args: List[Any], **kwargs: Dict[str, Any]):
276 super().__init__(*args, **kwargs)
277 if _is_refreshable(self.credentials):
278 raise RuntimeError(
279 'InlineCredentialsHandler must not be used with refreshable '
280 'credentials as they will not be refreshed once passed to '
281 'kvssink plugin configuration.'
282 )
284 def plugin_config(self) -> str:
285 return ' '.join([
286 f'access-key="{self.credentials.access_key}"',
287 f'secret-key="{self.credentials.secret_key}"',
288 ])
290 def plugin_config_mask(self, plugin_config: str) -> str:
291 ''' Masks credentials for printing in logs.
293 Args:
294 plugin_config: The original plugin config string.
295 '''
296 res = plugin_config
297 res = re.sub(r'secret-key="([^"]*)"', 'secret-key="*****"', res)
298 res = re.sub(r'access-key="([^"]*)"', 'access-key="*****"', res)
299 return res
301 def _save_credentials(self, *args, **kwargs) -> None:
302 # Empty implementation, credentials are passed in plugin config
303 pass
306class KVSEnvironmentCredentialsHandler(KVSCredentialsHandler):
307 ''' Saves AWS credentials in environment variables.
309 Experience shows that the kvssink plugin does not check periodically the
310 environment variables for updated dynamic credentials, so there are chances
311 that your stream will stop once the original dynamic credentials expiry.
313 For this reason, it is recommended that you use this credentials handler
314 only with static credentials and do not use it in production setting.
316 Args:
317 args: Positional arguments to be passed to :class:`KVSCredentialsHandler` initializer.
318 kwargs: Keyword arguments to be passed to :class:`KVSCredentialsHandler` initializer.
319 '''
320 def __init__(self, *args: List[Any], **kwargs: Dict[str, Any]):
321 super().__init__(*args, **kwargs)
323 def _save_credentials(
324 self,
325 credentials: Credentials,
326 _: datetime.datetime
327 ):
328 if _is_refreshable(credentials):
329 os.environ['AWS_SESSION_TOKEN'] = credentials.token
330 os.environ['AWS_ACCESS_KEY_ID'] = credentials.access_key
331 os.environ['AWS_SECRET_ACCESS_KEY'] = credentials.secret_key
334class KVSFileCredentialsHandler(KVSCredentialsHandler):
335 ''' Saves AWS variables in a text file compatible with `credential-path`
336 parameter of kvssink.
338 Using this credentials handler establishes the most robust communication
339 between the handler and the Kinesis Video Stream Producer. The credentials
340 are written into a text file with a predefined format. This handler can be
341 used both with static and dynamic credentials: the handler will ensure that
342 the refreshed credentials are updated also in the text file, before the
343 declared file expiration.
345 Args:
346 credentials_path: The path of the credentials file.
347 args: Positional arguments to be passed to :class:`KVSCredentialsHandler` initializer.
348 kwargs: Keyword arguments to be passed to :class:`KVSCredentialsHandler` initializer.
349 '''
351 FILE_REFRESH_GRACE_PERIOD = datetime.timedelta(seconds=60)
352 ''' Give some time (60s) to boto3 credentials handler to effectively
353 refresh the credentials, and declare the expiration of the KVS
354 credentials file after the waiting this time, KVS will find the
355 new tokens in the file. '''
357 def __init__(
358 self, credentials_path: str='/tmp/credentials.txt',
359 *args: List[Any],
360 **kwargs: Dict[str, Any]
361 ):
362 self.credentials_path = credentials_path
363 super().__init__(*args, **kwargs)
365 def _save_credentials(self, credentials, next_update):
366 if next_update is not None:
367 file_expire = next_update + self.FILE_REFRESH_GRACE_PERIOD
368 file_expire_str = _format_time(file_expire)
369 self.logger.info(f'Credentials file expiration: {file_expire_str}')
371 credentials_str = '\t'.join([
372 'CREDENTIALS',
373 credentials.access_key,
374 file_expire_str,
375 credentials.secret_key,
376 credentials.token
377 ])
378 self.logger.info(
379 f'Updated {self.credentials_path}:\n' + '\t'.join([
380 'CREDENTIALS',
381 f'{credentials.access_key[:5]}...',
382 file_expire_str,
383 f'{credentials.secret_key[:5]}...',
384 f'{credentials.token[:5]}...',
385 ])
386 )
387 else:
388 credentials_str = '\t'.join([
389 'CREDENTIALS',
390 credentials.access_key,
391 credentials.secret_key
392 ])
393 self.logger.info(
394 f'Updated {self.credentials_path}:\n' + '\t'.join([
395 'CREDENTIALS',
396 f'{credentials.access_key[:5]}...',
397 f'{credentials.secret_key[:5]}...',
398 ])
399 )
401 with open(self.credentials_path, 'w', encoding='utf-8') as credentials_file:
402 credentials_file.write(credentials_str)
404 def plugin_config(self) -> str:
405 return f'credential-path="{self.credentials_path}"'