Skip to content

feat(kinesis): Kinesis Data Streams Library#6516

Draft
ekjotmultani wants to merge 9 commits intomainfrom
feat/kinesis-data-streams-library
Draft

feat(kinesis): Kinesis Data Streams Library#6516
ekjotmultani wants to merge 9 commits intomainfrom
feat/kinesis-data-streams-library

Conversation

@ekjotmultani
Copy link
Member

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

- Add Auth: fetch session, explicit sign-out, sign-in with credentials
- Add Storage: list files, delete files (cleanup)
- Add API: query Todo by ID, delete Todo (cleanup)
- Add DataStore: query with predicate, delete (cleanup)
- Remove redundant Analytics guest tests
- Add unique test data naming with UUIDs
@ekjotmultani ekjotmultani requested a review from a team as a code owner January 22, 2026 17:21
@ekjotmultani ekjotmultani marked this pull request as draft January 26, 2026 19:13
@ekjotmultani ekjotmultani changed the title Feat/kinesis data streams library feat(kinesis): Kinesis Data Streams Library Feb 3, 2026
int maxBytes = 5 * 1024 * 1024,
}) async {
// Query records sorted by stream_name, partition_key, id
final query = _db.select(_db.kinesisRecords)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we optimize this query to avoid consuming high amounts of RAM by loading data we don't process? See also the Android implementation.

retryableRecordIndices: List.generate(records.length, (i) => i),
);
} on SmithyHttpException catch (e) {
// Check if it's a retryable HTTP error (5xx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about 429 errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, addressing

/// {@endtemplate}
class NetworkException extends KinesisDataStreamsException {
/// {@macro aws_kinesis_datastreams.network_exception}
const NetworkException(super.message, {super.underlyingException})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we using this exception actually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it serves as being here while vNext common error classes are coming, should be thrown when network errors arise in send operations via the kinesis client, will address in commit

final Future<void> Function() _onFlush;
Timer? _timer;
bool _enabled = true;
bool _closed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of having both _enabled and _closed?

Copy link
Member Author

@ekjotmultani ekjotmultani Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_closed can serve as a flag for other code to know whether or not the client itself is shut down or if just flushing is enabled or not. In our implementation they do similar things but it is useful for getters to know the internal state of the client I feel. What do you think?

final entry = resultEntries[i];

if (entry.errorCode == null) {
// Success - record has sequenceNumber and shardId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record has sequenceNumber and shardId

Is this comment relevant? How are we using shardId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we arent using them, the comment just documents that we must have been returned these from the sdk call

await client.flush();

// Assert - if no exception thrown, the record was sent successfully
// The flush would throw if the stream doesn't exist or credentials are invalid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more comprehensive assert here?

}
await client.flush();

// Assert - all records sent without error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

║ 2. Fill in your AWS credentials and stream name ║
║ 3. Set isConfigured = true ║
║ 4. Run: dart test test/e2e/ --tags=e2e ║
╚══════════════════════════════════════════════════════════════════╝
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually implement the e2e test before merging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes before merge these tests will be live

client.enable();
await client.flush();

// Assert - no exception means success (nothing was queued)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

});
});
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ensure concurrent access to the record storage works via a test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants