-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathclient.py
More file actions
245 lines (218 loc) · 9.82 KB
/
client.py
File metadata and controls
245 lines (218 loc) · 9.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
__all__ = ["ArangoClient"]
import asyncio
from typing import Any, Optional, Sequence
from arangoasync.auth import Auth, JwtToken
from arangoasync.compression import CompressionManager
from arangoasync.connection import (
BasicConnection,
Connection,
JwtConnection,
JwtSuperuserConnection,
)
from arangoasync.database import StandardDatabase
from arangoasync.http import DefaultHTTPClient, HTTPClient
from arangoasync.resolver import HostResolver, get_resolver
from arangoasync.serialization import (
DefaultDeserializer,
DefaultSerializer,
Deserializer,
Serializer,
)
from arangoasync.typings import Json, Jsons
from arangoasync.version import __version__
class ArangoClient:
"""ArangoDB client.
Args:
hosts (str | Sequence[str]): Host URL or list of URL's.
In case of a cluster, this would be the list of coordinators.
Which coordinator to use is determined by the `host_resolver`.
host_resolver (str | HostResolver): Host resolver strategy.
This determines how the client will choose which server to use.
Passing a string would configure a resolver with the default settings.
See :class:`DefaultHostResolver <arangoasync.resolver.DefaultHostResolver>`
and :func:`get_resolver <arangoasync.resolver.get_resolver>`
for more information.
If you need more customization, pass a subclass of
:class:`HostResolver <arangoasync.resolver.HostResolver>`.
http_client (HTTPClient | None): HTTP client implementation.
This is the core component that sends requests to the ArangoDB server.
Defaults to :class:`DefaultHttpClient <arangoasync.http.DefaultHTTPClient>`,
but you can fully customize its parameters or even use a different
implementation by subclassing
:class:`HTTPClient <arangoasync.http.HTTPClient>`.
compression (CompressionManager | None): Disabled by default.
Used to compress requests to the server or instruct the server to compress
responses. Enable it by passing an instance of
:class:`DefaultCompressionManager
<arangoasync.compression.DefaultCompressionManager>`
or a custom subclass of :class:`CompressionManager
<arangoasync.compression.CompressionManager>`.
serializer (Serializer | None): Custom JSON serializer implementation.
Leave as `None` to use the default serializer.
See :class:`DefaultSerializer
<arangoasync.serialization.DefaultSerializer>`.
For custom serialization of collection documents, see :class:`Collection
<arangoasync.collection.Collection>`.
deserializer (Deserializer | None): Custom JSON deserializer implementation.
Leave as `None` to use the default deserializer.
See :class:`DefaultDeserializer
<arangoasync.serialization.DefaultDeserializer>`.
For custom deserialization of collection documents, see :class:`Collection
<arangoasync.collection.Collection>`.
Raises:
ValueError: If the `host_resolver` is not supported.
"""
def __init__(
self,
hosts: str | Sequence[str] = "http://127.0.0.1:8529",
host_resolver: str | HostResolver = "default",
http_client: Optional[HTTPClient] = None,
compression: Optional[CompressionManager] = None,
serializer: Optional[Serializer[Json]] = None,
deserializer: Optional[Deserializer[Json, Jsons]] = None,
) -> None:
self._hosts = [hosts] if isinstance(hosts, str) else hosts
self._host_resolver = (
get_resolver(host_resolver, len(self._hosts))
if isinstance(host_resolver, str)
else host_resolver
)
self._http_client = http_client or DefaultHTTPClient()
self._sessions = [
self._http_client.create_session(host) for host in self._hosts
]
self._compression = compression
self._serializer: Serializer[Json] = serializer or DefaultSerializer()
self._deserializer: Deserializer[Json, Jsons] = (
deserializer or DefaultDeserializer()
)
def __repr__(self) -> str:
return f"<ArangoClient {','.join(self._hosts)}>"
async def __aenter__(self) -> "ArangoClient":
return self
async def __aexit__(self, *exc: Any) -> None:
await self.close()
@property
def hosts(self) -> Sequence[str]:
"""Return the list of hosts."""
return self._hosts
@property
def host_resolver(self) -> HostResolver:
"""Return the host resolver."""
return self._host_resolver
@property
def compression(self) -> Optional[CompressionManager]:
"""Return the compression manager."""
return self._compression
@property
def sessions(self) -> Sequence[Any]:
"""Return the list of sessions.
You may use this to customize sessions on the fly (for example,
adjust the timeout). Not recommended unless you know what you are doing.
Warning:
Modifying only a subset of sessions may lead to unexpected behavior.
In order to keep the client in a consistent state, you should make sure
all sessions are configured in the same way.
"""
return self._sessions
@property
def version(self) -> str:
"""Return the version of the client."""
return __version__
async def close(self) -> None:
"""Close HTTP sessions."""
await asyncio.gather(
*(self._http_client.close_session(session) for session in self._sessions)
)
async def db(
self,
name: str,
auth_method: str = "basic",
auth: Optional[Auth | str] = None,
token: Optional[JwtToken] = None,
verify: bool = False,
compression: Optional[CompressionManager] = None,
serializer: Optional[Serializer[Json]] = None,
deserializer: Optional[Deserializer[Json, Jsons]] = None,
) -> StandardDatabase:
"""Connects to a database and returns and API wrapper.
Args:
name (str): Database name.
auth_method (str): The following methods are supported:
- "basic": HTTP authentication.
Requires the `auth` parameter. The `token` parameter is ignored.
- "jwt": User JWT authentication.
At least one of the `auth` or `token` parameters are required.
If `auth` is provided, but the `token` is not, the token will be
refreshed automatically. This assumes that the clocks of the server
and client are synchronized.
- "superuser": Superuser JWT authentication.
The `token` parameter is required. The `auth` parameter is ignored.
auth (Auth | None): Login information (username and password) or
access token.
token (JwtToken | None): JWT token.
verify (bool): Verify the connection by sending a test request.
compression (CompressionManager | None): If set, supersedes the
client-level compression settings.
serializer (Serializer | None): If set, supersedes the client-level
serializer.
deserializer (Deserializer | None): If set, supersedes the client-level
deserializer.
Returns:
StandardDatabase: Database API wrapper.
Raises:
ValueError: If the authentication is invalid.
ServerConnectionError: If `verify` is `True` and the connection fails.
"""
connection: Connection
if isinstance(auth, str):
auth = Auth(password=auth)
if auth_method == "basic":
if auth is None:
raise ValueError("Basic authentication requires the `auth` parameter")
connection = BasicConnection(
sessions=self._sessions,
host_resolver=self._host_resolver,
http_client=self._http_client,
db_name=name,
compression=compression or self._compression,
serializer=serializer or self._serializer,
deserializer=deserializer or self._deserializer,
auth=auth,
)
elif auth_method == "jwt":
if auth is None and token is None:
raise ValueError(
"JWT authentication requires the `auth` or `token` parameter"
)
connection = JwtConnection(
sessions=self._sessions,
host_resolver=self._host_resolver,
http_client=self._http_client,
db_name=name,
compression=compression or self._compression,
serializer=serializer or self._serializer,
deserializer=deserializer or self._deserializer,
auth=auth,
token=token,
)
elif auth_method == "superuser":
if token is None:
raise ValueError(
"Superuser JWT authentication requires the `token` parameter"
)
connection = JwtSuperuserConnection(
sessions=self._sessions,
host_resolver=self._host_resolver,
http_client=self._http_client,
db_name=name,
compression=compression or self._compression,
serializer=serializer or self._serializer,
deserializer=deserializer or self._deserializer,
token=token,
)
else:
raise ValueError(f"Invalid authentication method: {auth_method}")
if verify:
await connection.ping()
return StandardDatabase(connection)