Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions arrow/arrio/arrio.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import (
type Reader interface {
// Read reads the current record from the underlying stream and an error, if any.
// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
Read() (arrow.Record, error)
Read() (arrow.RecordBatch, error)
}

// ReaderAt is the interface that wraps the ReadAt method.
type ReaderAt interface {
// ReadAt reads the i-th record from the underlying stream and an error, if any.
ReadAt(i int64) (arrow.Record, error)
ReadAt(i int64) (arrow.RecordBatch, error)
}

// Writer is the interface that wraps the Write method.
type Writer interface {
Write(rec arrow.Record) error
Write(rec arrow.RecordBatch) error
}

// Copy copies all the records available from src to dst.
Expand Down
4 changes: 2 additions & 2 deletions arrow/arrio/arrio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
streamKind
)

func (k copyKind) write(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []arrow.Record) {
func (k copyKind) write(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []arrow.RecordBatch) {
t.Helper()

switch k {
Expand All @@ -49,7 +49,7 @@ func (k copyKind) write(t *testing.T, f *os.File, mem memory.Allocator, schema *
}
}

func (k copyKind) check(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []arrow.Record) {
func (k copyKind) check(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []arrow.RecordBatch) {
t.Helper()

switch k {
Expand Down
14 changes: 7 additions & 7 deletions arrow/avro/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type schemaEdit struct {
value any
}

// Reader wraps goavro/OCFReader and creates array.Records from a schema.
// Reader wraps goavro/OCFReader and creates array.RecordBatches from a schema.
type OCFReader struct {
r *ocf.Decoder
avroSchema string
Expand All @@ -58,7 +58,7 @@ type OCFReader struct {
bld *array.RecordBuilder
bldMap *fieldPos
ldr *dataLoader
cur arrow.Record
cur arrow.RecordBatch
err error

primed bool
Expand All @@ -70,7 +70,7 @@ type OCFReader struct {
avroChan chan any
avroDatumCount int64
avroChanSize int
recChan chan arrow.Record
recChan chan arrow.RecordBatch

bldDone chan struct{}

Expand All @@ -80,7 +80,7 @@ type OCFReader struct {
}

// NewReader returns a reader that reads from an Avro OCF file and creates
// arrow.Records from the converted avro data.
// arrow.RecordBatches from the converted avro data.
func NewOCFReader(r io.Reader, opts ...Option) (*OCFReader, error) {
ocfr, err := ocf.NewDecoder(r)
if err != nil {
Expand All @@ -100,7 +100,7 @@ func NewOCFReader(r io.Reader, opts ...Option) (*OCFReader, error) {
}

rr.avroChan = make(chan any, rr.avroChanSize)
rr.recChan = make(chan arrow.Record, rr.recChanSize)
rr.recChan = make(chan arrow.RecordBatch, rr.recChanSize)
rr.bldDone = make(chan struct{})
schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"]))
if err != nil {
Expand Down Expand Up @@ -170,7 +170,7 @@ func (rr *OCFReader) Reuse(r io.Reader, opts ...Option) error {
rr.primed = false

rr.avroChan = make(chan any, rr.avroChanSize)
rr.recChan = make(chan arrow.Record, rr.recChanSize)
rr.recChan = make(chan arrow.RecordBatch, rr.recChanSize)
rr.bldDone = make(chan struct{})

rr.readerCtx, rr.readCancel = context.WithCancel(context.Background())
Expand All @@ -192,7 +192,7 @@ func (r *OCFReader) Schema() *arrow.Schema { return r.schema }
// Record returns the current record that has been extracted from the
// underlying Avro OCF file.
// It is valid until the next call to Next.
func (r *OCFReader) Record() arrow.Record { return r.cur }
func (r *OCFReader) Record() arrow.RecordBatch { return r.cur }

// Metrics returns the maximum queue depth of the Avro record read cache and of the
// converted Arrow record cache.
Expand Down
8 changes: 4 additions & 4 deletions arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ type nativeCRecordBatchReader struct {
arr *CArrowArray
schema *arrow.Schema

cur arrow.Record
cur arrow.RecordBatch
err error
}

Expand All @@ -959,8 +959,8 @@ type nativeCRecordBatchReader struct {
func (n *nativeCRecordBatchReader) Retain() {}
func (n *nativeCRecordBatchReader) Release() {}

func (n *nativeCRecordBatchReader) Err() error { return n.err }
func (n *nativeCRecordBatchReader) Record() arrow.Record { return n.cur }
func (n *nativeCRecordBatchReader) Err() error { return n.err }
func (n *nativeCRecordBatchReader) Record() arrow.RecordBatch { return n.cur }

func (n *nativeCRecordBatchReader) Next() bool {
err := n.next()
Expand Down Expand Up @@ -1021,7 +1021,7 @@ func (n *nativeCRecordBatchReader) getError(errno int) error {
return fmt.Errorf("%w: %s", syscall.Errno(errno), C.GoString(C.stream_get_last_error(n.stream)))
}

func (n *nativeCRecordBatchReader) Read() (arrow.Record, error) {
func (n *nativeCRecordBatchReader) Read() (arrow.RecordBatch, error) {
if err := n.next(); err != nil {
n.err = err
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func TestExportRecordReaderStreamLifetime(t *testing.T) {
rec := array.NewRecord(schema, []arrow.Array{arr}, 0)
defer rec.Release()

rdr, _ := array.NewRecordReader(schema, []arrow.Record{rec})
rdr, _ := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
defer rdr.Release()

out := createTestStreamObj()
Expand Down Expand Up @@ -980,7 +980,7 @@ func (r *failingReader) Next() bool {
r.opCount -= 1
return r.opCount > 0
}
func (r *failingReader) Record() arrow.Record {
func (r *failingReader) Record() arrow.RecordBatch {
arrdata.Records["primitives"][0].Retain()
return arrdata.Records["primitives"][0]
}
Expand Down
2 changes: 1 addition & 1 deletion arrow/cdata/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func asyncProducerCancel(producer *CArrowAsyncProducer) {
//export asyncTaskExtract
func asyncTaskExtract(task *CArrowAsyncTask, out *CArrowDeviceArray) C.int {
h := getHandle(task.private_data)
rec := h.Value().(arrow.Record)
rec := h.Value().(arrow.RecordBatch)
defer rec.Release()

out.device_id, out.device_type = C.int64_t(-1), C.ARROW_DEVICE_CPU
Expand Down
8 changes: 4 additions & 4 deletions arrow/cdata/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func ImportCArray(arr *CArrowArray, schema *CArrowSchema) (arrow.Field, arrow.Ar
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCRecordBatchWithSchema(arr *CArrowArray, sc *arrow.Schema) (arrow.Record, error) {
func ImportCRecordBatchWithSchema(arr *CArrowArray, sc *arrow.Schema) (arrow.RecordBatch, error) {
imp, err := importCArrayAsType(arr, arrow.StructOf(sc.Fields()...))
if err != nil {
return nil, err
Expand Down Expand Up @@ -144,7 +144,7 @@ func ImportCRecordBatchWithSchema(arr *CArrowArray, sc *arrow.Schema) (arrow.Rec
//
// NOTE: The array takes ownership of the underlying memory buffers via ArrowArrayMove,
// it does not take ownership of the actual arr object itself.
func ImportCRecordBatch(arr *CArrowArray, sc *CArrowSchema) (arrow.Record, error) {
func ImportCRecordBatch(arr *CArrowArray, sc *CArrowSchema) (arrow.RecordBatch, error) {
field, err := importSchema(sc)
if err != nil {
return nil, err
Expand Down Expand Up @@ -232,7 +232,7 @@ func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
// may error at runtime, due to CGO rules ("the current implementation may sometimes
// cause a runtime error if the contents of the C memory appear to be a Go pointer").
// You have been warned!
func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema *CArrowSchema) {
func ExportArrowRecordBatch(rb arrow.RecordBatch, out *CArrowArray, outSchema *CArrowSchema) {
children := make([]arrow.ArrayData, rb.NumCols())
for i := range rb.Columns() {
children[i] = rb.Column(i).Data()
Expand Down Expand Up @@ -291,7 +291,7 @@ func ReleaseCArrowArrayStream(stream *CArrowArrayStream) { releaseStream(stream)
// RecordMessage is a simple container for a record batch channel to stream for
// using the Async C Data Interface via ExportAsyncRecordBatchStream.
type RecordMessage struct {
Record arrow.Record
Record arrow.RecordBatch
AdditionalMetadata arrow.Metadata
Err error
}
Expand Down
2 changes: 1 addition & 1 deletion arrow/cdata/test/test_cimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func makeSchema() *arrow.Schema {
}, &meta)
}

func makeBatch() arrow.Record {
func makeBatch() arrow.RecordBatch {
bldr := array.NewRecordBuilder(alloc, makeSchema())
defer bldr.Release()

Expand Down
8 changes: 4 additions & 4 deletions arrow/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
)

// Reader wraps encoding/csv.Reader and creates array.Records from a schema.
// Reader wraps encoding/csv.Reader and creates array.RecordBatches from a schema.
type Reader struct {
r *csv.Reader
schema *arrow.Schema

refs atomic.Int64
bld *array.RecordBuilder
cur arrow.Record
cur arrow.RecordBatch
err error

chunk int
Expand Down Expand Up @@ -101,7 +101,7 @@ func NewInferringReader(r io.Reader, opts ...Option) *Reader {
}

// NewReader returns a reader that reads from the CSV file and creates
// arrow.Records from the given schema.
// arrow.RecordBatches from the given schema.
//
// NewReader panics if the given schema contains fields that have types that are not
// primitive types.
Expand Down Expand Up @@ -225,7 +225,7 @@ func (r *Reader) Schema() *arrow.Schema { return r.schema }
// Record returns the current record that has been extracted from the
// underlying CSV file.
// It is valid until the next call to Next.
func (r *Reader) Record() arrow.Record { return r.cur }
func (r *Reader) Record() arrow.RecordBatch { return r.cur }

// Next returns whether a Record could be extracted from the underlying CSV file.
//
Expand Down
4 changes: 2 additions & 2 deletions arrow/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestCSVReaderParseError(t *testing.T) {

n := 0
lines := 0
var rec arrow.Record
var rec arrow.RecordBatch
for r.Next() {
if rec != nil {
rec.Release()
Expand Down Expand Up @@ -891,7 +891,7 @@ func TestInferringSchema(t *testing.T) {
]`))
defer exp.Release()

assertRowEqual := func(expected, actual arrow.Record, row int) {
assertRowEqual := func(expected, actual arrow.RecordBatch, row int) {
ex := expected.NewSlice(int64(row), int64(row+1))
defer ex.Release()
assert.Truef(t, array.RecordEqual(ex, actual), "expected: %s\ngot: %s", ex, actual)
Expand Down
6 changes: 3 additions & 3 deletions arrow/csv/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
)

// Writer wraps encoding/csv.Writer and writes arrow.Record based on a schema.
// Writer wraps encoding/csv.Writer and writes arrow.RecordBatch based on a schema.
type Writer struct {
boolFormatter func(bool) string
header bool
Expand All @@ -37,7 +37,7 @@ type Writer struct {
w *csv.Writer
}

// NewWriter returns a writer that writes arrow.Records to the CSV file
// NewWriter returns a writer that writes arrow.RecordBatches to the CSV file
// with the given schema.
//
// NewWriter panics if the given schema contains fields that have types that are not
Expand All @@ -63,7 +63,7 @@ func NewWriter(w io.Writer, schema *arrow.Schema, opts ...Option) *Writer {
func (w *Writer) Schema() *arrow.Schema { return w.schema }

// Write writes a single Record as one row to the CSV file
func (w *Writer) Write(record arrow.Record) error {
func (w *Writer) Write(record arrow.RecordBatch) error {
if !record.Schema().Equal(w.schema) {
return ErrMismatchFields
}
Expand Down
2 changes: 1 addition & 1 deletion arrow/extensions/bool8_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestBool8TypeBatchIPCRoundTrip(t *testing.T) {
[]arrow.Array{arr}, -1)
defer batch.Release()

var written arrow.Record
var written arrow.RecordBatch
{
var buf bytes.Buffer
wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema()))
Expand Down
2 changes: 1 addition & 1 deletion arrow/extensions/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestJSONTypeBatchIPCRoundTrip(t *testing.T) {
[]arrow.Array{arr}, -1)
defer batch.Release()

var written arrow.Record
var written arrow.RecordBatch
{
var buf bytes.Buffer
wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema()))
Expand Down
2 changes: 1 addition & 1 deletion arrow/extensions/opaque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestOpaqueTypeBatchRoundTrip(t *testing.T) {
[]arrow.Array{arr}, -1)
defer batch.Release()

var written arrow.Record
var written arrow.RecordBatch
{
var buf bytes.Buffer
wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema()))
Expand Down
2 changes: 1 addition & 1 deletion arrow/extensions/uuid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestUUIDTypeBatchIPCRoundTrip(t *testing.T) {
[]arrow.Array{arr}, -1)
defer batch.Release()

var written arrow.Record
var written arrow.RecordBatch
{
var buf bytes.Buffer
wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema()))
Expand Down