Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package feast.source.kafka.deserializer;
package feast.source.kafka;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.FeatureRow;
Expand Down
38 changes: 23 additions & 15 deletions ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@

import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
import feast.ingestion.options.JobOptions;
import feast.ingestion.transform.fn.FilterFeatureRowDoFn;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.source.FeatureSource;
import feast.source.FeatureSourceFactory;
import feast.source.kafka.deserializer.FeatureRowDeserializer;
import feast.source.kafka.deserializer.FeatureRowKeyDeserializer;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRowKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -43,6 +41,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages
Expand All @@ -60,29 +59,35 @@ public PCollection<FeatureRow> expand(PInput input) {

KafkaReadOptions options =
OptionsParser.parse(importSpec.getSourceOptionsMap(), KafkaReadOptions.class);
JobOptions jobOptions = OptionsParser.parse(importSpec.getJobOptionsMap(), JobOptions.class);

List<String> topicsList = new ArrayList<>(Arrays.asList(options.topics.split(",")));

PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord =
input.getPipeline().apply( KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(options.server)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class));
KafkaIO.Read<byte[], FeatureRow> read = KafkaIO.<byte[], FeatureRow>read()
.withBootstrapServers(options.server)
.withTopics(topicsList)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);
if (jobOptions.getSampleLimit() > 0) {
read = read.withMaxNumRecords(jobOptions.getSampleLimit());
}

PCollection<KafkaRecord<byte[], FeatureRow>> featureRowRecord =
input.getPipeline().apply(read);

PCollection<FeatureRow> featureRow = featureRowRecord.apply(
PCollection<FeatureRow> featureRow = featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
new DoFn<KafkaRecord<byte[], FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
KafkaRecord<byte[], FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));

if (options.discardUnknownFeatures) {
List<String> featureIds = new ArrayList<>();
for(Field field: importSpec.getSchema().getFieldsList()) {
for (Field field : importSpec.getSchema().getFieldsList()) {
String featureId = field.getFeatureId();
if (!Strings.isNullOrEmpty(featureId)) {
featureIds.add(featureId);
Expand All @@ -94,8 +99,11 @@ public void processElement(ProcessContext processContext) {
}

public static class KafkaReadOptions implements Options {
@NotEmpty public String server;
@NotEmpty public String topics;

@NotEmpty
public String server;
@NotEmpty
public String topics;
public boolean discardUnknownFeatures = false;
}

Expand Down

This file was deleted.

125 changes: 125 additions & 0 deletions ingestion/src/test/java/feast/source/kafka/KafkaFeatureSourceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package feast.source.kafka;

import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class KafkaFeatureSourceTest {

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "TEST_TOPIC");
@Rule
public TestPipeline pipeline = TestPipeline.create();
@Autowired
private KafkaTemplate<byte[], FeatureRow> template;


public void send(FeatureRow... rows) {
for (FeatureRow row : rows) {
try {
log.info("Sent: " + template.send("TEST_TOPIC", row).get().toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

@Test
public void testFoo() throws ExecutionException, InterruptedException {
String server = embeddedKafka.getEmbeddedKafka().getBrokerAddresses()[0].toString();
ImportSpec importSpec = ImportSpec.newBuilder().setType("kafka")
.addEntities("testEntity")
.putSourceOptions("topics", "TEST_TOPIC")
.putSourceOptions("server", server)
.putJobOptions("sample.limit", "1")
.build();
FeatureRow row = FeatureRow.newBuilder().setEntityKey("key").build();
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
// we keep sending on loop because beam will only start consuming rows that were sent after startup.
scheduler.scheduleAtFixedRate(() -> send(row), 0, 1, TimeUnit.SECONDS);

PCollection<FeatureRow> rows = pipeline.apply(new KafkaFeatureSource(importSpec));
Assert.assertEquals(IsBounded.BOUNDED, rows.isBounded());
PAssert.that(rows).containsInAnyOrder(row);
pipeline.run();
}

public Producer<byte[], FeatureRow> getProducer() {
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
return new DefaultKafkaProducerFactory<>(producerProps, new ByteArraySerializer(),
new FeatureRowSerializer()).createProducer();
}


public static class FeatureRowSerializer implements Serializer<FeatureRow> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public byte[] serialize(String topic, FeatureRow data) {
return data.toByteArray();
}

@Override
public void close() {

}
}

@Configuration
static class ContextConfiguration {

@Bean
ProducerFactory<byte[], FeatureRow> producerFactory() {
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());

return new DefaultKafkaProducerFactory<>(
producerProps, new ByteArraySerializer(), new FeatureRowSerializer());
}

@Bean
KafkaTemplate<byte[], FeatureRow> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory(), true);
}
}
}
Loading