Coverage for backpack/timepiece.py: 93%
240 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 23:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-30 23:12 +0000
1''' This module contains time-related utility methods for measuring code execution time
2and scheduling tasks in an external event loop. '''
4import time
5import datetime
6import threading
7from collections import deque
8from itertools import islice
9from typing import List, Deque, Optional, Iterator, Dict, Any, Callable, Tuple
10from abc import ABC, abstractmethod
11import concurrent.futures
12import functools
14from dateutil.tz import tzlocal
16def local_now() -> datetime.datetime:
17 ''' Returns the current time in local time zone.
19 Returns:
20 A timezone aware datetime instance in the local time zone.
21 '''
22 return datetime.datetime.now(tz=tzlocal())
24def local_dt(dt: datetime.datetime) -> datetime.datetime:
25 ''' Converts the supplied naive datetime to be time zone aware in the local time zone.
27 Args:
28 dt: The naive datetime instance.
30 Returns:
31 A timezone aware datetime instance in the local time zone.
32 '''
33 return dt.astimezone(tz=tzlocal())
35def panorama_timestamp_to_datetime(panorama_ts: Tuple[int, int]) -> datetime.datetime:
36 ''' Converts panoramasdk.media.time_stamp (seconds, microseconds)
37 tuple to python datetime.
39 Args:
40 panorama_ts: The Panorama timestamp
42 Returns:
43 A python datetime instance.
44 '''
45 sec, microsec = panorama_ts
46 return datetime.datetime.fromtimestamp(sec + microsec / 1000000.0)
49class BaseTimer(ABC):
50 ''' Base class for code execution time measuring timers.
52 Args:
53 max_intervals: Maximum number of intervals to remember.
54 '''
56 # Print at most this many intervals in __repr__
57 MAX_REPR_INTERVALS = 5
59 def __init__(self, max_intervals:int=10):
60 self.intervals: Deque[float] = deque(maxlen=max_intervals)
62 def min(self) -> float:
63 ''' Returns the shortest time interval between the events (in seconds). '''
64 return min(self.intervals) if len(self.intervals) > 0 else 0.0
66 def max(self) -> float:
67 ''' Returns the longest time interval between the events (in seconds). '''
68 return max(self.intervals) if len(self.intervals) > 0 else 0.0
70 def mean(self) -> float:
71 ''' Returns the mean time interval between the events (in seconds). '''
72 return self.sum() / self.len() if self.len() > 0 else 0.0
74 def sum(self) -> float:
75 ''' Returns the sum of the time interval between recorded events (in seconds). '''
76 return sum(self.intervals) if len(self.intervals) > 0 else 0.0
78 def len(self) -> int:
79 ''' Returns the number of recorded events. '''
80 return len(self.intervals)
82 def freq(self) -> float:
83 ''' Returns the mean frequency of the events (in Hertz). '''
84 mean = self.mean()
85 return 1 / mean if mean > 0 else 0.0
87 def reset(self) -> None:
88 ''' Resets the timer. '''
89 self.intervals.clear()
91 def _repr_intervals(self, intervals):
92 iv_list = [f'{iv:.4f}' for iv in islice(intervals, self.MAX_REPR_INTERVALS)]
93 if len(intervals) > self.MAX_REPR_INTERVALS:
94 iv_list.append('...')
95 return f'[{", ".join(iv_list)}]'
97 def _repr_props(self) -> Iterator[str]:
98 if self.intervals:
99 yield f'intervals={self._repr_intervals(self.intervals)}'
100 yield f'min={self.min():.4f}'
101 yield f'mean={self.mean():.4f}'
102 yield f'max={self.max():.4f}'
104 def __repr__(self) -> str:
105 elements = [self.__class__.__name__] + list(self._repr_props())
106 return '<' + ' '.join(elements) + '>'
109class Ticker(BaseTimer):
111 ''' A performance profiler that measures the time interval between repeatedly
112 occurring events.
114 Ticker can also calculate basic statistics of the time intervals.
116 Example usage::
118 ticker = Ticker(max_intervals=5)
119 for i in range(10):
120 ticker.tick()
121 time.sleep(random.random() / 10)
122 print(ticker)
124 Results::
126 <Ticker intervals=[0.0899, 0.0632, 0.0543, 0.0713, 0.0681] min=0.0543 mean=0.0694 max=0.0899>
128 :ivar max_intervals: Maximum number of time intervals to be recorded.
129 Only the last max_intervals number of intervals will be kept.
130 :ivar intervals: The recorded intervals in seconds between the successive events.
131 '''
133 def __init__(self, max_intervals:int=10):
134 super().__init__(max_intervals=max_intervals)
135 self._last_tick: Optional[float] = None
137 def tick(self) -> None:
138 ''' Registers a tick in this Ticker. '''
139 now = time.perf_counter()
140 if self._last_tick is not None:
141 self.intervals.append(now - self._last_tick)
142 self._last_tick = now
145class StopWatch(BaseTimer):
147 ''' A simple performance profiler with context managers.
149 There are two ways to use StopWatch: as a context manager, or with the `tick()`
150 method. You can use the same StopWatch object in both ways at the same time.
152 When used as a context manager, StopWatch can be used to measure the
153 performance of python code using an elegant API based on context manager.
154 You can measure nested and serial execution.
156 The second way is to measure the average execution of repeating tasks with
157 the `tick()` function call. After enough data samples were collected with
158 `tick()`, you can calculate the average execution of the task calling
159 `mean_tick()`.
161 Example usage::
163 import time
164 with StopWatch('root') as root:
165 with root.child('task1', max_ticks=5) as task:
166 time.sleep(0.01)
167 with task.child('subtask1.1'):
168 time.sleep(0.03)
169 with task.child('subtask1.2'):
170 time.sleep(0.07)
171 with task.child('subtask1.3'):
172 time.sleep(0.09)
173 with root.child('task2') as task:
174 time.sleep(0.17)
175 print(root)
178 Results::
180 <StopWatch name=root total_elapsed=0.9222 children=[
181 <StopWatch
182 name=task1
183 total_elapsed=0.7520
184 ticks=[0.0501, 0.0601, 0.0701, 0.0802, 0.0902]
185 mean_tick=0.0701
186 children=[
187 <StopWatch name=subtask1.1 total_elapsed=0.0301>,
188 <StopWatch name=subtask1.2 total_elapsed=0.0701>,
189 <StopWatch name=subtask1.3 total_elapsed=0.0902>
190 ]
191 >,
192 <StopWatch name=task2 total_elapsed=0.1702>
193 ]>
195 Args:
196 name: The name of this StopWatch
197 max_intervals: Maximum number of intervals to be recorded.
199 Attributes:
200 name (str): The name of this StopWatch
201 parent (Optional['StopWatch']): The parent StopWatch
202 children (Dict[str, 'StopWatch']): The name-indexed dictionary of the children StopWatches
203 '''
205 def __init__(self, name:str, max_intervals:int=10):
206 super().__init__(max_intervals=max_intervals)
207 self.name: str = name
208 self.parent: Optional['StopWatch'] = None
209 self.children: Dict[str, 'StopWatch'] = {}
210 self._start: Optional[float] = None
212 def child(self, name:str, max_intervals:Optional[int]=None) -> 'StopWatch':
213 ''' Creates a new or returns an existing child of this StopWatch.
215 Args:
216 name: Name of the child StopWatch.
217 max_intervals: Maximum number of intervals to be recorded in the
218 child. If None, max_intervals of the parent (this object) will be used.
219 '''
220 if name in self.children:
221 return self.children[name]
222 if max_intervals is None:
223 max_intervals = self.intervals.maxlen
224 child = self.__class__(name, max_intervals=max_intervals)
225 child.parent = self
226 self.children[name] = child
227 return child
229 def parents(self) -> Iterator['StopWatch']:
230 ''' Returns a generator of all parents of this StopWatch. '''
231 current = self
232 while True:
233 current = current.parent
234 if not current:
235 break
236 yield current
238 def full_name(self) -> str:
239 ''' Returns the fully qualified name of this StopWatch, including
240 all parents' name. '''
241 family = [self.name]
242 family.extend(p.name for p in self.parents())
243 return '.'.join(reversed(family))
245 def __enter__(self) -> 'StopWatch':
246 ''' Context manager entry point returning self.'''
247 self._start = time.perf_counter()
248 return self
250 def __exit__(self, *_) -> None:
251 ''' Context manager exit. '''
252 self.intervals.append(time.perf_counter() - self._start)
253 self._start = None
255 def measure(self, fun):
256 ''' Use stopwatch as decorator.
258 Usage::
260 import time
261 from backpack.timepiece import StopWatch
263 stopwatch = StopWatch('stopwatch')
265 @stopwatch.measure
266 def long_running_func():
267 time.sleep(10)
269 '''
270 @functools.wraps(fun)
271 def wrapper(*args, **kwds):
272 self.__enter__()
273 fun(*args, **kwds)
274 self.__exit__()
275 return wrapper
277 @property
278 def level(self) -> int:
279 ''' Returns the number of parents. '''
280 return len(list(self.parents()))
282 def _repr_props(self) -> Iterator[str]:
283 yield f'name={self.name}'
284 yield from super()._repr_props()
286 def __repr__(self) -> str:
287 ''' Indented string representation of this StopWatch.'''
288 lvl = self.level
289 indent = " " * lvl
290 newline = '\n'
291 props = list(self._repr_props())
292 if self.children:
293 children = (repr(c) for c in self.children.values())
294 props.append(f'children=[{", ".join(children)}\n{indent}]')
295 return f'{newline if lvl > 0 else ""}{indent}<{self.__class__.__name__} {" ".join(props)}>'
298class Callback:
299 ''' Encapsulates a callback function and its arguments.
301 The callback can be optionally called asynchronously.
303 Args:
304 cb (Callable): The callback function to be called
305 cbargs (Optional[List[Any]]): The callback function to be called
306 cbkwargs (Optional[Dict[str, Any]]): Keyword arguments of the callback
307 executor (Optional[concurrent.futures.Executor]): If specified, the callback function will
308 be called asynchronously using this executor.
309 '''
311 # pylint: disable=invalid-name,too-few-public-methods
312 # invalid-name disabled for the `cb` parameter. callable is also taken.
313 # too-few-public-methods disabled because this is a wrapper class around a callback function.
314 def __init__(
315 self,
316 cb: Callable,
317 cbargs: Optional[List[Any]] = None,
318 cbkwargs: Optional[Dict[str, Any]] = None,
319 executor: Optional[concurrent.futures.Executor] = None
320 ):
321 self.cb = cb
322 self.cbargs = cbargs or []
323 self.cbkwargs = cbkwargs or {}
324 self.executor = executor
326 def __call__(self):
327 if self.executor:
328 self.executor.submit(self.cb, *self.cbargs, **self.cbkwargs)
329 return None
330 return self.cb(*self.cbargs, **self.cbkwargs)
333class Schedule(ABC):
334 ''' Schedules a task to be called later with the help of an external
335 scheduler.
337 The external scheduler is expected to call the `tick()` method periodically,
338 most likely from an event-loop.
340 Args:
341 repeating (bool): If this schedule fires repeatedly
342 callback (Callback): The callback to be called when the scheduler fires
343 '''
345 def __init__(
346 self,
347 repeating: bool,
348 callback: Callback
349 ):
350 self.repeating = repeating
351 self.callback = callback
353 def fire(self) -> None:
354 ''' Fires the schedule calling the callback.
356 Returns:
357 If not using an executor (the callback is called synchronously), `fire()`
358 returns the return value of the callback. Otherwise it returns None.
359 '''
360 return self.callback()
362 @abstractmethod
363 def tick(self) -> Tuple[bool, Any]: # pylint: disable=no-self-use
364 ''' The heartbeat of the schedule to be called periodically.
366 Returns:
367 A tuple of (True, callback_return_value) if the schedule was fired,
368 otherwise (False, None)
369 '''
372class AtSchedule(Schedule):
373 ''' Schedules a task to be executed only once at a specific time.
375 The task will be executed at the next tick after the specified datetime.
377 Args:
378 at (datetime.datetime): When to execute the task
379 callback (Callback): The callback to be called when the scheduler fires
380 '''
382 # pylint: disable=invalid-name
383 # Disabled for the `at` parameter
385 def __init__(self, at: datetime.datetime, callback: Callback):
386 super().__init__(repeating=False, callback=callback)
387 self.at_lock = threading.Lock()
388 self.fire_lock = threading.Lock()
389 self.at = at
391 @property
392 def at(self) -> datetime.datetime:
393 ''' Property accessor for 'at'. '''
394 return self._at
396 @at.setter
397 def at(self, val: datetime.datetime):
398 ''' Property setter for 'at'. '''
399 with self.at_lock:
400 self._at = val
401 self._fired = False
403 def _do_tick(self):
404 now = local_now()
405 if not self._fired and self.at is not None and now > self.at:
406 res = self.fire()
407 self._fired = True
408 return (True, res)
409 return (False, None)
411 def tick(self) -> Tuple[bool, Any]:
412 if self.callback.executor:
413 with self.fire_lock:
414 return self._do_tick()
415 return self._do_tick()
418class IntervalSchedule(Schedule):
419 ''' Schedules a task to be executed at regular time intervals.
421 The task will be executed at the first tick and at each tick after
422 the specified time interval has passed.
424 Args:
425 interval (datetime.timedelta): The time interval of the executions
426 callback (Callback): The callback to be called when the scheduler fires
427 '''
429 def __init__(self, interval: datetime.timedelta, callback: Callback):
430 super().__init__(repeating=True, callback=callback)
431 self.interval = interval
432 self._next_fire = None
434 def _set_next_fire(self, now):
435 while self._next_fire <= now:
436 self._next_fire += self.interval
438 def tick(self) -> Tuple[bool, Any]:
439 now = local_now()
440 if not self._next_fire:
441 self._next_fire = now
442 if now >= self._next_fire:
443 res = (True, self.fire())
444 else:
445 res = (False, None)
446 self._set_next_fire(now)
447 return res
450class OrdinalSchedule(Schedule):
451 ''' Schedules a task to be executed at each nth tick.
453 At the first tick the task will not be executed.
455 Args:
456 ordinal (int): Execute the task once in every ordinal number of ticks. An OrdinalSchedule
457 with zero ordinal will never fire.
458 callback (Callback): The callback to be called when the scheduler fires
459 '''
461 def __init__(self, ordinal: int, callback: Callback):
462 super().__init__(repeating=True, callback=callback)
463 if ordinal < 0:
464 raise ValueError('ordinal must be greater or equal than zero')
465 self.ordinal = ordinal
466 self._counter = 0
468 def tick(self) -> Tuple[bool, Any]:
469 if self.ordinal == 0:
470 return (False, None)
471 self._counter += 1
472 if self._counter == self.ordinal:
473 res = (True, self.fire())
474 self._counter = 0
475 return res
476 return (False, None)
479class AlarmClock:
480 ''' An alarm clock can be used to bundle different schedules and send them
481 the tick event at once.
483 Args:
484 schedules (List[Schedule]): The list of the schedules.
485 '''
487 # pylint: disable=too-few-public-methods
488 # It happens that an alarm clock should only tick:
489 # real functionality is implemented by wrapped schedules
491 def __init__(self, schedules: List[Schedule]=None):
492 self.schedules = schedules or []
494 def tick(self) -> None:
495 ''' The heartbeat of the alarm clock to be called periodically.
497 Will forward the tick to the registered schedules.
498 '''
499 removables = []
500 for schedule in self.schedules:
501 fired, _ = schedule.tick()
502 if fired and not schedule.repeating:
503 removables.append(schedule)
504 for schedule in removables:
505 self.schedules.remove(schedule)
508class BaseTachometer(ABC):
509 ''' Abstract base class for tachometers.
511 A Tachometer can be used to measure the frequency of recurring events, and periodically report
512 statistics about it by calling a callback function with the following signature::
514 def stats_callback(timestamp: datetime.datetime, timer: BaseTimer):
515 pass
517 passing the timestamp of the last event, as well as the `BaseTimer` instance that
518 collected the events. You can access the `min()`, `max()`, `sum()` (total processing time)
519 and the `len()` (number of measurements) methods of the timer.
521 This class is not intended to be instantiated. Use one of the subclasses instead.
523 Args:
524 timer (BaseTimer): Instance of the BaseTimer subclass that will be used to report events.
525 stats_callback (Callable[[datetime.datetime, BaseTimer]): A callable with the above
526 signature that will be called when new statistics is available.
527 stats_interval (datetime.timedelta): The interval of the statistics calculation. Defaults
528 to one minute.
529 executor (Optional[concurrent.futures.Executor]): If specified, callback will be called
530 asynchronously using this executor
531 '''
533 # pylint: disable=too-few-public-methods
534 # It happens that a Tachometer should only tick and sometimes call the callback.
536 EXPECTED_MAX_FPS = 100
538 def __init__(
539 self,
540 timer: BaseTimer,
541 stats_callback: Callable[[datetime.datetime, BaseTimer], None],
542 stats_interval: datetime.timedelta = datetime.timedelta(seconds=60),
543 executor: Optional[concurrent.futures.Executor] = None
544 ):
545 self.stats_callback = stats_callback
546 self.schedule = IntervalSchedule(
547 interval=stats_interval,
548 callback=Callback(cb=self._calculate_stats, executor=executor)
549 )
550 self.timer = timer
552 def _calculate_stats(self):
553 timestamp = local_now()
554 if len(self.timer.intervals) == 0:
555 return (False, None)
556 res = self.stats_callback(timestamp, self.timer)
557 self.timer.intervals.clear()
558 return (True, res)
561class TickerTachometer(BaseTachometer):
562 ''' TickerTachometer is a combination of a Ticker and a Tachometer.
564 It reports statistics about the call frequency of the `tick()` method.
566 Call the `tick()` method of the tachometer each time an atomic event happens.
567 For example, if you are interested in the spastics of the frame processing
568 time of your application, call `tick` method each time you process a new
569 frame.
571 Args:
572 stats_callback (Callable[[datetime.datetime, Ticker]): A callable that will be called
573 when new statistics is available.
574 stats_interval (datetime.timedelta): The interval of the statistics calculation. Defaults
575 to one minute.
576 executor (Optional[concurrent.futures.Executor]): If specified, callback will be called
577 asynchronously using this executor
578 '''
580 def __init__(self,
581 stats_callback: Callable[[datetime.datetime, Ticker], None],
582 stats_interval: datetime.timedelta = datetime.timedelta(seconds=60),
583 executor: Optional[concurrent.futures.Executor] = None
584 ):
585 ticker_intervals = int(self.EXPECTED_MAX_FPS * stats_interval.total_seconds())
586 self.ticker = Ticker(max_intervals=ticker_intervals)
587 super().__init__(self.ticker, stats_callback, stats_interval, executor)
589 def tick(self) -> Tuple[bool, Any]:
590 ''' Call this method when a recurring event happens.
592 Returns:
593 Tuple[bool, Any]: A tuple with these elements:
595 - bool: True, if stats_callback was called
596 - the return value of stats_callback if it was called, else None
597 '''
598 self.ticker.tick()
599 return self.schedule.tick()
602class StopWatchTachometer(BaseTachometer):
603 ''' `StopWatchTachometer` is a combination of a StopWatch and a Tachometer.
605 It reports statistics about the elapsed time intervals.
607 Much like StopWatch, you can use `StopWatchTachometer` as a context manager or as a function
608 decorator.
610 For example::
612 swt = StopWatchTachometer(
613 stats_callback=lambda dt, timer: print('Mean interval:', timer.mean()),
614 stats_interval=datetime.timedelta(seconds=10)
615 )
617 for i in range(11):
618 with swt:
619 print('tick')
620 time.sleep(2)
622 @swt.measure
623 def long_running_func():
624 print('tack')
625 time.sleep(1.5)
627 for i in range(15):
628 long_running_func()
630 Args:
631 stats_callback (Callable[[datetime.datetime, Ticker]): A callable that will be called
632 when new statistics is available.
633 stats_interval (datetime.timedelta): The interval of the statistics calculation. Defaults
634 to one minute.
635 executor (Optional[concurrent.futures.Executor]): If specified, callback will be called
636 asynchronously using this executor
637 '''
639 def __init__(self,
640 stats_callback: Callable[[datetime.datetime, Ticker], None],
641 stats_interval: datetime.timedelta = datetime.timedelta(seconds=60),
642 executor: Optional[concurrent.futures.Executor] = None
643 ):
644 ticker_intervals = int(self.EXPECTED_MAX_FPS * stats_interval.total_seconds())
645 self.stopwatch = StopWatch('tachometer', max_intervals=ticker_intervals)
646 super().__init__(self.stopwatch, stats_callback, stats_interval, executor)
648 def __enter__(self) -> 'StopWatchTachometer':
649 ''' Context manager entry point returning self.'''
650 self.stopwatch.__enter__()
651 return self
653 def __exit__(self, *_) -> None:
654 ''' Context manager exit. '''
655 self.stopwatch.__exit__()
656 self.schedule.tick()
658 def measure(self, fun):
659 @functools.wraps(fun)
660 def wrapper(*args, **kwds):
661 self.__enter__()
662 fun(*args, **kwds)
663 self.__exit__()
664 return wrapper