Skip to content

Commit ba2fe43

Browse files
HonahXFokko
andauthored
Centralized table properties management (#388)
* add TableProperties and PropertyUtil * fix lint * Revert unrelated change --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent ef33b9d commit ba2fe43

File tree

4 files changed

+80
-40
lines changed

4 files changed

+80
-40
lines changed

pyiceberg/catalog/hive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6868
from pyiceberg.schema import Schema, SchemaVisitor, visit
6969
from pyiceberg.serializers import FromInputFile
70-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
70+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
7171
from pyiceberg.table.metadata import new_table_metadata
7272
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
7373
from pyiceberg.typedef import EMPTY_DICT
@@ -155,7 +155,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str])
155155
PROP_TABLE_TYPE = "table_type"
156156
PROP_METADATA_LOCATION = "metadata_location"
157157
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
158-
DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
158+
DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT}
159159

160160

161161
def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]:

pyiceberg/io/pyarrow.py

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@
124124
visit,
125125
visit_with_partner,
126126
)
127-
from pyiceberg.table import WriteTask
127+
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
128128
from pyiceberg.table.name_mapping import NameMapping
129129
from pyiceberg.transforms import TruncateTransform
130130
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
@@ -1389,19 +1389,12 @@ class MetricModeTypes(Enum):
13891389
FULL = "full"
13901390

13911391

1392-
DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
1393-
COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
1394-
1395-
13961392
@dataclass(frozen=True)
13971393
class MetricsMode(Singleton):
13981394
type: MetricModeTypes
13991395
length: Optional[int] = None
14001396

14011397

1402-
_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, DEFAULT_TRUNCATION_LENGTH)
1403-
1404-
14051398
def match_metrics_mode(mode: str) -> MetricsMode:
14061399
sanitized_mode = mode.strip().lower()
14071400
if sanitized_mode.startswith("truncate"):
@@ -1435,12 +1428,14 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
14351428
_field_id: int = 0
14361429
_schema: Schema
14371430
_properties: Dict[str, str]
1438-
_default_mode: Optional[str]
1431+
_default_mode: str
14391432

14401433
def __init__(self, schema: Schema, properties: Dict[str, str]):
14411434
self._schema = schema
14421435
self._properties = properties
1443-
self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
1436+
self._default_mode = self._properties.get(
1437+
TableProperties.DEFAULT_WRITE_METRICS_MODE, TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT
1438+
)
14441439

14451440
def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
14461441
return struct_result()
@@ -1475,12 +1470,9 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
14751470
if column_name is None:
14761471
return []
14771472

1478-
metrics_mode = _DEFAULT_METRICS_MODE
1479-
1480-
if self._default_mode:
1481-
metrics_mode = match_metrics_mode(self._default_mode)
1473+
metrics_mode = match_metrics_mode(self._default_mode)
14821474

1483-
col_mode = self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
1475+
col_mode = self._properties.get(f"{TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX}.{column_name}")
14841476
if col_mode:
14851477
metrics_mode = match_metrics_mode(col_mode)
14861478

@@ -1767,33 +1759,40 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
17671759
return iter([data_file])
17681760

17691761

1770-
def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
1771-
def _get_int(key: str, default: Optional[int] = None) -> Optional[int]:
1772-
if value := table_properties.get(key):
1773-
try:
1774-
return int(value)
1775-
except ValueError as e:
1776-
raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e
1777-
else:
1778-
return default
1762+
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
1763+
PYARROW_UNCOMPRESSED_CODEC = "none"
17791764

1765+
1766+
def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
17801767
for key_pattern in [
1781-
"write.parquet.row-group-size-bytes",
1782-
"write.parquet.page-row-limit",
1783-
"write.parquet.bloom-filter-max-bytes",
1784-
"write.parquet.bloom-filter-enabled.column.*",
1768+
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
1769+
TableProperties.PARQUET_PAGE_ROW_LIMIT,
1770+
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
1771+
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
17851772
]:
17861773
if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
17871774
raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented")
17881775

1789-
compression_codec = table_properties.get("write.parquet.compression-codec", "zstd")
1790-
compression_level = _get_int("write.parquet.compression-level")
1791-
if compression_codec == "uncompressed":
1792-
compression_codec = "none"
1776+
compression_codec = table_properties.get(TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT)
1777+
compression_level = PropertyUtil.property_as_int(
1778+
properties=table_properties,
1779+
property_name=TableProperties.PARQUET_COMPRESSION_LEVEL,
1780+
default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
1781+
)
1782+
if compression_codec == ICEBERG_UNCOMPRESSED_CODEC:
1783+
compression_codec = PYARROW_UNCOMPRESSED_CODEC
17931784

17941785
return {
17951786
"compression": compression_codec,
17961787
"compression_level": compression_level,
1797-
"data_page_size": _get_int("write.parquet.page-size-bytes"),
1798-
"dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes", default=2 * 1024 * 1024),
1788+
"data_page_size": PropertyUtil.property_as_int(
1789+
properties=table_properties,
1790+
property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES,
1791+
default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT,
1792+
),
1793+
"dictionary_pagesize_limit": PropertyUtil.property_as_int(
1794+
properties=table_properties,
1795+
property_name=TableProperties.PARQUET_DICT_SIZE_BYTES,
1796+
default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT,
1797+
),
17991798
}

pyiceberg/table/__init__.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
TableMetadataUtil,
8686
)
8787
from pyiceberg.table.name_mapping import (
88-
SCHEMA_NAME_MAPPING_DEFAULT,
8988
NameMapping,
9089
create_mapping_from_schema,
9190
parse_mapping_from_json,
@@ -134,6 +133,50 @@
134133
_JAVA_LONG_MAX = 9223372036854775807
135134

136135

136+
class TableProperties:
137+
PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
138+
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB
139+
140+
PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
141+
PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB
142+
143+
PARQUET_PAGE_ROW_LIMIT = "write.parquet.page-row-limit"
144+
PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20000
145+
146+
PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
147+
PARQUET_DICT_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024 # 2 MB
148+
149+
PARQUET_COMPRESSION = "write.parquet.compression-codec"
150+
PARQUET_COMPRESSION_DEFAULT = "zstd"
151+
152+
PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"
153+
PARQUET_COMPRESSION_LEVEL_DEFAULT = None
154+
155+
PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"
156+
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024
157+
158+
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"
159+
160+
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
161+
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
162+
163+
METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"
164+
165+
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
166+
167+
168+
class PropertyUtil:
169+
@staticmethod
170+
def property_as_int(properties: Dict[str, str], property_name: str, default: Optional[int] = None) -> Optional[int]:
171+
if value := properties.get(property_name):
172+
try:
173+
return int(value)
174+
except ValueError as e:
175+
raise ValueError(f"Could not parse table property {property_name} to an integer: {value}") from e
176+
else:
177+
return default
178+
179+
137180
class Transaction:
138181
_table: Table
139182
_updates: Tuple[TableUpdate, ...]
@@ -921,7 +964,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
921964

922965
def name_mapping(self) -> NameMapping:
923966
"""Return the table's field-id NameMapping."""
924-
if name_mapping_json := self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
967+
if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
925968
return parse_mapping_from_json(name_mapping_json)
926969
else:
927970
return create_mapping_from_schema(self.schema())

pyiceberg/table/name_mapping.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
3535
from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, StructType
3636

37-
SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"
38-
3937

4038
class MappedField(IcebergBaseModel):
4139
field_id: int = Field(alias="field-id")

0 commit comments

Comments
 (0)