diff --git a/arrow/array/json_reader.go b/arrow/array/json_reader.go index b0698b3a..a5bc1ba8 100644 --- a/arrow/array/json_reader.go +++ b/arrow/array/json_reader.go @@ -75,7 +75,7 @@ type JSONReader struct { bldr *RecordBuilder refs atomic.Int64 - cur arrow.Record + cur arrow.RecordBatch err error chunk int @@ -124,9 +124,15 @@ func (r *JSONReader) Err() error { return r.err } func (r *JSONReader) Schema() *arrow.Schema { return r.schema } +// RecordBatch returns the last read in record batch. The returned record batch is only valid +// until the next call to Next unless Retain is called on the record batch itself. +func (r *JSONReader) RecordBatch() arrow.RecordBatch { return r.cur } + // Record returns the last read in record. The returned record is only valid // until the next call to Next unless Retain is called on the record itself. -func (r *JSONReader) Record() arrow.Record { return r.cur } +// +// Deprecated: Use [RecordBatch] instead. +func (r *JSONReader) Record() arrow.Record { return r.RecordBatch() } func (r *JSONReader) Retain() { r.refs.Add(1) @@ -144,7 +150,7 @@ func (r *JSONReader) Release() { } } -// Next returns true if it read in a record, which will be available via Record +// Next returns true if it read in a record, which will be available via RecordBatch // and false if there is either an error or the end of the reader. func (r *JSONReader) Next() bool { if r.cur != nil { diff --git a/arrow/array/record.go b/arrow/array/record.go index 69f4cbaf..e71e46e3 100644 --- a/arrow/array/record.go +++ b/arrow/array/record.go @@ -37,6 +37,8 @@ type RecordReader interface { Schema() *arrow.Schema Next() bool + RecordBatch() arrow.RecordBatch + // Deprecated: Use [RecordBatch] instead. Record() arrow.Record Err() error } @@ -46,12 +48,12 @@ type simpleRecords struct { refCount atomic.Int64 schema *arrow.Schema - recs []arrow.Record - cur arrow.Record + recs []arrow.RecordBatch + cur arrow.RecordBatch } // NewRecordReader returns a simple iterator over the given slice of records. -func NewRecordReader(schema *arrow.Schema, recs []arrow.Record) (RecordReader, error) { +func NewRecordReader(schema *arrow.Schema, recs []arrow.RecordBatch) (RecordReader, error) { rs := &simpleRecords{ schema: schema, recs: recs, @@ -96,8 +98,11 @@ func (rs *simpleRecords) Release() { } } -func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema } -func (rs *simpleRecords) Record() arrow.Record { return rs.cur } +func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema } +func (rs *simpleRecords) RecordBatch() arrow.RecordBatch { return rs.cur } + +// Deprecated: Use [RecordBatch] instead. +func (rs *simpleRecords) Record() arrow.Record { return rs.RecordBatch() } func (rs *simpleRecords) Next() bool { if len(rs.recs) == 0 { return false @@ -121,11 +126,11 @@ type simpleRecord struct { arrs []arrow.Array } -// NewRecord returns a basic, non-lazy in-memory record batch. +// NewRecordBatch returns a basic, non-lazy in-memory record batch. // -// NewRecord panics if the columns and schema are inconsistent. -// NewRecord panics if rows is larger than the height of the columns. -func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record { +// NewRecordBatch panics if the columns and schema are inconsistent. +// NewRecordBatch panics if rows is larger than the height of the columns. +func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.RecordBatch { rec := &simpleRecord{ schema: schema, rows: nrows, @@ -156,7 +161,12 @@ func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Reco return rec } -func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) { +// Deprecated: Use [NewRecordBatch] instead. +func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record { + return NewRecordBatch(schema, cols, nrows) +} + +func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.RecordBatch, error) { if i < 0 || i >= len(rec.arrs) { return nil, fmt.Errorf("arrow/array: column index out of range [0, %d): got=%d", len(rec.arrs), i) } @@ -179,7 +189,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) copy(arrs, rec.arrs) arrs[i] = arr - return NewRecord(rec.schema, arrs, rec.rows), nil + return NewRecordBatch(rec.schema, arrs, rec.rows), nil } func (rec *simpleRecord) validate() error { @@ -242,7 +252,7 @@ func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i). // // NewSlice panics if the slice is outside the valid range of the record array. // NewSlice panics if j < i. -func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record { +func (rec *simpleRecord) NewSlice(i, j int64) arrow.RecordBatch { arrs := make([]arrow.Array, len(rec.arrs)) for ii, arr := range rec.arrs { arrs[ii] = NewSlice(arr, i, j) @@ -252,7 +262,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record { arr.Release() } }() - return NewRecord(rec.schema, arrs, j-i) + return NewRecordBatch(rec.schema, arrs, j-i) } func (rec *simpleRecord) String() string { @@ -325,13 +335,13 @@ func (b *RecordBuilder) Reserve(size int) { } } -// NewRecord creates a new record from the memory buffers and resets the -// RecordBuilder so it can be used to build a new record. +// NewRecordBatch creates a new record batch from the memory buffers and resets the +// RecordBuilder so it can be used to build a new record batch. // -// The returned Record must be Release()'d after use. +// The returned RecordBatch must be Release()'d after use. // -// NewRecord panics if the fields' builder do not have the same length. -func (b *RecordBuilder) NewRecord() arrow.Record { +// NewRecordBatch panics if the fields' builder do not have the same length. +func (b *RecordBuilder) NewRecordBatch() arrow.RecordBatch { cols := make([]arrow.Array, len(b.fields)) rows := int64(0) @@ -353,7 +363,12 @@ func (b *RecordBuilder) NewRecord() arrow.Record { rows = irow } - return NewRecord(b.schema, cols, rows) + return NewRecordBatch(b.schema, cols, rows) +} + +// Deprecated: Use [NewRecordBatch] instead. +func (b *RecordBuilder) NewRecord() arrow.Record { + return b.NewRecordBatch() } // UnmarshalJSON for record builder will read in a single object and add the values @@ -411,9 +426,9 @@ type iterReader struct { refCount atomic.Int64 schema *arrow.Schema - cur arrow.Record + cur arrow.RecordBatch - next func() (arrow.Record, error, bool) + next func() (arrow.RecordBatch, error, bool) stop func() err error @@ -434,7 +449,10 @@ func (ir *iterReader) Release() { } } -func (ir *iterReader) Record() arrow.Record { return ir.cur } +func (ir *iterReader) RecordBatch() arrow.RecordBatch { return ir.cur } + +// Deprecated: Use [RecordBatch] instead. +func (ir *iterReader) Record() arrow.Record { return ir.RecordBatch() } func (ir *iterReader) Err() error { return ir.err } func (ir *iterReader) Next() bool { @@ -452,9 +470,9 @@ func (ir *iterReader) Next() bool { return ok } -// ReaderFromIter wraps a go iterator for arrow.Record + error into a RecordReader +// ReaderFromIter wraps a go iterator for arrow.RecordBatch + error into a RecordReader // interface object for ease of use. -func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) RecordReader { +func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.RecordBatch, error]) RecordReader { next, stop := iter.Pull2(itr) rdr := &iterReader{ schema: schema, @@ -469,12 +487,12 @@ func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) Re // you can use range on. The semantics are still important, if a record // that is returned is desired to be utilized beyond the scope of an iteration // then Retain must be called on it. -func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] { +func IterFromReader(rdr RecordReader) iter.Seq2[arrow.RecordBatch, error] { rdr.Retain() return func(yield func(arrow.RecordBatch, error) bool) { defer rdr.Release() for rdr.Next() { - if !yield(rdr.Record(), nil) { + if !yield(rdr.RecordBatch(), nil) { return } } @@ -486,6 +504,6 @@ func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] { } var ( - _ arrow.Record = (*simpleRecord)(nil) - _ RecordReader = (*simpleRecords)(nil) + _ arrow.RecordBatch = (*simpleRecord)(nil) + _ RecordReader = (*simpleRecords)(nil) ) diff --git a/arrow/array/table.go b/arrow/array/table.go index 6c2f3365..9ba65bf2 100644 --- a/arrow/array/table.go +++ b/arrow/array/table.go @@ -320,8 +320,11 @@ func NewTableReader(tbl arrow.Table, chunkSize int64) *TableReader { return tr } -func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() } -func (tr *TableReader) Record() arrow.RecordBatch { return tr.rec } +func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() } +func (tr *TableReader) RecordBatch() arrow.RecordBatch { return tr.rec } + +// Deprecated: Use [RecordBatch] instead. +func (tr *TableReader) Record() arrow.Record { return tr.RecordBatch() } func (tr *TableReader) Next() bool { if tr.cur >= tr.max { diff --git a/arrow/array/util.go b/arrow/array/util.go index 9305e4c2..11430d86 100644 --- a/arrow/array/util.go +++ b/arrow/array/util.go @@ -180,7 +180,7 @@ func FromJSON(mem memory.Allocator, dt arrow.DataType, r io.Reader, opts ...From // RecordToStructArray constructs a struct array from the columns of the record batch // by referencing them, zero-copy. -func RecordToStructArray(rec arrow.Record) *Struct { +func RecordToStructArray(rec arrow.RecordBatch) *Struct { cols := make([]arrow.ArrayData, rec.NumCols()) for i, c := range rec.Columns() { cols[i] = c.Data() diff --git a/arrow/avro/reader.go b/arrow/avro/reader.go index bbab9975..db6de627 100644 --- a/arrow/avro/reader.go +++ b/arrow/avro/reader.go @@ -189,10 +189,17 @@ func (r *OCFReader) AvroSchema() string { return r.avroSchema } // Schema returns the converted Arrow schema of the Avro OCF func (r *OCFReader) Schema() *arrow.Schema { return r.schema } +// RecordBatch returns the current record batch that has been extracted from the +// underlying Avro OCF file. +// It is valid until the next call to Next. +func (r *OCFReader) RecordBatch() arrow.RecordBatch { return r.cur } + // Record returns the current record that has been extracted from the // underlying Avro OCF file. // It is valid until the next call to Next. -func (r *OCFReader) Record() arrow.RecordBatch { return r.cur } +// +// Deprecated: Use [RecordBatch] instead. +func (r *OCFReader) Record() arrow.Record { return r.RecordBatch() } // Metrics returns the maximum queue depth of the Avro record read cache and of the // converted Arrow record cache. diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go index 3e9e5e79..b38ceb5f 100644 --- a/arrow/cdata/cdata.go +++ b/arrow/cdata/cdata.go @@ -959,8 +959,11 @@ type nativeCRecordBatchReader struct { func (n *nativeCRecordBatchReader) Retain() {} func (n *nativeCRecordBatchReader) Release() {} -func (n *nativeCRecordBatchReader) Err() error { return n.err } -func (n *nativeCRecordBatchReader) Record() arrow.RecordBatch { return n.cur } +func (n *nativeCRecordBatchReader) Err() error { return n.err } +func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return n.cur } + +// Deprecated: Use [RecordBatch] instead. +func (n *nativeCRecordBatchReader) Record() arrow.Record { return n.RecordBatch() } func (n *nativeCRecordBatchReader) Next() bool { err := n.next() diff --git a/arrow/cdata/cdata_test.go b/arrow/cdata/cdata_test.go index f06fc743..f504ce99 100644 --- a/arrow/cdata/cdata_test.go +++ b/arrow/cdata/cdata_test.go @@ -980,10 +980,13 @@ func (r *failingReader) Next() bool { r.opCount -= 1 return r.opCount > 0 } -func (r *failingReader) Record() arrow.RecordBatch { +func (r *failingReader) RecordBatch() arrow.RecordBatch { arrdata.Records["primitives"][0].Retain() return arrdata.Records["primitives"][0] } +func (r *failingReader) Record() arrow.Record { + return r.RecordBatch() +} func (r *failingReader) Err() error { if r.opCount == 0 { return fmt.Errorf("Expected error message") diff --git a/arrow/csv/reader.go b/arrow/csv/reader.go index 98b4a971..d83842b9 100644 --- a/arrow/csv/reader.go +++ b/arrow/csv/reader.go @@ -222,10 +222,17 @@ func (r *Reader) Err() error { return r.err } func (r *Reader) Schema() *arrow.Schema { return r.schema } +// RecordBatch returns the current record batch that has been extracted from the +// underlying CSV file. +// It is valid until the next call to Next. +func (r *Reader) RecordBatch() arrow.RecordBatch { return r.cur } + // Record returns the current record that has been extracted from the // underlying CSV file. // It is valid until the next call to Next. -func (r *Reader) Record() arrow.RecordBatch { return r.cur } +// +// Deprecated: Use [RecordBatch] instead. +func (r *Reader) Record() arrow.Record { return r.RecordBatch() } // Next returns whether a Record could be extracted from the underlying CSV file. // diff --git a/arrow/flight/flightsql/client.go b/arrow/flight/flightsql/client.go index f7660e31..cdba3620 100644 --- a/arrow/flight/flightsql/client.go +++ b/arrow/flight/flightsql/client.go @@ -1097,7 +1097,7 @@ type PreparedStatement struct { handle []byte datasetSchema *arrow.Schema paramSchema *arrow.Schema - paramBinding arrow.Record + paramBinding arrow.RecordBatch streamBinding array.RecordReader closed bool } @@ -1373,7 +1373,7 @@ func (p *PreparedStatement) clearParameters() { // from under the statement. Release will be called on a previous binding // record or reader if it existed, and will be called upon calling Close on the // PreparedStatement. -func (p *PreparedStatement) SetParameters(binding arrow.Record) { +func (p *PreparedStatement) SetParameters(binding arrow.RecordBatch) { p.clearParameters() p.paramBinding = binding if p.paramBinding != nil { diff --git a/arrow/flight/flightsql/example/sql_batch_reader.go b/arrow/flight/flightsql/example/sql_batch_reader.go index 74444db3..f3dce1df 100644 --- a/arrow/flight/flightsql/example/sql_batch_reader.go +++ b/arrow/flight/flightsql/example/sql_batch_reader.go @@ -104,7 +104,7 @@ type SqlBatchReader struct { schema *arrow.Schema rows *sql.Rows - record arrow.Record + record arrow.RecordBatch bldr *array.RecordBuilder err error @@ -253,7 +253,10 @@ func (r *SqlBatchReader) Release() { } func (r *SqlBatchReader) Schema() *arrow.Schema { return r.schema } -func (r *SqlBatchReader) Record() arrow.Record { return r.record } +func (r *SqlBatchReader) RecordBatch() arrow.RecordBatch { return r.record } + +// Deprecated: Use [RecordBatch] instead. +func (r *SqlBatchReader) Record() arrow.Record { return r.RecordBatch() } func (r *SqlBatchReader) Err() error { return r.err } diff --git a/arrow/flight/flightsql/example/sqlite_server.go b/arrow/flight/flightsql/example/sqlite_server.go index e4b501b8..d5f61980 100644 --- a/arrow/flight/flightsql/example/sqlite_server.go +++ b/arrow/flight/flightsql/example/sqlite_server.go @@ -363,7 +363,7 @@ func (s *SQLiteFlightSQLServer) GetFlightInfoXdbcTypeInfo(_ context.Context, _ f } func (s *SQLiteFlightSQLServer) DoGetXdbcTypeInfo(_ context.Context, cmd flightsql.GetXdbcTypeInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) { - var batch arrow.Record + var batch arrow.RecordBatch if cmd.GetDataType() == nil { batch = GetTypeInfoResult(s.Alloc) } else { diff --git a/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go b/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go index 17ca3c5f..99840b5d 100644 --- a/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go +++ b/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go @@ -44,7 +44,7 @@ type SqliteTablesSchemaBatchReader struct { rdr array.RecordReader stmt *sql.Stmt schemaBldr *array.BinaryBuilder - record arrow.Record + record arrow.RecordBatch err error } @@ -94,7 +94,10 @@ func (s *SqliteTablesSchemaBatchReader) Schema() *arrow.Schema { return arrow.NewSchema(fields, nil) } -func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.record } +func (s *SqliteTablesSchemaBatchReader) RecordBatch() arrow.RecordBatch { return s.record } + +// Deprecated: Use [RecordBatch] instead. +func (s *SqliteTablesSchemaBatchReader) Record() arrow.Record { return s.RecordBatch() } func getSqlTypeFromTypeName(sqltype string) int { if sqltype == "" { diff --git a/arrow/flight/flightsql/example/type_info.go b/arrow/flight/flightsql/example/type_info.go index 57734f0c..35ecef65 100644 --- a/arrow/flight/flightsql/example/type_info.go +++ b/arrow/flight/flightsql/example/type_info.go @@ -28,7 +28,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" ) -func GetTypeInfoResult(mem memory.Allocator) arrow.Record { +func GetTypeInfoResult(mem memory.Allocator) arrow.RecordBatch { typeNames, _, _ := array.FromJSON(mem, arrow.BinaryTypes.String, strings.NewReader(`["bit", "tinyint", "bigint", "longvarbinary", "varbinary", "text", "longvarchar", "char", @@ -97,7 +97,7 @@ func GetTypeInfoResult(mem memory.Allocator) arrow.Record { sqlDataType, sqlDateTimeSub, numPrecRadix, intervalPrecision}, 17) } -func GetFilteredTypeInfoResult(mem memory.Allocator, filter int32) arrow.Record { +func GetFilteredTypeInfoResult(mem memory.Allocator, filter int32) arrow.RecordBatch { batch := GetTypeInfoResult(mem) defer batch.Release() diff --git a/arrow/flight/record_batch_reader.go b/arrow/flight/record_batch_reader.go index 30a5bcb4..5f113682 100644 --- a/arrow/flight/record_batch_reader.go +++ b/arrow/flight/record_batch_reader.go @@ -121,7 +121,7 @@ func (r *Reader) Release() { // LatestAppMetadata returns the bytes from the AppMetadata field of the // most recently read FlightData message that was processed by calling // the Next function. The metadata returned would correspond to the record -// retrieved by calling Record(). +// retrieved by calling RecordBatch(). func (r *Reader) LatestAppMetadata() []byte { return r.dmr.lastAppMetadata } @@ -129,7 +129,7 @@ func (r *Reader) LatestAppMetadata() []byte { // LatestFlightDescriptor returns a pointer to the last FlightDescriptor object // that was received in the most recently read FlightData message that was // processed by calling the Next function. The descriptor returned would correspond -// to the record retrieved by calling Record(). +// to the record batch retrieved by calling RecordBatch(). func (r *Reader) LatestFlightDescriptor() *FlightDescriptor { return r.dmr.descr } @@ -188,7 +188,7 @@ func DeserializeSchema(info []byte, mem memory.Allocator) (*arrow.Schema, error) // StreamChunk represents a single chunk of a FlightData stream type StreamChunk struct { - Data arrow.Record + Data arrow.RecordBatch Desc *FlightDescriptor AppMetadata []byte Err error diff --git a/arrow/flight/record_batch_writer.go b/arrow/flight/record_batch_writer.go index 3d291788..9cfb6bd8 100644 --- a/arrow/flight/record_batch_writer.go +++ b/arrow/flight/record_batch_writer.go @@ -74,7 +74,7 @@ func (w *Writer) SetFlightDescriptor(descr *FlightDescriptor) { } // Write writes a recordbatch payload and returns any error, implementing the arrio.Writer interface -func (w *Writer) Write(rec arrow.Record) error { +func (w *Writer) Write(rec arrow.RecordBatch) error { if w.pw.fd.FlightDescriptor != nil { defer func() { w.pw.fd.FlightDescriptor = nil @@ -83,9 +83,9 @@ func (w *Writer) Write(rec arrow.Record) error { return w.Writer.Write(rec) } -// WriteWithAppMetadata will write this record with the supplied application +// WriteWithAppMetadata will write this record batch with the supplied application // metadata attached in the flightData message. -func (w *Writer) WriteWithAppMetadata(rec arrow.Record, appMeta []byte) error { +func (w *Writer) WriteWithAppMetadata(rec arrow.RecordBatch, appMeta []byte) error { w.pw.fd.AppMetadata = appMeta defer func() { w.pw.fd.AppMetadata = nil diff --git a/arrow/internal/flight_integration/scenario.go b/arrow/internal/flight_integration/scenario.go index a640d06b..106669e9 100644 --- a/arrow/internal/flight_integration/scenario.go +++ b/arrow/internal/flight_integration/scenario.go @@ -101,10 +101,10 @@ func initServer(port int, srv flight.Server) int { type integrationDataSet struct { schema *arrow.Schema - chunks []arrow.Record + chunks []arrow.RecordBatch } -func consumeFlightLocation(ctx context.Context, loc *flight.Location, tkt *flight.Ticket, orig []arrow.Record, opts ...grpc.DialOption) error { +func consumeFlightLocation(ctx context.Context, loc *flight.Location, tkt *flight.Ticket, orig []arrow.RecordBatch, opts ...grpc.DialOption) error { client, err := flight.NewClientWithMiddleware(loc.GetUri(), nil, nil, opts...) if err != nil { return err @@ -177,7 +177,7 @@ func (s *defaultIntegrationTester) RunClient(addr string, opts ...grpc.DialOptio } dataSet := integrationDataSet{ - chunks: make([]arrow.Record, 0), + chunks: make([]arrow.RecordBatch, 0), schema: rdr.Schema(), } @@ -332,7 +332,7 @@ func (s *defaultIntegrationTester) DoPut(stream flight.FlightService_DoPutServer key = desc.Path[0] dataset.schema = rdr.Schema() - dataset.chunks = make([]arrow.Record, 0) + dataset.chunks = make([]arrow.RecordBatch, 0) for rdr.Next() { rec := rdr.Record() rec.Retain() @@ -568,7 +568,7 @@ func (o *orderedScenarioTester) RunClient(addr string, opts ...grpc.DialOption) return fmt.Errorf("expected to server return FlightInfo.ordered = true") } - var recs []arrow.Record + var recs []arrow.RecordBatch for _, ep := range info.Endpoint { if len(ep.Location) != 0 { return fmt.Errorf("expected to receive empty locations to use the original service: %s", @@ -931,7 +931,7 @@ func (tester *expirationTimeDoGetScenarioTester) RunClient(addr string, opts ... return err } - var recs []arrow.Record + var recs []arrow.RecordBatch for _, ep := range info.Endpoint { if len(recs) == 0 { if ep.ExpirationTime != nil { @@ -3078,7 +3078,7 @@ func getIngestRecords() array.RecordReader { rec := array.NewRecord(schema, []arrow.Array{arr}, ingestStatementExpectedRows) defer rec.Release() - rdr, _ := array.NewRecordReader(schema, []arrow.Record{rec}) + rdr, _ := array.NewRecordReader(schema, []arrow.RecordBatch{rec}) return rdr } diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go index 9135529d..2d83fcc7 100644 --- a/arrow/ipc/file_reader.go +++ b/arrow/ipc/file_reader.go @@ -159,7 +159,7 @@ type FileReader struct { memo dictutils.Memo schema *arrow.Schema - record arrow.Record + record arrow.RecordBatch irec int // current record index. used for the arrio.Reader interface err error // last error @@ -355,11 +355,11 @@ func (f *FileReader) Close() error { return nil } -// Record returns the i-th record from the file. -// The returned value is valid until the next call to Record. -// Users need to call Retain on that Record to keep it valid for longer. -func (f *FileReader) Record(i int) (arrow.Record, error) { - record, err := f.RecordAt(i) +// RecordBatch returns the i-th record batch from the file. +// The returned value is valid until the next call to RecordBatch. +// Users need to call Retain on that RecordBatch to keep it valid for longer. +func (f *FileReader) RecordBatch(i int) (arrow.RecordBatch, error) { + record, err := f.RecordBatchAt(i) if err != nil { return nil, err } @@ -372,10 +372,19 @@ func (f *FileReader) Record(i int) (arrow.Record, error) { return record, nil } -// Record returns the i-th record from the file. Ownership is transferred to the +// Record returns the i-th record from the file. +// The returned value is valid until the next call to Record. +// Users need to call Retain on that Record to keep it valid for longer. +// +// Deprecated: Use [RecordBatch] instead. +func (f *FileReader) Record(i int) (arrow.Record, error) { + return f.RecordBatch(i) +} + +// RecordBatchAt returns the i-th record batch from the file. Ownership is transferred to the // caller and must call Release() to free the memory. This method is safe to // call concurrently. -func (f *FileReader) RecordAt(i int) (arrow.Record, error) { +func (f *FileReader) RecordBatchAt(i int) (arrow.RecordBatch, error) { if i < 0 || i > f.NumRecords() { panic("arrow/ipc: record index out of bounds") } @@ -400,32 +409,41 @@ func (f *FileReader) RecordAt(i int) (arrow.Record, error) { defer msg.Release() if msg.Type() != MessageRecordBatch { - return nil, fmt.Errorf("arrow/ipc: message %d is not a Record", i) + return nil, fmt.Errorf("arrow/ipc: message %d is not a RecordBatch", i) } - return newRecord(f.schema, &f.memo, msg.meta, msg.body, f.swapEndianness, f.mem), nil + return newRecordBatch(f.schema, &f.memo, msg.meta, msg.body, f.swapEndianness, f.mem), nil } -// Read reads the current record from the underlying stream and an error, if any. +// RecordAt returns the i-th record from the file. Ownership is transferred to the +// caller and must call Release() to free the memory. This method is safe to +// call concurrently. +// +// Deprecated: Use [RecordBatchAt] instead. +func (f *FileReader) RecordAt(i int) (arrow.Record, error) { + return f.RecordBatchAt(i) +} + +// Read reads the current record batch from the underlying stream and an error, if any. // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF). // -// The returned record value is valid until the next call to Read. -// Users need to call Retain on that Record to keep it valid for longer. -func (f *FileReader) Read() (rec arrow.Record, err error) { +// The returned record batch value is valid until the next call to Read. +// Users need to call Retain on that RecordBatch to keep it valid for longer. +func (f *FileReader) Read() (rec arrow.RecordBatch, err error) { if f.irec == f.NumRecords() { return nil, io.EOF } - rec, f.err = f.Record(f.irec) + rec, f.err = f.RecordBatch(f.irec) f.irec++ return rec, f.err } -// ReadAt reads the i-th record from the underlying stream and an error, if any. -func (f *FileReader) ReadAt(i int64) (arrow.Record, error) { - return f.Record(int(i)) +// ReadAt reads the i-th record batch from the underlying stream and an error, if any. +func (f *FileReader) ReadAt(i int64) (arrow.RecordBatch, error) { + return f.RecordBatch(int(i)) } -func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) arrow.Record { +func newRecordBatch(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) arrow.RecordBatch { var ( msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) md flatbuf.RecordBatch diff --git a/arrow/ipc/file_writer.go b/arrow/ipc/file_writer.go index 2aa0c9b4..ff043ec9 100644 --- a/arrow/ipc/file_writer.go +++ b/arrow/ipc/file_writer.go @@ -296,7 +296,7 @@ func (f *FileWriter) Close() error { return nil } -func (f *FileWriter) Write(rec arrow.Record) error { +func (f *FileWriter) Write(rec arrow.RecordBatch) error { schema := rec.Schema() if schema == nil || !schema.Equal(f.schema) { return errInconsistentSchema diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go index 1934c719..9d7096c0 100644 --- a/arrow/ipc/reader.go +++ b/arrow/ipc/reader.go @@ -40,7 +40,7 @@ type Reader struct { schema *arrow.Schema refCount atomic.Int64 - rec arrow.Record + rec arrow.RecordBatch err error // types dictTypeMap @@ -164,7 +164,7 @@ func (r *Reader) Release() { } } -// Next returns whether a Record could be extracted from the underlying stream. +// Next returns whether a RecordBatch could be extracted from the underlying stream. func (r *Reader) Next() bool { if r.rec != nil { r.rec.Release() @@ -252,20 +252,29 @@ func (r *Reader) next() bool { return false } - r.rec = newRecord(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem) + r.rec = newRecordBatch(r.schema, &r.memo, msg.meta, msg.body, r.swapEndianness, r.mem) return true } +// RecordBatch returns the current record batch that has been extracted from the +// underlying stream. +// It is valid until the next call to Next. +func (r *Reader) RecordBatch() arrow.RecordBatch { + return r.rec +} + // Record returns the current record that has been extracted from the // underlying stream. // It is valid until the next call to Next. +// +// Deprecated: Use [RecordBatch] instead. func (r *Reader) Record() arrow.Record { - return r.rec + return r.RecordBatch() } -// Read reads the current record from the underlying stream and an error, if any. +// Read reads the current record batch from the underlying stream and an error, if any. // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF). -func (r *Reader) Read() (arrow.Record, error) { +func (r *Reader) Read() (arrow.RecordBatch, error) { if r.rec != nil { r.rec.Release() r.rec = nil diff --git a/arrow/ipc/writer.go b/arrow/ipc/writer.go index 96f082fb..ab068511 100644 --- a/arrow/ipc/writer.go +++ b/arrow/ipc/writer.go @@ -152,7 +152,7 @@ func (w *Writer) Close() error { return nil } -func (w *Writer) Write(rec arrow.Record) (err error) { +func (w *Writer) Write(rec arrow.RecordBatch) (err error) { defer func() { if pErr := recover(); pErr != nil { err = utils.FormatRecoveredError("arrow/ipc: unknown error while writing", pErr) @@ -204,7 +204,7 @@ func (w *Writer) Write(rec arrow.Record) (err error) { return w.pw.WritePayload(data) } -func writeDictionaryPayloads(mem memory.Allocator, batch arrow.Record, isFileFormat bool, emitDictDeltas bool, mapper *dictutils.Mapper, lastWrittenDicts map[int64]arrow.Array, pw PayloadWriter, encoder *recordEncoder) error { +func writeDictionaryPayloads(mem memory.Allocator, batch arrow.RecordBatch, isFileFormat bool, emitDictDeltas bool, mapper *dictutils.Mapper, lastWrittenDicts map[int64]arrow.Array, pw PayloadWriter, encoder *recordEncoder) error { dictionaries, err := dictutils.CollectDictionaries(batch, mapper) if err != nil { return err @@ -467,7 +467,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) error { return <-errch } -func (w *recordEncoder) encode(p *Payload, rec arrow.Record) error { +func (w *recordEncoder) encode(p *Payload, rec arrow.RecordBatch) error { // perform depth-first traversal of the row-batch for i, col := range rec.Columns() { err := w.visit(p, col) @@ -1033,7 +1033,7 @@ func (w *recordEncoder) rebaseDenseUnionValueOffsets(arr *array.DenseUnion, offs return shiftedOffsetsBuf } -func (w *recordEncoder) Encode(p *Payload, rec arrow.Record) error { +func (w *recordEncoder) Encode(p *Payload, rec arrow.RecordBatch) error { if err := w.encode(p, rec); err != nil { return err } @@ -1087,7 +1087,7 @@ func needTruncate(offset int64, buf *memory.Buffer, minLength int64) bool { // GetRecordBatchPayload produces the ipc payload for a given record batch. // The resulting payload itself must be released by the caller via the Release // method after it is no longer needed. -func GetRecordBatchPayload(batch arrow.Record, opts ...Option) (Payload, error) { +func GetRecordBatchPayload(batch arrow.RecordBatch, opts ...Option) (Payload, error) { cfg := newConfig(opts...) var ( data = Payload{msg: MessageRecordBatch} diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go index d19177d9..b7f95178 100644 --- a/parquet/pqarrow/file_reader.go +++ b/parquet/pqarrow/file_reader.go @@ -879,7 +879,10 @@ func (r *recordReader) Next() bool { return r.next() } -func (r *recordReader) Record() arrow.Record { return r.cur } +func (r *recordReader) RecordBatch() arrow.RecordBatch { return r.cur } + +// Deprecated: Use [RecordBatch] instead. +func (r *recordReader) Record() arrow.Record { return r.RecordBatch() } func (r *recordReader) Err() error { if r.err == io.EOF {