A reliable event publishing and consumption system using PostgreSQL and gRPC, following the CloudEvents specification.
- Event Publishing:
- CloudEvents specification compliance
- PostgreSQL persistence
- Debezium change data capture integration
- Event Consumption:
- In-process and remote consumers (via gRPC)
- Ordered event delivery by subject
- Automatic catch-up for missed events
- Historical event replay
- Java 21 or higher
- PostgreSQL 13 or higher
- Docker (optional)
implementation 'com.p14n:postevent:1.0.0'<dependency>
<groupId>com.p14n</groupId>
<artifactId>postevent</artifactId>
<version>1.0.0</version>
</dependency>com.p14n/postevent {:mvn/version "1.0.0"}Postevent can be used in both in-process and client/server modes, depending on your architecture. Use LocalPersistentConsumer for in-process, and ConsumerServer/RemotePersistentConsumer for client/server.
var config = new ConfigData("myapp", Set.of("orders"), "dbhost", 5432,
"dbuser", "dbpassword", "dbname", 100);
// Create a local consumer
try (LocalPersistentConsumer consumer = new LocalPersistentConsumer(config, OpenTelemetry.noop(), 10)) {
consumer.start();
consumer.subscribe("orders", message -> {
System.out.println("Received: " + message);
});
}
// Publish an event
Publisher.publish(Event.create(...), dataSource, "orders");// Start a consumer server
var config = new ConfigData("myapp", Set.of("orders"), "dbhost", 5432,
"dbuser", "dbpassword", "dbname", 100);
var server = new ConsumerServer(config, OpenTelemetry.noop());
server.start(8080);
// Create a remote consumer
try (RemotePersistentConsumer client = new RemotePersistentConsumer(OpenTelemetry.noop(), 10)) {
client.start(Set.of("orders"), dataSource, "consumerhost", 8080);
client.subscribe("orders", message -> {
System.out.println("Received: " + message);
});
}
// Publish an event
Publisher.publish(Event.create(...), dataSource, "orders");See src/main/java/com/p14n/postevent/example for more examples.
| Parameter | Description | Default | Required |
|---|---|---|---|
| affinity | Unique identifier for the application (ie node of cluster) | - | Yes |
| topics | Set of topic names to consume | - | Yes |
| dbHost | PostgreSQL host | - | Yes |
| dbPort | PostgreSQL port | - | Yes |
| dbUser | PostgreSQL username | - | Yes |
| dbPassword | PostgreSQL password | - | Yes |
| dbName | PostgreSQL database name | - | Yes |
| overrideProps | Additional properties for Debezium | - | No |
| startupTimeoutSeconds | Timeout for Debezium startup | 30 | No |
| pollInterval | Poll interval for Debezium | 500 | No |
Tables are created on startup in the postevent schema. The tables are named after the topic names. When a new row/event is added to the table, Debezium will notify Postevent. The following steps are then taken:
- The PeristentBroker will copy the event to the
messagestable. If the event is already in themessagestable, it will be ignored. - The TransactionalBroker starts a database transaction that spans the status of the and publishing to your subscriber.
- The OrderedProcessor ensures that all prior events (by subject) have been processed before processing the current event.
Messages are copied to the destination server as fast as possible. Under normal conditions the process above is followed. However, if messages arrive out of order, or are missing (for example if the server wasn't connected when they were published), the CatupService will be triggered:
- The CatchupService will check for gaps in the sequence of events. If a gap is found, it will fetch the missing events from the source server and copy them to the destination server. This process will continue until all gaps are filled.
- Once the CatchupService has filled all gaps, it will trigger the UnprocessedSubmitter to process the events.
The contiguous high water mark (HWM) is used to track the last consistent event that arrived. This is used to determine if there are any gaps in the sequence of events. The HWM is updated after each event is copied to the destination server, only if the event is contiguous (ie all previous events have arrived). It is safe to delete all rows in the messages table before the HWM for that topic. This may be done automatically in the future.
A ConsumerServer can be configured to consume multiple topics. Each topic will have its own table in the postevent schema. Each client can subscribe to any subset of these topics. The client will receive events for all topics it is subscribed to.
Tested on fargate 0.5vCPU/1GB with RDS db.t4g.micro
- Throughput: 200 events/second
- Latency: < 50ms ms from topic publish to remote client receive in transaction
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
# Clone the repository
git clone https://github.com/p14n/postevent.git
# Start PostgreSQL
docker run --name postevent -v ./app/postgres:/docker-entrypoint-initdb.d \
-e POSTGRES_PASSWORD=postgres -e POSTGRES_USER=postgres \
--publish 5432:5432 -d postgres
# Build the project
./gradlew build./gradlew testWe use SemVer for versioning. For available versions, see the releases page.
This project is licensed under the MIT License - see the LICENSE file for details.
- Create an Issue
- Debezium for change data capture
- CloudEvents for event specification
- gRPC for remote communication
Active development.