Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 131 additions & 32 deletions utils/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,35 +182,6 @@ static void writeVariableLengthNullableValue(
}


static void writeValue(
char * buffer_address,
int64_t field_offset,
const ColumnWithTypeAndName & col,
int32_t col_index,
int64_t num_rows,
const std::vector<int64_t> & offsets,
std::vector<int64_t> & buffer_cursor)
{
const auto type_without_nullable{std::move(removeNullable(col.type))};
const auto is_nullable = isColumnNullable(*col.column);
if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable))
{
if (is_nullable)
writeFixedLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets);
else
writeFixedLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets);
}
else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable))
{
if (is_nullable)
writeVariableLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets, buffer_cursor);
else
writeVariableLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets, buffer_cursor);
}
else
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Doesn't support type {} for writeValue", col.type->getName());
}

SparkRowInfo::SparkRowInfo(const Block & block)
: types(std::move(block.getDataTypes()))
, num_rows(block.rows())
Expand Down Expand Up @@ -347,16 +318,143 @@ int64_t SparkRowInfo::getTotalBytes() const
return total_bytes;
}


static void writeValue(
char * buffer_address,
int64_t field_offset,
const ColumnWithTypeAndName & col,
int32_t col_index,
int64_t num_rows,
const std::vector<int64_t> & offsets,
std::vector<int64_t> & buffer_cursor)
{
const auto type_without_nullable{std::move(removeNullable(col.type))};
const auto is_nullable = isColumnNullable(*col.column);
if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable))
{
if (is_nullable)
writeFixedLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets);
else
writeFixedLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets);
}
else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable))
{
if (is_nullable)
writeVariableLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets, buffer_cursor);
else
writeVariableLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets, buffer_cursor);
}
else
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Doesn't support type {} for writeValue", col.type->getName());
}


ALWAYS_INLINE static void writeRow(
size_t row_idx,
char * buffer_address,
const ColumnsWithTypeAndName & cols,
const std::vector<int64_t> & offsets,
const std::vector<int64_t> & field_offsets,
const std::vector<bool> & is_fixed_lengths,
std::vector<std::shared_ptr<FixedLengthDataWriter>> & fixed_length_writers,
std::vector<std::shared_ptr<VariableLengthDataWriter>> & variable_length_writers)
{
// std::cerr << "row_idx:" << row_idx << ",offset:" << offsets[row_idx] << std::endl;
for (size_t col_idx = 0; col_idx < cols.size(); ++col_idx)
{
const auto & col = cols[col_idx];
const auto & field_offset = field_offsets[col_idx];
if (is_fixed_lengths[col_idx])
{
fixed_length_writers[col_idx]->unsafeWrite(col.column->getDataAt(row_idx), buffer_address + offsets[row_idx] + field_offset);
}
else
{
StringRef str = col.column->getDataAt(row_idx);
int64_t offset_and_size = variable_length_writers[col_idx]->writeUnalignedBytes(row_idx, str.data, str.size, 0);
memcpy(buffer_address + offsets[row_idx] + field_offset, &offset_and_size, 8);
}
}

/*
for (size_t col_idx = 0; col_idx < cols.size(); ++col_idx)
{
const auto & col = cols[col_idx];
const auto type_without_nullable{std::move(removeNullable(col.type))};
const int64_t field_offset = spark_row_info->getFieldOffset(col_idx);
if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable))
{
FixedLengthDataWriter writer(col.type);
if (col.column->isNullAt(row_idx))
bitSet(buffer_address + offsets[row_idx], col_idx);
else
writer.unsafeWrite(col.column->getDataAt(row_idx), buffer_address + offsets[row_idx] + field_offset);
}
else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable))
{
VariableLengthDataWriter writer(col.type, buffer_address, offsets, buffer_cursor);
const bool use_raw_data = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable);
if (col.column->isNullAt(row_idx))
bitSet(buffer_address + offsets[row_idx], col_idx);
else if (use_raw_data)
{
StringRef str = col.column->getDataAt(row_idx);
int64_t offset_and_size = writer.writeUnalignedBytes(row_idx, str.data, str.size, 0);
memcpy(buffer_address + offsets[row_idx] + field_offset, &offset_and_size, 8);
}
else
{
const auto field{std::move((*col.column)[row_idx])};
int64_t offset_and_size = writer.write(row_idx, field, 0);
memcpy(buffer_address + offsets[row_idx] + field_offset, &offset_and_size, 8);
}
}
else
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Doesn't support type {} for writeValue", col.type->getName());
}
*/
}

std::unique_ptr<SparkRowInfo> CHColumnToSparkRow::convertCHColumnToSparkRow(const Block & block)
{
if (!block.rows() || !block.columns())
return {};

std::unique_ptr<SparkRowInfo> spark_row_info = std::make_unique<SparkRowInfo>(block);
// spark_row_info->setBufferAddress(reinterpret_cast<char *>(alloc(spark_row_info->getTotalBytes(), 64)));
spark_row_info->setBufferAddress(alignedAlloc(spark_row_info->getTotalBytes(), 64));
// memset(spark_row_info->getBufferAddress(), 0, spark_row_info->getTotalBytes());
std::cerr << "total bytes:" << spark_row_info->getTotalBytes() << std::endl;
memset(spark_row_info->getBufferAddress(), 0, spark_row_info->getTotalBytes());

// std::cerr << "total bytes:" << spark_row_info->getTotalBytes() << std::endl;
// std::cerr << "offsets_size:" << spark_row_info->getOffsets().size() << std::endl;
const int64_t num_rows = spark_row_info->getNumRows();
const int64_t num_cols = spark_row_info->getNumCols();
char * buffer_address = spark_row_info->getBufferAddress();
const auto & cols = block.getColumnsWithTypeAndName();
const auto & offsets = spark_row_info->getOffsets();
auto & buffer_cursor = spark_row_info->getBufferCursor();
std::vector<int64_t> field_offsets(num_cols);
std::vector<bool> is_fixed_lengths(num_cols);
std::vector<std::shared_ptr<FixedLengthDataWriter>> fixed_length_writers(num_cols);
std::vector<std::shared_ptr<VariableLengthDataWriter>> variable_length_writers(num_cols);
for (size_t col_idx = 0; col_idx < static_cast<size_t>(spark_row_info->getNumCols()); ++col_idx)
{
field_offsets[col_idx] = spark_row_info->getFieldOffset(col_idx);
is_fixed_lengths[col_idx] = BackingDataLengthCalculator::isFixedLengthDataType(removeNullable(cols[col_idx].type));
if (is_fixed_lengths[col_idx])
fixed_length_writers[col_idx] = std::make_shared<FixedLengthDataWriter>(cols[col_idx].type);
else
variable_length_writers[col_idx]
= std::make_shared<VariableLengthDataWriter>(cols[col_idx].type, buffer_address, offsets, buffer_cursor);
}

int64_t row_idx = 0;
while (row_idx++ < num_rows)
{
std::cout << "row_idx:" << row_idx << ", num_rows:" << num_rows << std::endl;
writeRow(row_idx, buffer_address, cols, offsets, field_offsets, is_fixed_lengths, fixed_length_writers, variable_length_writers);
}

/*
for (auto col_idx = 0; col_idx < spark_row_info->getNumCols(); col_idx++)
{
const auto & col = block.getByPosition(col_idx);
Expand All @@ -371,6 +469,7 @@ std::unique_ptr<SparkRowInfo> CHColumnToSparkRow::convertCHColumnToSparkRow(cons
spark_row_info->getBufferCursor());
}
return spark_row_info;
*/
}

void CHColumnToSparkRow::freeMem(char * /*address*/, size_t size)
Expand Down
8 changes: 4 additions & 4 deletions utils/local-engine/Parser/CHColumnToSparkRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class VariableLengthDataWriter
virtual int64_t write(size_t row_idx, const DB::Field & field, int64_t parent_offset);

/// Only support String/FixedString/Decimal32/Decimal64
int64_t writeUnalignedBytes(size_t row_idx, const char * src, size_t size, int64_t parent_offset);
ALWAYS_INLINE int64_t writeUnalignedBytes(size_t row_idx, const char * src, size_t size, int64_t parent_offset);
private:
int64_t writeArray(size_t row_idx, const DB::Array & array, int64_t parent_offset);
int64_t writeMap(size_t row_idx, const DB::Map & map, int64_t parent_offset);
Expand All @@ -147,15 +147,15 @@ class FixedLengthDataWriter

/// Write value of fixed-length to values region of structure(struct or array)
/// It's caller's duty to make sure that struct fields or array elements are written in order
virtual void write(const DB::Field & field, char * buffer);
ALWAYS_INLINE virtual void write(const DB::Field & field, char * buffer);

/// Copy memory chunk of Fixed length typed CH Column directory to buffer for performance.
/// It is unsafe unless you know what you are doing.
virtual void unsafeWrite(const StringRef & str, char * buffer);
ALWAYS_INLINE virtual void unsafeWrite(const StringRef & str, char * buffer);

/// Copy memory chunk of in fixed length typed Field directory to buffer for performance.
/// It is unsafe unless you know what you are doing.
virtual void unsafeWrite(const char * __restrict src, char * __restrict buffer);
ALWAYS_INLINE virtual void unsafeWrite(const char * __restrict src, char * __restrict buffer);

private:
// const DB::DataTypePtr type;
Expand Down
23 changes: 22 additions & 1 deletion utils/local-engine/tests/benchmark_ch_column_to_spark_row.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ static void readParquetFile(const Block & header, const String & file, Block & b

static void BM_CHColumnToSparkRow_Lineitem(benchmark::State& state)
{
/*
const NameTypes name_types = {
{"l_orderkey", "Nullable(Int64)"},
{"l_partkey", "Nullable(Int64)"},
Expand All @@ -68,6 +69,26 @@ static void BM_CHColumnToSparkRow_Lineitem(benchmark::State& state)
{"l_shipmode", "Nullable(String)"},
{"l_comment", "Nullable(String)"},
};
*/
const NameTypes name_types = {
{"l_orderkey", "Int64"},
{"l_partkey", "Int64"},
{"l_suppkey", "Int64"},
{"l_linenumber", "Int64"},
{"l_quantity", "Float64"},
{"l_extendedprice", "Float64"},
{"l_discount", "Float64"},
{"l_tax", "Float64"},
{"l_returnflag", "String"},
{"l_linestatus", "String"},
{"l_shipdate", "Date32"},
{"l_commitdate", "Date32"},
{"l_receiptdate", "Date32"},
{"l_shipinstruct", "String"},
{"l_shipmode", "String"},
{"l_comment", "String"},
};


const Block header = std::move(getLineitemHeader(name_types));
const String file = "/data1/liyang/cppproject/gluten/jvm/src/test/resources/tpch-data/lineitem/"
Expand All @@ -83,4 +104,4 @@ static void BM_CHColumnToSparkRow_Lineitem(benchmark::State& state)
}
}

BENCHMARK(BM_CHColumnToSparkRow_Lineitem)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_CHColumnToSparkRow_Lineitem)->Unit(benchmark::kMillisecond)->Iterations(1);
10 changes: 8 additions & 2 deletions utils/local-engine/tests/gtest_ch_column_to_spark_row.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ static void assertReadConsistentWithWritten(std::unique_ptr<SparkRowInfo> & spar
reader.pointTo(spark_row_info->getBufferAddress(), spark_row_info->getTotalBytes());
for (size_t i=0; i<type_and_fields.size(); ++i)
{
const auto read_field{std::move(reader.getField(i))};
const auto & written_field = type_and_fields[i].field;
std::cout << "read_field:" << read_field.getType() << "," << toString(read_field) << std::endl;
std::cout << "written_field:" << written_field.getType() << "," << toString(written_field) << std::endl;
EXPECT_TRUE(reader.getField(i) == type_and_fields[i].field);
}
}
Expand Down Expand Up @@ -350,6 +354,7 @@ TEST(CHColumnToSparkRow, ArrayMapTypes)
array[0] = std::move(map);
return std::move(array);
}()},
/*
{std::make_shared<DataTypeMap>(std::make_shared<DataTypeInt32>(), array_type),
[]() -> Field
{
Expand All @@ -362,12 +367,13 @@ TEST(CHColumnToSparkRow, ArrayMapTypes)
map[0] = std::move(tuple);
return std::move(map);
}()},
*/
};

auto spark_row_info = mockSparkRowInfo(type_and_fields);
EXPECT_TRUE(
spark_row_info->getTotalBytes()
== 8 + 2 * 8 + BackingDataLengthCalculator(type_and_fields[0].type).calculate(type_and_fields[0].field)
+ BackingDataLengthCalculator(type_and_fields[1].type).calculate(type_and_fields[1].field));
== 8 + 1 * 8 + BackingDataLengthCalculator(type_and_fields[0].type).calculate(type_and_fields[0].field));
// + BackingDataLengthCalculator(type_and_fieds[1].type).calculate(type_and_fields[1].field));
assertReadConsistentWithWritten(spark_row_info, type_and_fields);
}