Coverage for backpack/timepiece.py: 93%

240 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-30 23:12 +0000

1''' This module contains time-related utility methods for measuring code execution time 

2and scheduling tasks in an external event loop. ''' 

3 

4import time 

5import datetime 

6import threading 

7from collections import deque 

8from itertools import islice 

9from typing import List, Deque, Optional, Iterator, Dict, Any, Callable, Tuple 

10from abc import ABC, abstractmethod 

11import concurrent.futures 

12import functools 

13 

14from dateutil.tz import tzlocal 

15 

16def local_now() -> datetime.datetime: 

17 ''' Returns the current time in local time zone. 

18 

19 Returns: 

20 A timezone aware datetime instance in the local time zone. 

21 ''' 

22 return datetime.datetime.now(tz=tzlocal()) 

23 

24def local_dt(dt: datetime.datetime) -> datetime.datetime: 

25 ''' Converts the supplied naive datetime to be time zone aware in the local time zone. 

26 

27 Args: 

28 dt: The naive datetime instance. 

29 

30 Returns: 

31 A timezone aware datetime instance in the local time zone. 

32 ''' 

33 return dt.astimezone(tz=tzlocal()) 

34 

35def panorama_timestamp_to_datetime(panorama_ts: Tuple[int, int]) -> datetime.datetime: 

36 ''' Converts panoramasdk.media.time_stamp (seconds, microseconds) 

37 tuple to python datetime. 

38 

39 Args: 

40 panorama_ts: The Panorama timestamp 

41 

42 Returns: 

43 A python datetime instance. 

44 ''' 

45 sec, microsec = panorama_ts 

46 return datetime.datetime.fromtimestamp(sec + microsec / 1000000.0) 

47 

48 

49class BaseTimer(ABC): 

50 ''' Base class for code execution time measuring timers. 

51 

52 Args: 

53 max_intervals: Maximum number of intervals to remember. 

54 ''' 

55 

56 # Print at most this many intervals in __repr__ 

57 MAX_REPR_INTERVALS = 5 

58 

59 def __init__(self, max_intervals:int=10): 

60 self.intervals: Deque[float] = deque(maxlen=max_intervals) 

61 

62 def min(self) -> float: 

63 ''' Returns the shortest time interval between the events (in seconds). ''' 

64 return min(self.intervals) if len(self.intervals) > 0 else 0.0 

65 

66 def max(self) -> float: 

67 ''' Returns the longest time interval between the events (in seconds). ''' 

68 return max(self.intervals) if len(self.intervals) > 0 else 0.0 

69 

70 def mean(self) -> float: 

71 ''' Returns the mean time interval between the events (in seconds). ''' 

72 return self.sum() / self.len() if self.len() > 0 else 0.0 

73 

74 def sum(self) -> float: 

75 ''' Returns the sum of the time interval between recorded events (in seconds). ''' 

76 return sum(self.intervals) if len(self.intervals) > 0 else 0.0 

77 

78 def len(self) -> int: 

79 ''' Returns the number of recorded events. ''' 

80 return len(self.intervals) 

81 

82 def freq(self) -> float: 

83 ''' Returns the mean frequency of the events (in Hertz). ''' 

84 mean = self.mean() 

85 return 1 / mean if mean > 0 else 0.0 

86 

87 def reset(self) -> None: 

88 ''' Resets the timer. ''' 

89 self.intervals.clear() 

90 

91 def _repr_intervals(self, intervals): 

92 iv_list = [f'{iv:.4f}' for iv in islice(intervals, self.MAX_REPR_INTERVALS)] 

93 if len(intervals) > self.MAX_REPR_INTERVALS: 

94 iv_list.append('...') 

95 return f'[{", ".join(iv_list)}]' 

96 

97 def _repr_props(self) -> Iterator[str]: 

98 if self.intervals: 

99 yield f'intervals={self._repr_intervals(self.intervals)}' 

100 yield f'min={self.min():.4f}' 

101 yield f'mean={self.mean():.4f}' 

102 yield f'max={self.max():.4f}' 

103 

104 def __repr__(self) -> str: 

105 elements = [self.__class__.__name__] + list(self._repr_props()) 

106 return '<' + ' '.join(elements) + '>' 

107 

108 

109class Ticker(BaseTimer): 

110 

111 ''' A performance profiler that measures the time interval between repeatedly 

112 occurring events. 

113 

114 Ticker can also calculate basic statistics of the time intervals. 

115 

116 Example usage:: 

117 

118 ticker = Ticker(max_intervals=5) 

119 for i in range(10): 

120 ticker.tick() 

121 time.sleep(random.random() / 10) 

122 print(ticker) 

123 

124 Results:: 

125 

126 <Ticker intervals=[0.0899, 0.0632, 0.0543, 0.0713, 0.0681] min=0.0543 mean=0.0694 max=0.0899> 

127 

128 :ivar max_intervals: Maximum number of time intervals to be recorded. 

129 Only the last max_intervals number of intervals will be kept. 

130 :ivar intervals: The recorded intervals in seconds between the successive events. 

131 ''' 

132 

133 def __init__(self, max_intervals:int=10): 

134 super().__init__(max_intervals=max_intervals) 

135 self._last_tick: Optional[float] = None 

136 

137 def tick(self) -> None: 

138 ''' Registers a tick in this Ticker. ''' 

139 now = time.perf_counter() 

140 if self._last_tick is not None: 

141 self.intervals.append(now - self._last_tick) 

142 self._last_tick = now 

143 

144 

145class StopWatch(BaseTimer): 

146 

147 ''' A simple performance profiler with context managers. 

148 

149 There are two ways to use StopWatch: as a context manager, or with the `tick()` 

150 method. You can use the same StopWatch object in both ways at the same time. 

151 

152 When used as a context manager, StopWatch can be used to measure the 

153 performance of python code using an elegant API based on context manager. 

154 You can measure nested and serial execution. 

155 

156 The second way is to measure the average execution of repeating tasks with 

157 the `tick()` function call. After enough data samples were collected with 

158 `tick()`, you can calculate the average execution of the task calling 

159 `mean_tick()`. 

160 

161 Example usage:: 

162 

163 import time 

164 with StopWatch('root') as root: 

165 with root.child('task1', max_ticks=5) as task: 

166 time.sleep(0.01) 

167 with task.child('subtask1.1'): 

168 time.sleep(0.03) 

169 with task.child('subtask1.2'): 

170 time.sleep(0.07) 

171 with task.child('subtask1.3'): 

172 time.sleep(0.09) 

173 with root.child('task2') as task: 

174 time.sleep(0.17) 

175 print(root) 

176 

177 

178 Results:: 

179 

180 <StopWatch name=root total_elapsed=0.9222 children=[ 

181 <StopWatch 

182 name=task1 

183 total_elapsed=0.7520 

184 ticks=[0.0501, 0.0601, 0.0701, 0.0802, 0.0902] 

185 mean_tick=0.0701 

186 children=[ 

187 <StopWatch name=subtask1.1 total_elapsed=0.0301>, 

188 <StopWatch name=subtask1.2 total_elapsed=0.0701>, 

189 <StopWatch name=subtask1.3 total_elapsed=0.0902> 

190 ] 

191 >, 

192 <StopWatch name=task2 total_elapsed=0.1702> 

193 ]> 

194 

195 Args: 

196 name: The name of this StopWatch 

197 max_intervals: Maximum number of intervals to be recorded. 

198 

199 Attributes: 

200 name (str): The name of this StopWatch 

201 parent (Optional['StopWatch']): The parent StopWatch 

202 children (Dict[str, 'StopWatch']): The name-indexed dictionary of the children StopWatches 

203 ''' 

204 

205 def __init__(self, name:str, max_intervals:int=10): 

206 super().__init__(max_intervals=max_intervals) 

207 self.name: str = name 

208 self.parent: Optional['StopWatch'] = None 

209 self.children: Dict[str, 'StopWatch'] = {} 

210 self._start: Optional[float] = None 

211 

212 def child(self, name:str, max_intervals:Optional[int]=None) -> 'StopWatch': 

213 ''' Creates a new or returns an existing child of this StopWatch. 

214 

215 Args: 

216 name: Name of the child StopWatch. 

217 max_intervals: Maximum number of intervals to be recorded in the 

218 child. If None, max_intervals of the parent (this object) will be used. 

219 ''' 

220 if name in self.children: 

221 return self.children[name] 

222 if max_intervals is None: 

223 max_intervals = self.intervals.maxlen 

224 child = self.__class__(name, max_intervals=max_intervals) 

225 child.parent = self 

226 self.children[name] = child 

227 return child 

228 

229 def parents(self) -> Iterator['StopWatch']: 

230 ''' Returns a generator of all parents of this StopWatch. ''' 

231 current = self 

232 while True: 

233 current = current.parent 

234 if not current: 

235 break 

236 yield current 

237 

238 def full_name(self) -> str: 

239 ''' Returns the fully qualified name of this StopWatch, including 

240 all parents' name. ''' 

241 family = [self.name] 

242 family.extend(p.name for p in self.parents()) 

243 return '.'.join(reversed(family)) 

244 

245 def __enter__(self) -> 'StopWatch': 

246 ''' Context manager entry point returning self.''' 

247 self._start = time.perf_counter() 

248 return self 

249 

250 def __exit__(self, *_) -> None: 

251 ''' Context manager exit. ''' 

252 self.intervals.append(time.perf_counter() - self._start) 

253 self._start = None 

254 

255 def measure(self, fun): 

256 ''' Use stopwatch as decorator. 

257 

258 Usage:: 

259 

260 import time 

261 from backpack.timepiece import StopWatch 

262 

263 stopwatch = StopWatch('stopwatch') 

264 

265 @stopwatch.measure 

266 def long_running_func(): 

267 time.sleep(10) 

268 

269 ''' 

270 @functools.wraps(fun) 

271 def wrapper(*args, **kwds): 

272 self.__enter__() 

273 fun(*args, **kwds) 

274 self.__exit__() 

275 return wrapper 

276 

277 @property 

278 def level(self) -> int: 

279 ''' Returns the number of parents. ''' 

280 return len(list(self.parents())) 

281 

282 def _repr_props(self) -> Iterator[str]: 

283 yield f'name={self.name}' 

284 yield from super()._repr_props() 

285 

286 def __repr__(self) -> str: 

287 ''' Indented string representation of this StopWatch.''' 

288 lvl = self.level 

289 indent = " " * lvl 

290 newline = '\n' 

291 props = list(self._repr_props()) 

292 if self.children: 

293 children = (repr(c) for c in self.children.values()) 

294 props.append(f'children=[{", ".join(children)}\n{indent}]') 

295 return f'{newline if lvl > 0 else ""}{indent}<{self.__class__.__name__} {" ".join(props)}>' 

296 

297 

298class Callback: 

299 ''' Encapsulates a callback function and its arguments. 

300 

301 The callback can be optionally called asynchronously. 

302 

303 Args: 

304 cb (Callable): The callback function to be called 

305 cbargs (Optional[List[Any]]): The callback function to be called 

306 cbkwargs (Optional[Dict[str, Any]]): Keyword arguments of the callback 

307 executor (Optional[concurrent.futures.Executor]): If specified, the callback function will 

308 be called asynchronously using this executor. 

309 ''' 

310 

311 # pylint: disable=invalid-name,too-few-public-methods 

312 # invalid-name disabled for the `cb` parameter. callable is also taken. 

313 # too-few-public-methods disabled because this is a wrapper class around a callback function. 

314 def __init__( 

315 self, 

316 cb: Callable, 

317 cbargs: Optional[List[Any]] = None, 

318 cbkwargs: Optional[Dict[str, Any]] = None, 

319 executor: Optional[concurrent.futures.Executor] = None 

320 ): 

321 self.cb = cb 

322 self.cbargs = cbargs or [] 

323 self.cbkwargs = cbkwargs or {} 

324 self.executor = executor 

325 

326 def __call__(self): 

327 if self.executor: 

328 self.executor.submit(self.cb, *self.cbargs, **self.cbkwargs) 

329 return None 

330 return self.cb(*self.cbargs, **self.cbkwargs) 

331 

332 

333class Schedule(ABC): 

334 ''' Schedules a task to be called later with the help of an external 

335 scheduler. 

336 

337 The external scheduler is expected to call the `tick()` method periodically, 

338 most likely from an event-loop. 

339 

340 Args: 

341 repeating (bool): If this schedule fires repeatedly 

342 callback (Callback): The callback to be called when the scheduler fires 

343 ''' 

344 

345 def __init__( 

346 self, 

347 repeating: bool, 

348 callback: Callback 

349 ): 

350 self.repeating = repeating 

351 self.callback = callback 

352 

353 def fire(self) -> None: 

354 ''' Fires the schedule calling the callback. 

355 

356 Returns: 

357 If not using an executor (the callback is called synchronously), `fire()` 

358 returns the return value of the callback. Otherwise it returns None. 

359 ''' 

360 return self.callback() 

361 

362 @abstractmethod 

363 def tick(self) -> Tuple[bool, Any]: # pylint: disable=no-self-use 

364 ''' The heartbeat of the schedule to be called periodically. 

365 

366 Returns: 

367 A tuple of (True, callback_return_value) if the schedule was fired, 

368 otherwise (False, None) 

369 ''' 

370 

371 

372class AtSchedule(Schedule): 

373 ''' Schedules a task to be executed only once at a specific time. 

374 

375 The task will be executed at the next tick after the specified datetime. 

376 

377 Args: 

378 at (datetime.datetime): When to execute the task 

379 callback (Callback): The callback to be called when the scheduler fires 

380 ''' 

381 

382 # pylint: disable=invalid-name 

383 # Disabled for the `at` parameter 

384 

385 def __init__(self, at: datetime.datetime, callback: Callback): 

386 super().__init__(repeating=False, callback=callback) 

387 self.at_lock = threading.Lock() 

388 self.fire_lock = threading.Lock() 

389 self.at = at 

390 

391 @property 

392 def at(self) -> datetime.datetime: 

393 ''' Property accessor for 'at'. ''' 

394 return self._at 

395 

396 @at.setter 

397 def at(self, val: datetime.datetime): 

398 ''' Property setter for 'at'. ''' 

399 with self.at_lock: 

400 self._at = val 

401 self._fired = False 

402 

403 def _do_tick(self): 

404 now = local_now() 

405 if not self._fired and self.at is not None and now > self.at: 

406 res = self.fire() 

407 self._fired = True 

408 return (True, res) 

409 return (False, None) 

410 

411 def tick(self) -> Tuple[bool, Any]: 

412 if self.callback.executor: 

413 with self.fire_lock: 

414 return self._do_tick() 

415 return self._do_tick() 

416 

417 

418class IntervalSchedule(Schedule): 

419 ''' Schedules a task to be executed at regular time intervals. 

420 

421 The task will be executed at the first tick and at each tick after 

422 the specified time interval has passed. 

423 

424 Args: 

425 interval (datetime.timedelta): The time interval of the executions 

426 callback (Callback): The callback to be called when the scheduler fires 

427 ''' 

428 

429 def __init__(self, interval: datetime.timedelta, callback: Callback): 

430 super().__init__(repeating=True, callback=callback) 

431 self.interval = interval 

432 self._next_fire = None 

433 

434 def _set_next_fire(self, now): 

435 while self._next_fire <= now: 

436 self._next_fire += self.interval 

437 

438 def tick(self) -> Tuple[bool, Any]: 

439 now = local_now() 

440 if not self._next_fire: 

441 self._next_fire = now 

442 if now >= self._next_fire: 

443 res = (True, self.fire()) 

444 else: 

445 res = (False, None) 

446 self._set_next_fire(now) 

447 return res 

448 

449 

450class OrdinalSchedule(Schedule): 

451 ''' Schedules a task to be executed at each nth tick. 

452 

453 At the first tick the task will not be executed. 

454 

455 Args: 

456 ordinal (int): Execute the task once in every ordinal number of ticks. An OrdinalSchedule 

457 with zero ordinal will never fire. 

458 callback (Callback): The callback to be called when the scheduler fires 

459 ''' 

460 

461 def __init__(self, ordinal: int, callback: Callback): 

462 super().__init__(repeating=True, callback=callback) 

463 if ordinal < 0: 

464 raise ValueError('ordinal must be greater or equal than zero') 

465 self.ordinal = ordinal 

466 self._counter = 0 

467 

468 def tick(self) -> Tuple[bool, Any]: 

469 if self.ordinal == 0: 

470 return (False, None) 

471 self._counter += 1 

472 if self._counter == self.ordinal: 

473 res = (True, self.fire()) 

474 self._counter = 0 

475 return res 

476 return (False, None) 

477 

478 

479class AlarmClock: 

480 ''' An alarm clock can be used to bundle different schedules and send them 

481 the tick event at once. 

482 

483 Args: 

484 schedules (List[Schedule]): The list of the schedules. 

485 ''' 

486 

487 # pylint: disable=too-few-public-methods 

488 # It happens that an alarm clock should only tick: 

489 # real functionality is implemented by wrapped schedules 

490 

491 def __init__(self, schedules: List[Schedule]=None): 

492 self.schedules = schedules or [] 

493 

494 def tick(self) -> None: 

495 ''' The heartbeat of the alarm clock to be called periodically. 

496 

497 Will forward the tick to the registered schedules. 

498 ''' 

499 removables = [] 

500 for schedule in self.schedules: 

501 fired, _ = schedule.tick() 

502 if fired and not schedule.repeating: 

503 removables.append(schedule) 

504 for schedule in removables: 

505 self.schedules.remove(schedule) 

506 

507 

508class BaseTachometer(ABC): 

509 ''' Abstract base class for tachometers. 

510 

511 A Tachometer can be used to measure the frequency of recurring events, and periodically report 

512 statistics about it by calling a callback function with the following signature:: 

513 

514 def stats_callback(timestamp: datetime.datetime, timer: BaseTimer): 

515 pass 

516 

517 passing the timestamp of the last event, as well as the `BaseTimer` instance that 

518 collected the events. You can access the `min()`, `max()`, `sum()` (total processing time) 

519 and the `len()` (number of measurements) methods of the timer. 

520 

521 This class is not intended to be instantiated. Use one of the subclasses instead. 

522 

523 Args: 

524 timer (BaseTimer): Instance of the BaseTimer subclass that will be used to report events. 

525 stats_callback (Callable[[datetime.datetime, BaseTimer]): A callable with the above 

526 signature that will be called when new statistics is available. 

527 stats_interval (datetime.timedelta): The interval of the statistics calculation. Defaults 

528 to one minute. 

529 executor (Optional[concurrent.futures.Executor]): If specified, callback will be called 

530 asynchronously using this executor 

531 ''' 

532 

533 # pylint: disable=too-few-public-methods 

534 # It happens that a Tachometer should only tick and sometimes call the callback. 

535 

536 EXPECTED_MAX_FPS = 100 

537 

538 def __init__( 

539 self, 

540 timer: BaseTimer, 

541 stats_callback: Callable[[datetime.datetime, BaseTimer], None], 

542 stats_interval: datetime.timedelta = datetime.timedelta(seconds=60), 

543 executor: Optional[concurrent.futures.Executor] = None 

544 ): 

545 self.stats_callback = stats_callback 

546 self.schedule = IntervalSchedule( 

547 interval=stats_interval, 

548 callback=Callback(cb=self._calculate_stats, executor=executor) 

549 ) 

550 self.timer = timer 

551 

552 def _calculate_stats(self): 

553 timestamp = local_now() 

554 if len(self.timer.intervals) == 0: 

555 return (False, None) 

556 res = self.stats_callback(timestamp, self.timer) 

557 self.timer.intervals.clear() 

558 return (True, res) 

559 

560 

561class TickerTachometer(BaseTachometer): 

562 ''' TickerTachometer is a combination of a Ticker and a Tachometer. 

563 

564 It reports statistics about the call frequency of the `tick()` method. 

565 

566 Call the `tick()` method of the tachometer each time an atomic event happens. 

567 For example, if you are interested in the spastics of the frame processing 

568 time of your application, call `tick` method each time you process a new 

569 frame. 

570 

571 Args: 

572 stats_callback (Callable[[datetime.datetime, Ticker]): A callable that will be called 

573 when new statistics is available. 

574 stats_interval (datetime.timedelta): The interval of the statistics calculation. Defaults 

575 to one minute. 

576 executor (Optional[concurrent.futures.Executor]): If specified, callback will be called 

577 asynchronously using this executor 

578 ''' 

579 

580 def __init__(self, 

581 stats_callback: Callable[[datetime.datetime, Ticker], None], 

582 stats_interval: datetime.timedelta = datetime.timedelta(seconds=60), 

583 executor: Optional[concurrent.futures.Executor] = None 

584 ): 

585 ticker_intervals = int(self.EXPECTED_MAX_FPS * stats_interval.total_seconds()) 

586 self.ticker = Ticker(max_intervals=ticker_intervals) 

587 super().__init__(self.ticker, stats_callback, stats_interval, executor) 

588 

589 def tick(self) -> Tuple[bool, Any]: 

590 ''' Call this method when a recurring event happens. 

591 

592 Returns: 

593 Tuple[bool, Any]: A tuple with these elements: 

594 

595 - bool: True, if stats_callback was called 

596 - the return value of stats_callback if it was called, else None 

597 ''' 

598 self.ticker.tick() 

599 return self.schedule.tick() 

600 

601 

602class StopWatchTachometer(BaseTachometer): 

603 ''' `StopWatchTachometer` is a combination of a StopWatch and a Tachometer. 

604 

605 It reports statistics about the elapsed time intervals. 

606 

607 Much like StopWatch, you can use `StopWatchTachometer` as a context manager or as a function 

608 decorator. 

609 

610 For example:: 

611 

612 swt = StopWatchTachometer( 

613 stats_callback=lambda dt, timer: print('Mean interval:', timer.mean()), 

614 stats_interval=datetime.timedelta(seconds=10) 

615 ) 

616 

617 for i in range(11): 

618 with swt: 

619 print('tick') 

620 time.sleep(2) 

621 

622 @swt.measure 

623 def long_running_func(): 

624 print('tack') 

625 time.sleep(1.5) 

626 

627 for i in range(15): 

628 long_running_func() 

629 

630 Args: 

631 stats_callback (Callable[[datetime.datetime, Ticker]): A callable that will be called 

632 when new statistics is available. 

633 stats_interval (datetime.timedelta): The interval of the statistics calculation. Defaults 

634 to one minute. 

635 executor (Optional[concurrent.futures.Executor]): If specified, callback will be called 

636 asynchronously using this executor 

637 ''' 

638 

639 def __init__(self, 

640 stats_callback: Callable[[datetime.datetime, Ticker], None], 

641 stats_interval: datetime.timedelta = datetime.timedelta(seconds=60), 

642 executor: Optional[concurrent.futures.Executor] = None 

643 ): 

644 ticker_intervals = int(self.EXPECTED_MAX_FPS * stats_interval.total_seconds()) 

645 self.stopwatch = StopWatch('tachometer', max_intervals=ticker_intervals) 

646 super().__init__(self.stopwatch, stats_callback, stats_interval, executor) 

647 

648 def __enter__(self) -> 'StopWatchTachometer': 

649 ''' Context manager entry point returning self.''' 

650 self.stopwatch.__enter__() 

651 return self 

652 

653 def __exit__(self, *_) -> None: 

654 ''' Context manager exit. ''' 

655 self.stopwatch.__exit__() 

656 self.schedule.tick() 

657 

658 def measure(self, fun): 

659 @functools.wraps(fun) 

660 def wrapper(*args, **kwds): 

661 self.__enter__() 

662 fun(*args, **kwds) 

663 self.__exit__() 

664 return wrapper