forked from xenserver/python-libs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_httpaccessor.py
More file actions
127 lines (100 loc) · 5.08 KB
/
test_httpaccessor.py
File metadata and controls
127 lines (100 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""Test xcp.accessor.HTTPAccessor using a local pure-Python http(s)server fixture"""
# -*- coding: utf-8 -*-
import base64
import sys
import unittest
from contextlib import contextmanager
import pytest
from xcp.accessor import HTTPAccessor, createAccessor
# Skip the tests in this module if Python <= 3.6 (pytest_httpserver requires Python >= 3.7):
try:
from http.client import HTTPResponse
from pytest_httpserver import HTTPServer
from werkzeug.wrappers import Response
except ImportError:
pytest.skip(allow_module_level=True)
UTF8TEXT_LITERAL = "✋Hello accessor from the 🗺, download and verify me! ✅"
class HTTPAccessorTestCase(unittest.TestCase):
document_root = "tests/"
httpserver = HTTPServer() # pyright: ignore[reportUnboundVariable]
@classmethod
def setUpClass(cls):
cls.httpserver.start()
@classmethod
def tearDownClass(cls):
cls.httpserver.check_assertions()
cls.httpserver.stop()
def test_404(self):
self.httpserver.expect_request("/404").respond_with_data("", status=404)
httpaccessor = createAccessor(self.httpserver.url_for("/"), True)
self.assertFalse(httpaccessor.access("404"))
self.httpserver.check_assertions()
self.assertEqual(httpaccessor.lastError, 404)
@classmethod
def serve_a_get_request(cls, testdata_repo_subdir, read_file, error_handler):
"""Expect a GET request and handle it using the local pytest_httpserver.HTTPServer"""
def handle_get(request):
"""Handle a GET request for the local pytest_httpserver.HTTPServer fixture"""
if error_handler:
response = error_handler(request)
if response:
return response
with open(testdata_repo_subdir + read_file, "rb") as local_testdata_file:
return Response(local_testdata_file.read())
cls.httpserver.expect_request("/" + read_file).respond_with_handler(handle_get)
@contextmanager
def http_get_request_data(self, url, read_file, error_handler):
"""Serve a GET request, assert that the accessor returns the content of the GET Request"""
self.serve_a_get_request(self.document_root, read_file, error_handler)
httpaccessor = createAccessor(url, True)
self.assertEqual(type(httpaccessor), HTTPAccessor)
with open(self.document_root + read_file, "rb") as ref:
yield httpaccessor, ref
def assert_http_get_request_data(self, url, read_file, error_handler):
with self.http_get_request_data(url, read_file, error_handler) as (httpaccessor, ref):
http_accessor_filehandle = httpaccessor.openAddress(read_file)
if sys.version_info >= (3, 0):
assert isinstance(http_accessor_filehandle, HTTPResponse)
self.assertEqual(http_accessor_filehandle.read(), ref.read())
self.httpserver.check_assertions()
http_accessor_filehandle.close()
return httpaccessor
@staticmethod
def httpserver_basic_auth_handler(login):
def basic_auth_handler_func(request):
key = base64.b64encode(login.encode()).decode()
authorization = request.headers.get("Authorization", None)
# When no valid Authorization header is received, tell the client the ream for it:
if not authorization or authorization != "Basic " + key:
basic_realm = {"WWW-Authenticate": 'Basic realm="BasicAuthTestRealm"'}
return Response("not authorized", status=401, headers=basic_realm)
return None
return basic_auth_handler_func
def test_access_repo_treeinfo(self):
"""Assert that the accessor has access to the .treeinfo file of the repo"""
access = self.assert_http_get_request_data(
self.httpserver.url_for(""), "data/repo/.treeinfo", None
)
self.assertTrue(access.access("data/repo/.treeinfo"))
def test_basic_auth(self):
login = "user:passwd"
# Insert the login into the URL for the test server: http://user:passwd@localhost/path
url = self.httpserver.url_for("").split("//")
url_with_login = url[0] + "//" + login + "@" + url[1]
# Test that a GET Request with the auth handler returns the served file:
basic_auth = self.httpserver_basic_auth_handler(login)
self.assert_http_get_request_data(url_with_login, "data/repo/.treeinfo", basic_auth)
def test_get_binary(self):
binary = (
"__pycache__/__init__.cpython-"
+ str(sys.version_info.major)
+ str(sys.version_info.minor)
+ ".pyc"
)
self.assert_http_get_request_data(self.httpserver.url_for(""), binary, None)
def test_httpaccessor_open_text(self):
"""Get text containing UTF-8 and compare the returned decoded string contents"""
self.httpserver.expect_request("/textfile").respond_with_data(UTF8TEXT_LITERAL)
accessor = createAccessor(self.httpserver.url_for("/"), True)
with accessor.openText("textfile") as textfile:
assert textfile.read() == UTF8TEXT_LITERAL