Skip to content

Commit 3c8d751

Browse files
committed
✨(api) add xAPI statement forwarding background task
We want to support xAPI statement forwarding to make ralph able to forward statements it receives to other Learning Record Stores. For now, we only support forwarding statements using HTTP basic auth.
1 parent 500cc49 commit 3c8d751

9 files changed

Lines changed: 408 additions & 8 deletions

File tree

setup.cfg

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ install_requires =
2929
elasticsearch==8.2.3
3030
fastapi==0.78.0
3131
gunicorn==20.1.0
32+
h11==0.12.0
33+
httpx==0.23.0
3234
Jinja2==3.1.2
3335
langcodes==3.3.0
3436
ovh==1.0.0
@@ -70,6 +72,7 @@ dev =
7072
pyfakefs==4.5.6
7173
pylint==2.14.3
7274
pytest==7.1.2
75+
pytest-asyncio==0.18.3
7376
pytest-cov==3.0.0
7477
ci =
7578
twine==4.0.1
@@ -106,6 +109,7 @@ skip_glob=venv
106109
profile=black
107110

108111
[tool:pytest]
112+
asyncio_mode = strict
109113
addopts = -v --cov-report term-missing --cov-config=.coveragerc --cov=src/ralph --hypothesis-show-statistics
110114
python_files =
111115
test_*.py

src/ralph/api/forwarding.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""xAPI statement forwarding background task"""
2+
3+
import logging
4+
from typing import Literal
5+
6+
from httpx import AsyncClient, AsyncHTTPTransport, RequestError
7+
from pydantic import AnyUrl, BaseModel, ValidationError
8+
9+
from ..defaults import XAPI_FORWARDING
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class ForwardingConfiguration(BaseModel):
15+
"""Represents an xAPI forwarding configuration item."""
16+
17+
class Config: # pylint: disable=missing-class-docstring
18+
min_anystr_length = 1
19+
20+
url: AnyUrl
21+
is_active: Literal[True]
22+
basic_username: str
23+
basic_password: str
24+
max_retries: int
25+
timeout: float
26+
27+
28+
def get_valid_xapi_forwarding(config: list[dict]) -> list[ForwardingConfiguration]:
29+
"""Returns a list of valid xAPI forwarding configuration values."""
30+
31+
valid_xapi_forwarding = []
32+
for forwarding in config:
33+
try:
34+
valid_xapi_forwarding.append(ForwardingConfiguration.parse_obj(forwarding))
35+
except ValidationError as error:
36+
msg = (
37+
"Invalid RALPH_XAPI_FORWARDING.forwarder configuration value: "
38+
"'%s'. '%s'. It will be ignored."
39+
)
40+
logger.debug(msg, forwarding, error)
41+
42+
return valid_xapi_forwarding
43+
44+
45+
VALID_XAPI_FORWARDING = get_valid_xapi_forwarding(XAPI_FORWARDING)
46+
47+
48+
async def forward_xapi_statements(statements: list[dict]):
49+
"""Forwards xAPI statements."""
50+
51+
for forwarding in VALID_XAPI_FORWARDING:
52+
transport = AsyncHTTPTransport(retries=forwarding.max_retries)
53+
async with AsyncClient(transport=transport) as client:
54+
try:
55+
req = await client.post(
56+
forwarding.url,
57+
json=statements,
58+
auth=(forwarding.basic_username, forwarding.basic_password),
59+
timeout=forwarding.timeout,
60+
)
61+
req.raise_for_status()
62+
msg = "Forwarded %s statements to %s with success."
63+
logger.debug(msg, len(statements), forwarding.url)
64+
except RequestError as error:
65+
logger.debug("Failed to forward xapi statements. %s", error)

src/ralph/api/routers/statements.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,17 @@
66
from uuid import UUID, uuid4
77

88
from elasticsearch.helpers import BulkIndexError
9-
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
9+
from fastapi import (
10+
APIRouter,
11+
BackgroundTasks,
12+
Depends,
13+
HTTPException,
14+
Query,
15+
Request,
16+
status,
17+
)
1018

19+
from ralph.api.forwarding import forward_xapi_statements
1120
from ralph.backends.database.es import ESDatabase
1221

1322
from ...defaults import ES_MAX_SEARCH_HITS_COUNT
@@ -255,7 +264,10 @@ async def get(
255264
},
256265
)
257266
# pylint: disable=unused-argument
258-
async def post(statements: Union[LaxStatement, list[LaxStatement]]):
267+
async def post(
268+
statements: Union[LaxStatement, list[LaxStatement]],
269+
background_tasks: BackgroundTasks,
270+
):
259271
"""
260272
Store a set of statements (or a single statement as a single member of a set).
261273
NB: at this time, using POST to make a GET request, is not supported.
@@ -273,10 +285,11 @@ async def post(statements: Union[LaxStatement, list[LaxStatement]]):
273285
# - generate IDs for statements that are missing them;
274286
# - use the list of keys to perform validations and as a final return value;
275287
# - provide an iterable containing both the statements and generated IDs for bulk.
276-
statements_dict = {
277-
statement.setdefault("id", str(uuid4())): statement
278-
for statement in map(lambda x: x.dict(exclude_unset=True), statements)
279-
}
288+
statements_dict = {}
289+
for statement in map(lambda x: x.dict(exclude_unset=True), statements):
290+
statement_id = str(statement.get("id", uuid4()))
291+
statement["id"] = statement_id
292+
statements_dict[statement_id] = statement
280293

281294
# Requests with duplicate statement IDs are considered invalid
282295
# statements_ids were deduplicated by the dict, statements list was not
@@ -287,6 +300,8 @@ async def post(statements: Union[LaxStatement, list[LaxStatement]]):
287300
detail="Duplicate statement IDs in the list of statements",
288301
)
289302

303+
background_tasks.add_task(forward_xapi_statements, list(statements_dict.values()))
304+
290305
es_response = ES_CLIENT.query({"query": {"terms": {"_id": statements_ids}}})
291306

292307
if len(es_response["hits"]["hits"]) > 0:

src/ralph/defaults.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ def load_config(config_file_path):
107107
},
108108
}
109109

110+
DEFAULT_XAPI_FORWARDING_CONFIG = {
111+
"forwarders": [
112+
{
113+
"is_active": False,
114+
"url": "http://localhost:8101",
115+
"basic_username": "john.doe@example.com",
116+
"basic_password": "password",
117+
"max_retries": 1,
118+
"timeout": 3,
119+
}
120+
],
121+
}
122+
110123
APP_DIR = Path(environ.get("RALPH_APP_DIR", get_app_dir("ralph")))
111124
DEFAULT_ENCODING = getattr(io, "LOCALE_ENCODING", "utf8")
112125
CONFIG_FILE = APP_DIR / "config.yml"
@@ -139,3 +152,4 @@ def load_config(config_file_path):
139152
EXECUTION_ENVIRONMENT = config("RALPH_EXECUTION_ENVIRONMENT", "development")
140153
RUNSERVER_HOST = config("RALPH_RUNSERVER_HOST", "0.0.0.0") # nosec
141154
RUNSERVER_PORT = config("RALPH_RUNSERVER_PORT", 8100)
155+
XAPI_FORWARDING = config("RALPH_XAPI_FORWARDING", DEFAULT_XAPI_FORWARDING_CONFIG)

tests/api/test_forwarding.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""Tests for the xAPI statements forwarding background task"""
2+
3+
import asyncio
4+
import logging
5+
6+
import pytest
7+
from httpx import RequestError
8+
from hypothesis import HealthCheck, settings
9+
from hypothesis import strategies as st
10+
11+
from ralph.api.forwarding import (
12+
ForwardingConfiguration,
13+
forward_xapi_statements,
14+
get_valid_xapi_forwarding,
15+
)
16+
17+
from tests.fixtures.hypothesis_strategies import custom_builds, custom_given
18+
19+
20+
@custom_given(ForwardingConfiguration)
21+
def test_api_forwarding_get_valid_xapi_forwarding_with_valid_values(config):
22+
"""Tests that the `get_valid_xapi_forwarding` function, given valid values, should
23+
return the expected list of valid configurations.
24+
"""
25+
26+
assert len(get_valid_xapi_forwarding([config])) == 1
27+
assert len(get_valid_xapi_forwarding([config, config])) == 2
28+
29+
30+
@pytest.mark.parametrize(
31+
"values,expected",
32+
[
33+
# Empty values should be ignored
34+
([{}], []),
35+
# Non active forwarding should be ignored
36+
(
37+
[
38+
{
39+
"url": "http://localhost",
40+
"is_active": False,
41+
"basic_username": "username",
42+
"basic_password": "password",
43+
"max_retries": 1,
44+
"timeout": 10,
45+
}
46+
],
47+
[],
48+
),
49+
# Incomplete forwarding should be ignored
50+
(
51+
[
52+
# Incomplete
53+
{
54+
"url": "http://localhost",
55+
"is_active": True,
56+
},
57+
# Valid
58+
{
59+
"url": "http://localhost",
60+
"is_active": True,
61+
"basic_username": "username",
62+
"basic_password": "password",
63+
"max_retries": 1,
64+
"timeout": 10,
65+
},
66+
# Incomplete
67+
{
68+
"is_active": True,
69+
"basic_username": "username",
70+
"basic_password": "password",
71+
"max_retries": 1,
72+
},
73+
],
74+
[
75+
ForwardingConfiguration(
76+
url="http://localhost",
77+
is_active=True,
78+
basic_username="username",
79+
basic_password="password",
80+
max_retries=1,
81+
timeout=10,
82+
)
83+
],
84+
),
85+
],
86+
)
87+
def test_api_forwarding_get_valid_xapi_forwarding_with_valid_and_invalid_values(
88+
values, expected, caplog
89+
):
90+
"""Tests that the `get_valid_xapi_forwarding` function, given valid and invalid
91+
values, should return a list containing only valid values.
92+
"""
93+
94+
with caplog.at_level(logging.DEBUG):
95+
assert get_valid_xapi_forwarding(values) == expected
96+
assert "Invalid RALPH_XAPI_FORWARDING.forwarder configuration value" in (
97+
caplog.record_tuples[0][2]
98+
)
99+
100+
101+
@settings(suppress_health_check=(HealthCheck.function_scoped_fixture,))
102+
@pytest.mark.parametrize("statements", [[{}, {"id": 1}]])
103+
@custom_given(custom_builds(ForwardingConfiguration, max_retries=st.just(1)))
104+
def test_api_forwarding_forward_xapi_statements_with_successful_request(
105+
monkeypatch, caplog, statements, config
106+
):
107+
"""Tests the `forward_xapi_statements` function should log the forwarded statements
108+
count if the request was successful.
109+
"""
110+
111+
class MockSuccessfulResponse:
112+
"""Dummy Successful Response."""
113+
114+
@staticmethod
115+
def raise_for_status():
116+
"""Does not raise any exceptions."""
117+
118+
async def post_success(*args, **kwargs): # pylint: disable=unused-argument
119+
"""Returns a MockSuccessfulResponse instance."""
120+
121+
return MockSuccessfulResponse()
122+
123+
monkeypatch.setattr("ralph.api.forwarding.AsyncClient.post", post_success)
124+
monkeypatch.setattr("ralph.api.forwarding.VALID_XAPI_FORWARDING", [config])
125+
126+
with caplog.at_level(logging.DEBUG):
127+
asyncio.get_event_loop().run_until_complete(forward_xapi_statements(statements))
128+
129+
assert [
130+
f"Forwarded {len(statements)} statements to {config.url} with success."
131+
] == [message for source, _, message in caplog.record_tuples if source != "asyncio"]
132+
133+
134+
@settings(suppress_health_check=(HealthCheck.function_scoped_fixture,))
135+
@pytest.mark.parametrize("statements", [[{}, {"id": 1}]])
136+
@custom_given(custom_builds(ForwardingConfiguration, max_retries=st.just(3)))
137+
def test_api_forwarding_forward_xapi_statements_with_unsuccessful_request(
138+
monkeypatch, caplog, statements, config
139+
):
140+
"""Tests the `forward_xapi_statements` function should log the the error if the
141+
request was successful.
142+
"""
143+
144+
class MockUnsuccessfulResponse:
145+
"""Dummy Failing Response."""
146+
147+
@staticmethod
148+
def raise_for_status():
149+
"""Dummy raise_for_status method that is always raising an exception."""
150+
151+
raise RequestError("Something went wrong.")
152+
153+
async def post_fail(*args, **kwargs): # pylint: disable=unused-argument
154+
"""Returns a MockUnsuccessfulResponse instance."""
155+
156+
return MockUnsuccessfulResponse()
157+
158+
monkeypatch.setattr("ralph.api.forwarding.AsyncClient.post", post_fail)
159+
monkeypatch.setattr("ralph.api.forwarding.VALID_XAPI_FORWARDING", [config])
160+
161+
with caplog.at_level(logging.DEBUG):
162+
asyncio.get_event_loop().run_until_complete(forward_xapi_statements(statements))
163+
164+
assert ["Failed to forward xapi statements. Something went wrong."] == [
165+
message for source, _, message in caplog.record_tuples if source != "asyncio"
166+
]

tests/api/test_statements_get.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import pytest
1111
from elasticsearch import Elasticsearch
12-
from elasticsearch.client import IndicesClient
1312
from elasticsearch.helpers import bulk
1413
from fastapi.testclient import TestClient
1514

@@ -20,7 +19,7 @@
2019
)
2120
ES_TEST_INDEX = "statements"
2221
ES_CLIENT = Elasticsearch(ES_TEST_HOSTS)
23-
ES_INDICES_CLIENT = IndicesClient(ES_CLIENT)
22+
ES_INDICES_CLIENT = ES_CLIENT.indices
2423

2524
client = TestClient(app)
2625

0 commit comments

Comments
 (0)