diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.cpp b/utils/local-engine/Parser/CHColumnToSparkRow.cpp index d1c7ab15ec04..38123c049cef 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/utils/local-engine/Parser/CHColumnToSparkRow.cpp @@ -182,35 +182,6 @@ static void writeVariableLengthNullableValue( } -static void writeValue( - char * buffer_address, - int64_t field_offset, - const ColumnWithTypeAndName & col, - int32_t col_index, - int64_t num_rows, - const std::vector & offsets, - std::vector & buffer_cursor) -{ - const auto type_without_nullable{std::move(removeNullable(col.type))}; - const auto is_nullable = isColumnNullable(*col.column); - if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable)) - { - if (is_nullable) - writeFixedLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets); - else - writeFixedLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets); - } - else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable)) - { - if (is_nullable) - writeVariableLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets, buffer_cursor); - else - writeVariableLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets, buffer_cursor); - } - else - throw Exception(ErrorCodes::UNKNOWN_TYPE, "Doesn't support type {} for writeValue", col.type->getName()); -} - SparkRowInfo::SparkRowInfo(const Block & block) : types(std::move(block.getDataTypes())) , num_rows(block.rows()) @@ -347,16 +318,143 @@ int64_t SparkRowInfo::getTotalBytes() const return total_bytes; } + +static void writeValue( + char * buffer_address, + int64_t field_offset, + const ColumnWithTypeAndName & col, + int32_t col_index, + int64_t num_rows, + const std::vector & offsets, + std::vector & buffer_cursor) +{ + const auto type_without_nullable{std::move(removeNullable(col.type))}; + const auto is_nullable = isColumnNullable(*col.column); + if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable)) + { + if (is_nullable) + writeFixedLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets); + else + writeFixedLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets); + } + else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable)) + { + if (is_nullable) + writeVariableLengthNullableValue(buffer_address, field_offset, col, col_index, num_rows, offsets, buffer_cursor); + else + writeVariableLengthNonNullableValue(buffer_address, field_offset, col, num_rows, offsets, buffer_cursor); + } + else + throw Exception(ErrorCodes::UNKNOWN_TYPE, "Doesn't support type {} for writeValue", col.type->getName()); +} + + +ALWAYS_INLINE static void writeRow( + size_t row_idx, + char * buffer_address, + const ColumnsWithTypeAndName & cols, + const std::vector & offsets, + const std::vector & field_offsets, + const std::vector & is_fixed_lengths, + std::vector> & fixed_length_writers, + std::vector> & variable_length_writers) +{ + // std::cerr << "row_idx:" << row_idx << ",offset:" << offsets[row_idx] << std::endl; + for (size_t col_idx = 0; col_idx < cols.size(); ++col_idx) + { + const auto & col = cols[col_idx]; + const auto & field_offset = field_offsets[col_idx]; + if (is_fixed_lengths[col_idx]) + { + fixed_length_writers[col_idx]->unsafeWrite(col.column->getDataAt(row_idx), buffer_address + offsets[row_idx] + field_offset); + } + else + { + StringRef str = col.column->getDataAt(row_idx); + int64_t offset_and_size = variable_length_writers[col_idx]->writeUnalignedBytes(row_idx, str.data, str.size, 0); + memcpy(buffer_address + offsets[row_idx] + field_offset, &offset_and_size, 8); + } + } + + /* + for (size_t col_idx = 0; col_idx < cols.size(); ++col_idx) + { + const auto & col = cols[col_idx]; + const auto type_without_nullable{std::move(removeNullable(col.type))}; + const int64_t field_offset = spark_row_info->getFieldOffset(col_idx); + if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable)) + { + FixedLengthDataWriter writer(col.type); + if (col.column->isNullAt(row_idx)) + bitSet(buffer_address + offsets[row_idx], col_idx); + else + writer.unsafeWrite(col.column->getDataAt(row_idx), buffer_address + offsets[row_idx] + field_offset); + } + else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable)) + { + VariableLengthDataWriter writer(col.type, buffer_address, offsets, buffer_cursor); + const bool use_raw_data = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable); + if (col.column->isNullAt(row_idx)) + bitSet(buffer_address + offsets[row_idx], col_idx); + else if (use_raw_data) + { + StringRef str = col.column->getDataAt(row_idx); + int64_t offset_and_size = writer.writeUnalignedBytes(row_idx, str.data, str.size, 0); + memcpy(buffer_address + offsets[row_idx] + field_offset, &offset_and_size, 8); + } + else + { + const auto field{std::move((*col.column)[row_idx])}; + int64_t offset_and_size = writer.write(row_idx, field, 0); + memcpy(buffer_address + offsets[row_idx] + field_offset, &offset_and_size, 8); + } + } + else + throw Exception(ErrorCodes::UNKNOWN_TYPE, "Doesn't support type {} for writeValue", col.type->getName()); + } + */ +} + std::unique_ptr CHColumnToSparkRow::convertCHColumnToSparkRow(const Block & block) { if (!block.rows() || !block.columns()) return {}; std::unique_ptr spark_row_info = std::make_unique(block); - // spark_row_info->setBufferAddress(reinterpret_cast(alloc(spark_row_info->getTotalBytes(), 64))); spark_row_info->setBufferAddress(alignedAlloc(spark_row_info->getTotalBytes(), 64)); - // memset(spark_row_info->getBufferAddress(), 0, spark_row_info->getTotalBytes()); - std::cerr << "total bytes:" << spark_row_info->getTotalBytes() << std::endl; + memset(spark_row_info->getBufferAddress(), 0, spark_row_info->getTotalBytes()); + + // std::cerr << "total bytes:" << spark_row_info->getTotalBytes() << std::endl; + // std::cerr << "offsets_size:" << spark_row_info->getOffsets().size() << std::endl; + const int64_t num_rows = spark_row_info->getNumRows(); + const int64_t num_cols = spark_row_info->getNumCols(); + char * buffer_address = spark_row_info->getBufferAddress(); + const auto & cols = block.getColumnsWithTypeAndName(); + const auto & offsets = spark_row_info->getOffsets(); + auto & buffer_cursor = spark_row_info->getBufferCursor(); + std::vector field_offsets(num_cols); + std::vector is_fixed_lengths(num_cols); + std::vector> fixed_length_writers(num_cols); + std::vector> variable_length_writers(num_cols); + for (size_t col_idx = 0; col_idx < static_cast(spark_row_info->getNumCols()); ++col_idx) + { + field_offsets[col_idx] = spark_row_info->getFieldOffset(col_idx); + is_fixed_lengths[col_idx] = BackingDataLengthCalculator::isFixedLengthDataType(removeNullable(cols[col_idx].type)); + if (is_fixed_lengths[col_idx]) + fixed_length_writers[col_idx] = std::make_shared(cols[col_idx].type); + else + variable_length_writers[col_idx] + = std::make_shared(cols[col_idx].type, buffer_address, offsets, buffer_cursor); + } + + int64_t row_idx = 0; + while (row_idx++ < num_rows) + { + std::cout << "row_idx:" << row_idx << ", num_rows:" << num_rows << std::endl; + writeRow(row_idx, buffer_address, cols, offsets, field_offsets, is_fixed_lengths, fixed_length_writers, variable_length_writers); + } + + /* for (auto col_idx = 0; col_idx < spark_row_info->getNumCols(); col_idx++) { const auto & col = block.getByPosition(col_idx); @@ -371,6 +469,7 @@ std::unique_ptr CHColumnToSparkRow::convertCHColumnToSparkRow(cons spark_row_info->getBufferCursor()); } return spark_row_info; + */ } void CHColumnToSparkRow::freeMem(char * /*address*/, size_t size) diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.h b/utils/local-engine/Parser/CHColumnToSparkRow.h index c07375702aaf..7c3a2d7da332 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.h +++ b/utils/local-engine/Parser/CHColumnToSparkRow.h @@ -121,7 +121,7 @@ class VariableLengthDataWriter virtual int64_t write(size_t row_idx, const DB::Field & field, int64_t parent_offset); /// Only support String/FixedString/Decimal32/Decimal64 - int64_t writeUnalignedBytes(size_t row_idx, const char * src, size_t size, int64_t parent_offset); + ALWAYS_INLINE int64_t writeUnalignedBytes(size_t row_idx, const char * src, size_t size, int64_t parent_offset); private: int64_t writeArray(size_t row_idx, const DB::Array & array, int64_t parent_offset); int64_t writeMap(size_t row_idx, const DB::Map & map, int64_t parent_offset); @@ -147,15 +147,15 @@ class FixedLengthDataWriter /// Write value of fixed-length to values region of structure(struct or array) /// It's caller's duty to make sure that struct fields or array elements are written in order - virtual void write(const DB::Field & field, char * buffer); + ALWAYS_INLINE virtual void write(const DB::Field & field, char * buffer); /// Copy memory chunk of Fixed length typed CH Column directory to buffer for performance. /// It is unsafe unless you know what you are doing. - virtual void unsafeWrite(const StringRef & str, char * buffer); + ALWAYS_INLINE virtual void unsafeWrite(const StringRef & str, char * buffer); /// Copy memory chunk of in fixed length typed Field directory to buffer for performance. /// It is unsafe unless you know what you are doing. - virtual void unsafeWrite(const char * __restrict src, char * __restrict buffer); + ALWAYS_INLINE virtual void unsafeWrite(const char * __restrict src, char * __restrict buffer); private: // const DB::DataTypePtr type; diff --git a/utils/local-engine/tests/benchmark_ch_column_to_spark_row.cpp b/utils/local-engine/tests/benchmark_ch_column_to_spark_row.cpp index b9fca1884903..f430333edb3b 100644 --- a/utils/local-engine/tests/benchmark_ch_column_to_spark_row.cpp +++ b/utils/local-engine/tests/benchmark_ch_column_to_spark_row.cpp @@ -50,6 +50,7 @@ static void readParquetFile(const Block & header, const String & file, Block & b static void BM_CHColumnToSparkRow_Lineitem(benchmark::State& state) { + /* const NameTypes name_types = { {"l_orderkey", "Nullable(Int64)"}, {"l_partkey", "Nullable(Int64)"}, @@ -68,6 +69,26 @@ static void BM_CHColumnToSparkRow_Lineitem(benchmark::State& state) {"l_shipmode", "Nullable(String)"}, {"l_comment", "Nullable(String)"}, }; + */ + const NameTypes name_types = { + {"l_orderkey", "Int64"}, + {"l_partkey", "Int64"}, + {"l_suppkey", "Int64"}, + {"l_linenumber", "Int64"}, + {"l_quantity", "Float64"}, + {"l_extendedprice", "Float64"}, + {"l_discount", "Float64"}, + {"l_tax", "Float64"}, + {"l_returnflag", "String"}, + {"l_linestatus", "String"}, + {"l_shipdate", "Date32"}, + {"l_commitdate", "Date32"}, + {"l_receiptdate", "Date32"}, + {"l_shipinstruct", "String"}, + {"l_shipmode", "String"}, + {"l_comment", "String"}, + }; + const Block header = std::move(getLineitemHeader(name_types)); const String file = "/data1/liyang/cppproject/gluten/jvm/src/test/resources/tpch-data/lineitem/" @@ -83,4 +104,4 @@ static void BM_CHColumnToSparkRow_Lineitem(benchmark::State& state) } } -BENCHMARK(BM_CHColumnToSparkRow_Lineitem)->Unit(benchmark::kMillisecond)->Iterations(10); +BENCHMARK(BM_CHColumnToSparkRow_Lineitem)->Unit(benchmark::kMillisecond)->Iterations(1); diff --git a/utils/local-engine/tests/gtest_ch_column_to_spark_row.cpp b/utils/local-engine/tests/gtest_ch_column_to_spark_row.cpp index e53b792ac62f..89140b291603 100644 --- a/utils/local-engine/tests/gtest_ch_column_to_spark_row.cpp +++ b/utils/local-engine/tests/gtest_ch_column_to_spark_row.cpp @@ -69,6 +69,10 @@ static void assertReadConsistentWithWritten(std::unique_ptr & spar reader.pointTo(spark_row_info->getBufferAddress(), spark_row_info->getTotalBytes()); for (size_t i=0; i(std::make_shared(), array_type), []() -> Field { @@ -362,12 +367,13 @@ TEST(CHColumnToSparkRow, ArrayMapTypes) map[0] = std::move(tuple); return std::move(map); }()}, + */ }; auto spark_row_info = mockSparkRowInfo(type_and_fields); EXPECT_TRUE( spark_row_info->getTotalBytes() - == 8 + 2 * 8 + BackingDataLengthCalculator(type_and_fields[0].type).calculate(type_and_fields[0].field) - + BackingDataLengthCalculator(type_and_fields[1].type).calculate(type_and_fields[1].field)); + == 8 + 1 * 8 + BackingDataLengthCalculator(type_and_fields[0].type).calculate(type_and_fields[0].field)); + // + BackingDataLengthCalculator(type_and_fieds[1].type).calculate(type_and_fields[1].field)); assertReadConsistentWithWritten(spark_row_info, type_and_fields); }