|
8 | 8 | (:require [clojure.tools.logging :as log] |
9 | 9 | [xtdb.api :as xt] |
10 | 10 | [xtdb.datasets.tpch :as tpch] |
11 | | - [xtdb.node :as xtn])) |
| 11 | + [xtdb.node :as xtn] |
| 12 | + [xtdb.db-catalog :as db-catalog]) |
| 13 | + (:import [xtdb.aws.s3 S3Configurator] |
| 14 | + [software.amazon.awssdk.regions Region])) |
12 | 15 |
|
13 | 16 | (defn generate-dataset |
14 | 17 | "Generates TPC-H dataset at given scale factor and stores in S3 bucket." |
15 | | - [bucket-name scale-factor] |
16 | | - (log/info "Generating TPC-H dataset at scale factor" scale-factor "to bucket" bucket-name) |
| 18 | + [bucket-name scale-factor region] |
| 19 | + (log/info "Generating TPC-H dataset at scale factor" scale-factor "to bucket" bucket-name "in region" region) |
17 | 20 |
|
18 | | - (let [node-config {:storage |
19 | | - {:object-store |
20 | | - {:module 'xtdb.s3/s3-object-store |
21 | | - :bucket bucket-name |
22 | | - :prefix (str "tpch-sf" scale-factor "/") |
23 | | - ;; Public bucket - no credentials needed for read |
24 | | - ;; Write requires AWS credentials from environment |
25 | | - }} |
26 | | - :server {:port 5432}}] |
| 21 | + (let [node-config {:storage [:remote {:object-store [:s3 {:bucket bucket-name |
| 22 | + :prefix (str "tpch-sf" scale-factor "/") |
| 23 | + :configurator (reify S3Configurator |
| 24 | + (configureClient [_ builder] |
| 25 | + (.region builder (Region/of region))))}]}] |
| 26 | + :disk-cache {:path "/tmp/xtdb-tpch-cache"}}] |
27 | 27 |
|
28 | | - (log/info "Starting XTDB node with config:" node-config) |
| 28 | + (log/info "Starting XTDB node with S3 storage in region:" region) |
29 | 29 |
|
30 | 30 | (with-open [node (xtn/start-node node-config)] |
31 | 31 | (log/info "Node started, generating TPC-H data...") |
32 | 32 |
|
33 | | - ;; Generate TPC-H dataset |
34 | | - (tpch/submit-tpch! node {:scale-factor scale-factor}) |
| 33 | + ;; Generate TPC-H dataset using DML (INSERT statements) |
| 34 | + (tpch/submit-dml! node scale-factor) |
35 | 35 |
|
36 | 36 | (log/info "TPC-H data submitted, waiting for ingest...") |
37 | 37 |
|
38 | 38 | ;; Wait for data to be ingested |
39 | 39 | (Thread/sleep 5000) |
40 | 40 |
|
41 | | - ;; Finish chunk to ensure all data is written |
42 | | - (log/info "Calling finish-chunk to ensure data is persisted...") |
43 | | - (.finishChunk node) |
| 41 | +;; Finish block to ensure all data is written |
| 42 | + (log/info "Calling finish-block to ensure data is persisted...") |
| 43 | + (.finishBlock (.getLogProcessor (db-catalog/primary-db node))) |
44 | 44 |
|
45 | 45 | (log/info "Dataset generation complete!")))) |
46 | 46 |
|
47 | 47 | (defn -main [& args] |
48 | | - (when (not= (count args) 2) |
49 | | - (println "Usage: clojure -M:generate-tpch <bucket-name> <scale-factor>") |
50 | | - (println "Example: clojure -M:generate-tpch xtdb-play-datasets 0.01") |
| 48 | + (when (not= (count args) 3) |
| 49 | + (println "Usage: clojure -M:generate-tpch <bucket-name> <scale-factor> <region>") |
| 50 | + (println "Example: clojure -M:generate-tpch xtdb-play-datasets 0.01 eu-west-1") |
51 | 51 | (System/exit 1)) |
52 | 52 |
|
53 | 53 | (let [bucket-name (first args) |
54 | | - scale-factor (Double/parseDouble (second args))] |
| 54 | + scale-factor (Double/parseDouble (second args)) |
| 55 | + region (nth args 2)] |
55 | 56 |
|
56 | 57 | (println "=" (repeat 60 "=")) |
57 | 58 | (println "TPC-H Dataset Generator") |
58 | 59 | (println "=" (repeat 60 "=")) |
59 | 60 | (println "Bucket:" bucket-name) |
60 | 61 | (println "Scale Factor:" scale-factor) |
| 62 | + (println "Region:" region) |
61 | 63 | (println) |
62 | 64 |
|
63 | 65 | (try |
64 | | - (generate-dataset bucket-name scale-factor) |
| 66 | + (generate-dataset bucket-name scale-factor region) |
65 | 67 | (println) |
66 | 68 | (println "✓ Success! Dataset is now available in S3:") |
67 | | - (println " s3://" bucket-name "/tpch-sf" scale-factor "/") |
| 69 | + (println (str " s3://" bucket-name "/tpch-sf" scale-factor "/")) |
68 | 70 | (catch Exception e |
69 | 71 | (println "✗ Error generating dataset:") |
70 | 72 | (println (.getMessage e)) |
|
0 commit comments