Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ config_namespace! {
/// Should DataFusion collect statistics after listing files
pub collect_statistics: bool, default = false

/// Enables parallel file scanning. Currently supported only for Parquet format
pub parallel_file_scan: bool, default = false

/// Number of partitions for query execution. Increasing partitions can increase
/// concurrency. Defaults to the number of cpu cores on the system
pub target_partitions: usize, default = num_cpus::get()
Expand Down
11 changes: 0 additions & 11 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
Expand Down Expand Up @@ -68,16 +67,6 @@ impl FileFormat for AvroFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_physical_plan(
&self,
_state: &SessionState,
Expand Down
11 changes: 0 additions & 11 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use crate::physical_plan::file_format::{
newline_delimited_stream, CsvExec, FileScanConfig,
};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -142,16 +141,6 @@ impl FileFormat for CsvFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_physical_plan(
&self,
_state: &SessionState,
Expand Down
11 changes: 0 additions & 11 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
Expand Down Expand Up @@ -129,16 +128,6 @@ impl FileFormat for JsonFormat {
Ok(Arc::new(schema))
}

async fn infer_stats(
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_physical_plan(
&self,
_state: &SessionState,
Expand Down
55 changes: 49 additions & 6 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::fmt;
use std::sync::Arc;

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::listing::FileRanges;
use crate::error::Result;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::FileScanConfig;
Expand All @@ -40,6 +41,36 @@ use crate::execution::context::SessionState;
use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};

/// Format-specific file metadata used for Scan operation planning
#[derive(Clone)]
pub struct FormatScanMetadata {
/// Statistics
pub statistics: Statistics,
/// File ranges if available
pub file_ranges: FileRanges,
}

impl FormatScanMetadata {
fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self
}

fn with_file_ranges(mut self, file_ranges: FileRanges) -> Self {
self.file_ranges = file_ranges;
self
}
}

impl Default for FormatScanMetadata {
fn default() -> FormatScanMetadata {
FormatScanMetadata {
statistics: Statistics::default(),
file_ranges: vec![None],
}
}
}

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization across
/// providers that support the the same file formats.
Expand All @@ -60,20 +91,24 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
objects: &[ObjectMeta],
) -> Result<SchemaRef>;

/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
/// Fetch format-specific metadata used for Scan operation planning
///
/// `table_schema` is the (combined) schema of the overall table
/// and may be a superset of the schema contained in this file.
///
/// TODO: should the file source return statistics for only columns referred to in the table schema?
async fn infer_stats(
#[allow(unused_variables)]
async fn fetch_format_scan_metadata(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
collect_statistics: bool,
collect_ranges: bool,
) -> Result<FormatScanMetadata> {
Ok(FormatScanMetadata::default())
}

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
Expand Down Expand Up @@ -115,9 +150,17 @@ pub(crate) mod test_util {

let file_schema = format.infer_schema(state, &store, &[meta.clone()]).await?;

let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
let format_scan_metadata = format
.fetch_format_scan_metadata(
state,
&store,
file_schema.clone(),
&meta,
true,
true,
)
.await?;
let statistics = format_scan_metadata.statistics;

let file_groups = vec![vec![PartitionedFile {
object_meta: meta,
Expand Down
Loading