Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions paimon-python/pypaimon/globalindex/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ def exclude(self, ranges: List['Range']) -> List['Range']:

return result

@staticmethod
def to_ranges(values: List[int]) -> List['Range']:
if not values:
return []
sorted_ids = sorted(values)
result = []
range_start = sorted_ids[0]
range_end = range_start
for current in sorted_ids[1:]:
if current != range_end + 1:
result.append(Range(range_start, range_end))
range_start = current
range_end = current
result.append(Range(range_start, range_end))
return result

@staticmethod
def intersect(start1: int, end1: int, start2: int, end2: int) -> bool:
"""Check if two ranges intersect."""
Expand Down
36 changes: 36 additions & 0 deletions paimon-python/pypaimon/read/push_down_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,39 @@ def _get_all_fields(predicate: Predicate) -> Set[str]:
for sub_predicate in predicate.literals:
involved_fields.update(_get_all_fields(sub_predicate))
return involved_fields


def remove_row_id_filter(predicate: Predicate) -> Optional[Predicate]:
from pypaimon.table.special_fields import SpecialFields

if not predicate:
return None
if predicate.field == SpecialFields.ROW_ID.name:
return None
if predicate.method == "and":
parts = _split_and(predicate)
non_row_id = [
p for p in parts
if _get_all_fields(p) != {SpecialFields.ROW_ID.name}
]
if not non_row_id:
return None
filtered = []
for p in non_row_id:
r = remove_row_id_filter(p)
if r is None:
return None
filtered.append(r)
return PredicateBuilder.and_predicates(filtered)
if predicate.method == "or":
new_children = []
for c in predicate.literals or []:
r = remove_row_id_filter(c)
if r is not None:
new_children.append(r)
if not new_children:
return None
if len(new_children) == 1:
return new_children[0]
return PredicateBuilder.or_predicates(new_children)
return predicate
87 changes: 85 additions & 2 deletions paimon-python/pypaimon/read/scanner/file_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (trim_and_transform_predicate)
from pypaimon.read.push_down_utils import (
remove_row_id_filter,
trim_and_transform_predicate,
)
from pypaimon.read.scanner.append_table_split_generator import AppendTableSplitGenerator
from pypaimon.read.scanner.data_evolution_split_generator import DataEvolutionSplitGenerator
from pypaimon.read.scanner.primary_key_table_split_generator import PrimaryKeyTableSplitGenerator
Expand All @@ -37,6 +40,53 @@
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions


def _row_ranges_from_predicate(predicate: Optional[Predicate]) -> Optional[List]:
from pypaimon.globalindex.range import Range
from pypaimon.table.special_fields import SpecialFields

if predicate is None:
return None

def visit(p: Predicate):
if p.method == 'and':
result = None
for child in p.literals:
sub = visit(child)
if sub is None:
continue
result = Range.and_(result, sub) if result is not None else sub
if not result:
return result
return result
if p.method == 'or':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And between, see #7255

parts = []
for child in p.literals:
sub = visit(child)
if sub is None:
return None
parts.extend(sub)
if not parts:
return []
return Range.sort_and_merge_overlap(parts, merge=True, adjacent=True)
if p.field != SpecialFields.ROW_ID.name:
return None
if p.method == 'equal':
if not p.literals:
return []
return Range.to_ranges([int(p.literals[0])])
if p.method == 'in':
if not p.literals:
return []
return Range.to_ranges([int(x) for x in p.literals])
if p.method == 'between':
if not p.literals or len(p.literals) < 2:
return []
return [Range(int(p.literals[0]), int(p.literals[1]))]
return None

return visit(predicate)


def _filter_manifest_files_by_row_ranges(
manifest_files: List[ManifestFileMeta],
row_ranges: List) -> List[ManifestFileMeta]:
Expand Down Expand Up @@ -85,6 +135,31 @@ def _filter_manifest_files_by_row_ranges(
return filtered_files


def _filter_manifest_entries_by_row_ranges(
entries: List[ManifestEntry],
row_ranges: List) -> List[ManifestEntry]:
from pypaimon.globalindex.range import Range

if not row_ranges:
return []

filtered = []
for entry in entries:
first_row_id = entry.file.first_row_id
if first_row_id is None:
filtered.append(entry)
continue
file_range = Range(
first_row_id,
first_row_id + entry.file.row_count - 1
)
for r in row_ranges:
if file_range.overlaps(r):
filtered.append(entry)
break
return filtered


class FileScanner:
def __init__(
self,
Expand All @@ -99,6 +174,7 @@ def __init__(
self.table: FileStoreTable = table
self.manifest_scanner = manifest_scanner
self.predicate = predicate
self.predicate_for_stats = remove_row_id_filter(predicate) if predicate else None
self.limit = limit
self.vector_search = vector_search

Expand Down Expand Up @@ -186,6 +262,8 @@ def _create_data_evolution_split_generator(self):
row_ranges = global_index_result.results().to_range_list()
if isinstance(global_index_result, ScoredGlobalIndexResult):
score_getter = global_index_result.score_getter()
if row_ranges is None and self.predicate is not None:
row_ranges = _row_ranges_from_predicate(self.predicate)

manifest_files = self.manifest_scanner()

Expand All @@ -195,6 +273,9 @@ def _create_data_evolution_split_generator(self):

entries = self.read_manifest_entries(manifest_files)

if row_ranges is not None:
entries = _filter_manifest_entries_by_row_ranges(entries, row_ranges)

return entries, DataEvolutionSplitGenerator(
self.table,
self.target_split_size,
Expand Down Expand Up @@ -343,6 +424,8 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
else:
if not self.predicate:
return True
if self.predicate_for_stats is None:
return True
if entry.file.value_stats_cols is None and entry.file.write_cols is not None:
stats_fields = entry.file.write_cols
else:
Expand All @@ -352,7 +435,7 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
entry.file.row_count,
stats_fields
)
return self.predicate.test_by_simple_stats(
return self.predicate_for_stats.test_by_simple_stats(
evolved_stats,
entry.file.row_count
)
Expand Down
88 changes: 88 additions & 0 deletions paimon-python/pypaimon/tests/data_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import tempfile
import unittest
from types import SimpleNamespace

import pyarrow as pa

Expand Down Expand Up @@ -587,6 +588,93 @@ def test_multiple_appends_different_first_row_ids(self):
'_SEQUENCE_NUMBER must be non-nullable per SpecialFields',
)

rb_with_row_id = table.new_read_builder().with_projection(['f0', 'f1', 'f2', '_ROW_ID'])
pb = rb_with_row_id.new_predicate_builder()
rb_eq0 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 0))
result_eq0 = rb_eq0.new_read().to_arrow(rb_eq0.new_scan().plan().splits())
self.assertEqual(result_eq0, pa.Table.from_pydict(
{'f0': [1], 'f1': ['a'], 'f2': ['b']}, schema=simple_pa_schema))
rb_eq1 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 1))
result_eq1 = rb_eq1.new_read().to_arrow(rb_eq1.new_scan().plan().splits())
self.assertEqual(result_eq1, pa.Table.from_pydict(
{'f0': [2], 'f1': ['c'], 'f2': ['d']}, schema=simple_pa_schema))
rb_in = table.new_read_builder().with_filter(pb.is_in('_ROW_ID', [0, 1]))
result_in = rb_in.new_read().to_arrow(rb_in.new_scan().plan().splits())
self.assertEqual(result_in, expect)

def test_filter_by_row_id(self):
simple_pa_schema = pa.schema([('f0', pa.int32())])
schema = Schema.from_pyarrow_schema(
simple_pa_schema,
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
)
self.catalog.create_table('default.test_row_id_filter_empty_and_or', schema, False)
table = self.catalog.get_table('default.test_row_id_filter_empty_and_or')
write_builder = table.new_batch_write_builder()

# Commit 1: _ROW_ID 0, 1 with f0=1, 2
w = write_builder.new_write().with_write_type(['f0'])
c = write_builder.new_commit()
w.write_arrow(pa.Table.from_pydict(
{'f0': [1, 2]}, schema=pa.schema([('f0', pa.int32())])))
cmts = w.prepare_commit()
for msg in cmts:
for nf in msg.new_files:
nf.first_row_id = 0
c.commit(cmts)
w.close()
c.close()

# Commit 2: _ROW_ID 2, 3 with f0=101, 102
w = write_builder.new_write().with_write_type(['f0'])
c = write_builder.new_commit()
w.write_arrow(pa.Table.from_pydict(
{'f0': [101, 102]}, schema=pa.schema([('f0', pa.int32())])))
cmts = w.prepare_commit()
for msg in cmts:
for nf in msg.new_files:
nf.first_row_id = 2
c.commit(cmts)
w.close()
c.close()

rb_with_row_id = table.new_read_builder().with_projection(['f0', '_ROW_ID'])
pb = rb_with_row_id.new_predicate_builder()

# 1. Non-existent _ROW_ID -> empty
rb_eq999 = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 999))
result_eq999 = rb_eq999.new_read().to_arrow(rb_eq999.new_scan().plan().splits())
self.assertEqual(len(result_eq999), 0, "Non-existent _ROW_ID should return empty")

# 2. AND: _ROW_ID=0 AND f0=1 -> 1 row
rb_and = table.new_read_builder().with_filter(
pb.and_predicates([pb.equal('_ROW_ID', 0), pb.equal('f0', 1)])
)
result_and = rb_and.new_read().to_arrow(rb_and.new_scan().plan().splits())
self.assertEqual(len(result_and), 1)
self.assertEqual(result_and['f0'][0].as_py(), 1)

# 3. OR: _ROW_ID=0 OR f0>100 -> at least row with _ROW_ID=0 and all f0>100
rb_or = table.new_read_builder().with_filter(
pb.or_predicates([pb.equal('_ROW_ID', 0), pb.greater_than('f0', 100)])
)
result_or = rb_or.new_read().to_arrow(rb_or.new_scan().plan().splits())
f0_vals = set(result_or['f0'][i].as_py() for i in range(len(result_or)))
self.assertGreaterEqual(len(result_or), 3, "OR should return _ROW_ID=0 row and f0>100 rows")
self.assertIn(1, f0_vals, "_ROW_ID=0 row has f0=1")
self.assertIn(101, f0_vals)
self.assertIn(102, f0_vals)

def test_filter_manifest_entries_by_row_ranges(self):
from pypaimon.read.scanner.file_scanner import _filter_manifest_entries_by_row_ranges

entry_0 = SimpleNamespace(file=SimpleNamespace(first_row_id=0, row_count=1))
entries = [entry_0]
row_ranges = []

filtered = _filter_manifest_entries_by_row_ranges(entries, row_ranges)
self.assertEqual(filtered, [], "empty row_ranges must return no entries, not all entries")

def test_more_data(self):
simple_pa_schema = pa.schema([
('f0', pa.int32()),
Expand Down
86 changes: 86 additions & 0 deletions paimon-python/pypaimon/tests/range_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import unittest

from pypaimon.common.predicate import Predicate
from pypaimon.globalindex.range import Range
from pypaimon.read.scanner.file_scanner import _row_ranges_from_predicate
from pypaimon.table.special_fields import SpecialFields


class RangeTest(unittest.TestCase):

def test_to_ranges(self):
assert Range.to_ranges([]) == []
assert Range.to_ranges([5]) == [Range(5, 5)]
assert Range.to_ranges([1, 2, 3]) == [Range(1, 3)]
assert Range.to_ranges([1, 3, 5]) == [
Range(1, 1), Range(3, 3), Range(5, 5)
]
assert Range.to_ranges([1, 1, 2]) == [Range(1, 1), Range(1, 2)]

def test_row_ranges_from_predicate(self):
assert _row_ranges_from_predicate(None) is None

pred_eq = Predicate('equal', 0, SpecialFields.ROW_ID.name, [5])
assert _row_ranges_from_predicate(pred_eq) == [Range(5, 5)]

pred_in = Predicate('in', 0, SpecialFields.ROW_ID.name, [10])
assert _row_ranges_from_predicate(pred_in) == [Range(10, 10)]
pred_in_multi = Predicate('in', 0, SpecialFields.ROW_ID.name, [1, 2, 3, 5, 6])
assert _row_ranges_from_predicate(pred_in_multi) == [Range(1, 3), Range(5, 6)]

pred_between = Predicate('between', 0, SpecialFields.ROW_ID.name, [10, 20])
result = _row_ranges_from_predicate(pred_between)
assert result is not None and result == [Range(10, 20)]

pred_other = Predicate('equal', 0, 'other_field', [5])
assert _row_ranges_from_predicate(pred_other) is None

pred_gt = Predicate('greaterThan', 0, SpecialFields.ROW_ID.name, [10])
assert _row_ranges_from_predicate(pred_gt) is None

assert _row_ranges_from_predicate(
Predicate('equal', 0, SpecialFields.ROW_ID.name, [])
) == []
assert _row_ranges_from_predicate(
Predicate('in', 0, SpecialFields.ROW_ID.name, [])
) == []
assert _row_ranges_from_predicate(
Predicate('between', 0, SpecialFields.ROW_ID.name, [10])
) == []

pred_eq5 = Predicate('equal', 0, SpecialFields.ROW_ID.name, [5])
pred_between_1_10 = Predicate('between', 0, SpecialFields.ROW_ID.name, [1, 10])
pred_and = Predicate('and', None, None, [pred_eq5, pred_between_1_10])
assert _row_ranges_from_predicate(pred_and) == [Range(5, 5)]
pred_eq3 = Predicate('equal', 0, SpecialFields.ROW_ID.name, [3])
pred_and_no = Predicate('and', None, None, [pred_eq5, pred_eq3])
assert _row_ranges_from_predicate(pred_and_no) == []

pred_or = Predicate('or', None, None, [pred_eq5, pred_eq3])
assert _row_ranges_from_predicate(pred_or) == [Range(3, 3), Range(5, 5)]
pred_between_1_5 = Predicate('between', 0, SpecialFields.ROW_ID.name, [1, 5])
pred_between_3_7 = Predicate('between', 0, SpecialFields.ROW_ID.name, [3, 7])
pred_or_overlap = Predicate('or', None, None, [pred_between_1_5, pred_between_3_7])
assert _row_ranges_from_predicate(pred_or_overlap) == [Range(1, 7)]


if __name__ == '__main__':
unittest.main()
Loading