Skip to content

Latest commit

 

History

History
443 lines (342 loc) · 14.3 KB

File metadata and controls

443 lines (342 loc) · 14.3 KB

Dataflow Flex templates - Kafka to BigQuery

Open in Cloud Shell

Samples showing how to create and run an Apache Beam template with a custom Docker image on Google Cloud Dataflow.

Before you begin

NOTE: These instructions used to use Bitnami version of Kafka, but due to a recent untagging event on Docker Hub for all Bitnami images, these instructions have switched over to Apache Kafka images. They are still pinned to a Zookeeper compatible version of Kafka for now.

If you are not familiar with Dataflow Flex templates, please see the Streaming Beam SQL sample first.

Follow the Getting started with Google Cloud Dataflow page, and make sure you have a Google Cloud project with billing enabled and a service account JSON key set up in your GOOGLE_APPLICATION_CREDENTIALS environment variable. Additionally, for this sample you need the following:

  1. Enable the APIs: App Engine, Cloud Scheduler, Cloud Build.

  2. Create a Cloud Storage bucket.

    export BUCKET="your-gcs-bucket"
    gcloud storage buckets create gs://$BUCKET
  3. Create a BigQuery dataset.

    export PROJECT="$(gcloud config get-value project)"
    export DATASET="beam_samples"
    export TABLE="kafka_to_bigquery"
    
    bq mk --dataset "$PROJECT:$DATASET"
  4. Select the compute region and zone to use.

    # Select your default compute/region, or default to "us-central1".
    export REGION=${"$(gcloud config get-value compute/region)":-"us-central1"}
    
    # Select your default compute/zone, or default to "$REGION-a".
    # Note that the zone *must* be in $REGION.
    export ZONE=${"$(gcloud config get-value compute/zone)":-"$REGION-a"}
  5. Clone the java-docs-samples repository.

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
  6. Navigate to the sample code directory.

    cd java-docs-samples/dataflow/flex-templates/kafka_to_bigquery

Kafka to BigQuery sample

This sample shows how to deploy an Apache Beam streaming pipeline that reads JSON encoded messages from Apache Kafka, decodes them, and writes them into a BigQuery table.

For this, we need two parts running:

  1. A Kafka server container accessible through an external IP address. This services publishes messages to a topic.

  2. An Apache Beam streaming pipeline running in Dataflow Flex Templates. This subscribes to a Kafka topic, consumes the messages that are published to that topic, processes them, and writes them into a BigQuery table.

Starting the Kafka server

(Optional) Run the Kafka server locally for development. (Click to expand)

Note that you must have Docker installed in your machine to run the container locally. You do not need Docker installed to run in Cloud, skip this section if you want to go straight to building and deploying in Cloud.

# Create a network where containers can communicate.
docker network create kafka-net

# Build the image.
docker image build -t kafka kafka/

# Run a detached container (in the background) using the network we created.
docker run -d --rm \
  --name "kafka" \
  --net "kafka-net" \
  -p 2181:2181 -p 9092:9092 \
  kafka

Once you are done, you can stop and delete the resources.

# Stop the container.
docker kill kafka

# Delete the Docker network.
docker network rm kafka-net

For more information about creating a Docker application, see Containerizing an application.

The Kafka server must be accessible to external applications. For this we need an external static IP address for the Kafka server to live. Not an internal IP address.

ℹ️ If you already have a Kafka server running you can skip this section. Just make sure to store its IP address into an environment variable.

export KAFKA_ADDRESS="123.456.789"
# Create a new static IP address for the Kafka server to use.
gcloud compute addresses create --region "$REGION" kafka-address

# Get the static address into a variable.
export KAFKA_ADDRESS=$(gcloud compute addresses describe --region="$REGION" --format='value(address)' kafka-address)

ℹ️ Do not use --global to create the static IP address since the Kafka server must reside in a specific region.

We also need to create a firewall rule to allow incoming messages to the server.

Kafka uses port 9092 and Zookeeper uses port 2181 by default, unless configured differently.

# Create a firewall rule to open the port used by Zookeeper and Kafka.
# Allow connections to ports 2181, 9092 in VMs with the "kafka-server" tag.
gcloud compute firewall-rules create allow-kafka \
  --target-tags "kafka-server" \
  --allow tcp:2181,tcp:9092

Now we can start a new Compute Engine VM (Virtual Machine) instance for the Kafka server using the Docker image we created in Container Registry.

For this sample, we don't need a high performance VM, so we are using an e2-small machine with shared CPU cores for a more cost-effective option.

To learn more about pricing, see the VM instances pricing page.

export KAFKA_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka:latest"

# Note: If the project name has `:` in it that signifies a project within an
# organization (e.g. `example.com:project-id`), replace those with `/` so that
# the Kafka image can be found appropriately.

# Build the Kafka server image into Container Registry.
gcloud builds submit --tag $KAFKA_IMAGE kafka/

# If a different topic, address, kafka port, or zookeeper port is desired,
# update the following environment variables before starting the server.
# Otherwise, the default values will be used in the Dockerfile:
export KAFKA_TOPIC=<topic-name>
export KAFKA_ADDRESS=<kafka-address>
export KAFKA_PORT=<kafka-port>
export ZOOKEEPER_PORT=<zookeeper-port>

# Create and start a new instance.
# The --address flag binds the VM's address to the static address we created.
# The --container-env KAFKA_ADDRESS is an environment variable passed to the
# container to configure Kafka to use the static address of the VM.
# The --tags "kafka-server" is used by the firewakll rule.
gcloud compute instances create-with-container kafka-vm \
  --zone "$ZONE" \
  --machine-type "e2-small" \
  --address "$KAFKA_ADDRESS" \
  --container-image "$KAFKA_IMAGE" \
  --container-env "KAFKA_ADDRESS=$KAFKA_ADDRESS" \
  --tags "kafka-server"

Note: The Kafka server should be running at this point, but in its current state no messages are being sent to a topic, which will cause the KafkaToBigQuery template to fail.

Sending messages to Kafka server

SSH into the kafka-vm that was created earlier and issue the below commands that are required based on your timing. Messages sent before the template is started will be present when the template is started. If the desire is to send messages after the template has started, then the messages will be processed as they are sent.

Pre-Requisite SSH into the Kafka VM

$ gcloud compute ssh kafka-vm --zone "$ZONE"
  1. Create a Topic
docker run --rm --network host apache/kafka:3.9.1 \
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic <topic-name> --partitions 1 --replication-factor 1
  1. Send Messages to the Topic

Run the console producer to send messages. After running the command, type a message and press Enter. You can send multiple messages. Press Ctrl+C to stop the producer.

Note: You can run this step either before starting the Dataflow template (messages will be ready) or while it's running (messages will be processed as they arrive).

docker run -i --rm --network host apache/kafka:3.9.1 \
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic <topic-name>
  1. (Optional) Verify the Messages

You can check that your messages were sent correctly by starting a consumer. This will print all messages from the beginning of the topic. Press Ctrl+C to exit.

docker run -it --rm --network host apache/kafka:3.9.1 \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic <topic-name> --from-beginning
  1. (Optional) Delete a Topic
docker run --rm --network host apache/kafka:3.9.1 \
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic <topic-name>

Creating and running a Flex Template

(Optional) Run the Apache Beam pipeline locally for development. (Click to expand)
# If you omit the --bootstrapServer argument, it connects to localhost.
# If you are running the Kafka server locally, you can omit --bootstrapServer.
mvn compile exec:java \
  -Dexec.mainClass=org.apache.beam.samples.KafkaToBigQuery \
  -Dexec.args="\
    --project=$PROJECT \
    --outputTable=$PROJECT:$DATASET.$TABLE \
    --bootstrapServer=$KAFKA_ADDRESS:9092"

First, let's build the container image.

# Build and package the application as an uber-jar file.
mvn clean package

Now we can create the template file.

export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/kafka-to-bigquery-sql:latest"
export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/kafka-to-bigquery.json"

# Build the Flex Template.
gcloud dataflow flex-template build $TEMPLATE_PATH \
    --image-gcr-path "$TEMPLATE_IMAGE" \
    --sdk-language "JAVA" \
    --flex-template-base-image JAVA11 \
    --metadata-file "metadata.json" \
    --jar "target/kafka-to-bigquery-1.0.jar" \
    --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.KafkaToBigQuery"

Finally, to run a Dataflow job using the template.

export REGION="us-central1"

# Run the Flex Template.
gcloud dataflow flex-template run "kafka-to-bigquery-`date +%Y%m%d-%H%M%S`" \
    --template-file-gcs-location "$TEMPLATE_PATH" \
    --parameters inputTopic="messages" \
    --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
    --parameters bootstrapServer="$KAFKA_ADDRESS:9092" \
    --region "$REGION"

Note: If one of the parameters is a deeply nested json or dictionary, use the gcloud --flags-file parameter to pass in a yaml file a list of all the parameters including the nested dictionary. Passing in the dictionary straight from the command line will give a gcloud error. The parameters file can look like this:

--parameters:
  inputTopic: messages
  outputTable: $PROJECT:$DATASET.$TABLE
  bootstrapServer: $KAFKA_ADDRESS:9092
  schema:
    '{type: object, properties: {processing_time: {type: TIMESTAMP}, url: {type: STRING}, rating: {type: STRING}}}'

Run the following query to check the results in BigQuery.

bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'

Cleaning up

After you've finished this tutorial, you can clean up the resources you created on Google Cloud so you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Clean up the Flex template resources

  1. Stop the Dataflow pipeline.

    gcloud dataflow jobs list \
        --filter 'NAME:kafka-to-bigquery AND STATE=Running' \
        --format 'value(JOB_ID)' \
      | xargs gcloud dataflow jobs cancel
  2. Delete the template spec file from Cloud Storage.

    gcloud storage rm $TEMPLATE_PATH
  3. Delete the Flex Template container images from Container Registry.

    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags

Clean up the Kafka server

  1. Delete the Kafka server VM instance.

    gcloud compute instances delete kafka-vm
  2. Delete the firewall rule, this does not incur any charges.

    gcloud compute firewall-rules delete allow-kafka
  3. Delete the static address.

    gcloud compute addresses delete --region "$REGION" kafka-address
  4. Delete the Kafka container image from Container Registry.

    gcloud container images delete $KAFKA_IMAGE --force-delete-tags

Clean up Google Cloud project resources

  1. Delete the BigQuery table.

    bq rm -f -t $PROJECT:$DATASET.$TABLE
  2. Delete the BigQuery dataset, this alone does not incur any charges.

    ⚠️ The following command also deletes all tables in the dataset. The tables and data cannot be recovered.

    bq rm -r -f -d $PROJECT:$DATASET
  3. Delete the Cloud Storage bucket, this alone does not incur any charges.

    ⚠️ The following command also deletes all objects in the bucket. These objects cannot be recovered.

    gcloud storage rm --recursive gs://$BUCKET