Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/brick_offline_first_with_supabase/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.1.0

- Use `SupabaseProvider#subscribeToRealtime` to generate the channel used by `OfflineFirstWithSupabaseRepository#subscribeToRealtime`
- **Breaking Change** protected method `OfflineFirstWithSupabaseRepository#queryToPostgresChangeFilter` has been moved to `SupabaseProvider#queryToPostgresChangeFilter`. Implementations should override this method in `SupabaseProvider` instead.

## 2.0.0

- Dart minimum SDK is updated to `3.4.0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,30 +194,6 @@ abstract class OfflineFirstWithSupabaseRepository<
);
}

/// Convert a query to a [PostgresChangeFilter] for use with [subscribeToRealtime].
@protected
@visibleForTesting
@visibleForOverriding
PostgresChangeFilter? queryToPostgresChangeFilter<TModel extends TRepositoryModel>(
Query query,
) {
final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;
if (query.where?.isEmpty ?? true) return null;
final condition = query.where!.first;
final column = adapter.fieldsToSupabaseColumns[condition.evaluatedField]?.columnName;

if (column == null) return null;

final type = _compareToFilterParam(condition.compare);
if (type == null) return null;

return PostgresChangeFilter(
type: type,
column: column,
value: condition.value,
);
}

@override
Future<void> reset() async {
await super.reset();
Expand Down Expand Up @@ -254,8 +230,7 @@ abstract class OfflineFirstWithSupabaseRepository<
///
/// [query] is an optional query to filter the data. The query **must be** one level -
/// `Query.where('user', Query.exact('name', 'Tom'))` is invalid but `Query.where('name', 'Tom')`
/// is valid. The [Compare] operator is limited to a [PostgresChangeFilterType] equivalent.
/// See [_compareToFilterParam] for a precise breakdown.
/// is valid. The [Compare] operator is limited to a [PostgresChangeFilterType] equivalent. See [SupabaseProvider.queryToPostgresChangeFilter] for more details.
Stream<List<TModel>> subscribeToRealtime<TModel extends TRepositoryModel>({
PostgresChangeEvent eventType = PostgresChangeEvent.all,
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.alwaysHydrate,
Expand All @@ -274,80 +249,76 @@ abstract class OfflineFirstWithSupabaseRepository<
return subscribe<TModel>(policy: policy, query: query);
}

final channel = remoteProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
event: eventType,
schema: schema,
table: adapter.supabaseTableName,
filter: queryToPostgresChangeFilter<TModel>(query),
callback: (payload) async {
switch (payload.eventType) {
// This code path is likely never hit; `PostgresChangeEvent.all` is used
// to listen to changes but as far as can be determined is not delivered within
// the payload of the callback.
//
// It's handled just in case this behavior changes.
case PostgresChangeEvent.all:
final localResults = await sqliteProvider.get<TModel>(repository: this);
final remoteResults =
await get<TModel>(query: query, policy: OfflineFirstGetPolicy.awaitRemote);
final toDelete = localResults.where((r) => !remoteResults.contains(r));

for (final deletableModel in toDelete) {
await sqliteProvider.delete<TModel>(deletableModel, repository: this);
memoryCacheProvider.delete<TModel>(deletableModel, repository: this);
}

case PostgresChangeEvent.delete:
final query = queryFromSupabaseDeletePayload(
payload.oldRecord,
supabaseDefinitions: adapter.fieldsToSupabaseColumns,
);

if (query.where?.isEmpty ?? true) return;

final results = await get<TModel>(
query: query,
policy: OfflineFirstGetPolicy.localOnly,
seedOnly: true,
);
if (results.isEmpty) return;

await sqliteProvider.delete<TModel>(results.first, repository: this);
memoryCacheProvider.delete<TModel>(results.first, repository: this);

case PostgresChangeEvent.insert || PostgresChangeEvent.update:
// The supabase payload is not configurable and will not supply associations.
// For models that have associations, an additional network call must be
// made to retrieve all scoped data.
final modelHasAssociations = adapter.fieldsToSupabaseColumns.entries
.any((entry) => entry.value.association && !entry.value.associationIsNullable);

if (modelHasAssociations) {
await get<TModel>(
query: query,
policy: OfflineFirstGetPolicy.alwaysHydrate,
seedOnly: true,
);

return;
}

final instance = await adapter.fromSupabase(
payload.newRecord,
provider: remoteProvider,
repository: this,
);

await sqliteProvider.upsert<TModel>(instance as TModel, repository: this);
memoryCacheProvider.upsert<TModel>(instance, repository: this);
final channel = remoteProvider.subscribeToRealtime<TModel>(
eventType: eventType,
query: query,
schema: schema,
callback: (payload) async {
switch (payload.eventType) {
// This code path is likely never hit; `PostgresChangeEvent.all` is used
// to listen to changes but as far as can be determined is not delivered within
// the payload of the callback.
//
// It's handled just in case this behavior changes.
case PostgresChangeEvent.all:
final localResults = await sqliteProvider.get<TModel>(repository: this);
final remoteResults =
await get<TModel>(query: query, policy: OfflineFirstGetPolicy.awaitRemote);
final toDelete = localResults.where((r) => !remoteResults.contains(r));

for (final deletableModel in toDelete) {
await sqliteProvider.delete<TModel>(deletableModel, repository: this);
memoryCacheProvider.delete<TModel>(deletableModel, repository: this);
}

await notifySubscriptionsWithLocalData<TModel>();
},
)
.subscribe();
case PostgresChangeEvent.delete:
final query = queryFromSupabaseDeletePayload(
payload.oldRecord,
supabaseDefinitions: adapter.fieldsToSupabaseColumns,
);

if (query.where?.isEmpty ?? true) return;

final results = await get<TModel>(
query: query,
policy: OfflineFirstGetPolicy.localOnly,
seedOnly: true,
);
if (results.isEmpty) return;

await sqliteProvider.delete<TModel>(results.first, repository: this);
memoryCacheProvider.delete<TModel>(results.first, repository: this);

case PostgresChangeEvent.insert || PostgresChangeEvent.update:
// The supabase payload is not configurable and will not supply associations.
// For models that have associations, an additional network call must be
// made to retrieve all scoped data.
final modelHasAssociations = adapter.fieldsToSupabaseColumns.entries
.any((entry) => entry.value.association && !entry.value.associationIsNullable);

if (modelHasAssociations) {
await get<TModel>(
query: query,
policy: OfflineFirstGetPolicy.alwaysHydrate,
seedOnly: true,
);

return;
}

final instance = await adapter.fromSupabase(
payload.newRecord,
provider: remoteProvider,
repository: this,
);

await sqliteProvider.upsert<TModel>(instance as TModel, repository: this);
memoryCacheProvider.upsert<TModel>(instance, repository: this);
}

await notifySubscriptionsWithLocalData<TModel>();
},
);

final controller = StreamController<List<TModel>>(
onCancel: () async {
Expand Down Expand Up @@ -396,29 +367,6 @@ abstract class OfflineFirstWithSupabaseRepository<
}
}

PostgresChangeFilterType? _compareToFilterParam(Compare compare) {
switch (compare) {
case Compare.exact:
return PostgresChangeFilterType.eq;
case Compare.contains:
return PostgresChangeFilterType.inFilter;
case Compare.greaterThan:
return PostgresChangeFilterType.gt;
case Compare.greaterThanOrEqualTo:
return PostgresChangeFilterType.gte;
case Compare.lessThan:
return PostgresChangeFilterType.lt;
case Compare.lessThanOrEqualTo:
return PostgresChangeFilterType.lte;
case Compare.notEqual:
return PostgresChangeFilterType.neq;
case Compare.between:
return null;
case Compare.doesNotContain:
return null;
}
}

/// This is a convenience method to create the basic offline client and queue.
/// The client is used to add offline capabilities to [SupabaseProvider];
/// the queue is used to add offline to the repository.
Expand Down
4 changes: 2 additions & 2 deletions packages/brick_offline_first_with_supabase/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ homepage: https://github.com/GetDutchie/brick/tree/main/packages/brick_offline_f
issue_tracker: https://github.com/GetDutchie/brick/issues
repository: https://github.com/GetDutchie/brick

version: 2.0.0
version: 2.1.0

environment:
sdk: ">=3.4.0 <4.0.0"
Expand All @@ -15,7 +15,7 @@ dependencies:
brick_offline_first: ">=4.0.0 <5.0.0"
brick_offline_first_with_rest: ">=4.0.0 <5.0.0"
brick_sqlite: ">=4.0.0 <5.0.0"
brick_supabase: ">=2.0.0 <3.0.0"
brick_supabase: ">=2.1.0 <3.0.0"
http: ">=1.0.0 <2.0.0"
logging: ">=1.0.0 <2.0.0"
meta: ">=1.3.0 <2.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,106 +211,6 @@ void main() {
});
});

group('#queryToPostgresChangeFilter', () {
group('returns null', () {
test('for complex queries', () {
final query = Query.where('pizza', const Where.exact('id', 2));
expect(repository.queryToPostgresChangeFilter<Customer>(query), isNull);
});

test('for empty queries', () {
const query = Query();
expect(repository.queryToPostgresChangeFilter<Customer>(query), isNull);
});

test('for missing columns', () {
final query = Query.where('unknown', 1);
expect(repository.queryToPostgresChangeFilter<Customer>(query), isNull);
});
});

group('Compare', () {
test('.between', () {
final query = Query(where: [const Where('firstName').isBetween(1, 2)]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);
expect(filter, isNull);
});

test('.doesNotContain', () {
final query = Query(where: [const Where('firstName').doesNotContain('Thomas')]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);
expect(filter, isNull);
});

test('.exact', () {
const query = Query(where: [Where.exact('firstName', 'Thomas')]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.eq);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});

test('.greaterThan', () {
final query = Query(where: [const Where('firstName').isGreaterThan('Thomas')]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.gt);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});

test('.greaterThanOrEqualTo', () {
final query = Query(
where: [const Where('firstName').isGreaterThanOrEqualTo('Thomas')],
);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.gte);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});

test('.lessThan', () {
final query = Query(where: [const Where('firstName').isLessThan('Thomas')]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.lt);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});

test('.lessThanOrEqualTo', () {
final query = Query(
where: [const Where('firstName').isLessThanOrEqualTo('Thomas')],
);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.lte);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});

test('.notEqual', () {
final query = Query(where: [const Where('firstName').isNot('Thomas')]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.neq);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});

test('.contains', () {
final query = Query(where: [const Where('firstName').contains('Thomas')]);
final filter = repository.queryToPostgresChangeFilter<Customer>(query);

expect(filter!.type, PostgresChangeFilterType.inFilter);
expect(filter.column, 'first_name');
expect(filter.value, 'Thomas');
});
});
});

group('#supabaseRealtimeSubscriptions', () {
test('adds controller and query to #supabaseRealtimeSubscriptions', () async {
expect(repository.supabaseRealtimeSubscriptions, hasLength(0));
Expand Down
5 changes: 5 additions & 0 deletions packages/brick_supabase/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.1.0

- Add `SupabaseProvider#subscribeToRealtime` to subscribe to [Supabase channels](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart).
- Add `SupabaseProvider#queryToPostgresChangeFilter` to convert `Query`s for Supabase subscriptions

## 2.0.0

- **BREAKING CHANGE** `Query(providerArgs:)` is no longer supported; see [1.2.0](#1.2.0) for migration steps
Expand Down
Loading