Source code for uplink.retry.stop
__all__ = ["after_attempt", "after_delay"]
class RetryBreaker(object):
def __or__(self, other):
if other is not None:
assert isinstance(
other, RetryBreaker
), "Both objects should be retry breakers."
return _Or(self, other)
return self
def __call__(self): # pragma: no cover
raise NotImplementedError
class _Or(RetryBreaker):
def __init__(self, left, right):
self._left = left
self._right = right
def __call__(self):
left = self._left()
right = self._right()
while True:
delay = yield
next(left)
next(right)
stop_left = left.send(delay)
stop_right = right.send(delay)
yield stop_left or stop_right
# noinspection PyPep8Naming
[docs]class after_attempt(RetryBreaker):
"""Stops retrying after the specified number of ``attempts``."""
def __init__(self, attempt):
self._max_attempt = attempt
self._attempt = 0
def __call__(self):
attempt = 0
while True:
yield
attempt += 1
yield self._max_attempt <= attempt
# noinspection PyPep8Naming
[docs]class after_delay(RetryBreaker):
"""
Stops retrying after the backoff exceeds the specified ``delay``
in seconds.
"""
def __init__(self, delay):
self._max_delay = delay
def __call__(self):
while True:
delay = yield
yield self._max_delay < delay
class _NeverStop(RetryBreaker):
def __call__(self):
while True:
yield
yield False
#: Continuously retry until the server returns a successful response
NEVER = _NeverStop()
# Keep for backwards compatibility with v0.8.0
# TODO: Remove in v1.0.0
DISABLE = NEVER