Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6da0719
feat: MergeAppend — auto-compact manifests at commit time
sriv-e6x Jun 1, 2026
bd7461c
feat(transaction): FastAppendAction.with_snapshot_id() + public gener…
Jun 2, 2026
fc85a11
feat(transaction): mirror with_snapshot_id() on ReplaceDataFilesAction
Jun 2, 2026
6d3124e
feat(transaction): mirror with_snapshot_id() on RewriteManifestsAction
Jun 2, 2026
cd5cc1d
feat: root manifest format for v4 single-file commits (Phase 1)
sriv-e6x Jun 5, 2026
a04caa0
feat: add FormatVersion::V4 with V3 fallback across all match arms
sriv-e6x Jun 5, 2026
303de34
feat: V4 single-file commit path via root manifest (Phase 2)
sriv-e6x Jun 5, 2026
8f632c2
feat: manifest delete vectors for V4 compaction (Phase 3)
sriv-e6x Jun 5, 2026
ecb322c
feat: background rebalancing for V4 root manifests (Phase 4)
sriv-e6x Jun 5, 2026
5cd7502
test: add MDV comprehensive and remove_inline_files tests
sriv-e6x Jun 5, 2026
c57b29b
perf: cache-forward entries, MDV index, summary fix for V4
sriv-e6x Jun 5, 2026
b94e1c5
perf: two-section Parquet layout + adaptive inline flush for V4
sriv-e6x Jun 5, 2026
ecae928
perf: partition bounds on inline entries for query pruning
sriv-e6x Jun 5, 2026
5906380
fix: CAS cache validation, content type separation, partition bounds …
sriv-e6x Jun 5, 2026
95b06ae
fix: ref-merging, cache edge case, row lineage, V4 integration test
sriv-e6x Jun 5, 2026
cf22a43
v4 root manifest: scan path, correctness, and performance fixes
sriv-e6x Jun 5, 2026
8f6a35f
feat: opt into V4 via e6.actual-format-version property (Lakekeeper-f…
Jun 5, 2026
07caac7
fix(spec): route load_manifest_list through effective_format_version
Jun 5, 2026
7d042a2
fix(snapshot): commit_v4 child manifests must be Parquet to match .pa…
Jun 5, 2026
e84ed18
fix(scan): don't await the delete-process spawn -- deadlocks V4 inlin…
Jun 6, 2026
760990d
fix(scan): drop inline_delete_tx before inline-data loop to break V4 …
Jun 6, 2026
275434b
feat(parquet): expose parquet_to_data_file_builder as pub
Jun 10, 2026
45d8491
fix(spec): key TableMetadata.statistics on (snapshot_id, path)
Jun 13, 2026
fa3dcf6
feat(io): bump opendal 0.55→0.57 and drop CustomAwsCredentialLoader p…
Jun 21, 2026
76663e5
feat(azdls): inject WI bearer token via http-client wrap
Jun 21, 2026
244c451
fix(azdls): always overwrite Authorization in WI bearer wrap
Jun 21, 2026
83aecf9
fix(transaction): disable retry for ReplaceDataFiles commits
sriv-e6x Jun 24, 2026
8df5199
fix(snapshot): use current_snapshot for summary cumulative totals
sriv-e6x Jun 24, 2026
c59f15e
test: add OCC concurrency test for concurrent commit behavior
sriv-e6x Jun 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
989 changes: 793 additions & 196 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ motore-macros = "0.4.3"
murmur3 = "0.5.2"
num-bigint = "0.4.6"
once_cell = "1.20"
opendal = "0.55.0"
opendal = "0.57.0"
ordered-float = "4"
parquet = "57.0"
pilota = "0.11.10"
Expand Down
5 changes: 3 additions & 2 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ storage-fs = ["opendal/services-fs"]
storage-gcs = ["opendal/services-gcs"]
storage-memory = ["opendal/services-memory"]
storage-oss = ["opendal/services-oss"]
storage-s3 = ["opendal/services-s3", "reqsign"]
storage-s3 = ["opendal/services-s3"]


[dependencies]
Expand All @@ -57,13 +57,15 @@ async-trait = { workspace = true }
backon = { workspace = true }
base64 = { workspace = true }
bimap = { workspace = true }
log = "0.4"
bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
expect-test = { workspace = true }
flate2 = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
itertools = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
Expand All @@ -73,7 +75,6 @@ opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
rand = { workspace = true }
reqsign = { version = "0.16.3", optional = true, default-features = false }
reqwest = { workspace = true }
roaring = { workspace = true }
rust_decimal = { workspace = true }
Expand Down
231 changes: 231 additions & 0 deletions crates/iceberg/src/io/azdls_wi_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Azure Workload-Identity bearer-token injection for opendal-azdls.
//!
//! `opendal-service-azdls` 0.57's `AzdlsBuilder` constructs its credential
//! chain with a `StaticEnv` that contains only the explicitly-set
//! `adls.client-id` / `adls.tenant-id` / `adls.authority-host` properties —
//! it never propagates `AZURE_FEDERATED_TOKEN_FILE` from the OS env, which
//! reqsign's `WorkloadIdentityCredentialProvider` requires. The result on
//! AKS is that opendal's IMDS provider returns a node-VM identity (wrong
//! principal), and ADLS rejects writes with 403
//! `AuthorizationPermissionMismatch`.
//!
//! Rather than patch upstream opendal, this module performs the federated
//! → AAD token exchange ourselves and wraps the operator's `HttpClient` so
//! every outgoing request carries a fresh `Authorization: Bearer <token>`
//! header for the WI managed identity. We hook in via the public
//! `Operator::inner().info().update_http_client(..)` surface, so nothing in
//! the opendal stack needs to change.

use std::sync::Arc;
use std::time::{Duration, Instant};

use http::header::AUTHORIZATION;
use http::{HeaderValue, Request, Response};
use opendal::Buffer;
use opendal::raw::{HttpBody, HttpClient, HttpFetch};
use opendal::{Error as ODError, ErrorKind as ODErrorKind, Result as ODResult};
use serde::Deserialize;
use tokio::sync::Mutex;

use crate::{Error, ErrorKind, Result};

const AAD_SCOPE: &str = "https://storage.azure.com/.default";
const DEFAULT_AUTHORITY_HOST: &str = "https://login.microsoftonline.com";
const REFRESH_SKEW: Duration = Duration::from_secs(120);

/// Environment-derived WI settings the federated exchange needs.
#[derive(Debug, Clone)]
pub(crate) struct WiEnv {
pub tenant_id: String,
pub client_id: String,
pub federated_token_file: String,
pub authority_host: String,
}

/// Reads `AZURE_*` env vars; returns `None` if any of the three required
/// vars (tenant, client, federated token file) is missing or empty.
pub(crate) fn read_wi_env() -> Option<WiEnv> {
let tenant_id = non_empty_env("AZURE_TENANT_ID")?;
let client_id = non_empty_env("AZURE_CLIENT_ID")?;
let federated_token_file = non_empty_env("AZURE_FEDERATED_TOKEN_FILE")?;
let authority_host = non_empty_env("AZURE_AUTHORITY_HOST")
.unwrap_or_else(|| DEFAULT_AUTHORITY_HOST.to_string());
Some(WiEnv {
tenant_id,
client_id,
federated_token_file,
authority_host,
})
}

fn non_empty_env(name: &str) -> Option<String> {
std::env::var(name).ok().filter(|v| !v.is_empty())
}

#[derive(Debug, Deserialize)]
struct TokenResponse {
access_token: String,
expires_in: u64,
}

#[derive(Debug)]
struct CachedToken {
token: String,
/// Instant after which we should refresh (= AAD-expiry minus REFRESH_SKEW).
refresh_after: Instant,
}

/// Caches a WI access token and refreshes on demand using the federated
/// assertion flow.
#[derive(Debug)]
pub(crate) struct WiTokenFetcher {
env: WiEnv,
http: reqwest::Client,
cached: Mutex<Option<CachedToken>>,
}

impl WiTokenFetcher {
pub(crate) fn new(env: WiEnv) -> Self {
Self {
env,
http: reqwest::Client::new(),
cached: Mutex::new(None),
}
}

/// Returns a valid bearer token, refreshing the cached one if needed.
pub(crate) async fn get_token(&self) -> Result<String> {
let mut guard = self.cached.lock().await;
if let Some(cached) = guard.as_ref() {
if Instant::now() < cached.refresh_after {
return Ok(cached.token.clone());
}
}

let assertion = tokio::fs::read_to_string(&self.env.federated_token_file)
.await
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!(
"WI: read AZURE_FEDERATED_TOKEN_FILE {}: {e}",
self.env.federated_token_file
),
)
})?;
let assertion = assertion.trim().to_string();

let url = format!(
"{}/{}/oauth2/v2.0/token",
self.env.authority_host.trim_end_matches('/'),
self.env.tenant_id
);
let form = [
("client_id", self.env.client_id.as_str()),
("scope", AAD_SCOPE),
("grant_type", "client_credentials"),
(
"client_assertion_type",
"urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
),
("client_assertion", assertion.as_str()),
];
let resp = self
.http
.post(&url)
.form(&form)
.send()
.await
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("WI: AAD POST {url}: {e}")))?;
let status = resp.status();
let body = resp
.text()
.await
.unwrap_or_else(|_| "<unreadable>".to_string());
if !status.is_success() {
return Err(Error::new(
ErrorKind::Unexpected,
format!("WI: AAD token endpoint returned {status}: {body}"),
));
}
let tr: TokenResponse = serde_json::from_str(&body).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("WI: parse AAD response: {e}; body={body}"),
)
})?;

let refresh_after = Instant::now() + Duration::from_secs(tr.expires_in) - REFRESH_SKEW;
let token = tr.access_token;
*guard = Some(CachedToken {
token: token.clone(),
refresh_after,
});
Ok(token)
}
}

/// `HttpFetch` wrapper that injects the WI bearer token on every outgoing
/// request unless the caller already set its own `Authorization`.
pub(crate) struct WiHttpFetch {
inner: HttpClient,
fetcher: Arc<WiTokenFetcher>,
}

impl WiHttpFetch {
pub(crate) fn new(inner: HttpClient, fetcher: Arc<WiTokenFetcher>) -> Self {
Self { inner, fetcher }
}
}

impl HttpFetch for WiHttpFetch {
async fn fetch(&self, mut req: Request<Buffer>) -> ODResult<Response<HttpBody>> {
// Always overwrite. reqsign's default chain runs an IMDS provider
// (the AKS node-VM identity) before our wrap sees the request; if
// we only inserted when AUTHORIZATION was absent, we would forward
// that wrong-principal token to ADLS and get 403
// AuthorizationPermissionMismatch. Replacing the header guarantees
// every ADLS call carries the federated UAMI we want.
let token = self.fetcher.get_token().await.map_err(|e| {
ODError::new(ODErrorKind::Unexpected, "WI: fetch bearer token failed").set_source(e)
})?;
let value = HeaderValue::from_str(&format!("Bearer {token}")).map_err(|e| {
ODError::new(
ODErrorKind::Unexpected,
"WI: bearer token has invalid header bytes",
)
.set_source(e)
})?;
req.headers_mut().insert(AUTHORIZATION, value);
self.inner.fetch(req).await
}
}

/// Wrap the operator's existing `HttpClient` with WI bearer injection.
///
/// Builds an `HttpClient` whose `HttpFetch` impl prepends an
/// `Authorization: Bearer <token>` to every request before delegating to
/// the wrapped client.
pub(crate) fn wrap_http_client(inner: HttpClient, fetcher: Arc<WiTokenFetcher>) -> HttpClient {
HttpClient::with(WiHttpFetch::new(inner, fetcher))
}

#[allow(dead_code)]
fn _force_link<B>(_: Request<Buffer>, _: Response<B>) {}
2 changes: 2 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ mod storage;
pub use file_io::*;
pub(crate) mod object_cache;

#[cfg(feature = "storage-azdls")]
mod azdls_wi_layer;
#[cfg(feature = "storage-azdls")]
mod storage_azdls;
#[cfg(feature = "storage-fs")]
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::mem::size_of_val;

use std::sync::Arc;

use crate::io::FileIO;
Expand Down Expand Up @@ -62,8 +62,8 @@ impl ObjectCache {
Self {
cache: moka::future::Cache::builder()
.weigher(|_, val: &CachedItem| match val {
CachedItem::ManifestList(item) => size_of_val(item.as_ref()),
CachedItem::Manifest(item) => size_of_val(item.as_ref()),
CachedItem::ManifestList(item) => item.estimated_size(),
CachedItem::Manifest(item) => item.estimated_size(),
} as u32)
.max_capacity(cache_size_bytes)
.build(),
Expand Down
Loading
Loading