Timepiece

Timepiece component includes classes that allow you to quickly time profile the video processing pipeline. For detailed information, please check the API documentation.

Ticker

Ticker allows you to calculate statistics of the time intervals between recurring events. You can use a ticker, for example, to get statistics about how much time the application spends to process frames.

Example usage:

import time
import random
from backpack.timepiece import Ticker

ticker = Ticker(max_intervals=5)
for i in range(10):
    ticker.tick()
    time.sleep(random.random() / 10)
print(ticker)

This code returns the time interval (in seconds) between the last five tick() calls, as well as some basic statistics of them:

<Ticker intervals=[0.0899, 0.0632, 0.0543, 0.0713, 0.0681] min=0.0543 mean=0.0694 max=0.0899>

StopWatch

With StopWatch, you can measure the execution time of a code block, even repeatedly, and get statistics about the time spent on different invocations. You can use StopWatch, for example, to profile the inference time of your machine learning model or your preprocessing or postprocessing functions. Stopwatches can be organized in a hierarchy, where the parent watch measures the summary of the time of child watches.

Example usage:

import time
import random

with StopWatch('root') as root:
    with root.child('task1', max_intervals=5) as task1:
        time.sleep(0.01)
        with task1.child('subtask1_1') as subtask1_1:
            time.sleep(0.03)
        with task1.child('subtask1_2'):
            time.sleep(0.07)
        with task1.child('subtask1_3'):
            time.sleep(0.09)
        with subtask1_1:
            time.sleep(0.05)
    for i in range(5)
        with root.child('task2') as task2:
            time.sleep(random.random() / 10)
print(root)

Results:

<StopWatch name=root intervals=[0.5416] min=0.5416 mean=0.5416 max=0.5416 children=[
    <StopWatch name=task1 intervals=[0.2505] min=0.2505 mean=0.2505 max=0.2505 children=[
        <StopWatch name=subtask1_1 intervals=[0.0301, 0.0501] min=0.0301 mean=0.0401 max=0.0501>,
        <StopWatch name=subtask1_2 intervals=[0.0701] min=0.0701 mean=0.0701 max=0.0701>,
        <StopWatch name=subtask1_3 intervals=[0.0901] min=0.0901 mean=0.0901 max=0.0901>
    ]>,
    <StopWatch name=task2 intervals=[0.0275, 0.0825, 0.0334, 0.0843, 0.0633] min=0.0275 mean=0.0582 max=0.0843>
]>

You can access all interval data, as well as the statistical values using StopWatch properties.

Schedules

Schedules allow you to schedule the execution of a function at a later time.

It is important to note that Schedule instances do not intrinsically have an event loop or use kernel-based timing operations. Instead, call regularly the tick() method of the Schedule, and the scheduled function will be executed when the next tick() is called after the scheduled time. When developing Panorama applications, you typically call the tick() function in the frame processing loop.

You can also specify a python executor when creating Schedule objects. If an executor is specified, the scheduled function will be called asynchronously using that executor, the tick() method can immediately return, and the scheduled function will be executed in another thread.

The following Schedules are available to you:

Finally, AlarmClock allows you to handle a collection of Schedule instances with the invocation of a single tick() method.

Example usage:

import time, datetime
from concurrent.futures import ThreadPoolExecutor
from backpack.timepiece import (
    local_now, AtSchedule, IntervalSchedule, AlarmClock, OrdinalSchedule, Callback
)

def get_callback(name, executor):
    return Callback(
        cb=lambda name: print(f'{name} was called at {datetime.datetime.now()}'),
        cbkwargs={'name': name},
        executor=executor
    )

executor = ThreadPoolExecutor()

at = local_now() + datetime.timedelta(seconds=3)
atschedule = AtSchedule(
    at=at,
    callback=get_callback('AtSchedule', executor)
)

iv = datetime.timedelta(seconds=1.35)
ivschedule = IntervalSchedule(
    interval=iv,
    callback=get_callback('IntervalSchedule', executor)
)

ordinalschedule = OrdinalSchedule(
    ordinal=17,
    callback=get_callback('OrdinalSchedule', executor),
)

alarmclock = AlarmClock([atschedule, ivschedule, ordinalschedule])

for i in range(25*5):
    alarmclock.tick()
    time.sleep(1/25)

Results:

IntervalSchedule was called at 2022-02-18 13:08:27.025772+00:00
OrdinalSchedule was called at 2022-02-18 13:08:27.669900+00:00
OrdinalSchedule was called at 2022-02-18 13:08:28.354350+00:00
IntervalSchedule was called at 2022-02-18 13:08:28.395027+00:00
OrdinalSchedule was called at 2022-02-18 13:08:29.039454+00:00
OrdinalSchedule was called at 2022-02-18 13:08:29.724104+00:00
IntervalSchedule was called at 2022-02-18 13:08:29.764723+00:00
AtSchedule was called at 2022-02-18 13:08:30.046814+00:00
OrdinalSchedule was called at 2022-02-18 13:08:30.409206+00:00
IntervalSchedule was called at 2022-02-18 13:08:31.093138+00:00
OrdinalSchedule was called at 2022-02-18 13:08:31.093451+00:00
OrdinalSchedule was called at 2022-02-18 13:08:31.776985+00:00

Tachometer

A BaseTachometer combines an instance of BaseTimer subclass and IntervalSchedule to measure the time interval of a recurring event and periodically report statistics about it. You can use it, for example, to report the frame processing time statistics to an external service. You can specify the reporting interval and a callback function that will be called with the timing statistics. You should consider using an executor, as your reporting callback can take a considerable amount of time to finish, and you might not want to hold up the processing loop synchronously meanwhile.

BaseTachometer is not intended to be instantiated directly. Instead, you should use one of its subclasses, TickerTachometer or StopWatchTachometer. The former wraps the functionality of a Ticker, the later a StopWatch.

Example usage of TickerTachometer:

import datetime, time, random
from concurrent.futures import ThreadPoolExecutor
from backpack.timepiece import TickerTachometer

def stats_callback(timestamp, ticker):
    print('timestamp:', timestamp)
    print(f'min: {ticker.min():.4f}, max: {ticker.max():.4f}, '
          f'sum: {ticker.sum():.4f}, num: {ticker.len()}')

tach = TickerTachometer(
    stats_callback=stats_callback,
    stats_interval=datetime.timedelta(seconds=2),
    executor=ThreadPoolExecutor()
)

for i in range(200):
    tach.tick()
    time.sleep(random.random() / 10)

Results:

timestamp: 2022-02-18 13:08:34.074238+00:00
min: 0.0003, max: 0.0979, sum: 2.0156, num: 36
timestamp: 2022-02-18 13:08:36.102133+00:00
min: 0.0005, max: 0.0998, sum: 2.0279, num: 40
timestamp: 2022-02-18 13:08:38.105702+00:00
min: 0.0005, max: 0.0984, sum: 2.0036, num: 43
timestamp: 2022-02-18 13:08:40.083832+00:00
min: 0.0028, max: 0.0975, sum: 1.9781, num: 39

Example usage of StopWatchTachometer:

def stats_callback(timestamp, ticker):
    print('timestamp:', timestamp)
    print(f'min: {ticker.min():.4f}, max: {ticker.max():.4f}, '
          f'sum: {ticker.sum():.4f}, num: {ticker.len()}')

tach = StopWatchTachometer(
    stats_callback=stats_callback,
    stats_interval=datetime.timedelta(seconds=2)
)

# as a context manager:
for i in range(200):
    with tach:
        time.sleep(random.random() / 10)

# or as a decorator:
@tach.measure
def long_running_func():
    time.sleep(random.random() / 10)

for i in range(200):
    long_running_func()

CloudWatchTimerAdapter

CloudWatchTimerAdapter provides an easy-to use callback implementation that sends frame processing time statistics to AWS CloudWatch Metrics service. You can use this class as a drop-in to your frame processing loop. It will give you detailed statistics about the behavior the timing of your application and you can mount CloudWatch alarms on this metric to receive email or SMS notifications when your application stops processing the video for whatever reason.

To successfully use CloudWatchTimerAdapter, you should grant the execution of the following operations to the Panorama Application IAM Role:

  • cloudwatch:PutMetricData

The following example (a snippet from a Panorama Application implementation) shows you how you can combine together AutoIdentity, TickerTachometer, and CloudWatchTimerAdapter to get frame processing time metrics in the CloudWatch service of your AWS account:

from concurrent.futures import ThreadPoolExecutor
import boto3
from backpack.autoidentity import AutoIdentity
from backpack.cwadapter import CloudWatchTimerAdapter
from backpack.timepiece import TickerTachometer

# You might want to read these values from Panorama application parameters
service_region = 'us-east-1'
device_region = 'us-east-1'

class Application(panoramasdk.node):

    def __init__(self):
        super().__init__()
        self.session = boto3.Session(region_name=service_region)
        self.executor = ThreadPoolExecutor()
        self.auto_identity = AutoIdentity(device_region=device_region)
        self.cw_adapter = CloudWatchTimerAdapter(
            namespace='MyPanoramaMetrics',
            metric_name='frame_processing_time',
            dimensions={
                'application_name': self.auto_identity.application_name or 'unknown',
                'device_id': self.auto_identity.device_id or 'unknown',
                'application_instance_id': self.auto_identity.application_instance_id or 'unknown'
            },
            executor=self.executor,
            boto3_session=self.session
        )
        self.tacho = TickerTachometer(
            stats_callback=self.cw_adapter.send_metrics
        )

    def process_streams(self):

        # call Tachometer
        self.tacho.tick()

        # this  will block until next frame is available
        streams = self.inputs.video_in.get()

        for stream in streams:
            # process stream.image here ...
            pass

        self.outputs.video_out.put(streams)

def main():
    try:
        app = Application()
        while True:
            app.process_streams()
    except Exception:
        print('Exception during processing loop.')

main()

For more information, refer to the timepiece and cwadapter API documentation.