Skip to content

Pyspark base64#274

Open
dhruvgupta-meesho wants to merge 7 commits intodevelopfrom
pyspark-base64
Open

Pyspark base64#274
dhruvgupta-meesho wants to merge 7 commits intodevelopfrom
pyspark-base64

Conversation

@dhruvgupta-meesho
Copy link
Contributor

@dhruvgupta-meesho dhruvgupta-meesho commented Feb 4, 2026

🔁 Pull Request Template – BharatMLStack

Please fill out the following sections to help us review your changes efficiently.


📌 Summary

e.g., Adds optimizes Redis fetch latency in online-feature-store, or improves search UI responsiveness in trufflebox-ui.


📂 Modules Affected

  • horizon (Real-time systems / networking)
  • online-feature-store (Feature serving infra)
  • trufflebox-ui (Admin panel / UI)
  • infra (Docker, CI/CD, GCP/AWS setup)
  • docs (Documentation updates)
  • Other: ___________

✅ Type of Change

  • Feature addition
  • Bug fix
  • Infra / build system change
  • Performance improvement
  • Refactor
  • Documentation
  • Other: ___________

📊 Benchmark / Metrics (if applicable)

Summary by CodeRabbit

  • New Features

    • Switched processing from pandas to Spark for scalable decoding
    • Added Arrow and Parquet feature decoding and Spark-aware float formatting
    • Enhanced CLI with Spark options and CSV/JSON output
    • Added schema cache clearing and expanded metadata/format info in decode outputs
  • Documentation

    • Comprehensive README overhaul with examples, API reference, and troubleshooting
  • Chores

    • Version bumped to 0.2.4; replaced pandas dependency with PySpark

@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

Walkthrough

Core library migrated from pandas to Spark: decoding functions now require a SparkSession and return Spark DataFrames. CLI updated to create/use SparkSession and emit Spark-backed outputs. New Arrow/Parquet feature decoders and schema-cache clearing added. Project dependency switched from pandas to pyspark; docs expanded.

Changes

Cohort / File(s) Summary
Core Decoding API
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Reworked decode_mplog and decode_mplog_dataframe to accept a SparkSession and return Spark DataFrames; added Spark-based row/schema construction; expanded exports to include format-specific decoders and utilities.
CLI Enhancement
py-sdk/inference_logging_client/inference_logging_client/cli.py
Added SparkSession creation/config (spark-master), richer CLI args (model-proxy-id, version, format, inference-host, hex/base64, decompress, output CSV/JSON), Spark-based decode flow, float formatting, metadata summary, and ensured SparkSession teardown.
Format-Specific Decoders
py-sdk/inference_logging_client/inference_logging_client/formats.py
Added decode_arrow_features and decode_parquet_features to decode single-entity Arrow IPC and Parquet payloads into feature dicts with robust NULL/missing handling and FormatError wrapping.
Schema and Caching
py-sdk/inference_logging_client/inference_logging_client/io.py
Added clear_schema_cache() to invalidate in-memory schema cache; minor formatting and user-agent update.
Data Types
py-sdk/inference_logging_client/inference_logging_client/types.py
Extended DecodedMPLog dataclass with fields: entities, parent_entity, metadata_byte, compression_enabled, version, and format_type.
Utilities
py-sdk/inference_logging_client/inference_logging_client/utils.py
format_dataframe_floats rewritten to operate on Spark DataFrames; format_float now consistently rounds to 6 decimals; reorganized scalar type mappings and type groups.
Exceptions
py-sdk/inference_logging_client/inference_logging_client/exceptions.py
Added explicit pass statements in several exception classes (InferenceLoggingError, SchemaFetchError, SchemaNotFoundError, DecodeError, FormatError, ProtobufError).
Decoder Styling
py-sdk/inference_logging_client/inference_logging_client/decoder.py
Cosmetic reformatting and minor normalization of struct/unpacking code; no behavioral changes.
Project Configuration
py-sdk/inference_logging_client/pyproject.toml
Bumped version to 0.2.4; replaced pandas>=1.3.0 dependency with pyspark>=3.3.0.
Documentation
py-sdk/inference_logging_client/readme.md
Comprehensive README overhaul with installation, Quick Start (bytes and Spark DataFrame examples), API reference, CLI docs, architecture, troubleshooting, and development guidance.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant CLI/API
    participant SparkSession as Spark\nSession
    participant Decoder
    participant FormatDecoder as Format\nDecoder
    participant Output

    Client->>CLI/API: Provide encoded bytes or Spark DataFrame + config
    CLI/API->>SparkSession: Create SparkSession
    CLI/API->>Decoder: Call decode_mplog(decode_mplog_dataframe) with SparkSession

    Decoder->>Decoder: Decompress (if enabled)
    Decoder->>Decoder: Extract metadata byte and detect format

    loop Per entity/row
        Decoder->>FormatDecoder: Decode features for detected format
        FormatDecoder-->>Decoder: Return feature dict
        Decoder->>Decoder: Build Spark row (include metadata columns)
    end

    Decoder->>SparkSession: Construct Spark DataFrame from rows
    Decoder-->>CLI/API: Return Spark DataFrame

    CLI/API->>CLI/API: Format floats (6 decimals)
    CLI/API->>Output: Write to CSV/JSON or display
    CLI/API->>SparkSession: Stop SparkSession
    Output-->>Client: Deliver formatted output
Loading
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed PR contains no modifications to dynamic configuration files matching application-dyn-*.yml pattern; all changes isolated to py-sdk/inference_logging_client module.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
py-sdk/inference_logging_client/inference_logging_client/cli.py (1)

77-82: ⚠️ Potential issue | 🟡 Minor

Enable strict base64 validation to detect malformed input.

base64.b64decode without the validate=True flag silently ignores invalid characters instead of raising an error. The current error handler cannot catch this silent failure. Normalize whitespace and enable validation to provide clear feedback to users:

🛠️ Suggested change
-            data = base64.b64decode(data)
+            normalized = b"".join(data.split())
+            data = base64.b64decode(normalized, validate=True)
🤖 Fix all issues with AI agents
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 350-363: The metadata preservation and column-ordering arrays use
the hard-coded string "mp_config_id" causing custom mp_config_id_column values
to be dropped; update the code to use the mp_config_id_column parameter instead
of the literal by replacing occurrences of "mp_config_id" in the
row_metadata_columns list and any column-ordering lists/operations (and the
metadata extraction logic that references that key) with the mp_config_id_column
variable (ensure mp_config_id_column has a sensible default like "mp_config_id"
if None so behavior is unchanged). Use the existing symbols
row_metadata_columns, mp_config_id_column, and the functions/blocks that perform
metadata extraction and column reordering to locate and change the hard-coded
string to the parameter.
- Around line 287-319: The _extract_metadata_byte function must be extended to
handle raw binary types and integer-list metadata: update _extract_metadata_byte
to (1) check for bytes/bytearray/memoryview at the top-level and return the
first byte if present, (2) when handling list formats in the parsed and
already-parsed branches, treat a first_item that is an int (0-255) as the
metadata byte and return it, and (3) treat list first_item values that are
bytes-like (bytes/bytearray/memoryview) by decoding or taking the first byte
similarly; ensure existing JSON/string parsing still occurs and keep all
existing exception handling.

In `@py-sdk/inference_logging_client/inference_logging_client/cli.py`:
- Around line 53-55: Update the misleading help and success text to reflect that
Spark writes directories of part-* files and that JSON output is supported:
change the parser.add_argument("--output", "-o", help=...) help to say "Output
directory (Spark creates part-* files; CSV by default or JSON with --json)" and
update the script's success message (the message printed after writing,
referenced near the write/finish logic) to state "Wrote output to directory
'<output>' as CSV" or "as JSON" depending on the --json flag, mentioning the
directory contains Spark part-* files rather than a single file.

In `@py-sdk/inference_logging_client/inference_logging_client/io.py`:
- Around line 48-54: The User-Agent string in the HTTP request construction
(where req = urllib.request.Request(...)) is hard-coded to
"inference-logging-client/0.1.0" and must be updated to the package's current
version; change that header to "inference-logging-client/0.2.0" (or derive it
from the package version constant if one exists) in the Request headers so the
User-Agent matches the package release.
- Around line 31-77: The _fetch_schema_with_retry function can raise TypeError
when max_retries <= 0 because the loop never runs and last_exception remains
None; add an upfront guard in _fetch_schema_with_retry to validate max_retries
(e.g., if max_retries < 1) and either set it to a sane default or raise a
SchemaFetchError/ValueError with a clear message, and as an extra safety ensure
that before the final raise you raise a concrete SchemaFetchError (not None)
with a descriptive message if last_exception is still None so callers of
_fetch_schema_with_retry never get a TypeError.

In `@py-sdk/inference_logging_client/readme.md`:
- Around line 628-639: The fenced code block showing the ASCII table uses triple
backticks with no language (MD040); update that block to include an appropriate
language tag such as text (i.e., change ``` to ```text) so markdownlint passes
and readability improves—search for the ASCII box starting with
"┌─────────────────────────────────────────────────────────────┐" and update its
opening fence; apply the same pattern to any other unlabeled fenced blocks (use
bash for shell snippets, python for code examples).
🧹 Nitpick comments (2)
py-sdk/inference_logging_client/inference_logging_client/formats.py (1)

31-35: Close Arrow stream readers after reading the table.

pa.ipc.open_stream returns a reader that should be closed to avoid resource leakage in long-lived processes. A context manager (or explicit close) makes this safer.

♻️ Suggested change
-    try:
-        reader = pa.ipc.open_stream(io.BytesIO(encoded_bytes))
-        table = reader.read_all()
-    except Exception as e:
+    try:
+        with pa.ipc.open_stream(io.BytesIO(encoded_bytes)) as reader:
+            table = reader.read_all()
+    except Exception as e:
         raise FormatError(f"Failed to read Arrow IPC data: {e}")
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

266-280: Avoid an extra Spark action before collect.

df.count() triggers a full job and df.collect() immediately triggers another. You can collect once and check emptiness.

♻️ Suggested change
-    if df.count() == 0:
-        from pyspark.sql.types import StructType
-        return spark.createDataFrame([], StructType([]))
-
     # Collect to driver for processing
     # Note: For large datasets, consider using mapInPandas or processing in partitions
     rows = df.collect()
+    if not rows:
+        from pyspark.sql.types import StructType
+        return spark.createDataFrame([], StructType([]))

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 4, 2026
@readytoreview
Copy link

readytoreview bot commented Mar 10, 2026

PR Validation Failed

PR description validation failed

Issues found:

  • no PR type selected
  • no test selection made (yes/no/need help)
  • missing KRD section

Please fill out the form to fix this: Open Form

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
py-sdk/inference_logging_client/inference_logging_client/utils.py (1)

138-151: ⚠️ Potential issue | 🟠 Major

Keep float rounding out of the core decode path.

py-sdk/inference_logging_client/inference_logging_client/decoder.py uses this helper for FP16/32/64 scalar and vector decoding, so the new round(value, 6) changes the decoded payload itself, not just its presentation. That makes the library lossy before callers opt into any output formatting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/utils.py` around
lines 138 - 151, The helper format_float currently mutates decoded numeric
values by rounding to 6 decimals; revert format_float to return the original
float unchanged (remove round(value, 6) and special-case handling should still
return inf/nan as-is) so decoding paths in decoder.py (used for FP16/32/64
scalar and vector decoding) remain lossless; if human-readable output is needed,
add a separate function (e.g., format_float_str or format_float_for_display)
that accepts a float and returns a 6-decimal string representation without
scientific notation, and update any presentation-only callers to use that new
function instead of format_float.
♻️ Duplicate comments (1)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

287-319: ⚠️ Potential issue | 🟠 Major

Handle binary/list metadata before falling back to 0.

_extract_metadata_byte() still only understands JSON strings and lists of base64 strings. In the Spark path, metadata can already arrive as bytes/bytearray/memoryview or as a list of ints, and those cases currently return 0, which forces version/format detection to 0/proto and sends the row through the wrong schema/decoder path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 287 - 319, _extract_metadata_byte currently only handles JSON strings and
lists of base64 strings and falls back to 0 for bytes-like inputs or lists of
ints; update this function to detect and handle bytes/bytearray/memoryview by
returning the first byte (decoded[0]) and handle already-parsed lists of
integers by returning the first element (ensuring it's in 0-255), placing those
checks before the final return 0 so Spark-provided binary or int-list metadata
is correctly interpreted instead of forcing the proto fallback.
🧹 Nitpick comments (2)
py-sdk/inference_logging_client/pyproject.toml (1)

7-7: Consider documenting the breaking API change.

Replacing pandas with PySpark is a breaking change for existing users. While a minor version bump is acceptable under semver for pre-1.0.0 releases, ensure release notes or a CHANGELOG clearly communicate this migration so users can prepare accordingly.

Also, the version jumps directly from 0.1.0 to 0.2.3—verify this is intentional (e.g., aligning with an internal versioning scheme) rather than an oversight.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/pyproject.toml` at line 7, The project has a
breaking API change swapping pandas for PySpark and an unexpected version jump
in pyproject.toml (version = "0.2.3"); add a clear entry to your CHANGELOG.md or
release notes that states the breaking change (explicitly mention removal of
pandas and addition of pyspark and any API surface differences in
functions/classes that consume DataFrame inputs), link to migration
guidance/examples, and confirm the version in pyproject.toml is intentional (or
correct it) so release metadata matches your intended semantic/versioning
policy; update any packaging/dependency lists and README migration notes to
reference the specific change and the affected public symbols (the
pyproject.toml version field and any modules that consumed pandas DataFrame
types).
py-sdk/inference_logging_client/inference_logging_client/types.py (1)

37-49: Declare _encoded_features on DecodedMPLog instead of attaching it later.

py-sdk/inference_logging_client/inference_logging_client/io.py mutates the dataclass with result._encoded_features, and readers already need getattr(..., "_encoded_features", []) to stay safe. Modeling that private field here keeps the internal contract typed and removes the hidden mutation.

Suggested change
 `@dataclass`
 class DecodedMPLog:
     """Container for decoded MPLog data."""
 
     user_id: str = ""
     tracking_id: str = ""
     model_proxy_config_id: str = ""
     entities: list[str] = field(default_factory=list)
     parent_entity: list[str] = field(default_factory=list)
     metadata_byte: int = 0
     compression_enabled: bool = False
     version: int = 0
     format_type: int = 0  # 0=proto, 1=arrow, 2=parquet
+    _encoded_features: list[bytes] = field(default_factory=list, repr=False)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/types.py` around
lines 37 - 49, The DecodedMPLog dataclass is being mutated elsewhere with
result._encoded_features and readers defensively use getattr(...,
"_encoded_features", []); declare that private attribute on DecodedMPLog to make
the contract explicit and typed. Add a field named _encoded_features to the
dataclass (type list[bytes] or list[Any]) with a default_factory=list and set
repr=False and init=False so it’s not part of the public init/representation;
keep using the existing dataclasses.field import and update type hints
accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 441-458: The parent_entity logic collapses multi-value parent
lists into a single string and then reuses that same value for every decoded
row; update the parent_entity handling so that when parent_val is a list you
align elements to each emitted entity row instead of flattening—if parent_val is
a list and its length equals the number of emitted entities, select the element
corresponding to the current row/index (use the same index used for entity_id
decoding), otherwise fall back to the existing single-value or stringified-list
behavior; modify the code that assigns parent_entity_val (the block handling
parent_val) and the subsequent write that emits parent_entity into decoded rows
so it uses the per-row parent_entity_val rather than a single shared value.

In `@py-sdk/inference_logging_client/inference_logging_client/formats.py`:
- Around line 170-186: The block handling sized features currently treats size
== 0 as missing and sets result[feature.name] = None; instead remove that
special-case so zero-length sized fields are decoded as valid empty values. Keep
the existing checks for remaining bytes (<2 and <size), but delete the if size
== 0 branch and let value_bytes = reader.read(size) and result[feature.name] =
decode_vector_or_string(value_bytes, feature.feature_type) handle empty
payloads; reference: is_sized_type, reader.read_uint16, reader.read(size),
decode_vector_or_string, and feature.feature_type.

In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 31: The pyproject dependency "pyspark>=3.3.0" conflicts with declared
Python 3.11–3.12 support; update pyproject.toml to use Python-version-specific
markers instead of a single broad requirement: keep "pyspark>=3.3.0" for python
<3.11, require "pyspark>=3.4" for python >=3.11 and <3.12, and require
"pyspark>=4.0" for python >=3.12 (use environment markers in the dependencies
section to express these constraints so declared Python classifiers remain
accurate).

In `@py-sdk/inference_logging_client/readme.md`:
- Around line 112-116: The example output comments for DataFrame.show() are
using pandas-style rows with indices; update them to actual PySpark show() ASCII
table format for both occurrences (the first df.show() example and the later
decoded_df.show() example), i.e., replace the commented pandas-style block (with
indices like "0  prod_123 ...") with an ASCII table header and rows matching
Spark's show() output (columns: entity_id, feature_price, feature_category,
embedding_vector) so the examples reflect real Spark DataFrame.show() rendering.

---

Outside diff comments:
In `@py-sdk/inference_logging_client/inference_logging_client/utils.py`:
- Around line 138-151: The helper format_float currently mutates decoded numeric
values by rounding to 6 decimals; revert format_float to return the original
float unchanged (remove round(value, 6) and special-case handling should still
return inf/nan as-is) so decoding paths in decoder.py (used for FP16/32/64
scalar and vector decoding) remain lossless; if human-readable output is needed,
add a separate function (e.g., format_float_str or format_float_for_display)
that accepts a float and returns a 6-decimal string representation without
scientific notation, and update any presentation-only callers to use that new
function instead of format_float.

---

Duplicate comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 287-319: _extract_metadata_byte currently only handles JSON
strings and lists of base64 strings and falls back to 0 for bytes-like inputs or
lists of ints; update this function to detect and handle
bytes/bytearray/memoryview by returning the first byte (decoded[0]) and handle
already-parsed lists of integers by returning the first element (ensuring it's
in 0-255), placing those checks before the final return 0 so Spark-provided
binary or int-list metadata is correctly interpreted instead of forcing the
proto fallback.

---

Nitpick comments:
In `@py-sdk/inference_logging_client/inference_logging_client/types.py`:
- Around line 37-49: The DecodedMPLog dataclass is being mutated elsewhere with
result._encoded_features and readers defensively use getattr(...,
"_encoded_features", []); declare that private attribute on DecodedMPLog to make
the contract explicit and typed. Add a field named _encoded_features to the
dataclass (type list[bytes] or list[Any]) with a default_factory=list and set
repr=False and init=False so it’s not part of the public init/representation;
keep using the existing dataclasses.field import and update type hints
accordingly.

In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 7: The project has a breaking API change swapping pandas for PySpark and
an unexpected version jump in pyproject.toml (version = "0.2.3"); add a clear
entry to your CHANGELOG.md or release notes that states the breaking change
(explicitly mention removal of pandas and addition of pyspark and any API
surface differences in functions/classes that consume DataFrame inputs), link to
migration guidance/examples, and confirm the version in pyproject.toml is
intentional (or correct it) so release metadata matches your intended
semantic/versioning policy; update any packaging/dependency lists and README
migration notes to reference the specific change and the affected public symbols
(the pyproject.toml version field and any modules that consumed pandas DataFrame
types).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 07c945a1-6bb7-4cf0-80b3-cb4ed974996a

📥 Commits

Reviewing files that changed from the base of the PR and between 2991580 and 7c6e3b6.

📒 Files selected for processing (10)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/inference_logging_client/cli.py
  • py-sdk/inference_logging_client/inference_logging_client/decoder.py
  • py-sdk/inference_logging_client/inference_logging_client/exceptions.py
  • py-sdk/inference_logging_client/inference_logging_client/formats.py
  • py-sdk/inference_logging_client/inference_logging_client/io.py
  • py-sdk/inference_logging_client/inference_logging_client/types.py
  • py-sdk/inference_logging_client/inference_logging_client/utils.py
  • py-sdk/inference_logging_client/pyproject.toml
  • py-sdk/inference_logging_client/readme.md

Comment on lines +441 to +458
# Process parent_entity
parent_entity_val = None
if "parent_entity" in df_columns and row["parent_entity"] is not None:
parent_val = row["parent_entity"]
if isinstance(parent_val, str):
try:
parent_val = json.loads(parent_val)
except (json.JSONDecodeError, ValueError):
parent_val = [parent_val]
if isinstance(parent_val, list):
if len(parent_val) == 1:
parent_entity_val = parent_val[0]
elif len(parent_val) > 1:
parent_entity_val = str(parent_val)
else:
parent_entity_val = None
else:
parent_entity_val = parent_val
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep parent_entity aligned with each emitted entity row.

Lines 441-458 collapse a multi-value parent_entity payload into a single string, and Lines 504-506 then write that same value to every decoded row. For multi-entity logs, the entity_idparent_entity mapping is no longer correct.

Also applies to: 504-506

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 441 - 458, The parent_entity logic collapses multi-value parent lists into
a single string and then reuses that same value for every decoded row; update
the parent_entity handling so that when parent_val is a list you align elements
to each emitted entity row instead of flattening—if parent_val is a list and its
length equals the number of emitted entities, select the element corresponding
to the current row/index (use the same index used for entity_id decoding),
otherwise fall back to the existing single-value or stringified-list behavior;
modify the code that assigns parent_entity_val (the block handling parent_val)
and the subsequent write that emits parent_entity into decoded rows so it uses
the per-row parent_entity_val rather than a single shared value.

Comment on lines +461 to +508
for i, feature_item in enumerate(features_list):
# Get entity_id from entities array or generate synthetic
entity_id = f"entity_{i}"
if entities_val and i < len(entities_val):
entity_id = str(entities_val[i])

# Get and decode base64 encoded_features
encoded_features_b64 = feature_item.get("encoded_features", "")
if not encoded_features_b64:
continue

try:
encoded_bytes = base64.b64decode(encoded_features_b64)
except (ValueError, TypeError):
continue

if len(encoded_bytes) == 0:
continue

# Attempt decompression if enabled
working_data = encoded_bytes
if decompress:
working_data = _decompress_zstd(encoded_bytes)

# Decode features based on format type
if detected_format == Format.ARROW:
decoded_features = decode_arrow_features(working_data, feature_schema)
elif detected_format == Format.PARQUET:
decoded_features = decode_parquet_features(working_data, feature_schema)
else:
# Default to proto format
decoded_features = decode_proto_features(working_data, feature_schema)

result_row = {"entity_id": entity_id}
result_row.update(
{k: v for k, v in decoded_features.items() if k not in _reserved_columns}
)

# Add metadata columns
for col in row_metadata_columns:
if col in df_columns:
result_row[col] = row[col]

# Set parent_entity
if parent_entity_val is not None:
result_row["parent_entity"] = parent_entity_val

all_decoded_rows.append(result_row)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Arrow/Parquet decoding here drops rows when a blob contains multiple entities.

py-sdk/inference_logging_client/inference_logging_client/formats.py already treats Arrow/Parquet encoded_features as blobs that can contain multiple rows, but this loop decodes each item with the single-entity helpers and emits exactly one result_row. If features holds one multi-row Arrow/Parquet blob, every entity after the first is lost.

Comment on lines 170 to 186
if is_sized_type(feature.feature_type):
# Read 2-byte size prefix
if reader.remaining() < 2:
result[feature.name] = None
continue

size = reader.read_uint16()
if size == 0:
result[feature.name] = None
continue

if reader.remaining() < size:
result[feature.name] = None
continue

value_bytes = reader.read(size)
result[feature.name] = decode_vector_or_string(value_bytes, feature.feature_type)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not collapse zero-length sized fields to None.

Lines 176-186 return None for every sized field with size == 0, but decode_vector_or_string() already handles empty payloads as valid empty values. That makes empty strings/vectors indistinguishable from missing features in proto logs.

Suggested change
             if is_sized_type(feature.feature_type):
                 # Read 2-byte size prefix
                 if reader.remaining() < 2:
                     result[feature.name] = None
                     continue
 
                 size = reader.read_uint16()
-                if size == 0:
-                    result[feature.name] = None
-                    continue
-
                 if reader.remaining() < size:
                     result[feature.name] = None
                     continue
 
-                value_bytes = reader.read(size)
+                value_bytes = reader.read(size) if size > 0 else b""
                 result[feature.name] = decode_vector_or_string(value_bytes, feature.feature_type)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if is_sized_type(feature.feature_type):
# Read 2-byte size prefix
if reader.remaining() < 2:
result[feature.name] = None
continue
size = reader.read_uint16()
if size == 0:
result[feature.name] = None
continue
if reader.remaining() < size:
result[feature.name] = None
continue
value_bytes = reader.read(size)
result[feature.name] = decode_vector_or_string(value_bytes, feature.feature_type)
if is_sized_type(feature.feature_type):
# Read 2-byte size prefix
if reader.remaining() < 2:
result[feature.name] = None
continue
size = reader.read_uint16()
if reader.remaining() < size:
result[feature.name] = None
continue
value_bytes = reader.read(size) if size > 0 else b""
result[feature.name] = decode_vector_or_string(value_bytes, feature.feature_type)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/formats.py` around
lines 170 - 186, The block handling sized features currently treats size == 0 as
missing and sets result[feature.name] = None; instead remove that special-case
so zero-length sized fields are decoded as valid empty values. Keep the existing
checks for remaining bytes (<2 and <size), but delete the if size == 0 branch
and let value_bytes = reader.read(size) and result[feature.name] =
decode_vector_or_string(value_bytes, feature.feature_type) handle empty
payloads; reference: is_sized_type, reader.read_uint16, reader.read(size),
decode_vector_or_string, and feature.feature_type.


dependencies = [
"pandas>=1.3.0",
"pyspark>=3.3.0",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

PySpark 3.3 3.4 3.5 Python version support

💡 Result:

PySpark (Apache Spark) Python version support

Spark / PySpark version Docs say “Python Versions Supported”
3.3.x Python 3.7+ (apache.uib.no)
3.4.x Python 3.7+ (but Python 3.7 support is deprecated starting in Spark 3.4.0) (apache.uib.no)
3.5.x Python 3.8+ (spark.apache.org)

Implication: Python 3.7 is supported in Spark 3.3 and 3.4 (deprecated in 3.4.0), and is effectively dropped starting Spark 3.5 (min becomes 3.8). (apache.uib.no)

Citations:


🌐 Web query:

PySpark 3.3.0 Python 3.11 3.12 compatibility support matrix

💡 Result:

PySpark 3.3.0 vs Python 3.11 / 3.12 (compatibility matrix)

PySpark version Python 3.11 Python 3.12
3.3.0 Not supported (3.11 support was added in Spark 3.4.0) Not supported (Spark 3.x support typically tops out at Python 3.11; Python 3.12 support is associated with Spark 4.x)

Sources: Spark 3.3.0 docs state “Python 3.7+” for that release line, but Spark’s explicit Python 3.11 support appears in the Spark 3.4.0 release notes (“Support Python 3.11”). [1], [2] Spark 3.x support capping at Python 3.11 (i.e., no 3.12) is reflected in vendor support matrices (example: Cloudera’s Spark 3.5.0 max Python 3.11) and Spark 4’s stated move to support Python 3.12+. [3], [4]

References

Citations:


🌐 Web query:

PySpark version compatibility chart Python 3.11 3.12

💡 Result:

PySpark ↔ Python compatibility (focus: Python 3.11 / 3.12)

PySpark (Spark) line Official Python support (docs / PyPI metadata) Python 3.11 Python 3.12
3.5.x Docs say Python ≥ 3.8; PyPI classifiers list 3.8–3.11 (and “Requires: Python ≥3.8”). [1] Yes [1] No (not listed/declared on PyPI for 3.5.7) [1]
4.0.x Docs say Python ≥ 3.9; PyPI classifiers list 3.9–3.13 (and “Requires: Python ≥3.9”). [2][3] Yes [2] Yes [2]
4.1.x Upgrade guide notes Spark 4.0 dropped Python 3.8; Spark 4.1 dropped Python 3.9 (so 3.11/3.12 remain supported). [4] Yes [4] Yes [4]

Practical takeaway: If you need Python 3.12, use PySpark 4.x; for Python 3.11, PySpark 3.5.x or 4.x both work (officially). [1][2]

Sources
[1] PyPI – pyspark 3.5.7 (Requires-Python & classifiers) (pypi.org)
[2] PyPI – pyspark 4.0.0 (Requires-Python & classifiers incl. Python 3.12) (pypi.org)
[3] Spark docs – PySpark 4.0.0 Installation (“Python 3.9 and above”) (spark.apache.org)
[4] Spark docs – PySpark upgrade guide (4.0 drops Py 3.8; 4.1 drops Py 3.9) (spark.apache.org)

Citations:


Fix PySpark version constraint to match declared Python 3.11–3.12 support.

The project declares support for Python 3.8–3.12, but pyspark>=3.3.0 has compatibility issues:

  • PySpark 3.3.0 supports only Python 3.7–3.10 (no 3.11 or 3.12)
  • Python 3.11 requires PySpark ≥3.4
  • Python 3.12 requires PySpark ≥4.0 (not 3.5)

Either:

  1. Bump the minimum to pyspark>=4.0 to ensure Python 3.12 support, or
  2. Remove Python 3.11–3.12 from classifiers if staying on Spark 3.x, or
  3. Use environment markers to constrain versions per Python version.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/pyproject.toml` at line 31, The pyproject
dependency "pyspark>=3.3.0" conflicts with declared Python 3.11–3.12 support;
update pyproject.toml to use Python-version-specific markers instead of a single
broad requirement: keep "pyspark>=3.3.0" for python <3.11, require
"pyspark>=3.4" for python >=3.11 and <3.12, and require "pyspark>=4.0" for
python >=3.12 (use environment markers in the dependencies section to express
these constraints so declared Python classifiers remain accurate).

Comment on lines +112 to +116
# View the results
df.show()
# entity_id feature_price feature_category embedding_vector
# 0 prod_123 29.99 5 [0.1, 0.2, ...]
# 1 prod_456 49.99 3 [0.3, 0.4, ...]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use actual PySpark show() output in these examples.

These snippets call DataFrame.show(), but the commented output is pandas-style with row indices. In Spark, show() renders an ASCII table, so the current examples are misleading.

📝 Suggested doc fix
-#    entity_id  feature_price  feature_category  embedding_vector
-# 0  prod_123          29.99                 5  [0.1, 0.2, ...]
-# 1  prod_456          49.99                 3  [0.3, 0.4, ...]
+# +---------+-------------+----------------+-------------------+
+# |entity_id|feature_price|feature_category|embedding_vector   |
+# +---------+-------------+----------------+-------------------+
+# |prod_123 |29.99        |5               |[0.1, 0.2, ...]    |
+# |prod_456 |49.99        |3               |[0.3, 0.4, ...]    |
+# +---------+-------------+----------------+-------------------+

Apply the same adjustment to the later decoded_df.show() example as well.

Also applies to: 143-146

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/readme.md` around lines 112 - 116, The
example output comments for DataFrame.show() are using pandas-style rows with
indices; update them to actual PySpark show() ASCII table format for both
occurrences (the first df.show() example and the later decoded_df.show()
example), i.e., replace the commented pandas-style block (with indices like "0 
prod_123 ...") with an ASCII table header and rows matching Spark's show()
output (columns: entity_id, feature_price, feature_category, embedding_vector)
so the examples reflect real Spark DataFrame.show() rendering.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
py-sdk/inference_logging_client/inference_logging_client/__init__.py (1)

515-517: Empty DataFrame loses schema information.

When no rows are decoded, the function returns an empty DataFrame with StructType([]). This differs from decode_mplog which builds a proper schema (lines 192-199). Consider preserving at least the metadata columns schema for consistency with non-empty results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 515 - 517, The empty-DataFrame branch returns StructType([]), losing
expected schema; change the branch that checks all_decoded_rows to construct and
use the same schema as non-empty results (the metadata/columns schema created in
decode_mplog at lines ~192-199) instead of StructType([]). Concretely, reuse or
replicate the schema-building logic (e.g., the metadata_columns or schema
construction code used by decode_mplog) to create a StructType matching the
expected columns, then return spark.createDataFrame([], that_schema) so empty
results preserve the same column types and metadata.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Line 51: Update the module-level __version__ variable to match the package
version declared in pyproject.toml (change __version__ from "0.1.0" to "0.2.4")
so the value in inference_logging_client.__init__.py and pyproject.toml stay
synchronized; locate the __version__ assignment in the file and set it to
"0.2.4".
- Line 285: The type annotation for schema_cache uses Python 3.9+ builtin
generics (dict[tuple[str, int], list[FeatureInfo]]) which will fail on Python
3.8; update the file to use a 3.8-compatible approach by either adding "from
__future__ import annotations" at the top of this module or changing the
annotation to typing.Dict[typing.Tuple[str, int], typing.List[FeatureInfo]]
(referencing the variable name schema_cache and the FeatureInfo type) so the
code runs under Python 3.8.

---

Nitpick comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 515-517: The empty-DataFrame branch returns StructType([]), losing
expected schema; change the branch that checks all_decoded_rows to construct and
use the same schema as non-empty results (the metadata/columns schema created in
decode_mplog at lines ~192-199) instead of StructType([]). Concretely, reuse or
replicate the schema-building logic (e.g., the metadata_columns or schema
construction code used by decode_mplog) to create a StructType matching the
expected columns, then return spark.createDataFrame([], that_schema) so empty
results preserve the same column types and metadata.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bb6b4cf0-9b8b-4bc4-b7d1-1592179c4bed

📥 Commits

Reviewing files that changed from the base of the PR and between 7c6e3b6 and feb90b1.

📒 Files selected for processing (2)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/pyproject.toml

from .types import FORMAT_TYPE_MAP, DecodedMPLog, FeatureInfo, Format
from .utils import format_dataframe_floats, get_format_name, unpack_metadata_byte

__version__ = "0.1.0"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Version mismatch with pyproject.toml.

__version__ is "0.1.0" but pyproject.toml declares version = "0.2.4". Update to keep them synchronized.

-__version__ = "0.1.0"
+__version__ = "0.2.4"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` at line
51, Update the module-level __version__ variable to match the package version
declared in pyproject.toml (change __version__ from "0.1.0" to "0.2.4") so the
value in inference_logging_client.__init__.py and pyproject.toml stay
synchronized; locate the __version__ assignment in the file and set it to
"0.2.4".

# Pre-fetch schemas for unique (mp_config_id, version) combinations to avoid
# redundant HTTP requests during row iteration.
# Key: (mp_config_id, version) only - host/path intentionally excluded as schemas are canonical
schema_cache: dict[tuple[str, int], list[FeatureInfo]] = {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Type annotation syntax incompatible with Python 3.8.

The dict[tuple[str, int], list[FeatureInfo]] syntax requires Python 3.9+. Since the project declares requires-python = ">=3.8", this will raise TypeError at runtime on Python 3.8.

Proposed fix

Either add at the top of the file:

from __future__ import annotations

Or use typing module imports:

-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
...
-    schema_cache: dict[tuple[str, int], list[FeatureInfo]] = {}
+    schema_cache: Dict[Tuple[str, int], List[FeatureInfo]] = {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` at line
285, The type annotation for schema_cache uses Python 3.9+ builtin generics
(dict[tuple[str, int], list[FeatureInfo]]) which will fail on Python 3.8; update
the file to use a 3.8-compatible approach by either adding "from __future__
import annotations" at the top of this module or changing the annotation to
typing.Dict[typing.Tuple[str, int], typing.List[FeatureInfo]] (referencing the
variable name schema_cache and the FeatureInfo type) so the code runs under
Python 3.8.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant