Source code for uplink.ratelimit
# Standard library imports
import contextlib
import math
import threading
import time
import sys
# Local imports
from uplink import decorators, utils
from uplink.clients.io import RequestTemplate, transitions
__all__ = ["ratelimit", "RateLimitExceeded"]
# Use monotonic time if available, otherwise fall back to the system clock.
now = time.monotonic if hasattr(time, "monotonic") else time.time
def _get_host_and_port(base_url):
parsed_url = utils.urlparse.urlparse(base_url)
return parsed_url.hostname, parsed_url.port
[docs]class RateLimitExceeded(RuntimeError):
"""A request failed because it exceeded the client-side rate limit."""
def __init__(self, calls, period):
super(RateLimitExceeded, self).__init__(
"Exceeded rate limit of [%s] calls every [%s] seconds."
% (calls, period)
)
class Limiter(object):
_last_reset = _num_calls = None
def __init__(self, max_calls, period, clock):
self._max_calls = max_calls
self._period = period
self._clock = clock
self._lock = threading.RLock()
self._reset()
@property
def period_remaining(self):
return self._period - (self._clock() - self._last_reset)
@contextlib.contextmanager
def check(self):
with self._lock:
if self.period_remaining <= 0:
self._reset()
yield self._max_calls > self._num_calls
self._num_calls += 1
def _reset(self):
self._num_calls = 0
self._last_reset = self._clock()
class RateLimiterTemplate(RequestTemplate):
def __init__(self, limiter, create_limit_reached_exception):
self._limiter = limiter
self._create_limit_reached_exception = create_limit_reached_exception
def before_request(self, request):
with self._limiter.check() as ok:
if ok:
return # Fallback to default behavior
elif self._create_limit_reached_exception is not None:
raise self._create_limit_reached_exception()
else:
return transitions.sleep(self._limiter.period_remaining)
# noinspection PyPep8Naming
[docs]class ratelimit(decorators.MethodAnnotation):
"""
A decorator that constrains a consumer method or an entire
consumer to making a specified maximum number of requests within a
defined time period (e.g., 15 calls every 15 minutes).
Note:
The rate limit is enforced separately for each host-port
combination. Logically, requests are grouped by host and port,
and the number of requests within a time period are counted and
capped separately for each group.
By default, when the limit is reached, the client will wait until
the current period is over before executing any subsequent
requests. If you'd prefer the client to raise an exception when the
limit is exceeded, set the ``raise_on_limit`` argument.
Args:
calls (int): The maximum number of allowed calls that the
consumer can make within the time period.
period (float): The duration of each time period in seconds.
raise_on_limit (:class:`Exception` or bool, optional): Either an
exception to raise when the client exceeds the rate limit or
a :class:`bool`. If :obj:`True`, a
:class:`~uplink.ratelimit.RateLimitExceeded` exception is
raised.
"""
BY_HOST_AND_PORT = _get_host_and_port
def __init__(
self,
calls=15,
period=900,
raise_on_limit=False,
group_by=BY_HOST_AND_PORT,
clock=now,
):
self._max_calls = max(1, min(sys.maxsize, math.floor(calls)))
self._period = period
self._clock = clock
self._limiter_cache = {}
self._group_by = utils.no_op if group_by is None else group_by
if utils.is_subclass(raise_on_limit, Exception) or isinstance(
raise_on_limit, Exception
):
self._create_limit_reached_exception = raise_on_limit
elif raise_on_limit:
self._create_limit_reached_exception = (
self._create_rate_limit_exceeded
)
else:
self._create_limit_reached_exception = None
def _get_limiter_for_request(self, request_builder):
key = self._group_by(request_builder.base_url)
try:
return self._limiter_cache[key]
except KeyError:
return self._limiter_cache.setdefault(
key, Limiter(self._max_calls, self._period, self._clock)
)
def modify_request(self, request_builder):
limiter = self._get_limiter_for_request(request_builder)
request_builder.add_request_template(
RateLimiterTemplate(limiter, self._create_limit_reached_exception)
)
def _create_rate_limit_exceeded(self):
return RateLimitExceeded(self._max_calls, self._period)