diff --git a/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java b/ingestion/src/main/java/feast/source/kafka/FeatureRowDeserializer.java similarity index 97% rename from ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java rename to ingestion/src/main/java/feast/source/kafka/FeatureRowDeserializer.java index e75a17e64f8..d62b959adea 100644 --- a/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java +++ b/ingestion/src/main/java/feast/source/kafka/FeatureRowDeserializer.java @@ -15,7 +15,7 @@ * */ -package feast.source.kafka.deserializer; +package feast.source.kafka; import com.google.protobuf.InvalidProtocolBufferException; import feast.types.FeatureRowProto.FeatureRow; diff --git a/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java b/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java index a91add6924a..5c8ca713fda 100644 --- a/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java +++ b/ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java @@ -21,17 +21,15 @@ import com.google.auto.service.AutoService; import com.google.common.base.Strings; +import feast.ingestion.options.JobOptions; import feast.ingestion.transform.fn.FilterFeatureRowDoFn; import feast.options.Options; import feast.options.OptionsParser; import feast.source.FeatureSource; import feast.source.FeatureSourceFactory; -import feast.source.kafka.deserializer.FeatureRowDeserializer; -import feast.source.kafka.deserializer.FeatureRowKeyDeserializer; import feast.specs.ImportSpecProto.Field; import feast.specs.ImportSpecProto.ImportSpec; import feast.types.FeatureRowProto.FeatureRow; -import feast.types.FeatureRowProto.FeatureRowKey; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -43,6 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; /** * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages @@ -60,29 +59,35 @@ public PCollection expand(PInput input) { KafkaReadOptions options = OptionsParser.parse(importSpec.getSourceOptionsMap(), KafkaReadOptions.class); + JobOptions jobOptions = OptionsParser.parse(importSpec.getJobOptionsMap(), JobOptions.class); List topicsList = new ArrayList<>(Arrays.asList(options.topics.split(","))); - PCollection> featureRowRecord = - input.getPipeline().apply( KafkaIO.read() - .withBootstrapServers(options.server) - .withTopics(topicsList) - .withKeyDeserializer(FeatureRowKeyDeserializer.class) - .withValueDeserializer(FeatureRowDeserializer.class)); + KafkaIO.Read read = KafkaIO.read() + .withBootstrapServers(options.server) + .withTopics(topicsList) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer(FeatureRowDeserializer.class); + if (jobOptions.getSampleLimit() > 0) { + read = read.withMaxNumRecords(jobOptions.getSampleLimit()); + } + + PCollection> featureRowRecord = + input.getPipeline().apply(read); - PCollection featureRow = featureRowRecord.apply( + PCollection featureRow = featureRowRecord.apply( ParDo.of( - new DoFn, FeatureRow>() { + new DoFn, FeatureRow>() { @ProcessElement public void processElement(ProcessContext processContext) { - KafkaRecord record = processContext.element(); + KafkaRecord record = processContext.element(); processContext.output(record.getKV().getValue()); } })); if (options.discardUnknownFeatures) { List featureIds = new ArrayList<>(); - for(Field field: importSpec.getSchema().getFieldsList()) { + for (Field field : importSpec.getSchema().getFieldsList()) { String featureId = field.getFeatureId(); if (!Strings.isNullOrEmpty(featureId)) { featureIds.add(featureId); @@ -94,8 +99,11 @@ public void processElement(ProcessContext processContext) { } public static class KafkaReadOptions implements Options { - @NotEmpty public String server; - @NotEmpty public String topics; + + @NotEmpty + public String server; + @NotEmpty + public String topics; public boolean discardUnknownFeatures = false; } diff --git a/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java b/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java deleted file mode 100644 index 4e77d7c99da..00000000000 --- a/ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.source.kafka.deserializer; - -import com.google.protobuf.InvalidProtocolBufferException; -import feast.types.FeatureRowProto.FeatureRowKey; -import java.util.Map; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * Deserializer for Kafka to deserialize Protocol Buffers messages - * - * @param Protobuf message type - */ -public class FeatureRowKeyDeserializer implements Deserializer { - - @Override - public void configure(Map configs, boolean isKey) {} - - @Override - public FeatureRowKey deserialize(String topic, byte[] data) { - try { - return FeatureRowKey.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - throw new SerializationException( - "Error deserializing FeatureRowKey from Protobuf message", e); - } - } - - @Override - public void close() {} -} diff --git a/ingestion/src/test/java/feast/source/kafka/KafkaFeatureSourceTest.java b/ingestion/src/test/java/feast/source/kafka/KafkaFeatureSourceTest.java new file mode 100644 index 00000000000..50ffa1e87fa --- /dev/null +++ b/ingestion/src/test/java/feast/source/kafka/KafkaFeatureSourceTest.java @@ -0,0 +1,125 @@ +package feast.source.kafka; + +import feast.specs.ImportSpecProto.ImportSpec; +import feast.types.FeatureRowProto.FeatureRow; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +@Slf4j +@RunWith(SpringRunner.class) +@SpringBootTest +@DirtiesContext +public class KafkaFeatureSourceTest { + + @ClassRule + public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "TEST_TOPIC"); + @Rule + public TestPipeline pipeline = TestPipeline.create(); + @Autowired + private KafkaTemplate template; + + + public void send(FeatureRow... rows) { + for (FeatureRow row : rows) { + try { + log.info("Sent: " + template.send("TEST_TOPIC", row).get().toString()); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + } + + @Test + public void testFoo() throws ExecutionException, InterruptedException { + String server = embeddedKafka.getEmbeddedKafka().getBrokerAddresses()[0].toString(); + ImportSpec importSpec = ImportSpec.newBuilder().setType("kafka") + .addEntities("testEntity") + .putSourceOptions("topics", "TEST_TOPIC") + .putSourceOptions("server", server) + .putJobOptions("sample.limit", "1") + .build(); + FeatureRow row = FeatureRow.newBuilder().setEntityKey("key").build(); + ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(1); + // we keep sending on loop because beam will only start consuming rows that were sent after startup. + scheduler.scheduleAtFixedRate(() -> send(row), 0, 1, TimeUnit.SECONDS); + + PCollection rows = pipeline.apply(new KafkaFeatureSource(importSpec)); + Assert.assertEquals(IsBounded.BOUNDED, rows.isBounded()); + PAssert.that(rows).containsInAnyOrder(row); + pipeline.run(); + } + + public Producer getProducer() { + Map producerProps = + KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); + return new DefaultKafkaProducerFactory<>(producerProps, new ByteArraySerializer(), + new FeatureRowSerializer()).createProducer(); + } + + + public static class FeatureRowSerializer implements Serializer { + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public byte[] serialize(String topic, FeatureRow data) { + return data.toByteArray(); + } + + @Override + public void close() { + + } + } + + @Configuration + static class ContextConfiguration { + + @Bean + ProducerFactory producerFactory() { + Map producerProps = + KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); + + return new DefaultKafkaProducerFactory<>( + producerProps, new ByteArraySerializer(), new FeatureRowSerializer()); + } + + @Bean + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory(), true); + } + } +} \ No newline at end of file diff --git a/ingestion/src/test/java/feast/source/kafka/deserializer/KafkaFeatureRowDeserializerTest.java b/ingestion/src/test/java/feast/source/kafka/deserializer/KafkaFeatureRowDeserializerTest.java deleted file mode 100644 index 3436b8d8b7a..00000000000 --- a/ingestion/src/test/java/feast/source/kafka/deserializer/KafkaFeatureRowDeserializerTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.source.kafka.deserializer; - -import com.google.protobuf.MessageLite; -import feast.types.FeatureRowProto.FeatureRow; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Deserializer; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.KafkaMessageListenerContainer; -import org.springframework.kafka.listener.MessageListener; -import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.rule.EmbeddedKafkaRule; -import org.springframework.kafka.test.utils.ContainerTestUtils; -import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.util.concurrent.ListenableFuture; - -@RunWith(SpringRunner.class) -@SpringBootTest -@DirtiesContext -public class KafkaFeatureRowDeserializerTest { - - private static final String topic = "TEST_TOPIC"; - - @ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic); - @Autowired private KafkaTemplate template; - - private void deserialize(MessageType input) { - - Deserializer deserializer = new FeatureRowDeserializer(); - - Map consumerProps = - KafkaTestUtils.consumerProps("testGroup", "false", embeddedKafka.getEmbeddedKafka()); - ConsumerFactory consumerFactory = - new DefaultKafkaConsumerFactory<>(consumerProps, deserializer, deserializer); - - BlockingQueue> records = new LinkedBlockingQueue<>(); - ContainerProperties containerProps = new ContainerProperties(topic); - containerProps.setMessageListener((MessageListener) records::add); - - MessageListenerContainer container = - new KafkaMessageListenerContainer<>(consumerFactory, containerProps); - container.start(); - ContainerTestUtils.waitForAssignment( - container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()); - - byte[] data = input.toByteArray(); - ProducerRecord producerRecord = new ProducerRecord<>(topic, data, data); - ListenableFuture> producerFuture = template.send(producerRecord); - - try { - producerFuture.get(); - } catch (InterruptedException e) { - return; - } catch (ExecutionException e) { - throw new KafkaException("Error sending message to Kafka.", e.getCause()); - } - - ConsumerRecord consumerRecord; - try { - consumerRecord = records.take(); - } catch (InterruptedException e) { - return; - } - - FeatureRow key = consumerRecord.key(); - Assert.assertEquals(key, input); - - FeatureRow value = consumerRecord.value(); - Assert.assertEquals(value, input); - } - - @Test(timeout = 10000) - public void deserializeFeatureRowProto() { - FeatureRow message = FeatureRow.newBuilder().setEntityName("test").build(); - deserialize(message); - } - - @Configuration - static class ContextConfiguration { - @Bean - ProducerFactory producerFactory() { - Map producerProps = - KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); - - return new DefaultKafkaProducerFactory<>( - producerProps, new ByteArraySerializer(), new ByteArraySerializer()); - } - - @Bean - KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory(), true); - } - } -} diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index b17ad7bbc9b..755fe0f582e 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -25,12 +25,6 @@ option java_package = "feast.types"; option java_outer_classname = "FeatureRowProto"; option go_package = "github.com/gojek/feast/protos/generated/go/feast/types"; -message FeatureRowKey { - string entityKey = 1; - google.protobuf.Timestamp eventTimestamp = 3; - string entityName = 4; -} - message FeatureRow { string entityKey = 1; repeated Feature features = 2;