Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#if USE_ARROW || USE_PARQUET

#include <common/DateLUTImpl.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
Expand Down Expand Up @@ -394,6 +396,43 @@ namespace NDB
}
}

static void fillArrowArrayWithDateTime64ColumnData(
const DataTypePtr & type,
ColumnPtr write_column,
const PaddedPODArray<UInt8> * null_bytemap,
const String & format_name,
arrow::ArrayBuilder* array_builder,
size_t start,
size_t end)
{
const auto * datetime64_type = assert_cast<const DataTypeDateTime64 *>(type.get());
const auto & column = assert_cast<const ColumnDecimal<DateTime64> &>(*write_column);
arrow::TimestampBuilder & builder = assert_cast<arrow::TimestampBuilder &>(*array_builder);
arrow::Status status;

auto scale = datetime64_type->getScale();
bool need_rescale = scale % 3;
auto rescale_multiplier = DecimalUtils::scaleMultiplier<DateTime64::NativeType>(3 - scale % 3);
for (size_t value_i = start; value_i < end; ++value_i)
{
if (null_bytemap && (*null_bytemap)[value_i])
{
status = builder.AppendNull();
}
else
{
auto value = static_cast<Int64>(column[value_i].get<DecimalField<DateTime64>>().getValue());
if (need_rescale)
{
if (common::mulOverflow(value, rescale_multiplier, value))
throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "Decimal math overflow");
}
status = builder.Append(value);
}
checkStatus(status, write_column->getName(), format_name);
}
}

static void fillArrowArray(
const String & column_name,
ColumnPtr & column,
Expand Down Expand Up @@ -454,6 +493,10 @@ namespace NDB
DataTypePtr array_type = assert_cast<const DataTypeMap *>(column_type.get())->getNestedType();
fillArrowArrayWithArrayColumnData<arrow::MapBuilder>(column_name, column_array, array_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if (isDateTime64(column_type))
{
fillArrowArrayWithDateTime64ColumnData(column_type, column, null_bytemap, format_name, array_builder, start, end);
}
else if (isDecimal(column_type))
{
auto fill_decimal = [&](const auto & types) -> bool
Expand Down Expand Up @@ -548,6 +591,18 @@ namespace NDB
}
}

static arrow::TimeUnit::type getArrowTimeUnit(const DataTypeDateTime64 * type)
{
UInt32 scale = type->getScale();
if (scale == 0)
return arrow::TimeUnit::SECOND;
if (scale > 0 && scale <= 3)
return arrow::TimeUnit::MILLI;
if (scale > 3 && scale <= 6)
return arrow::TimeUnit::MICRO;
return arrow::TimeUnit::NANO;
}

static std::shared_ptr<arrow::DataType> getArrowType(
DataTypePtr column_type, ColumnPtr column, const std::string & column_name, const std::string & format_name, bool * out_is_column_nullable)
{
Expand Down Expand Up @@ -630,6 +685,12 @@ namespace NDB
getArrowType(val_type, columns[1], column_name, format_name, out_is_column_nullable));
}

if (isDateTime64(column_type))
{
const auto * datetime64_type = assert_cast<const DataTypeDateTime64 *>(column_type.get());
return arrow::timestamp(getArrowTimeUnit(datetime64_type), datetime64_type->getTimeZone().getTimeZone());
}

const std::string type_name = column_type->getFamilyName();
if (const auto * arrow_type_it = std::find_if(
internal_type_to_arrow_type.begin(),
Expand Down
36 changes: 36 additions & 0 deletions ydb/tests/fq/s3/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v1-common/simple_format/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v1-common_simple_format_test.json-json_each_row_/timestamp_format_common_simple_format_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v1-common/simple_format/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v1-common_simple_format_test.parquet-parquet_/timestamp_format_common_simple_format_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v1-common/simple_format/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v1-common_simple_format_test.tsv-tsv_with_names_/timestamp_format_common_simple_format_test.tsv"
},
Expand All @@ -86,6 +89,9 @@
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v2-common/simple_format/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v2-common_simple_format_test.json-json_each_row_/timestamp_format_common_simple_format_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v2-common/simple_format/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v2-common_simple_format_test.parquet-parquet_/timestamp_format_common_simple_format_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v2-common/simple_format/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v2-common_simple_format_test.tsv-tsv_with_names_/timestamp_format_common_simple_format_test.tsv"
},
Expand All @@ -95,6 +101,9 @@
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v1-timestamp/simple_iso/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v1-timestamp_simple_iso_test.json-json_each_row_/timestamp_simple_iso_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v1-timestamp/simple_iso/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v1-timestamp_simple_iso_test.parquet-parquet_/timestamp_simple_iso_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v1-timestamp/simple_iso/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v1-timestamp_simple_iso_test.tsv-tsv_with_names_/timestamp_simple_iso_test.tsv"
},
Expand All @@ -104,6 +113,9 @@
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v2-timestamp/simple_iso/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v2-timestamp_simple_iso_test.json-json_each_row_/timestamp_simple_iso_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v2-timestamp/simple_iso/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v2-timestamp_simple_iso_test.parquet-parquet_/timestamp_simple_iso_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v2-timestamp/simple_iso/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v2-timestamp_simple_iso_test.tsv-tsv_with_names_/timestamp_simple_iso_test.tsv"
},
Expand All @@ -113,6 +125,9 @@
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v1-common/simple_posix/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v1-common_simple_posix_test.json-json_each_row_/common_simple_posix_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v1-common/simple_posix/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v1-common_simple_posix_test.parquet-parquet_/common_simple_posix_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v1-common/simple_posix/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v1-common_simple_posix_test.tsv-tsv_with_names_/common_simple_posix_test.tsv"
},
Expand All @@ -122,6 +137,9 @@
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v2-common/simple_posix/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v2-common_simple_posix_test.json-json_each_row_/common_simple_posix_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v2-common/simple_posix/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v2-common_simple_posix_test.parquet-parquet_/common_simple_posix_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v2-common/simple_posix/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v2-common_simple_posix_test.tsv-tsv_with_names_/common_simple_posix_test.tsv"
},
Expand All @@ -143,6 +161,15 @@
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.json-json_each_row-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.json-json_each_row-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.json"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MILLISECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MILLISECONDS_/UNIX_TIME_MILLISECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.tsv"
},
Expand Down Expand Up @@ -170,6 +197,15 @@
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.json-json_each_row-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.json-json_each_row-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.json"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MILLISECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MILLISECONDS_/UNIX_TIME_MILLISECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.tsv"
},
Expand Down

This file was deleted.

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
15 changes: 10 additions & 5 deletions ydb/tests/fq/s3/test_format_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ def test_timestamp_simple_iso(self, kikimr, s3, client, filename, type_format):
@pytest.mark.parametrize("filename, type_format", [
("timestamp/simple_iso/test.csv", "csv_with_names"),
("timestamp/simple_iso/test.tsv", "tsv_with_names"),
("timestamp/simple_iso/test.json", "json_each_row")
("timestamp/simple_iso/test.json", "json_each_row"),
("timestamp/simple_iso/test.parquet", "parquet")
])
def test_timestamp_simple_iso_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -383,7 +384,8 @@ def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format)
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row")
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -432,7 +434,8 @@ def test_date_time_simple_iso(self, kikimr, s3, client, filename, type_format):
@pytest.mark.parametrize("filename, type_format", [
("date_time/simple_iso/test.csv", "csv_with_names"),
("date_time/simple_iso/test.tsv", "tsv_with_names"),
("date_time/simple_iso/test.json", "json_each_row")
("date_time/simple_iso/test.json", "json_each_row"),
("date_time/simple_iso/test.parquet", "parquet")
])
def test_date_time_simple_iso_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -507,7 +510,8 @@ def test_date_time_simple_posix_insert(self, kikimr, s3, client, filename, type_
@pytest.mark.parametrize("filename, type_format", [
("timestamp/unix_time/test.csv", "csv_with_names"),
("timestamp/unix_time/test.tsv", "tsv_with_names"),
("timestamp/unix_time/test.json", "json_each_row")
("timestamp/unix_time/test.json", "json_each_row"),
("timestamp/unix_time/test.parquet", "parquet")
])
def test_timestamp_unix_time_insert(self, kikimr, s3, client, filename, type_format, timestamp_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -531,7 +535,8 @@ def test_timestamp_unix_time_insert(self, kikimr, s3, client, filename, type_for
@pytest.mark.parametrize("filename, type_format", [
("common/simple_format/test.csv", "csv_with_names"),
("common/simple_format/test.tsv", "tsv_with_names"),
("common/simple_format/test.json", "json_each_row")
("common/simple_format/test.json", "json_each_row"),
("common/simple_format/test.parquet", "parquet")
])
def test_timestamp_simple_format_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down