-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy path_types.py
More file actions
114 lines (93 loc) · 2.9 KB
/
_types.py
File metadata and controls
114 lines (93 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
Type definitions for type checking purposes.
"""
from http.cookiejar import CookieJar
from typing import (
IO,
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)
if TYPE_CHECKING: # pragma: no cover
from ._auth import Auth # noqa: F401
from ._config import Proxy, Timeout # noqa: F401
from ._models import Cookies, Headers, Request # noqa: F401
from ._urls import URL, QueryParams # noqa: F401
PrimitiveData = Optional[Union[str, int, float, bool]]
URLTypes = Union["URL", str]
QueryParamTypes = Union[
"QueryParams",
Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]],
List[Tuple[str, PrimitiveData]],
Tuple[Tuple[str, PrimitiveData], ...],
str,
bytes,
]
HeaderTypes = Union[
"Headers",
Mapping[str, str],
Mapping[bytes, bytes],
Sequence[Tuple[str, str]],
Sequence[Tuple[bytes, bytes]],
]
CookieTypes = Union["Cookies", CookieJar, Dict[str, str], List[Tuple[str, str]]]
TimeoutTypes = Union[
Optional[float],
Tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
"Timeout",
]
ProxyTypes = Union["URL", str, "Proxy"]
CertTypes = Union[str, Tuple[str, str], Tuple[str, str, str]]
AuthTypes = Union[
Tuple[Union[str, bytes], Union[str, bytes]],
Callable[["Request"], "Request"],
"Auth",
]
RequestContent = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
ResponseContent = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
ResponseExtensions = Mapping[str, Any]
RequestData = Mapping[str, Any]
FileContent = Union[IO[bytes], bytes, str]
FileTypes = Union[
# file (or bytes)
FileContent,
# (filename, file (or bytes))
Tuple[Optional[str], FileContent],
# (filename, file (or bytes), content_type)
Tuple[Optional[str], FileContent, Optional[str]],
# (filename, file (or bytes), content_type, headers)
Tuple[Optional[str], FileContent, Optional[str], Mapping[str, str]],
]
RequestFiles = Union[Mapping[str, FileTypes], Sequence[Tuple[str, FileTypes]]]
RequestExtensions = Mapping[str, Any]
__all__ = ["AsyncByteStream", "SyncByteStream"]
class SyncByteStream:
def __iter__(self) -> Iterator[bytes]:
raise NotImplementedError(
"The '__iter__' method must be implemented."
) # pragma: no cover
yield b"" # pragma: no cover
def close(self) -> None:
"""
Subclasses can override this method to release any network resources
after a request/response cycle is complete.
"""
class AsyncByteStream:
async def __aiter__(self) -> AsyncIterator[bytes]:
raise NotImplementedError(
"The '__aiter__' method must be implemented."
) # pragma: no cover
yield b"" # pragma: no cover
async def aclose(self) -> None:
pass