diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index e904662871..7978fdc9b4 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -474,6 +474,71 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]] long: [[2.349014],[4.896029],[6.0989],[-122.431297]] ``` +### Upsert + +PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row. + +Consider the following table, with some data: + +```python +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType, NestedField, StringType + +import pyarrow as pa + +schema = Schema( + NestedField(1, "city", StringType(), required=True), + NestedField(2, "inhabitants", IntegerType(), required=True), + # Mark City as the identifier field, also known as the primary-key + identifier_field_ids=[1] +) + +tbl = catalog.create_table("default.cities", schema=schema) + +arrow_schema = pa.schema( + [ + pa.field("city", pa.string(), nullable=False), + pa.field("inhabitants", pa.int32(), nullable=False), + ] +) + +# Write some data +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "inhabitants": 921402}, + {"city": "San Francisco", "inhabitants": 808988}, + {"city": "Drachten", "inhabitants": 45019}, + {"city": "Paris", "inhabitants": 2103000}, + ], + schema=arrow_schema +) +tbl.append(df) +``` + +Next, we'll upsert a table into the Iceberg table: + +```python +df = pa.Table.from_pylist( + [ + # Will be updated, the inhabitants has been updated + {"city": "Drachten", "inhabitants": 45505}, + + # New row, will be inserted + {"city": "Berlin", "inhabitants": 3432000}, + + # Ignored, already exists in the table + {"city": "Paris", "inhabitants": 2103000}, + ], + schema=arrow_schema +) +upd = tbl.upsert(df) + +assert upd.rows_updated == 1 +assert upd.rows_inserted == 1 +``` + +PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored. + ## Inspecting tables To explore the table metadata, tables can be inspected. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 93edf70f46..a409cd0e44 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1109,7 +1109,7 @@ def name_mapping(self) -> Optional[NameMapping]: def upsert( self, df: pa.Table, - join_cols: list[str], + join_cols: Optional[List[str]] = None, when_matched_update_all: bool = True, when_not_matched_insert_all: bool = True, case_sensitive: bool = True, @@ -1119,11 +1119,13 @@ def upsert( Args: df: The input dataframe to upsert with the table's data. - join_cols: The columns to join on. These are essentially analogous to primary keys + join_cols: Columns to join on, if not provided, it will use the identifier-field-ids. when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table case_sensitive: Bool indicating if the match should be case-sensitive + To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids + Example Use Cases: Case 1: Both Parameters = True (Full Upsert) Existing row found → Update it @@ -1148,6 +1150,15 @@ def upsert( """ from pyiceberg.table import upsert_util + if join_cols is None: + join_cols = [] + for field_id in self.schema().identifier_field_ids: + col = self.schema().find_column_name(field_id) + if col is not None: + join_cols.append(col) + else: + raise ValueError(f"Field-ID could not be found: {join_cols}") + if not when_matched_update_all and not when_not_matched_insert_all: raise ValueError("no upsert options selected...exiting") diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 2535e359d0..0cfb0ba609 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -16,13 +16,16 @@ # under the License. from pathlib import PosixPath +import pyarrow as pa import pytest from datafusion import SessionContext from pyarrow import Table as pa_table from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.schema import Schema from pyiceberg.table import UpsertResult +from pyiceberg.types import IntegerType, NestedField, StringType from tests.catalog.test_base import InMemoryCatalog, Table @@ -314,3 +317,52 @@ def test_key_cols_misaligned(catalog: Catalog) -> None: with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""): table.upsert(df=df_src, join_cols=["order_id"]) + + +def test_upsert_with_identifier_fields(catalog: Catalog) -> None: + identifier = "default.test_upsert_with_identifier_fields" + _drop_table(catalog, identifier) + + schema = Schema( + NestedField(1, "city", StringType(), required=True), + NestedField(2, "inhabitants", IntegerType(), required=True), + # Mark City as the identifier field, also known as the primary-key + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("city", pa.string(), nullable=False), + pa.field("inhabitants", pa.int32(), nullable=False), + ] + ) + + # Write some data + df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "inhabitants": 921402}, + {"city": "San Francisco", "inhabitants": 808988}, + {"city": "Drachten", "inhabitants": 45019}, + {"city": "Paris", "inhabitants": 2103000}, + ], + schema=arrow_schema, + ) + tbl.append(df) + + df = pa.Table.from_pylist( + [ + # Will be updated, the inhabitants has been updated + {"city": "Drachten", "inhabitants": 45505}, + # New row, will be inserted + {"city": "Berlin", "inhabitants": 3432000}, + # Ignored, already exists in the table + {"city": "Paris", "inhabitants": 2103000}, + ], + schema=arrow_schema, + ) + upd = tbl.upsert(df) + + assert upd.rows_updated == 1 + assert upd.rows_inserted == 1