Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions httpx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from ._dispatch.asgi import ASGIDispatch
from ._dispatch.wsgi import WSGIDispatch
from ._exceptions import (
ConnectionClosed,
ConnectTimeout,
CookieConflict,
DecodingError,
Expand All @@ -23,7 +22,6 @@
ResponseClosed,
ResponseNotRead,
StreamConsumed,
TimeoutException,
TooManyRedirects,
WriteTimeout,
)
Expand Down Expand Up @@ -56,7 +54,6 @@
"Timeout",
"ConnectTimeout",
"CookieConflict",
"ConnectionClosed",
"DecodingError",
"HTTPError",
"InvalidURL",
Expand All @@ -79,7 +76,6 @@
"Headers",
"QueryParams",
"Request",
"TimeoutException",
"Response",
"DigestAuth",
"WSGIDispatch",
Expand Down
157 changes: 106 additions & 51 deletions httpx/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from types import TracebackType

import hstspreload
import httpcore

from ._auth import Auth, AuthTypes, BasicAuth, FunctionAuth
from ._config import (
Expand All @@ -14,17 +15,14 @@
PoolLimits,
ProxiesTypes,
Proxy,
SSLConfig,
Timeout,
TimeoutTypes,
UnsetType,
VerifyTypes,
)
from ._content_streams import ContentStream
from ._dispatch.asgi import ASGIDispatch
from ._dispatch.base import AsyncDispatcher, SyncDispatcher
from ._dispatch.connection_pool import ConnectionPool
from ._dispatch.proxy_http import HTTPProxy
from ._dispatch.urllib3 import URLLib3Dispatcher
from ._dispatch.wsgi import WSGIDispatch
from ._exceptions import HTTPError, InvalidURL, RequestBodyUnavailable, TooManyRedirects
from ._models import (
Expand Down Expand Up @@ -96,7 +94,7 @@ def get_proxy_map(
elif isinstance(proxies, (str, URL, Proxy)):
proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies
return {"all": proxy}
elif isinstance(proxies, AsyncDispatcher): # pragma: nocover
elif isinstance(proxies, httpcore.AsyncHTTPTransport): # pragma: nocover
raise RuntimeError(
"Passing a dispatcher instance to 'proxies=' is no longer "
"supported. Use `httpx.Proxy() instead.`"
Expand All @@ -107,7 +105,7 @@ def get_proxy_map(
if isinstance(value, (str, URL, Proxy)):
proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value
new_proxies[str(key)] = proxy
elif isinstance(value, AsyncDispatcher): # pragma: nocover
elif isinstance(value, httpcore.AsyncHTTPTransport): # pragma: nocover
raise RuntimeError(
"Passing a dispatcher instance to 'proxies=' is "
"no longer supported. Use `httpx.Proxy() instead.`"
Expand Down Expand Up @@ -446,7 +444,7 @@ def __init__(
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
dispatch: SyncDispatcher = None,
dispatch: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
):
Expand All @@ -471,7 +469,7 @@ def __init__(
app=app,
trust_env=trust_env,
)
self.proxies: typing.Dict[str, SyncDispatcher] = {
self.proxies: typing.Dict[str, httpcore.SyncHTTPTransport] = {
key: self.init_proxy_dispatch(
proxy,
verify=verify,
Expand All @@ -487,18 +485,26 @@ def init_dispatch(
verify: VerifyTypes = True,
cert: CertTypes = None,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
dispatch: SyncDispatcher = None,
dispatch: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
) -> SyncDispatcher:
) -> httpcore.SyncHTTPTransport:
if dispatch is not None:
return dispatch

if app is not None:
return WSGIDispatch(app=app)

return URLLib3Dispatcher(
verify=verify, cert=cert, pool_limits=pool_limits, trust_env=trust_env,
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.SyncConnectionPool(
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
)

def init_proxy_dispatch(
Expand All @@ -508,18 +514,25 @@ def init_proxy_dispatch(
cert: CertTypes = None,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
trust_env: bool = True,
) -> SyncDispatcher:
return URLLib3Dispatcher(
proxy=proxy,
verify=verify,
cert=cert,
pool_limits=pool_limits,
trust_env=trust_env,
) -> httpcore.SyncHTTPTransport:
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.SyncHTTPProxy(
proxy_origin=proxy.url.raw[:3],
proxy_headers=proxy.headers.raw,
proxy_mode=proxy.mode,
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
)

def dispatcher_for_url(self, url: URL) -> SyncDispatcher:
def dispatcher_for_url(self, url: URL) -> httpcore.SyncHTTPTransport:
"""
Returns the SyncDispatcher instance that should be used for a given URL.
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies and not should_not_be_proxied(url):
Expand Down Expand Up @@ -667,22 +680,41 @@ def send_handling_auth(
request = next_request
history.append(response)

def send_single_request(self, request: Request, timeout: Timeout,) -> Response:
def send_single_request(self, request: Request, timeout: Timeout) -> Response:
"""
Sends a single request, without handling any redirections.
"""

dispatcher = self.dispatcher_for_url(request.url)

try:
response = dispatcher.send(request, timeout=timeout)
(
http_version,
status_code,
reason_phrase,
headers,
stream,
) = dispatcher.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream,
timeout=timeout.as_dict(),
)
except HTTPError as exc:
# Add the original request to any HTTPError unless
# there'a already a request attached in the case of
# a ProxyError.
if exc.request is None:
exc.request = request
raise
response = Response(
status_code,
http_version=http_version.decode("ascii"),
headers=headers,
stream=stream, # type: ignore
request=request,
)

self.cookies.extract_cookies(response)

Expand Down Expand Up @@ -928,7 +960,6 @@ class AsyncClient(BaseClient):
rather than sending actual network requests.
* **trust_env** - *(optional)* Enables or disables usage of environment
variables for configuration.
* **uds** - *(optional)* A path to a Unix domain socket to connect through.
"""

def __init__(
Expand All @@ -946,10 +977,9 @@ def __init__(
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
dispatch: AsyncDispatcher = None,
dispatch: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
uds: str = None,
):
super().__init__(
auth=auth,
Expand All @@ -972,9 +1002,8 @@ def __init__(
dispatch=dispatch,
app=app,
trust_env=trust_env,
uds=uds,
)
self.proxies: typing.Dict[str, AsyncDispatcher] = {
self.proxies: typing.Dict[str, httpcore.AsyncHTTPTransport] = {
key: self.init_proxy_dispatch(
proxy,
verify=verify,
Expand All @@ -992,24 +1021,27 @@ def init_dispatch(
cert: CertTypes = None,
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
dispatch: AsyncDispatcher = None,
dispatch: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
uds: str = None,
) -> AsyncDispatcher:
) -> httpcore.AsyncHTTPTransport:
if dispatch is not None:
return dispatch

if app is not None:
return ASGIDispatch(app=app)

return ConnectionPool(
verify=verify,
cert=cert,
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.AsyncConnectionPool(
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
http2=http2,
pool_limits=pool_limits,
trust_env=trust_env,
uds=uds,
)

def init_proxy_dispatch(
Expand All @@ -1020,21 +1052,25 @@ def init_proxy_dispatch(
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
trust_env: bool = True,
) -> AsyncDispatcher:
return HTTPProxy(
proxy_url=proxy.url,
proxy_headers=proxy.headers,
) -> httpcore.AsyncHTTPTransport:
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.AsyncHTTPProxy(
proxy_origin=proxy.url.raw[:3],
proxy_headers=proxy.headers.raw,
proxy_mode=proxy.mode,
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
trust_env=trust_env,
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
)

def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
def dispatcher_for_url(self, url: URL) -> httpcore.AsyncHTTPTransport:
"""
Returns the AsyncDispatcher instance that should be used for a given URL.
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies and not should_not_be_proxied(url):
Expand Down Expand Up @@ -1193,14 +1229,33 @@ async def send_single_request(
dispatcher = self.dispatcher_for_url(request.url)

try:
response = await dispatcher.send(request, timeout=timeout)
(
http_version,
status_code,
reason_phrase,
headers,
stream,
) = await dispatcher.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream,
timeout=timeout.as_dict(),
)
except HTTPError as exc:
# Add the original request to any HTTPError unless
# there'a already a request attached in the case of
# a ProxyError.
if exc.request is None:
exc.request = request
raise
response = Response(
status_code,
http_version=http_version.decode("ascii"),
headers=headers,
stream=stream, # type: ignore
request=request,
)

self.cookies.extract_cookies(response)

Expand Down Expand Up @@ -1383,9 +1438,9 @@ async def delete(
)

async def aclose(self) -> None:
await self.dispatch.close()
await self.dispatch.aclose()
for proxy in self.proxies.values():
await proxy.close()
await proxy.aclose()

async def __aenter__(self) -> "AsyncClient":
return self
Expand Down
8 changes: 8 additions & 0 deletions httpx/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ def __init__(
timeout if isinstance(pool_timeout, UnsetType) else pool_timeout
)

def as_dict(self) -> typing.Dict[str, typing.Optional[float]]:
return {
"connect": self.connect_timeout,
"read": self.read_timeout,
"write": self.write_timeout,
"pool": self.pool_timeout,
}

def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, self.__class__)
Expand Down
4 changes: 3 additions & 1 deletion httpx/_content_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pathlib import Path
from urllib.parse import urlencode

import httpcore

from ._exceptions import StreamConsumed
from ._utils import format_form_param

Expand All @@ -33,7 +35,7 @@
]


class ContentStream:
class ContentStream(httpcore.AsyncByteStream, httpcore.SyncByteStream):
def get_headers(self) -> typing.Dict[str, str]:
"""
Return a dictionary of headers that are implied by the encoding.
Expand Down
Loading