|
2 | 2 | layout: default |
3 | 3 | title: PostgreSQL |
4 | 4 | parent: Integration |
5 | | -nav_order: 2 |
| 5 | +nav_order: 4 |
6 | 6 | --- |
7 | 7 |
|
8 | | -## PostgreSQL |
| 8 | +# PostgreSQL |
9 | 9 |
|
10 | | -To setup EventFlow PostgreSQL integration, install the NuGet |
11 | | -package [EventFlow.PostgreSql](https://www.nuget.org/packages/EventFlow.PostgreSql) and add this to your EventFlow setup. |
| 10 | +Use the `EventFlow.PostgreSql` integration when you want EventFlow to persist |
| 11 | +events, snapshots, and read models in PostgreSQL. The package wraps the Npgsql |
| 12 | +driver and DbUp migrations, giving you consistent configuration, retries, and |
| 13 | +schema provisioning across the stack. |
| 14 | + |
| 15 | +## Prerequisites |
| 16 | + |
| 17 | +- A .NET application already wired with `EventFlow`. |
| 18 | +- PostgreSQL 12 or later. The bundled scripts rely on `GENERATED ... AS IDENTITY` |
| 19 | + columns and user-defined types. |
| 20 | +- Credentials that can execute `CREATE TABLE`, `CREATE TYPE`, and `CREATE INDEX` |
| 21 | + statements in the target database. |
| 22 | +- Network access for every service that emits commands or processes read |
| 23 | + models. |
| 24 | + |
| 25 | +## Install the NuGet package |
| 26 | + |
| 27 | +Add the PostgreSQL integration to every project that configures EventFlow. |
| 28 | + |
| 29 | +```bash |
| 30 | +dotnet add package EventFlow.PostgreSql |
| 31 | +``` |
| 32 | + |
| 33 | +## Configure EventFlow |
| 34 | + |
| 35 | +Call `ConfigurePostgreSql` once to register the shared connection, migrator, and |
| 36 | +transient retry strategy, then opt into the specific stores you need. |
| 37 | + |
| 38 | +```csharp |
| 39 | +public void ConfigureServices(IServiceCollection services) |
| 40 | +{ |
| 41 | + var postgres = PostgreSqlConfiguration.New |
| 42 | + .SetConnectionString(Configuration.GetConnectionString("eventflow-postgres")) |
| 43 | + .SetTransientRetryCount(3); |
| 44 | + |
| 45 | + services.AddEventFlow(o => o |
| 46 | + .ConfigurePostgreSql(postgres) |
| 47 | + .UsePostgreSqlEventStore() // Events |
| 48 | + .UsePostgreSqlSnapshotStore() // Snapshots (optional) |
| 49 | + .UsePostgreSqlReadModel<UserReadModel>() // Read models |
| 50 | + .UsePostgreSqlReadModel<UserNicknameReadModel, UserNicknameLocator>()); |
| 51 | +} |
| 52 | +``` |
| 53 | + |
| 54 | +`ConfigurePostgreSql` wires up `IPostgreSqlConnection`, the DbUp-based |
| 55 | +`IPostgreSqlDatabaseMigrator`, and the `PostgreSqlErrorRetryStrategy` used by |
| 56 | +the event store and read models. |
| 57 | + |
| 58 | +### Optional tuning |
| 59 | + |
| 60 | +- Call `SetConnectionString("read-models", ...)` when you want read models to |
| 61 | + connect to a different database or replica. |
| 62 | +- Adjust `SetTransientRetryCount` / `SetTransientRetryDelay` to tune retries |
| 63 | + for deadlocks (`SqlState 40P01`) and active-transaction conflicts (`SqlState 25001`). |
| 64 | +- Increase `SetUpgradeExecutionTimeout` when migration batches take longer than |
| 65 | + five minutes. |
| 66 | + |
| 67 | +## Event store |
| 68 | + |
| 69 | +### Enable the PostgreSQL event store |
| 70 | + |
| 71 | +Replace the in-memory default by calling `UsePostgreSqlEventStore()` after |
| 72 | +`ConfigurePostgreSql`. |
| 73 | + |
| 74 | +```csharp |
| 75 | +services.AddEventFlow(o => |
| 76 | + o.ConfigurePostgreSql(postgres) |
| 77 | + .UsePostgreSqlEventStore()); |
| 78 | +``` |
| 79 | + |
| 80 | +### Provision the schema |
| 81 | + |
| 82 | +Run the embedded scripts once per environment to create the `EventFlow` table, |
| 83 | +the `(AggregateId, AggregateSequenceNumber)` unique index, and the |
| 84 | +`eventdatamodel_list_type` composite type used for batch inserts. |
| 85 | + |
| 86 | +```csharp |
| 87 | +await using var scope = services.BuildServiceProvider().CreateAsyncScope(); |
| 88 | +var migrator = scope.ServiceProvider.GetRequiredService<IPostgreSqlDatabaseMigrator>(); |
| 89 | +await EventFlowEventStoresPostgreSql.MigrateDatabaseAsync(migrator, cancellationToken); |
| 90 | +``` |
| 91 | + |
| 92 | +The migrator is idempotent—rerunning it simply ensures the schema is present. |
| 93 | +Lack of `CREATE TYPE` or `CREATE TABLE` permissions causes install-time failures. |
| 94 | + |
| 95 | +### Operational notes |
| 96 | + |
| 97 | +- `PostgreSqlEventPersistence` surfaces duplicate key violations (`SqlState 23505`) |
| 98 | + as `OptimisticConcurrencyException`; investigate aggregate concurrency if you |
| 99 | + see these at runtime. |
| 100 | +- Event batches are appended inside a transaction. Monitor WAL growth and plan |
| 101 | + for appropriate autovacuum settings. |
| 102 | +- The built-in retry strategy only retries deadlocks and active-transaction |
| 103 | + errors; unexpected exceptions bubble immediately. |
| 104 | + |
| 105 | +## Snapshot store |
| 106 | + |
| 107 | +Enable PostgreSQL snapshots with `.UsePostgreSqlSnapshotStore()` and run the |
| 108 | +companion migration to create the `EventFlowSnapshots` table. |
| 109 | + |
| 110 | +```csharp |
| 111 | +services.AddEventFlow(o => |
| 112 | + o.ConfigurePostgreSql(postgres) |
| 113 | + .UsePostgreSqlSnapshotStore()); |
| 114 | + |
| 115 | +await EventFlowSnapshotStoresPostgreSql.MigrateDatabaseAsync(migrator, cancellationToken); |
| 116 | +``` |
| 117 | + |
| 118 | +Snapshots share a single table keyed by `(AggregateName, AggregateId)` and store |
| 119 | +the serialized data plus metadata needed for upgrades. Duplicate writes are |
| 120 | +ignored when a snapshot with the same sequence number already exists. |
| 121 | + |
| 122 | +## Read model store |
| 123 | + |
| 124 | +### Register the store |
| 125 | + |
| 126 | +`UsePostgreSqlReadModel<T>` (or the locator overload) plugs the SQL read-store |
| 127 | +implementation into EventFlow. |
| 128 | + |
| 129 | +```csharp |
| 130 | +services.AddEventFlow(o => |
| 131 | + o.ConfigurePostgreSql(postgres) |
| 132 | + .UsePostgreSqlReadModel<UserReadModel>() |
| 133 | + .UsePostgreSqlReadModel<UserNicknameReadModel, UserNicknameLocator>()); |
| 134 | +``` |
| 135 | + |
| 136 | +### Implement the read model |
| 137 | + |
| 138 | +PostgreSQL read models should implement `IReadModel` and either derive from |
| 139 | +`PostgreSqlReadModel` or decorate key properties with the provided attributes. |
| 140 | + |
| 141 | +```csharp |
| 142 | +[Table("ReadModel-User")] |
| 143 | +public class UserReadModel : PostgreSqlReadModel, |
| 144 | + IAmReadModelFor<UserAggregate, UserId, UserRegistered> |
| 145 | +{ |
| 146 | + public string DisplayName { get; set; } = default!; |
| 147 | + |
| 148 | + public Task ApplyAsync( |
| 149 | + IReadModelContext context, |
| 150 | + IDomainEvent<UserAggregate, UserId, UserRegistered> @event, |
| 151 | + CancellationToken cancellationToken) |
| 152 | + { |
| 153 | + AggregateId = @event.AggregateIdentity.Value; |
| 154 | + DisplayName = @event.AggregateEvent.DisplayName; |
| 155 | + UpdatedTime = DateTimeOffset.UtcNow; |
| 156 | + if (CreateTime == default) |
| 157 | + { |
| 158 | + CreateTime = UpdatedTime; |
| 159 | + } |
| 160 | + return Task.CompletedTask; |
| 161 | + } |
| 162 | +} |
| 163 | +``` |
| 164 | + |
| 165 | +The base class marks `AggregateId` with `[PostgreSqlReadModelIdentityColumn]` and |
| 166 | +`LastAggregateSequenceNumber` with `[PostgreSqlReadModelVersionColumn]`. Use |
| 167 | +`[PostgreSqlReadModelIgnoreColumn]` to skip properties that are not persisted. |
| 168 | + |
| 169 | +### Create the table |
| 170 | + |
| 171 | +EventFlow does not auto-create read model tables. Deploy DDL that matches your |
| 172 | +read model shape—by convention the table name is `ReadModel-[TypeName]`. |
| 173 | + |
| 174 | +```sql |
| 175 | +CREATE TABLE IF NOT EXISTS "ReadModel-User" ( |
| 176 | + Id BIGINT GENERATED BY DEFAULT AS IDENTITY, |
| 177 | + AggregateId VARCHAR(64) NOT NULL, |
| 178 | + CreateTime TIMESTAMPTZ NOT NULL, |
| 179 | + UpdatedTime TIMESTAMPTZ NOT NULL, |
| 180 | + LastAggregateSequenceNumber INT NOT NULL, |
| 181 | + DisplayName TEXT NOT NULL, |
| 182 | + CONSTRAINT "PK_ReadModel-User" PRIMARY KEY (Id) |
| 183 | +); |
| 184 | + |
| 185 | +CREATE INDEX IF NOT EXISTS "IX_ReadModel-User_AggregateId" |
| 186 | + ON "ReadModel-User" (AggregateId); |
| 187 | +``` |
| 188 | + |
| 189 | +At a minimum, keep the identity column, the optimistic concurrency column, and |
| 190 | +the fields mined by your query handlers. Add additional indexes to match your |
| 191 | +query patterns. |
| 192 | + |
| 193 | +### Run read model migrations |
| 194 | + |
| 195 | +Package the DDL alongside your application and execute it with the shared |
| 196 | +`IPostgreSqlDatabaseMigrator`. |
12 | 197 |
|
13 | 198 | ```csharp |
14 | | -// ... |
15 | | -.ConfigurePostgreSql(PostgreSqlConfiguration.New |
16 | | - .SetConnectionString(@"User ID=me;Password=???;Host=localhost;Port=5432;Database=MyApp")) |
17 | | -.UsePostgreSqlEventStore() |
18 | | -.UsePostgreSqlSnapshotStore() |
19 | | -.UsePostgreSqlReadModel<UserReadModel>() |
20 | | -.UsePostgreSqlReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>() |
21 | | -// ... |
22 | | -``` |
23 | | - |
24 | | -This code block configures EventFlow to store events, snapshots and read models in PostgreSQL. It's not mandatory, you |
25 | | -can mix and match, i.e. storing events in PostgreSQL, read models in Elastic search and don't using snapshots at all. |
26 | | - |
27 | | -- Event store. One big table `EventFlow` for all events for all aggregates. |
28 | | -- Read model store. Table `ReadModel-[ClassName]` per read model type. |
29 | | -- Snapshot store. One big table `EventFlowSnapshots` for all aggregates. |
| 199 | +var migrator = scope.ServiceProvider.GetRequiredService<IPostgreSqlDatabaseMigrator>(); |
| 200 | +await migrator.MigrateDatabaseUsingEmbeddedScriptsAsync( |
| 201 | + typeof(Program).Assembly, |
| 202 | + scriptNamespace: "MyCompany.MyApp.SqlScripts", |
| 203 | + cancellationToken); |
| 204 | +``` |
| 205 | + |
| 206 | +The tests in `Source/EventFlow.PostgreSql.Tests` demonstrate this pattern: embed |
| 207 | +versioned SQL files and invoke the migrator during startup or deployment. |
| 208 | + |
| 209 | +## Local development quickstart |
| 210 | + |
| 211 | +Run a disposable PostgreSQL container and point `ConfigurePostgreSql` to it. |
| 212 | + |
| 213 | +```bash |
| 214 | +docker run --rm -p 5432:5432 --name eventflow-postgres \ |
| 215 | + -e POSTGRES_PASSWORD=eventflow \ |
| 216 | + -e POSTGRES_DB=eventflow \ |
| 217 | + postgres:16 |
| 218 | +``` |
| 219 | + |
| 220 | +## Troubleshooting |
| 221 | + |
| 222 | +- **`SqlState 23505` (duplicate key)** – the unique index on |
| 223 | + `(AggregateId, AggregateSequenceNumber)` rejected a reinsert. Inspect aggregate |
| 224 | + concurrency or idempotency guards. |
| 225 | +- **`eventdatamodel_list_type` does not exist** – rerun |
| 226 | + `EventFlowEventStoresPostgreSql.MigrateDatabaseAsync`; the composite type is |
| 227 | + required for batch inserts. |
| 228 | +- **Missing read model rows** – confirm the table exists, the identity column is |
| 229 | + marked with `[PostgreSqlReadModelIdentityColumn]`, and the process has write |
| 230 | + access; otherwise updates are ignored. |
| 231 | +- **Permission errors during migration** – grant `CREATE TABLE`, `CREATE TYPE`, |
| 232 | + and `CREATE INDEX` to the login executing the migrator. |
| 233 | + |
| 234 | +## See also |
| 235 | + |
| 236 | +- [Event stores](event-stores.md#postgresql-event-store) |
| 237 | +- [Read model stores](read-stores.md) |
| 238 | +- [Snapshots](../additional/snapshots.md) |
| 239 | + |
0 commit comments