Lean 4 bindings for Apache Arrow, the universal columnar data format. Provides type-safe access to Arrow arrays, schemas, and streams with Parquet file writing support.
⚠️ Warning: this is work in progress, it is still incomplete and itmaywill contain errors
Parts of this repository were created with assistance from AI-powered coding tools, specifically Claude by Anthropic. Not all generated code may have been reviewed. Generated code may have been adapted by the author. Design choices, architectural decisions, and final validation were performed independently by the author.
Warning: This is a work in progress and still unstable.
Arrow-Lean is a pure C implementation of Apache Arrow for Lean 4 - no C++ dependencies required. This provides:
- Simplified Build: Just a C99 compiler, no Apache Arrow C++ library needed
- Easy Deployment: Minimal system dependencies
- High-performance data processing with cache-friendly columnar layout
- Zero-copy interoperability with other Arrow-enabled systems (Python/pandas, Rust, Go, etc.)
- Type-safe data access with Lean's strong type system
- Typed column builders for programmatic Arrow array construction
- Pure C Parquet writer for persistent columnar storage
- IPC serialization for storage and transmission of Arrow data
Apache Arrow is a universal in-memory format for tabular ("columnar") data, enabling zero-copy sharing of Arrow data between independent runtimes and components running in the same process.
See docs/Arrow.md for details on the Arrow memory format. See docs/Architecture.md for the implementation architecture.
| Component | Status |
|---|---|
| Arrow Schema | Implemented (pure C) |
| Arrow Array | Implemented (pure C) |
| Arrow Stream | Implemented (pure C) |
| Data Access | Implemented (Int64, UInt64, Float64, Float32, Int32, String, Bool) |
| Typed Builders | Implemented (Int64, Float64, String, Bool, Timestamp) |
| Buffer Management | Implemented (pure C) |
| ArrowM Monad | Implemented |
| Typed Errors | Implemented |
| IPC Serialization | Implemented (pure C) |
| Parquet Writer | Implemented (pure C, no compression) |
| Parquet Reader | Not implemented |
| ToArrow Typeclasses | Implemented |
arrow-lean/
├── ArrowLean/ # Main library
│ ├── Ops.lean # Core types: ArrowSchema, ArrowArray, ArrowType
│ ├── FFI.lean # Arrow FFI bindings
│ ├── Utils.lean # Type conversions, iterators
│ ├── Memory.lean # Resource management & finalization
│ ├── Error.lean # Typed error handling
│ ├── Monad.lean # ArrowM monad for table operations
│ ├── IPC.lean # IPC serialization
│ ├── BuilderFFI.lean # Typed builder FFI bindings
│ ├── TypedBuilders.lean # High-level builder API
│ ├── ToArrow.lean # ToArrowColumn/ToArrowBatch typeclasses
│ ├── Parquet.lean # Parquet types and options
│ └── ParquetFFI.lean # Parquet FFI bindings
├── ArrowLean.lean # Module exports
├── arrow/ # Pure C implementation
│ ├── arrow_c_abi.h # Arrow C Data Interface structs
│ ├── arrow_builders.h # Typed builder declarations
│ ├── arrow_builders.c # Builder implementations
│ ├── arrow_schema.c # Schema implementation
│ ├── arrow_array.c # Array implementation
│ ├── arrow_stream.c # Stream implementation
│ ├── arrow_data_access.c # Type-specific value extraction
│ ├── arrow_buffer.c # Buffer management
│ ├── arrow_ipc.c # IPC serialization
│ ├── parquet_writer_impl.h # Parquet writer declarations
│ ├── parquet_writer_impl.c # Pure C Parquet writer
│ ├── parquet_reader_writer.c # High-level Parquet API
│ ├── lean_builder_wrapper.c # Builder FFI exports
│ ├── lean_arrow_wrapper.c # Core Arrow FFI exports
│ ├── lean_arrow_ipc.c # IPC FFI exports
│ └── lean_parquet_wrapper.c # Parquet FFI exports
├── Examples/
│ ├── Main.lean
│ ├── ParquetExample.lean
│ └── TypedBuildersExample.lean # Typed builders & ToArrow examples
├── docs/
│ ├── Arrow.md # Arrow format documentation
│ ├── Architecture.md # Implementation architecture
│ └── Parquet.md # Parquet usage guide
├── lakefile.lean
└── lean-toolchain # Lean version (v4.27.0-rc1)
- Lean 4: v4.27.0-rc1 or compatible
- C compiler: gcc or clang with C99 support
- zlog: Logging library
Ubuntu/Debian:
sudo apt-get install libzlog-devmacOS:
brew install zlogNo C++ libraries or Apache Arrow C++ installation required.
zlogLean- Structured loggingCli- CLI argument parsing
# Build the library
lake build ArrowLean
# Run examples
lake exe examplesAdd to your lakefile.lean:
require arrowLean from git
"https://github.com/your-org/arrow-lean" @ "main"The ArrowM monad provides typed error handling and composable table operations:
import ArrowLean
open Arrow
-- Define operations on a table
def computeStats : ArrowM (Float × Nat) := do
let total ← sumFloat64
let count ← countNonNull getFloat64At
return (total, count)
-- Run with automatic error handling
def main : IO Unit := do
let schema ← ArrowSchema.init "g" -- float64
let array ← ArrowArray.init 1000
match ← withTable schema array computeStats with
| .ok (total, count) =>
IO.println s!"Total: {total}, Count: {count}"
| .error e =>
IO.println s!"Error: {e}"
schema.release
array.release-- Error kinds for precise error handling
inductive Arrow.ErrorKind
| nullAccess -- Attempted to read null value
| typeMismatch -- Column type doesn't match
| indexOutOfBounds -- Array index out of range
| columnNotFound -- Named column doesn't exist
| invalidSchema -- Schema is malformed
| allocationFailed -- Memory allocation failed
| serializationFailed
| ioError
| other
-- Pattern match on errors
match result with
| .error { kind := .indexOutOfBounds, .. } => handleBounds
| .error { kind := .nullAccess, .. } => handleNull
| .ok value => process value-- Row access with bounds checking
getInt64At : Nat → ArrowM (Option Int64)
getFloat64At : Nat → ArrowM (Option Float)
getStringAt : Nat → ArrowM (Option String)
getBoolAt : Nat → ArrowM (Option Bool)
-- Iteration
mapRows : (Nat → ArrowM α) → ArrowM (Array α)
forEachRow : (Nat → ArrowM Unit) → ArrowM Unit
foldRows : β → (β → Nat → ArrowM β) → ArrowM β
-- Filtering
filterRowIndices : (Nat → ArrowM Bool) → ArrowM (Array Nat)
-- Aggregation
sumInt64 : ArrowM Int64
sumFloat64 : ArrowM Float
avgFloat64 : ArrowM (Option Float)
countNonNull : (Nat → ArrowM (Option α)) → ArrowM NatFor simple use cases, you can also use the direct IO functions:
import ArrowLean
def processArray (array : ArrowArray) : IO Unit := do
for i in [:array.length.toNat] do
let idx := i.toUSize
match ← array.getFloat64 idx with
| some v => Zlog.debug s!"[{idx}] = {v}"
| none => Zlog.debug s!"[{idx}] = null"def processStream (stream : ArrowArrayStream) : IO Unit := do
stream.forEachArray fun array => do
Zlog.info s!"Processing batch with {array.length} rows"def createSchema : IO ArrowSchema := do
-- Primitive type schema
let priceSchema ← ArrowSchema.forType ArrowType.float64 "price"
-- Struct schema for tables
let tableSchema ← ArrowSchema.struct "trades"
return tableSchemaThe typed builder API provides the most efficient way to create Arrow arrays from Lean data:
import ArrowLean
-- Build an Int64 column
let values : Array Int64 := #[100, 200, 300, 400, 500]
match ← buildInt64Column values with
| some arr =>
IO.println s!"Created column with {arr.length} values"
arr.release
| none => IO.println "Failed"
-- Build a String column with nulls
let optStrings : Array (Option String) := #[some "hello", none, some "world"]
match ← buildOptStringColumn optStrings with
| some arr =>
IO.println s!"Created column with {arr.nullCount} nulls"
arr.release
| none => IO.println "Failed"-- Create schema with SchemaBuilder
match ← SchemaBuilder.create 3 with
| some sb =>
let _ ← sb.addInt64 "id" false -- non-nullable
let _ ← sb.addString "name" true -- nullable
let _ ← sb.addFloat64 "score" false -- non-nullable
match ← sb.finish with
| some schema =>
-- Build columns...
let batch ← RecordBatch.create schema columns rowCount
-- Use batch...
| none => ...
| none => ...Define how your custom types map to Arrow format:
-- Define a custom type
structure Trade where
timestamp : Int64
symbol : Option String
price : Float
quantity : Nat
-- Implement ToArrowBatch
instance : ToArrowBatch Trade where
columnSpecs := #[
ColumnSpec.timestamp "timestamp" "UTC" false,
ColumnSpec.string "symbol" true,
ColumnSpec.float64 "price" false,
ColumnSpec.int64 "quantity" false
]
buildColumns trades := do
let tsCol ← buildTimestampColumn (trades.map (·.timestamp))
let symCol ← buildOptStringColumn (trades.map (·.symbol))
let priceCol ← buildFloat64Column (trades.map (·.price))
let qtyCol ← buildInt64Column (trades.map (·.quantity.toInt64))
match tsCol, symCol, priceCol, qtyCol with
| some ts, some sym, some price, some qty =>
return some #[ts, sym, price, qty]
| _, _, _, _ => return none
-- Use it
let trades : Array Trade := #[...]
-- Convert to RecordBatch
match ← toRecordBatch trades with
| some batch => ...
-- Write directly to Parquet
let ok ← writeRecordsToParquet "trades.parquet" trades
-- Or write to IPC format
let ok ← writeRecordsToIPC "trades.arrow" trades
-- Serialize to bytes
match ← serializeRecordsToIPC trades with
| some bytes => -- Send bytes over network, store in Redis, etc.
| none => ...| Type | Format | Description |
|---|---|---|
null |
n |
Null values |
boolean |
b |
Boolean |
int8..int64 |
c, s, i, l |
Signed integers |
uint8..uint64 |
C, S, I, L |
Unsigned integers |
float16..float64 |
e, f, g |
Floating-point |
string |
u |
UTF-8 text |
binary |
z |
Binary data |
timestamp |
ts{unit}:UTC |
Timestamps |
date32, date64 |
tdD, tdm |
Date values |
list |
+l |
List/array |
struct |
+s |
Structured records |
-- Data access (Option for nullable values)
ArrowArray.getInt64 : ArrowArray → USize → IO (Option Int64)
ArrowArray.getUInt64 : ArrowArray → USize → IO (Option UInt64)
ArrowArray.getFloat64 : ArrowArray → USize → IO (Option Float)
ArrowArray.getFloat32 : ArrowArray → USize → IO (Option Float)
ArrowArray.getInt32 : ArrowArray → USize → IO (Option Int32)
ArrowArray.getString : ArrowArray → USize → IO (Option String)
ArrowArray.getBool : ArrowArray → USize → IO (Option Bool)
-- Iteration
ArrowArray.forEachInt64 : ArrowArray → (USize → Option Int64 → IO Unit) → IO Unit
ArrowArray.forEachFloat64 : ArrowArray → (USize → Option Float → IO Unit) → IO Unit
ArrowArray.forEachString : ArrowArray → (USize → Option String → IO Unit) → IO Unit
-- Lifecycle
ArrowArray.init : UInt64 → IO ArrowArray
ArrowArray.release : ArrowArray → IO UnitArrowSchema.init : String → IO ArrowSchema
ArrowSchema.forType : ArrowType → String → IO ArrowSchema
ArrowSchema.struct : String → IO ArrowSchema
ArrowSchema.release : ArrowSchema → IO UnitArrowArrayStream.getNext : ArrowArrayStream → IO (Option ArrowArray)
ArrowArrayStream.getSchema : ArrowArrayStream → IO (Option ArrowSchema)
ArrowArrayStream.forEachArray : ArrowArrayStream → (ArrowArray → IO Unit) → IO Unit
ArrowArrayStream.toArrays : ArrowArrayStream → IO (Array ArrowArray)ArrowBuffer.allocate : USize → IO ArrowBuffer
ArrowBuffer.resize : ArrowBuffer → USize → IO ArrowBuffer
ArrowBuffer.free : ArrowBuffer → IO UnitThe IPC module enables serializing Arrow data to binary format for storage or transmission.
A RecordBatch bundles a schema with its array data:
import ArrowLean
open ArrowLean.IPC
-- Create a batch from schema and array
let batch : RecordBatch := { schema := mySchema, array := myArray }
-- Access batch properties
let rows := batch.length
let fmt := batch.formatopen ArrowLean.IPC
-- Serialize a RecordBatch to ByteArray
let data ← serialize batch
-- Deserialize ByteArray back to RecordBatch
match ← deserialize data with
| some batch =>
IO.println s!"Loaded {batch.length} rows"
| none =>
IO.println "Deserialization failed"-- Serialize just the schema
let schemaData ← serializeSchema schema
-- Deserialize schema
match ← deserializeSchema schemaData with
| some schema => IO.println s!"Schema format: {schema.format}"
| none => IO.println "Invalid schema data"-- RecordBatch type
structure RecordBatch where
schema : ArrowSchema
array : ArrowArray
-- Serialization functions
IPC.serialize : RecordBatch → IO ByteArray
IPC.deserialize : ByteArray → IO (Option RecordBatch)
IPC.serializeSchema : ArrowSchema → IO ByteArray
IPC.deserializeSchema : ByteArray → IO (Option ArrowSchema)
IPC.serializeArray : ArrowArray → ArrowSchema → IO ByteArray
IPC.deserializeArray : ByteArray → ArrowSchema → IO (Option ArrowArray)
IPC.serializedSize : RecordBatch → IO UInt64See docs/Parquet.md for detailed Parquet documentation.
def readParquet (path : String) : IO Unit := do
let reader_opt ← ParquetReader.open path
match reader_opt with
| none => Zlog.error s!"Failed to open: {path}"
| some reader => do
let stream_opt ← reader.readTable
match stream_opt with
| some stream =>
stream.forEachArray fun array => do
Zlog.info s!"Batch: {array.length} rows"
| none =>
Zlog.error "Failed to read table"
reader.closedef writeParquet (path : String) (schema : ArrowSchema) : IO Unit := do
let writer_opt ← ParquetWriter.open path schema
match writer_opt with
| some writer => do
writer.setCompression ParquetCompression.snappy
let array ← ArrowArray.init 100
let _ ← writer.writeBatch array
writer.close
array.release
| none =>
Zlog.error "Failed to create writer"| Compression | Description |
|---|---|
uncompressed |
Maximum read speed |
snappy |
Good balance (default) |
gzip |
Better compression |
lz4 |
Fast compression |
zstd |
Best compression ratio |
brotli |
High compression |
For storing Arrow data in Redis, see redis-lean which provides the RedisArrow module with:
- Table Storage: Store Arrow schemas and batches in Redis keys
- Stream Micro-Batching: Convert Redis Streams to Arrow RecordBatches
- Arrow Memory Format - Understanding Arrow's columnar layout
- Parquet Support - Reading and writing Parquet files