-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
117 lines (95 loc) · 2.88 KB
/
example.py
File metadata and controls
117 lines (95 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from nimbus import Nimbus, ConnInfo
import psycopg2
conn_info = ConnInfo(
host="localhost",
port=4566,
user="root",
password="",
dbname="dev",
)
pg_source_config = {
"hostname": "postgres",
"port": 5432,
"username": "postgres",
"password": "postgres",
"database.name": "postgres",
}
ICEBERG_TABLE_NAME = "my_iceberg_table"
def example_ingest():
bus = Nimbus(conn_info)
j = bus.json_ingestor()
stream = bus.create_stream(
name="direct_ingest",
destination=bus.table(ICEBERG_TABLE_NAME),
source=j,
)
try:
stream.deploy()
for i in range(10):
j.ingest_record({"id": i, "name": f"name_{i}"})
j.flush()
finally:
j.close()
def example_postgres_cdc():
bus = Nimbus(conn_info)
pg_source = bus.postgres_source(
name="my_pg_source",
config=pg_source_config,
)
stream = bus.create_stream(
name="pg_cdc_ingest",
destination=bus.table(ICEBERG_TABLE_NAME),
source=bus.postgres_cdc(
postgres_source=pg_source,
postgres_table_name="public.my_pg_table",
config={
"snapshot": True,
"backfill.parallelism": 4,
},
),
)
stream.deploy()
def prepare():
# create postgres table
c = pg_source_config
with psycopg2.connect(
f"postgres://{c['username']}:{c['password']}@localhost:{c['port']}/{c['database.name']}"
) as pg:
with pg.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS my_pg_table (
id INTEGER PRIMARY KEY,
name VARCHAR
);
"""
)
pg.commit()
# create iceberg table
with conn_info.connect() as conn:
conn.exec(
f"""
CREATE CONNECTION IF NOT EXISTS lakekeeper_catalog_conn WITH (
type = 'iceberg',
warehouse.path = 'risingwave-warehouse',
s3.region = 'us-east-1',
s3.access.key = 'minioadmin',
s3.secret.key = 'minioadmin',
s3.endpoint = 'http://minio:9301',
catalog.type = 'rest',
catalog.uri = 'http://lakekeeper:8181/catalog/',
s3.path.style.access = true
);
SET iceberg_engine_connection = 'public.lakekeeper_catalog_conn';
CREATE TABLE IF NOT EXISTS {ICEBERG_TABLE_NAME} (
id INT PRIMARY KEY,
name VARCHAR
) WITH (
commit_checkpoint_interval = 1
) ENGINE = iceberg;
"""
)
if __name__ == "__main__":
prepare()
example_ingest()
example_postgres_cdc()