diff --git a/packages/brick_offline_first_with_supabase/CHANGELOG.md b/packages/brick_offline_first_with_supabase/CHANGELOG.md index 6a7da4ab..f5f62ec3 100644 --- a/packages/brick_offline_first_with_supabase/CHANGELOG.md +++ b/packages/brick_offline_first_with_supabase/CHANGELOG.md @@ -1,3 +1,8 @@ +## 2.1.0 + +- Use `SupabaseProvider#subscribeToRealtime` to generate the channel used by `OfflineFirstWithSupabaseRepository#subscribeToRealtime` +- **Breaking Change** protected method `OfflineFirstWithSupabaseRepository#queryToPostgresChangeFilter` has been moved to `SupabaseProvider#queryToPostgresChangeFilter`. Implementations should override this method in `SupabaseProvider` instead. + ## 2.0.0 - Dart minimum SDK is updated to `3.4.0` diff --git a/packages/brick_offline_first_with_supabase/lib/src/offline_first_with_supabase_repository.dart b/packages/brick_offline_first_with_supabase/lib/src/offline_first_with_supabase_repository.dart index 1d81df8d..32baac0a 100644 --- a/packages/brick_offline_first_with_supabase/lib/src/offline_first_with_supabase_repository.dart +++ b/packages/brick_offline_first_with_supabase/lib/src/offline_first_with_supabase_repository.dart @@ -194,30 +194,6 @@ abstract class OfflineFirstWithSupabaseRepository< ); } - /// Convert a query to a [PostgresChangeFilter] for use with [subscribeToRealtime]. - @protected - @visibleForTesting - @visibleForOverriding - PostgresChangeFilter? queryToPostgresChangeFilter( - Query query, - ) { - final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!; - if (query.where?.isEmpty ?? true) return null; - final condition = query.where!.first; - final column = adapter.fieldsToSupabaseColumns[condition.evaluatedField]?.columnName; - - if (column == null) return null; - - final type = _compareToFilterParam(condition.compare); - if (type == null) return null; - - return PostgresChangeFilter( - type: type, - column: column, - value: condition.value, - ); - } - @override Future reset() async { await super.reset(); @@ -254,8 +230,7 @@ abstract class OfflineFirstWithSupabaseRepository< /// /// [query] is an optional query to filter the data. The query **must be** one level - /// `Query.where('user', Query.exact('name', 'Tom'))` is invalid but `Query.where('name', 'Tom')` - /// is valid. The [Compare] operator is limited to a [PostgresChangeFilterType] equivalent. - /// See [_compareToFilterParam] for a precise breakdown. + /// is valid. The [Compare] operator is limited to a [PostgresChangeFilterType] equivalent. See [SupabaseProvider.queryToPostgresChangeFilter] for more details. Stream> subscribeToRealtime({ PostgresChangeEvent eventType = PostgresChangeEvent.all, OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.alwaysHydrate, @@ -274,80 +249,76 @@ abstract class OfflineFirstWithSupabaseRepository< return subscribe(policy: policy, query: query); } - final channel = remoteProvider.client - .channel(adapter.supabaseTableName) - .onPostgresChanges( - event: eventType, - schema: schema, - table: adapter.supabaseTableName, - filter: queryToPostgresChangeFilter(query), - callback: (payload) async { - switch (payload.eventType) { - // This code path is likely never hit; `PostgresChangeEvent.all` is used - // to listen to changes but as far as can be determined is not delivered within - // the payload of the callback. - // - // It's handled just in case this behavior changes. - case PostgresChangeEvent.all: - final localResults = await sqliteProvider.get(repository: this); - final remoteResults = - await get(query: query, policy: OfflineFirstGetPolicy.awaitRemote); - final toDelete = localResults.where((r) => !remoteResults.contains(r)); - - for (final deletableModel in toDelete) { - await sqliteProvider.delete(deletableModel, repository: this); - memoryCacheProvider.delete(deletableModel, repository: this); - } - - case PostgresChangeEvent.delete: - final query = queryFromSupabaseDeletePayload( - payload.oldRecord, - supabaseDefinitions: adapter.fieldsToSupabaseColumns, - ); - - if (query.where?.isEmpty ?? true) return; - - final results = await get( - query: query, - policy: OfflineFirstGetPolicy.localOnly, - seedOnly: true, - ); - if (results.isEmpty) return; - - await sqliteProvider.delete(results.first, repository: this); - memoryCacheProvider.delete(results.first, repository: this); - - case PostgresChangeEvent.insert || PostgresChangeEvent.update: - // The supabase payload is not configurable and will not supply associations. - // For models that have associations, an additional network call must be - // made to retrieve all scoped data. - final modelHasAssociations = adapter.fieldsToSupabaseColumns.entries - .any((entry) => entry.value.association && !entry.value.associationIsNullable); - - if (modelHasAssociations) { - await get( - query: query, - policy: OfflineFirstGetPolicy.alwaysHydrate, - seedOnly: true, - ); - - return; - } - - final instance = await adapter.fromSupabase( - payload.newRecord, - provider: remoteProvider, - repository: this, - ); - - await sqliteProvider.upsert(instance as TModel, repository: this); - memoryCacheProvider.upsert(instance, repository: this); + final channel = remoteProvider.subscribeToRealtime( + eventType: eventType, + query: query, + schema: schema, + callback: (payload) async { + switch (payload.eventType) { + // This code path is likely never hit; `PostgresChangeEvent.all` is used + // to listen to changes but as far as can be determined is not delivered within + // the payload of the callback. + // + // It's handled just in case this behavior changes. + case PostgresChangeEvent.all: + final localResults = await sqliteProvider.get(repository: this); + final remoteResults = + await get(query: query, policy: OfflineFirstGetPolicy.awaitRemote); + final toDelete = localResults.where((r) => !remoteResults.contains(r)); + + for (final deletableModel in toDelete) { + await sqliteProvider.delete(deletableModel, repository: this); + memoryCacheProvider.delete(deletableModel, repository: this); } - await notifySubscriptionsWithLocalData(); - }, - ) - .subscribe(); + case PostgresChangeEvent.delete: + final query = queryFromSupabaseDeletePayload( + payload.oldRecord, + supabaseDefinitions: adapter.fieldsToSupabaseColumns, + ); + + if (query.where?.isEmpty ?? true) return; + + final results = await get( + query: query, + policy: OfflineFirstGetPolicy.localOnly, + seedOnly: true, + ); + if (results.isEmpty) return; + + await sqliteProvider.delete(results.first, repository: this); + memoryCacheProvider.delete(results.first, repository: this); + + case PostgresChangeEvent.insert || PostgresChangeEvent.update: + // The supabase payload is not configurable and will not supply associations. + // For models that have associations, an additional network call must be + // made to retrieve all scoped data. + final modelHasAssociations = adapter.fieldsToSupabaseColumns.entries + .any((entry) => entry.value.association && !entry.value.associationIsNullable); + + if (modelHasAssociations) { + await get( + query: query, + policy: OfflineFirstGetPolicy.alwaysHydrate, + seedOnly: true, + ); + + return; + } + + final instance = await adapter.fromSupabase( + payload.newRecord, + provider: remoteProvider, + repository: this, + ); + + await sqliteProvider.upsert(instance as TModel, repository: this); + memoryCacheProvider.upsert(instance, repository: this); + } + + await notifySubscriptionsWithLocalData(); + }, + ); final controller = StreamController>( onCancel: () async { @@ -396,29 +367,6 @@ abstract class OfflineFirstWithSupabaseRepository< } } - PostgresChangeFilterType? _compareToFilterParam(Compare compare) { - switch (compare) { - case Compare.exact: - return PostgresChangeFilterType.eq; - case Compare.contains: - return PostgresChangeFilterType.inFilter; - case Compare.greaterThan: - return PostgresChangeFilterType.gt; - case Compare.greaterThanOrEqualTo: - return PostgresChangeFilterType.gte; - case Compare.lessThan: - return PostgresChangeFilterType.lt; - case Compare.lessThanOrEqualTo: - return PostgresChangeFilterType.lte; - case Compare.notEqual: - return PostgresChangeFilterType.neq; - case Compare.between: - return null; - case Compare.doesNotContain: - return null; - } - } - /// This is a convenience method to create the basic offline client and queue. /// The client is used to add offline capabilities to [SupabaseProvider]; /// the queue is used to add offline to the repository. diff --git a/packages/brick_offline_first_with_supabase/pubspec.yaml b/packages/brick_offline_first_with_supabase/pubspec.yaml index e5610af7..efc7eca6 100644 --- a/packages/brick_offline_first_with_supabase/pubspec.yaml +++ b/packages/brick_offline_first_with_supabase/pubspec.yaml @@ -5,7 +5,7 @@ homepage: https://github.com/GetDutchie/brick/tree/main/packages/brick_offline_f issue_tracker: https://github.com/GetDutchie/brick/issues repository: https://github.com/GetDutchie/brick -version: 2.0.0 +version: 2.1.0 environment: sdk: ">=3.4.0 <4.0.0" @@ -15,7 +15,7 @@ dependencies: brick_offline_first: ">=4.0.0 <5.0.0" brick_offline_first_with_rest: ">=4.0.0 <5.0.0" brick_sqlite: ">=4.0.0 <5.0.0" - brick_supabase: ">=2.0.0 <3.0.0" + brick_supabase: ">=2.1.0 <3.0.0" http: ">=1.0.0 <2.0.0" logging: ">=1.0.0 <2.0.0" meta: ">=1.3.0 <2.0.0" diff --git a/packages/brick_offline_first_with_supabase/test/offline_first_with_supabase_repository_test.dart b/packages/brick_offline_first_with_supabase/test/offline_first_with_supabase_repository_test.dart index 2db9f11e..a5b25f51 100644 --- a/packages/brick_offline_first_with_supabase/test/offline_first_with_supabase_repository_test.dart +++ b/packages/brick_offline_first_with_supabase/test/offline_first_with_supabase_repository_test.dart @@ -211,106 +211,6 @@ void main() { }); }); - group('#queryToPostgresChangeFilter', () { - group('returns null', () { - test('for complex queries', () { - final query = Query.where('pizza', const Where.exact('id', 2)); - expect(repository.queryToPostgresChangeFilter(query), isNull); - }); - - test('for empty queries', () { - const query = Query(); - expect(repository.queryToPostgresChangeFilter(query), isNull); - }); - - test('for missing columns', () { - final query = Query.where('unknown', 1); - expect(repository.queryToPostgresChangeFilter(query), isNull); - }); - }); - - group('Compare', () { - test('.between', () { - final query = Query(where: [const Where('firstName').isBetween(1, 2)]); - final filter = repository.queryToPostgresChangeFilter(query); - expect(filter, isNull); - }); - - test('.doesNotContain', () { - final query = Query(where: [const Where('firstName').doesNotContain('Thomas')]); - final filter = repository.queryToPostgresChangeFilter(query); - expect(filter, isNull); - }); - - test('.exact', () { - const query = Query(where: [Where.exact('firstName', 'Thomas')]); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.eq); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - - test('.greaterThan', () { - final query = Query(where: [const Where('firstName').isGreaterThan('Thomas')]); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.gt); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - - test('.greaterThanOrEqualTo', () { - final query = Query( - where: [const Where('firstName').isGreaterThanOrEqualTo('Thomas')], - ); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.gte); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - - test('.lessThan', () { - final query = Query(where: [const Where('firstName').isLessThan('Thomas')]); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.lt); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - - test('.lessThanOrEqualTo', () { - final query = Query( - where: [const Where('firstName').isLessThanOrEqualTo('Thomas')], - ); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.lte); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - - test('.notEqual', () { - final query = Query(where: [const Where('firstName').isNot('Thomas')]); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.neq); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - - test('.contains', () { - final query = Query(where: [const Where('firstName').contains('Thomas')]); - final filter = repository.queryToPostgresChangeFilter(query); - - expect(filter!.type, PostgresChangeFilterType.inFilter); - expect(filter.column, 'first_name'); - expect(filter.value, 'Thomas'); - }); - }); - }); - group('#supabaseRealtimeSubscriptions', () { test('adds controller and query to #supabaseRealtimeSubscriptions', () async { expect(repository.supabaseRealtimeSubscriptions, hasLength(0)); diff --git a/packages/brick_supabase/CHANGELOG.md b/packages/brick_supabase/CHANGELOG.md index 263307f3..3b77a7df 100644 --- a/packages/brick_supabase/CHANGELOG.md +++ b/packages/brick_supabase/CHANGELOG.md @@ -1,3 +1,8 @@ +## 2.1.0 + +- Add `SupabaseProvider#subscribeToRealtime` to subscribe to [Supabase channels](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart). +- Add `SupabaseProvider#queryToPostgresChangeFilter` to convert `Query`s for Supabase subscriptions + ## 2.0.0 - **BREAKING CHANGE** `Query(providerArgs:)` is no longer supported; see [1.2.0](#1.2.0) for migration steps diff --git a/packages/brick_supabase/lib/src/supabase_provider.dart b/packages/brick_supabase/lib/src/supabase_provider.dart index b7ff1ecb..4ed7e432 100644 --- a/packages/brick_supabase/lib/src/supabase_provider.dart +++ b/packages/brick_supabase/lib/src/supabase_provider.dart @@ -41,6 +41,29 @@ class SupabaseProvider implements Provider { required this.modelDictionary, }) : logger = Logger('SupabaseProvider'); + PostgresChangeFilterType? _compareToFilterParam(Compare compare) { + switch (compare) { + case Compare.exact: + return PostgresChangeFilterType.eq; + case Compare.contains: + return PostgresChangeFilterType.inFilter; + case Compare.greaterThan: + return PostgresChangeFilterType.gt; + case Compare.greaterThanOrEqualTo: + return PostgresChangeFilterType.gte; + case Compare.lessThan: + return PostgresChangeFilterType.lt; + case Compare.lessThanOrEqualTo: + return PostgresChangeFilterType.lte; + case Compare.notEqual: + return PostgresChangeFilterType.neq; + case Compare.between: + return null; + case Compare.doesNotContain: + return null; + } + } + /// Sends a DELETE request method to the endpoint @override Future delete( @@ -117,6 +140,67 @@ class SupabaseProvider implements Provider { ) as TModel; } + /// Convert a query to a [PostgresChangeFilter] for use with [subscribeToRealtime]. + PostgresChangeFilter? queryToPostgresChangeFilter(Query query) { + final adapter = modelDictionary.adapterFor[TModel]!; + if (query.where?.isEmpty ?? true) return null; + final condition = query.where!.first; + + final definition = adapter.fieldsToSupabaseColumns[condition.evaluatedField]; + if (definition == null) return null; + if (definition.association) return null; + + final type = _compareToFilterParam(condition.compare); + if (type == null) return null; + + return PostgresChangeFilter( + type: type, + column: definition.columnName, + value: condition.value, + ); + } + + /// Subscribes to realtime updates using + /// [Supabase channels](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart). + /// **This will only work if your Supabase table has realtime enabled.** + /// Follow [Supabase's documentation](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart#realtime-api) + /// to setup your table. + /// + /// The resulting stream will also notify for locally-made changes. In an online state, this + /// will result in duplicate events on the stream - the local copy is updated and notifies + /// the caller, then the Supabase realtime event is received and notifies the caller again. + /// + /// Supabase's channels can + /// [become expensive quickly](https://supabase.com/docs/guides/realtime/quotas); + /// please consider scale when utilizing this method. + /// + /// [eventType] is the triggering remote event. + /// + /// [query] is an optional query to filter the data. The query **must be** one level - + /// `Query.where('user', Query.exact('name', 'Tom'))` is invalid but `Query.where('name', 'Tom')` + /// is valid. The [Compare] operator is limited to a [PostgresChangeFilterType] equivalent. + /// See [_compareToFilterParam] for a precise breakdown. + /// + /// [RealtimeChannel.subscribe] is invoked before the [RealtimeChannel] is returned to the caller. + RealtimeChannel subscribeToRealtime({ + required void Function(PostgresChangePayload payload) callback, + PostgresChangeEvent eventType = PostgresChangeEvent.all, + Query? query, + String schema = 'public', + }) { + final adapter = modelDictionary.adapterFor[TModel]!; + return client + .channel(adapter.supabaseTableName) + .onPostgresChanges( + event: eventType, + schema: schema, + table: adapter.supabaseTableName, + filter: queryToPostgresChangeFilter(query ?? const Query()), + callback: callback, + ) + .subscribe(); + } + /// In almost all cases, use [upsert]. This method is provided for cases when a table's /// policy permits updates without inserts. Future update( diff --git a/packages/brick_supabase/pubspec.yaml b/packages/brick_supabase/pubspec.yaml index 48ee6f04..5e0e1c68 100644 --- a/packages/brick_supabase/pubspec.yaml +++ b/packages/brick_supabase/pubspec.yaml @@ -4,7 +4,7 @@ homepage: https://github.com/GetDutchie/brick/tree/main/packages/brick_supabase issue_tracker: https://github.com/GetDutchie/brick/issues repository: https://github.com/GetDutchie/brick -version: 2.0.0 +version: 2.1.0 environment: sdk: ">=3.4.0 <4.0.0" diff --git a/packages/brick_supabase/test/supabase_provider_test.dart b/packages/brick_supabase/test/supabase_provider_test.dart index f571b878..d43ce003 100644 --- a/packages/brick_supabase/test/supabase_provider_test.dart +++ b/packages/brick_supabase/test/supabase_provider_test.dart @@ -4,6 +4,7 @@ import 'package:brick_core/query.dart'; import 'package:brick_supabase/src/supabase_provider.dart'; import 'package:brick_supabase/src/supabase_provider_query.dart'; import 'package:brick_supabase/testing.dart'; +import 'package:supabase/supabase.dart'; import 'package:test/test.dart'; import '__mocks__.dart'; @@ -79,6 +80,122 @@ void main() { expect(inserted.name, instance.name); }); + group('#queryToPostgresChangeFilter', () { + late SupabaseProvider provider; + + setUp(() { + provider = SupabaseProvider(mock.client, modelDictionary: supabaseModelDictionary); + }); + + group('returns null', () { + test('for complex queries', () { + final query = Query.where('assoc', const Where.exact('id', 2)); + expect(provider.queryToPostgresChangeFilter(query), isNull); + }); + + test('for empty queries', () { + const query = Query(); + expect(provider.queryToPostgresChangeFilter(query), isNull); + }); + + test('for missing columns', () { + final query = Query.where('unknown', 1); + expect(provider.queryToPostgresChangeFilter(query), isNull); + }); + }); + + group('Compare', () { + test('.between', () { + final query = Query(where: [const Where('id').isBetween(1, 2)]); + final filter = provider.queryToPostgresChangeFilter(query); + expect(filter, isNull); + }); + + test('.doesNotContain', () { + final query = Query(where: [const Where('name').doesNotContain('Thomas')]); + final filter = provider.queryToPostgresChangeFilter(query); + expect(filter, isNull); + }); + + test('.exact', () { + const query = Query(where: [Where.exact('name', 'Thomas')]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.eq); + expect(filter.column, 'name'); + expect(filter.value, 'Thomas'); + }); + + test('.greaterThan', () { + final query = Query(where: [const Where('age').isGreaterThan(5)]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.gt); + expect(filter.column, 'age'); + expect(filter.value, 5); + }); + + test('.greaterThanOrEqualTo', () { + final query = Query(where: [const Where('age').isGreaterThanOrEqualTo(5)]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.gte); + expect(filter.column, 'age'); + expect(filter.value, 5); + }); + + test('.lessThan', () { + final query = Query(where: [const Where('age').isLessThan(5)]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.lt); + expect(filter.column, 'age'); + expect(filter.value, 5); + }); + + test('.lessThanOrEqualTo', () { + final query = Query(where: [const Where('age').isLessThanOrEqualTo(5)]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.lte); + expect(filter.column, 'age'); + expect(filter.value, 5); + }); + + test('.notEqual', () { + final query = Query(where: [const Where('name').isNot('Thomas')]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.neq); + expect(filter.column, 'name'); + expect(filter.value, 'Thomas'); + }); + + test('.contains', () { + final query = Query(where: [const Where('name').contains('Thomas')]); + final filter = provider.queryToPostgresChangeFilter(query); + + expect(filter!.type, PostgresChangeFilterType.inFilter); + expect(filter.column, 'name'); + expect(filter.value, 'Thomas'); + }); + }); + }); + + test('#subscribeToRealtime', () { + final provider = SupabaseProvider(mock.client, modelDictionary: supabaseModelDictionary); + final stream = provider.subscribeToRealtime(callback: (payload) {}); + + expect(stream, isNotNull); + // ignore: invalid_use_of_internal_member + expect(stream.joinRef, isNotNull); + expect( + // ignore: invalid_use_of_internal_member + stream.topic, + 'realtime:${supabaseModelDictionary.adapterFor[Demo]!.supabaseTableName}', + ); + }); + test('#update', () async { const req = SupabaseRequest( requestMethod: 'PATCH',