Skip to content

Commit 035c2dd

Browse files
committed
Implement async JSONL streaming support and add corresponding tests
1 parent 4ba5084 commit 035c2dd

File tree

3 files changed

+146
-9
lines changed

3 files changed

+146
-9
lines changed

.github/product.md

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -206,21 +206,30 @@ Remaining parameterized builtins:
206206
- Destructuring alternative operator `?//`
207207
- SQL-style operators: `INDEX`, `IN`, `JOIN`
208208

209-
### - [ ] Phase 16 — I/O
209+
### - [x] Phase 16 - Streaming
210210

211-
- `input`, `inputs`
212-
- `debug`, `debug(msgs)`, `stderr`
213-
- `input_filename`, `input_line_number`
211+
Implement support for JSONL/NDJSON async streaming
214212

215-
### - [ ] Phase 17 — Streaming
216-
217-
- `tostream`, `fromstream(stream_expression)`, `truncate_stream(stream_expression)`
218-
219-
### - [ ] Phase 18 — Modules
213+
### - [ ] Phase 16 — Modules
220214

221215
- `import RelativePathString as NAME [<metadata>];`
222216
- `include RelativePathString [<metadata>];`
223217
- `import RelativePathString as $NAME [<metadata>];`
224218
- `module <metadata>;`
225219
- `modulemeta`
226220
- Module search paths
221+
222+
## Out of Scope
223+
224+
Features from the jq manual we will not implement.
225+
226+
### I/O
227+
228+
- `input`, `inputs`
229+
- `debug`, `debug(msgs)`, `stderr`
230+
- `input_filename`, `input_line_number`
231+
232+
### Streaming
233+
234+
- `tostream`, `fromstream(stream_expression)`, `truncate_stream(stream_expression)`
235+

src/JQSharp/Jq.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Runtime.CompilerServices;
12
using System.Text.Json;
23

34
namespace Devlooped;
@@ -35,4 +36,45 @@ public static JqExpression Parse(string expression)
3536
/// <exception cref="JqException">Thrown when the expression is empty, invalid, or causes an error during evaluation.</exception>
3637
public static IEnumerable<JsonElement> Evaluate(string expression, JsonElement input)
3738
=> Parse(expression).Evaluate(input);
39+
40+
/// <summary>
41+
/// Evaluates a jq filter expression against each element in an asynchronous sequence and yields the matching results.
42+
/// </summary>
43+
/// <param name="expression">The jq filter expression to evaluate.</param>
44+
/// <param name="input">The asynchronous sequence of <see cref="JsonElement"/> values to evaluate the filter against.</param>
45+
/// <param name="cancellationToken">A token to cancel the asynchronous iteration.</param>
46+
/// <returns>
47+
/// An asynchronous sequence of <see cref="JsonElement"/> values produced by applying the filter
48+
/// to each element in <paramref name="input"/>.
49+
/// </returns>
50+
/// <exception cref="JqException">Thrown when the expression is empty, invalid, or causes an error during evaluation.</exception>
51+
public static async IAsyncEnumerable<JsonElement> EvaluateAsync(string expression, IAsyncEnumerable<JsonElement> input, [EnumeratorCancellation] CancellationToken cancellationToken = default)
52+
{
53+
var parsed = Parse(expression);
54+
await foreach (var element in EvaluateAsync(parsed, input, cancellationToken))
55+
yield return element;
56+
}
57+
58+
/// <summary>
59+
/// Evaluates a parsed <see cref="JqExpression"/> against each element in an asynchronous sequence and yields the matching results.
60+
/// </summary>
61+
/// <param name="expression">The pre-parsed <see cref="JqExpression"/> to evaluate.</param>
62+
/// <param name="input">The asynchronous sequence of <see cref="JsonElement"/> values to evaluate the expression against.</param>
63+
/// <param name="cancellationToken">A token to cancel the asynchronous iteration.</param>
64+
/// <returns>
65+
/// An asynchronous sequence of <see cref="JsonElement"/> values produced by applying the expression
66+
/// to each element in <paramref name="input"/>.
67+
/// </returns>
68+
/// <remarks>
69+
/// Prefer this overload when evaluating the same expression against multiple streams, as it avoids
70+
/// re-parsing the filter on every call.
71+
/// </remarks>
72+
public static async IAsyncEnumerable<JsonElement> EvaluateAsync(JqExpression expression, IAsyncEnumerable<JsonElement> input, [EnumeratorCancellation] CancellationToken cancellationToken = default)
73+
{
74+
await foreach (var element in input.WithCancellation(cancellationToken))
75+
{
76+
foreach (var result in expression.Evaluate(element))
77+
yield return result;
78+
}
79+
}
3880
}

src/Tests/JqJsonlStreamingTests.cs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System.Runtime.CompilerServices;
2+
using System.Text.Json;
3+
using Devlooped;
4+
5+
namespace Devlooped.Tests;
6+
7+
public class JqJsonlStreamingTests
8+
{
9+
static async IAsyncEnumerable<JsonElement> ParseJsonLines(
10+
string jsonl,
11+
[EnumeratorCancellation] CancellationToken cancellation = default)
12+
{
13+
using var reader = new StringReader(jsonl);
14+
while (await reader.ReadLineAsync(cancellation).ConfigureAwait(false) is { } line)
15+
{
16+
line = line.Trim();
17+
if (line.Length == 0)
18+
continue;
19+
20+
yield return JsonDocument.Parse(line).RootElement;
21+
}
22+
}
23+
24+
[Fact]
25+
public async Task Jsonl_stream_extracts_field_from_each_element()
26+
{
27+
var jsonl = """
28+
{"name":"Alice","age":30}
29+
{"name":"Bob","age":25}
30+
{"name":"Charlie","age":35}
31+
""";
32+
33+
var results = new List<string>();
34+
await foreach (var element in Jq.EvaluateAsync(".name", ParseJsonLines(jsonl)))
35+
results.Add(element.GetString()!);
36+
37+
Assert.Equal(["Alice", "Bob", "Charlie"], results);
38+
}
39+
40+
[Fact]
41+
public async Task Jsonl_stream_transforms_each_element()
42+
{
43+
var jsonl = """
44+
{"x":1}
45+
{"x":2}
46+
{"x":3}
47+
""";
48+
49+
var results = new List<int>();
50+
await foreach (var element in Jq.EvaluateAsync(".x * 2", ParseJsonLines(jsonl)))
51+
results.Add(element.GetInt32());
52+
53+
Assert.Equal([2, 4, 6], results);
54+
}
55+
56+
[Fact]
57+
public async Task Jsonl_stream_expands_array_elements_across_inputs()
58+
{
59+
var jsonl = """
60+
{"items":[1,2]}
61+
{"items":[3,4]}
62+
""";
63+
64+
var results = new List<int>();
65+
await foreach (var element in Jq.EvaluateAsync(".items[]", ParseJsonLines(jsonl)))
66+
results.Add(element.GetInt32());
67+
68+
Assert.Equal([1, 2, 3, 4], results);
69+
}
70+
71+
[Fact]
72+
public async Task Jsonl_stream_filters_elements_with_select()
73+
{
74+
var jsonl = """
75+
{"name":"Alice","active":true}
76+
{"name":"Bob","active":false}
77+
{"name":"Charlie","active":true}
78+
""";
79+
80+
var results = new List<string>();
81+
await foreach (var element in Jq.EvaluateAsync("select(.active) | .name", ParseJsonLines(jsonl)))
82+
results.Add(element.GetString()!);
83+
84+
Assert.Equal(["Alice", "Charlie"], results);
85+
}
86+
}

0 commit comments

Comments
 (0)