Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions contrib/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# KafkaIO : Dataflow Unbounded Source and Sink for Kafka Topics

KafkaIO provides unbounded sources and sinks for [Kafka](https://www.firebase.com/)
topics. Kafka version 0.9 and above are supported.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct, or are code changes needed to support Kafka 0.10 or 0.11?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works with 0.10, but haven't tested with 0.11 (kafka master?). We can remove the line or mention specific versions tested.


## Basic Usage

* Read from a topic with 8 byte long keys and string values:
```java
PCollection<KV<Long, String>> kafkaRecords =
pipeline
.applY(KafkaIO.read()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowercase y

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopics(ImmutableList.of("topic_a"))
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withoutMetadata()
);
```

* Write the same PCollection to a Kafka topic:
```java
kafkaRecords.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing last line?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

```

Please see JavaDoc for KafkaIO in
[KafkaIO.java](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100)
for complete documentation and a more descriptive usage example.

## Release Notes
* **0.2.0** : Assign one split for each of the Kafka topic partitions. This makes Dataflow
[Update](https://cloud.google.com/dataflow/pipelines/updating-a-pipeline)
from previous version incompatible.
* **0.1.0** : KafkaIO with support for Unbounded Source and Sink.
2 changes: 1 addition & 1 deletion contrib/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<artifactId>google-cloud-dataflow-java-contrib-kafka</artifactId>
<name>Google Cloud Dataflow Kafka IO Library</name>
<description>Library to read Kafka topics.</description>
<version>0.1.0-SNAPSHOT</version>
<version>0.2.0-SNAPSHOT</version>

<properties>
<dataflow.version>[1.6.0, 2.0.0)</dataflow.version>
Expand Down