"""
This module defines an :py:class:`aiohttp.ClientSession` adapter
that returns awaitable responses.
"""
# Standard library imports
import asyncio
import collections
import inspect
import threading
from concurrent import futures
# Third-party imports
try:
import aiohttp
except ImportError: # pragma: no cover
aiohttp = None
# Local imports
from uplink.clients import exceptions, io, interfaces, register
def threaded_callback(callback):
async def new_callback(response):
if isinstance(response, aiohttp.ClientResponse):
await response.text()
response = ThreadedResponse(response)
response = callback(response)
if isinstance(response, ThreadedResponse):
return response.unwrap()
else:
return response
return new_callback
[docs]class AiohttpClient(interfaces.HttpClientAdapter):
"""
An :py:mod:`aiohttp` client that creates awaitable responses.
Note:
This client is an optional feature and requires the :py:mod:`aiohttp`
package. For example, here's how to install this extra using pip::
$ pip install uplink[aiohttp]
Args:
session (:py:class:`aiohttp.ClientSession`, optional):
The session that should handle sending requests. If this
argument is omitted or set to :py:obj:`None`, a new session
will be created.
"""
exceptions = exceptions.Exceptions()
# TODO: Update docstrings to include aiohttp constructor parameters.
__ARG_SPEC = collections.namedtuple("__ARG_SPEC", "args kwargs")
def __init__(self, session=None, **kwargs):
if aiohttp is None:
raise NotImplementedError("aiohttp is not installed.")
self._auto_created_session = False
if session is None:
session = self._create_session(**kwargs)
self._session = session
self._sync_callback_adapter = threaded_callback
def __del__(self):
# TODO: Consider replacing this with a close method
if self._auto_created_session:
# aiohttp v3.0 has made ClientSession.close a coroutine,
# so we check whether it is one here and register it
# to run appropriately at exit
if asyncio.iscoroutinefunction(self._session.close):
asyncio.get_event_loop().run_until_complete(
self._session.close()
)
else:
self._session.close()
async def session(self):
"""Returns the underlying `aiohttp.ClientSession`."""
if isinstance(self._session, self.__ARG_SPEC):
args, kwargs = self._session
self._session = aiohttp.ClientSession(*args, **kwargs)
self._auto_created_session = True
return self._session
def wrap_callback(self, callback):
func = inspect.unwrap(callback)
if not asyncio.iscoroutinefunction(func):
callback = self._sync_callback_adapter(callback)
return callback
@staticmethod
@register.handler
def with_session(session, *args, **kwargs):
"""
Builds a client instance if the first argument is a
:py:class:`aiohttp.ClientSession`. Otherwise, return :py:obj:`None`.
"""
if isinstance(session, aiohttp.ClientSession):
return AiohttpClient(session, *args, **kwargs)
@classmethod
def _create_session(cls, *args, **kwargs):
return cls.__ARG_SPEC(args, kwargs)
@classmethod
def create(cls, *args, **kwargs):
"""
Builds a client instance with
:py:class:`aiohttp.ClientSession` arguments.
Instead of directly initializing this class with a
:py:class:`aiohttp.ClientSession`, use this method to have the
client lazily construct a session when sending the first
request. Hence, this method guarantees that the creation of the
underlying session happens inside of a coroutine.
Args:
*args: positional arguments that
:py:class:`aiohttp.ClientSession` takes.
**kwargs: keyword arguments that
:py:class:`aiohttp.ClientSession` takes.
"""
session_build_args = cls._create_session(*args, **kwargs)
return AiohttpClient(session=session_build_args)
async def send(self, request):
method, url, extras = request
session = await self.session()
response = await session.request(method, url, **extras)
# Make `aiohttp` response "quack" like a `requests` response
response.status_code = response.status
return response
def apply_callback(self, callback, response):
return self.wrap_callback(callback)(response)
@staticmethod
def io():
return io.AsyncioStrategy()
class ThreadedCoroutine(object):
def __init__(self, coroutine):
self.__coroutine = coroutine
def __call__(self, *args, **kwargs):
with AsyncioExecutor() as executor:
future = executor.submit(self.__coroutine, *args, **kwargs)
result = future.result()
return result
class ThreadedResponse(object):
def __init__(self, response):
self.__response = response
def __getattr__(self, item):
value = getattr(self.__response, item)
if asyncio.iscoroutinefunction(value):
return ThreadedCoroutine(value)
return value
def unwrap(self):
return self.__response
class AsyncioExecutor(futures.Executor):
"""
Executor that runs asyncio coroutines in a shadow thread.
Credit to Vincent Michel, who wrote the original implementation:
https://gist.github.com/vxgmichel/d16e66d1107a369877f6ef7e646ac2e5
"""
def __init__(self):
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._target)
self._thread.start()
def _target(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def submit(self, fn, *args, **kwargs):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def shutdown(self, wait=True):
self._loop.call_soon_threadsafe(self._loop.stop)
if wait: # pragma: no cover
self._thread.join()
# === Register client exceptions === #
if aiohttp is not None: # pragma: no cover
AiohttpClient.exceptions.BaseClientException = aiohttp.ClientError
AiohttpClient.exceptions.ConnectionError = aiohttp.ClientConnectionError
AiohttpClient.exceptions.ConnectionTimeout = aiohttp.ClientConnectorError
AiohttpClient.exceptions.ServerTimeout = aiohttp.ServerTimeoutError
AiohttpClient.exceptions.SSLError = aiohttp.ClientSSLError
AiohttpClient.exceptions.InvalidURL = aiohttp.InvalidURL