Skip to content

Commit dbb9169

Browse files
committed
[VL] adding kfaka read support
Signed-off-by: Yuan <yuanzhou@apache.org>
1 parent 91a818e commit dbb9169

4 files changed

Lines changed: 132 additions & 0 deletions

File tree

backends-velox/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,5 +592,31 @@
592592
</dependency>
593593
</dependencies>
594594
</profile>
595+
<profile>
596+
<id>kafka</id>
597+
<activation>
598+
<activeByDefault>false</activeByDefault>
599+
</activation>
600+
<dependencies>
601+
<dependency>
602+
<groupId>org.apache.gluten</groupId>
603+
<artifactId>gluten-kafka</artifactId>
604+
<version>${project.version}</version>
605+
</dependency>
606+
<dependency>
607+
<groupId>org.apache.gluten</groupId>
608+
<artifactId>gluten-kafka</artifactId>
609+
<version>${project.version}</version>
610+
<type>test-jar</type>
611+
<scope>test</scope>
612+
</dependency>
613+
<dependency>
614+
<groupId>org.apache.spark</groupId>
615+
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
616+
<version>${spark.version}</version>
617+
<scope>provided</scope>
618+
</dependency>
619+
</dependencies>
620+
</profile>
595621
</profiles>
596622
</project>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "KafkaReader.h"
19+
20+
namespace gluten {
21+
22+
// Implementation placeholder for KafkaReader
23+
// This file will contain the actual implementation of Kafka reading logic
24+
// including:
25+
// - Kafka consumer initialization
26+
// - Message polling and deserialization
27+
// - Offset management
28+
// - Error handling and retry logic
29+
30+
} // namespace gluten
31+
32+
// Made with Bob
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#pragma once
19+
20+
#include "velox/connectors/Connector.h"
21+
#include "velox/exec/Operator.h"
22+
23+
namespace gluten {
24+
25+
/// Kafka reader operator for streaming Kafka data
26+
/// This is a placeholder implementation that will be extended
27+
/// to support actual Kafka consumption in the Velox backend
28+
class KafkaReader : public facebook::velox::exec::SourceOperator {
29+
public:
30+
KafkaReader(
31+
int32_t operatorId,
32+
facebook::velox::exec::DriverCtx* driverCtx,
33+
const std::shared_ptr<const facebook::velox::core::PlanNode>& planNode)
34+
: SourceOperator(
35+
driverCtx,
36+
planNode->outputType(),
37+
operatorId,
38+
planNode->id(),
39+
"KafkaReader") {}
40+
41+
facebook::velox::RowVectorPtr getOutput() override {
42+
// TODO: Implement actual Kafka reading logic
43+
// This should:
44+
// 1. Connect to Kafka broker
45+
// 2. Read messages from the specified topic/partition
46+
// 3. Convert Kafka messages to RowVector format
47+
// 4. Handle offset management
48+
return nullptr;
49+
}
50+
51+
facebook::velox::BlockingReason isBlocked(
52+
facebook::velox::ContinueFuture* future) override {
53+
return facebook::velox::BlockingReason::kNotBlocked;
54+
}
55+
56+
bool isFinished() override {
57+
return noMoreSplits_ && !hasSplit_;
58+
}
59+
60+
private:
61+
bool noMoreSplits_ = false;
62+
bool hasSplit_ = false;
63+
};
64+
65+
} // namespace gluten
66+
67+
// Made with Bob

cpp/velox/substrait/SubstraitToVeloxPlan.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,6 +1682,13 @@ std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) {
16821682
}
16831683

16841684
int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) {
1685+
// Check if this is a Kafka stream
1686+
if (sRead.stream_kafka()) {
1687+
// For Kafka streams, we don't use the iterator pattern
1688+
// Return -1 to indicate this should be handled as a regular scan
1689+
return -1;
1690+
}
1691+
16851692
if (sRead.has_local_files()) {
16861693
const auto& fileList = sRead.local_files().items();
16871694
if (fileList.size() == 0) {

0 commit comments

Comments
 (0)