Skip to content

Commit a0cf56c

Browse files
ASiegeLionliupeiyue
andauthored
Fix OrcScan reads missing data column (#716)
* fix OrcScan reads missing data columns * fix OrcScan reads missing data columns * fix OrcScan reads missing data columns * fix OrcScan reads missing data columns * fix OrcScan reads missing data columns * fix OrcScan reads missing data columns --------- Co-authored-by: liupeiyue <liupeiyue@yy.com>
1 parent 43f4e58 commit a0cf56c

2 files changed

Lines changed: 47 additions & 12 deletions

File tree

native-engine/datafusion-ext-plans/src/orc_exec.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use bytes::Bytes;
2323
use datafusion::{
2424
datasource::{
2525
physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream},
26-
schema_adapter::SchemaAdapter,
2726
},
2827
error::Result,
2928
execution::context::TaskContext,
@@ -34,17 +33,19 @@ use datafusion::{
3433
PlanProperties, SendableRecordBatchStream, Statistics,
3534
},
3635
};
36+
use datafusion::datasource::schema_adapter::SchemaMapper;
3737
use datafusion_ext_commons::{batch_size, df_execution_err, hadoop_fs::FsProvider};
3838
use futures::{future::BoxFuture, FutureExt, StreamExt};
3939
use futures_util::TryStreamExt;
4040
use once_cell::sync::OnceCell;
4141
use orc_rust::{
4242
arrow_reader::ArrowReaderBuilder, projection::ProjectionMask, reader::AsyncChunkReader,
43+
reader::metadata::FileMetadata,
4344
};
4445

4546
use crate::{
4647
common::execution_context::ExecutionContext,
47-
scan::{internal_file_reader::InternalFileReader, BlazeSchemaAdapter},
48+
scan::{internal_file_reader::InternalFileReader, BlazeSchemaMapping},
4849
};
4950

5051
/// Execution plan for scanning one or more Orc partitions
@@ -208,7 +209,8 @@ impl FileOpener for OrcOpener {
208209
let batch_size = self.batch_size;
209210
let projection = self.projection.clone();
210211
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
211-
let schema_adapter = BlazeSchemaAdapter::new(projected_schema);
212+
213+
let schema_adapter = SchemaAdapter::new(projected_schema, projection);
212214

213215
Ok(Box::pin(async move {
214216
let mut builder = ArrowReaderBuilder::try_new_async(reader)
@@ -218,15 +220,9 @@ impl FileOpener for OrcOpener {
218220
let range = range.start as usize..range.end as usize;
219221
builder = builder.with_file_byte_range(range);
220222
}
221-
let file_schema = builder.schema();
222-
let (schema_mapping, adapted_projections) =
223-
schema_adapter.map_schema(file_schema.as_ref())?;
224-
225-
// Offset by 1 since index 0 is the root
226-
let projection = adapted_projections
227-
.iter()
228-
.map(|i| i + 1)
229-
.collect::<Vec<_>>();
223+
224+
let (schema_mapping, projection) = schema_adapter.map_schema(builder.file_metadata())?;
225+
230226
let projection_mask =
231227
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
232228
let stream = builder
@@ -264,3 +260,36 @@ impl AsyncChunkReader for OrcFileReaderRef {
264260
async move { self.0.read_fully(range).map_err(|e| e.into()) }.boxed()
265261
}
266262
}
263+
264+
struct SchemaAdapter {
265+
table_schema: SchemaRef,
266+
projection: Vec<usize>,
267+
}
268+
269+
impl SchemaAdapter {
270+
pub fn new(table_schema: SchemaRef, projection: Vec<usize>) -> Self {
271+
Self { table_schema, projection }
272+
}
273+
274+
fn map_schema(&self, orc_file_meta: &FileMetadata) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
275+
let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?);
276+
277+
let mut projection = Vec::with_capacity(projected_schema.fields().len());
278+
let mut field_mappings = vec![None; self.table_schema.fields().len()];
279+
280+
for nameColumn in orc_file_meta.root_data_type().children() {
281+
if let Some((table_idx, _table_field)) =
282+
projected_schema.fields().find(nameColumn.name()) {
283+
field_mappings[table_idx] = Some(projection.len());
284+
projection.push(nameColumn.data_type().column_index());
285+
}
286+
}
287+
288+
Ok((
289+
Arc::new(BlazeSchemaMapping::new(self.table_schema.clone(),
290+
field_mappings,
291+
)),
292+
projection,
293+
))
294+
}
295+
}

native-engine/datafusion-ext-plans/src/scan/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ pub struct BlazeSchemaMapping {
8585
field_mappings: Vec<Option<usize>>,
8686
}
8787

88+
impl BlazeSchemaMapping {
89+
pub fn new(table_schema: SchemaRef, field_mappings: Vec<Option<usize>>) -> Self {
90+
Self { table_schema, field_mappings }
91+
}
92+
}
93+
8894
impl SchemaMapper for BlazeSchemaMapping {
8995
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
9096
let batch_rows = batch.num_rows();

0 commit comments

Comments
 (0)