Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions arrow/array/json_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type JSONReader struct {
bldr *RecordBuilder

refs atomic.Int64
cur arrow.Record
cur arrow.RecordBatch
err error

chunk int
Expand Down Expand Up @@ -124,9 +124,15 @@ func (r *JSONReader) Err() error { return r.err }

func (r *JSONReader) Schema() *arrow.Schema { return r.schema }

// RecordBatch returns the last read in record batch. The returned record batch is only valid
// until the next call to Next unless Retain is called on the record batch itself.
func (r *JSONReader) RecordBatch() arrow.RecordBatch { return r.cur }

// Record returns the last read in record. The returned record is only valid
// until the next call to Next unless Retain is called on the record itself.
func (r *JSONReader) Record() arrow.Record { return r.cur }
//
// Deprecated: Use [RecordBatch] instead.
func (r *JSONReader) Record() arrow.Record { return r.RecordBatch() }

func (r *JSONReader) Retain() {
r.refs.Add(1)
Expand All @@ -144,7 +150,7 @@ func (r *JSONReader) Release() {
}
}

// Next returns true if it read in a record, which will be available via Record
// Next returns true if it read in a record, which will be available via RecordBatch
// and false if there is either an error or the end of the reader.
func (r *JSONReader) Next() bool {
if r.cur != nil {
Expand Down
74 changes: 46 additions & 28 deletions arrow/array/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type RecordReader interface {
Schema() *arrow.Schema

Next() bool
RecordBatch() arrow.RecordBatch
// Deprecated: Use [RecordBatch] instead.
Record() arrow.Record
Err() error
}
Expand All @@ -46,12 +48,12 @@ type simpleRecords struct {
refCount atomic.Int64

schema *arrow.Schema
recs []arrow.Record
cur arrow.Record
recs []arrow.RecordBatch
cur arrow.RecordBatch
}

// NewRecordReader returns a simple iterator over the given slice of records.
func NewRecordReader(schema *arrow.Schema, recs []arrow.Record) (RecordReader, error) {
func NewRecordReader(schema *arrow.Schema, recs []arrow.RecordBatch) (RecordReader, error) {
rs := &simpleRecords{
schema: schema,
recs: recs,
Expand Down Expand Up @@ -96,8 +98,11 @@ func (rs *simpleRecords) Release() {
}
}

func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
func (rs *simpleRecords) Record() arrow.Record { return rs.cur }
func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
func (rs *simpleRecords) RecordBatch() arrow.RecordBatch { return rs.cur }

// Deprecated: Use [RecordBatch] instead.
func (rs *simpleRecords) Record() arrow.Record { return rs.RecordBatch() }
func (rs *simpleRecords) Next() bool {
if len(rs.recs) == 0 {
return false
Expand All @@ -121,11 +126,11 @@ type simpleRecord struct {
arrs []arrow.Array
}

// NewRecord returns a basic, non-lazy in-memory record batch.
// NewRecordBatch returns a basic, non-lazy in-memory record batch.
//
// NewRecord panics if the columns and schema are inconsistent.
// NewRecord panics if rows is larger than the height of the columns.
func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record {
// NewRecordBatch panics if the columns and schema are inconsistent.
// NewRecordBatch panics if rows is larger than the height of the columns.
func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.RecordBatch {
rec := &simpleRecord{
schema: schema,
rows: nrows,
Expand Down Expand Up @@ -156,7 +161,12 @@ func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Reco
return rec
}

func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) {
// Deprecated: Use [NewRecordBatch] instead.
func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record {
return NewRecordBatch(schema, cols, nrows)
}

func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.RecordBatch, error) {
if i < 0 || i >= len(rec.arrs) {
return nil, fmt.Errorf("arrow/array: column index out of range [0, %d): got=%d", len(rec.arrs), i)
}
Expand All @@ -179,7 +189,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error)
copy(arrs, rec.arrs)
arrs[i] = arr

return NewRecord(rec.schema, arrs, rec.rows), nil
return NewRecordBatch(rec.schema, arrs, rec.rows), nil
}

func (rec *simpleRecord) validate() error {
Expand Down Expand Up @@ -242,7 +252,7 @@ func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).
//
// NewSlice panics if the slice is outside the valid range of the record array.
// NewSlice panics if j < i.
func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record {
func (rec *simpleRecord) NewSlice(i, j int64) arrow.RecordBatch {
arrs := make([]arrow.Array, len(rec.arrs))
for ii, arr := range rec.arrs {
arrs[ii] = NewSlice(arr, i, j)
Expand All @@ -252,7 +262,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record {
arr.Release()
}
}()
return NewRecord(rec.schema, arrs, j-i)
return NewRecordBatch(rec.schema, arrs, j-i)
}

func (rec *simpleRecord) String() string {
Expand Down Expand Up @@ -325,13 +335,13 @@ func (b *RecordBuilder) Reserve(size int) {
}
}

// NewRecord creates a new record from the memory buffers and resets the
// RecordBuilder so it can be used to build a new record.
// NewRecordBatch creates a new record batch from the memory buffers and resets the
// RecordBuilder so it can be used to build a new record batch.
//
// The returned Record must be Release()'d after use.
// The returned RecordBatch must be Release()'d after use.
//
// NewRecord panics if the fields' builder do not have the same length.
func (b *RecordBuilder) NewRecord() arrow.Record {
// NewRecordBatch panics if the fields' builder do not have the same length.
func (b *RecordBuilder) NewRecordBatch() arrow.RecordBatch {
cols := make([]arrow.Array, len(b.fields))
rows := int64(0)

Expand All @@ -353,7 +363,12 @@ func (b *RecordBuilder) NewRecord() arrow.Record {
rows = irow
}

return NewRecord(b.schema, cols, rows)
return NewRecordBatch(b.schema, cols, rows)
}

// Deprecated: Use [NewRecordBatch] instead.
func (b *RecordBuilder) NewRecord() arrow.Record {
return b.NewRecordBatch()
}

// UnmarshalJSON for record builder will read in a single object and add the values
Expand Down Expand Up @@ -411,9 +426,9 @@ type iterReader struct {
refCount atomic.Int64

schema *arrow.Schema
cur arrow.Record
cur arrow.RecordBatch

next func() (arrow.Record, error, bool)
next func() (arrow.RecordBatch, error, bool)
stop func()

err error
Expand All @@ -434,7 +449,10 @@ func (ir *iterReader) Release() {
}
}

func (ir *iterReader) Record() arrow.Record { return ir.cur }
func (ir *iterReader) RecordBatch() arrow.RecordBatch { return ir.cur }

// Deprecated: Use [RecordBatch] instead.
func (ir *iterReader) Record() arrow.Record { return ir.RecordBatch() }
func (ir *iterReader) Err() error { return ir.err }

func (ir *iterReader) Next() bool {
Expand All @@ -452,9 +470,9 @@ func (ir *iterReader) Next() bool {
return ok
}

// ReaderFromIter wraps a go iterator for arrow.Record + error into a RecordReader
// ReaderFromIter wraps a go iterator for arrow.RecordBatch + error into a RecordReader
// interface object for ease of use.
func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) RecordReader {
func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.RecordBatch, error]) RecordReader {
next, stop := iter.Pull2(itr)
rdr := &iterReader{
schema: schema,
Expand All @@ -469,12 +487,12 @@ func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) Re
// you can use range on. The semantics are still important, if a record
// that is returned is desired to be utilized beyond the scope of an iteration
// then Retain must be called on it.
func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] {
func IterFromReader(rdr RecordReader) iter.Seq2[arrow.RecordBatch, error] {
rdr.Retain()
return func(yield func(arrow.RecordBatch, error) bool) {
defer rdr.Release()
for rdr.Next() {
if !yield(rdr.Record(), nil) {
if !yield(rdr.RecordBatch(), nil) {
return
}
}
Expand All @@ -486,6 +504,6 @@ func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] {
}

var (
_ arrow.Record = (*simpleRecord)(nil)
_ RecordReader = (*simpleRecords)(nil)
_ arrow.RecordBatch = (*simpleRecord)(nil)
_ RecordReader = (*simpleRecords)(nil)
)
7 changes: 5 additions & 2 deletions arrow/array/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,11 @@ func NewTableReader(tbl arrow.Table, chunkSize int64) *TableReader {
return tr
}

func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() }
func (tr *TableReader) Record() arrow.RecordBatch { return tr.rec }
func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() }
func (tr *TableReader) RecordBatch() arrow.RecordBatch { return tr.rec }

// Deprecated: Use [RecordBatch] instead.
func (tr *TableReader) Record() arrow.Record { return tr.RecordBatch() }

func (tr *TableReader) Next() bool {
if tr.cur >= tr.max {
Expand Down
2 changes: 1 addition & 1 deletion arrow/array/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func FromJSON(mem memory.Allocator, dt arrow.DataType, r io.Reader, opts ...From

// RecordToStructArray constructs a struct array from the columns of the record batch
// by referencing them, zero-copy.
func RecordToStructArray(rec arrow.Record) *Struct {
func RecordToStructArray(rec arrow.RecordBatch) *Struct {
cols := make([]arrow.ArrayData, rec.NumCols())
for i, c := range rec.Columns() {
cols[i] = c.Data()
Expand Down
9 changes: 8 additions & 1 deletion arrow/avro/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,17 @@ func (r *OCFReader) AvroSchema() string { return r.avroSchema }
// Schema returns the converted Arrow schema of the Avro OCF
func (r *OCFReader) Schema() *arrow.Schema { return r.schema }

// RecordBatch returns the current record batch that has been extracted from the
// underlying Avro OCF file.
// It is valid until the next call to Next.
func (r *OCFReader) RecordBatch() arrow.RecordBatch { return r.cur }

// Record returns the current record that has been extracted from the
// underlying Avro OCF file.
// It is valid until the next call to Next.
func (r *OCFReader) Record() arrow.RecordBatch { return r.cur }
//
// Deprecated: Use [RecordBatch] instead.
func (r *OCFReader) Record() arrow.Record { return r.RecordBatch() }

// Metrics returns the maximum queue depth of the Avro record read cache and of the
// converted Arrow record cache.
Expand Down
7 changes: 5 additions & 2 deletions arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,11 @@ type nativeCRecordBatchReader struct {
func (n *nativeCRecordBatchReader) Retain() {}
func (n *nativeCRecordBatchReader) Release() {}

func (n *nativeCRecordBatchReader) Err() error { return n.err }
func (n *nativeCRecordBatchReader) Record() arrow.RecordBatch { return n.cur }
func (n *nativeCRecordBatchReader) Err() error { return n.err }
func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return n.cur }

// Deprecated: Use [RecordBatch] instead.
func (n *nativeCRecordBatchReader) Record() arrow.Record { return n.RecordBatch() }

func (n *nativeCRecordBatchReader) Next() bool {
err := n.next()
Expand Down
5 changes: 4 additions & 1 deletion arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,10 +980,13 @@ func (r *failingReader) Next() bool {
r.opCount -= 1
return r.opCount > 0
}
func (r *failingReader) Record() arrow.RecordBatch {
func (r *failingReader) RecordBatch() arrow.RecordBatch {
arrdata.Records["primitives"][0].Retain()
return arrdata.Records["primitives"][0]
}
func (r *failingReader) Record() arrow.Record {
return r.RecordBatch()
}
func (r *failingReader) Err() error {
if r.opCount == 0 {
return fmt.Errorf("Expected error message")
Expand Down
9 changes: 8 additions & 1 deletion arrow/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,17 @@ func (r *Reader) Err() error { return r.err }

func (r *Reader) Schema() *arrow.Schema { return r.schema }

// RecordBatch returns the current record batch that has been extracted from the
// underlying CSV file.
// It is valid until the next call to Next.
func (r *Reader) RecordBatch() arrow.RecordBatch { return r.cur }

// Record returns the current record that has been extracted from the
// underlying CSV file.
// It is valid until the next call to Next.
func (r *Reader) Record() arrow.RecordBatch { return r.cur }
//
// Deprecated: Use [RecordBatch] instead.
func (r *Reader) Record() arrow.Record { return r.RecordBatch() }

// Next returns whether a Record could be extracted from the underlying CSV file.
//
Expand Down
4 changes: 2 additions & 2 deletions arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ type PreparedStatement struct {
handle []byte
datasetSchema *arrow.Schema
paramSchema *arrow.Schema
paramBinding arrow.Record
paramBinding arrow.RecordBatch
streamBinding array.RecordReader
closed bool
}
Expand Down Expand Up @@ -1373,7 +1373,7 @@ func (p *PreparedStatement) clearParameters() {
// from under the statement. Release will be called on a previous binding
// record or reader if it existed, and will be called upon calling Close on the
// PreparedStatement.
func (p *PreparedStatement) SetParameters(binding arrow.Record) {
func (p *PreparedStatement) SetParameters(binding arrow.RecordBatch) {
p.clearParameters()
p.paramBinding = binding
if p.paramBinding != nil {
Expand Down
7 changes: 5 additions & 2 deletions arrow/flight/flightsql/example/sql_batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type SqlBatchReader struct {

schema *arrow.Schema
rows *sql.Rows
record arrow.Record
record arrow.RecordBatch
bldr *array.RecordBuilder
err error

Expand Down Expand Up @@ -253,7 +253,10 @@ func (r *SqlBatchReader) Release() {
}
func (r *SqlBatchReader) Schema() *arrow.Schema { return r.schema }

func (r *SqlBatchReader) Record() arrow.Record { return r.record }
func (r *SqlBatchReader) RecordBatch() arrow.RecordBatch { return r.record }

// Deprecated: Use [RecordBatch] instead.
func (r *SqlBatchReader) Record() arrow.Record { return r.RecordBatch() }

func (r *SqlBatchReader) Err() error { return r.err }

Expand Down
2 changes: 1 addition & 1 deletion arrow/flight/flightsql/example/sqlite_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (s *SQLiteFlightSQLServer) GetFlightInfoXdbcTypeInfo(_ context.Context, _ f
}

func (s *SQLiteFlightSQLServer) DoGetXdbcTypeInfo(_ context.Context, cmd flightsql.GetXdbcTypeInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) {
var batch arrow.Record
var batch arrow.RecordBatch
if cmd.GetDataType() == nil {
batch = GetTypeInfoResult(s.Alloc)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type SqliteTablesSchemaBatchReader struct {
rdr array.RecordReader
stmt *sql.Stmt
schemaBldr *array.BinaryBuilder
record arrow.Record
record arrow.RecordBatch
err error
}

Expand Down Expand Up @@ -94,7 +94,10 @@ func (s *SqliteTablesSchemaBatchReader) Schema() *arrow.Schema {
return arrow.NewSchema(fields, nil)
}

func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.record }
func (s *SqliteTablesSchemaBatchReader) RecordBatch() arrow.RecordBatch { return s.record }

// Deprecated: Use [RecordBatch] instead.
func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.RecordBatch() }

func getSqlTypeFromTypeName(sqltype string) int {
if sqltype == "" {
Expand Down
Loading
Loading