diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 1e779e2b56b..25f87563eee 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -472,7 +472,7 @@ const std::unordered_map scalar_func_map({ //{tipb::ScalarFuncSig::JsonReplaceSig, "cast"}, //{tipb::ScalarFuncSig::JsonRemoveSig, "cast"}, //{tipb::ScalarFuncSig::JsonMergeSig, "cast"}, - //{tipb::ScalarFuncSig::JsonObjectSig, "cast"}, + {tipb::ScalarFuncSig::JsonObjectSig, "json_object"}, {tipb::ScalarFuncSig::JsonArraySig, "json_array"}, {tipb::ScalarFuncSig::JsonValidJsonSig, "json_valid_json"}, {tipb::ScalarFuncSig::JsonValidOthersSig, "json_valid_others"}, diff --git a/dbms/src/Functions/FunctionsJson.cpp b/dbms/src/Functions/FunctionsJson.cpp index 15b12b3178c..fb1106e6256 100644 --- a/dbms/src/Functions/FunctionsJson.cpp +++ b/dbms/src/Functions/FunctionsJson.cpp @@ -24,6 +24,7 @@ void registerFunctionsJson(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); diff --git a/dbms/src/Functions/FunctionsJson.h b/dbms/src/Functions/FunctionsJson.h index 32db69dcb4e..f680e4be8b7 100644 --- a/dbms/src/Functions/FunctionsJson.h +++ b/dbms/src/Functions/FunctionsJson.h @@ -41,7 +41,9 @@ #include #include +#include #include +#include #include #include #include @@ -63,6 +65,8 @@ namespace DB namespace ErrorCodes { +extern const int ARGUMENT_OUT_OF_BOUND; +extern const int BAD_ARGUMENTS; extern const int ILLEGAL_COLUMN; extern const int UNKNOWN_TYPE; } // namespace ErrorCodes @@ -976,6 +980,208 @@ class FunctionJsonArray : public IFunction }; +class FunctionJsonObject : public IFunction +{ +public: + static constexpr auto name = "json_object"; + static FunctionPtr create(const Context &) { return std::make_shared(); } + + String getName() const override { return name; } + + size_t getNumberOfArguments() const override { return 0; } + + bool isVariadic() const override { return true; } + + bool useDefaultImplementationForNulls() const override { return false; } + bool useDefaultImplementationForConstants() const override { return true; } + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override + { + if (unlikely(arguments.size() % 2 != 0)) + { + throw Exception( + fmt::format("Incorrect parameter count in the call to native function '{}'", getName()), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + for (const auto arg_idx : ext::range(0, arguments.size())) + { + if (arg_idx % 2 == 0 && arguments[arg_idx]->onlyNull()) + throw Exception("JSON documents may not contain NULL member names.", ErrorCodes::BAD_ARGUMENTS); + + if (!arguments[arg_idx]->onlyNull()) + { + const auto * arg = removeNullable(arguments[arg_idx]).get(); + if (!arg->isStringOrFixedString()) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of argument {} of function {}", + arg->getName(), + arg_idx + 1, + getName()); + } + } + return std::make_shared(); + } + + void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override + { + if (arguments.empty()) + { + // clang-format off + const UInt8 empty_object_json_value[] = { + JsonBinary::TYPE_CODE_OBJECT, // object_type + 0x0, 0x0, 0x0, 0x0, // element_count + 0x8, 0x0, 0x0, 0x0}; // total_size + // clang-format on + auto empty_object_json = ColumnString::create(); + empty_object_json->insertData( + reinterpret_cast(empty_object_json_value), + sizeof(empty_object_json_value) / sizeof(UInt8)); + block.getByPosition(result).column = ColumnConst::create(std::move(empty_object_json), block.rows()); + return; + } + + auto nested_block = createBlockWithNestedColumns(block, arguments); + StringSources sources; + for (auto column_number : arguments) + { + sources.push_back( + block.getByPosition(column_number).column->onlyNull() + ? nullptr + : createDynamicStringSource(*nested_block.getByPosition(column_number).column)); + } + + auto rows = block.rows(); + auto col_to = ColumnString::create(); + auto & data_to = col_to->getChars(); + auto & offsets_to = col_to->getOffsets(); + offsets_to.resize(rows); + + std::vector nullmaps; + nullmaps.reserve(sources.size()); + bool is_input_nullable = false; + for (auto column_number : arguments) + { + const auto & col = block.getByPosition(column_number).column; + if (col->isColumnNullable()) + { + const auto & column_nullable = static_cast(*col); + nullmaps.push_back(&(column_nullable.getNullMapData())); + is_input_nullable = true; + } + else + { + nullmaps.push_back(nullptr); + } + } + + if (is_input_nullable) + doExecuteImpl(sources, rows, data_to, offsets_to, nullmaps); + else + doExecuteImpl(sources, rows, data_to, offsets_to, nullmaps); + + block.getByPosition(result).column = std::move(col_to); + } + +private: + template + static void doExecuteImpl( + StringSources & sources, + size_t rows, + ColumnString::Chars_t & data_to, + ColumnString::Offsets & offsets_to, + const std::vector & nullmaps) + { + struct JsonObjectEntry + { + StringRef key; + JsonBinary value; + size_t input_order; + }; + + const size_t pair_count = sources.size() / 2; + size_t reserve_size = rows * (1 + pair_count * 16); + for (const auto & source : sources) + reserve_size += source ? source->getSizeForReserve() : rows; + JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to, reserve_size); + + std::vector entries; + std::vector keys; + std::vector values; + entries.reserve(pair_count); + keys.reserve(pair_count); + values.reserve(pair_count); + + for (size_t i = 0; i < rows; ++i) + { + entries.clear(); + for (size_t col = 0; col < sources.size(); col += 2) + { + if constexpr (is_input_nullable) + { + const auto * key_nullmap = nullmaps[col]; + if (!sources[col] || (key_nullmap && (*key_nullmap)[i])) + throw Exception("JSON documents may not contain NULL member names.", ErrorCodes::BAD_ARGUMENTS); + } + + assert(sources[col]); + const auto & key_from = sources[col]->getWhole(); + if (unlikely(key_from.size > std::numeric_limits::max())) + throw Exception( + "TiDB/TiFlash does not yet support JSON objects with the key length >= 65536", + ErrorCodes::ARGUMENT_OUT_OF_BOUND); + StringRef key{key_from.data, key_from.size}; + + JsonBinary value(JsonBinary::TYPE_CODE_LITERAL, StringRef(&JsonBinary::LITERAL_NIL, 1)); + if constexpr (is_input_nullable) + { + const auto * value_nullmap = nullmaps[col + 1]; + if (sources[col + 1] && !(value_nullmap && (*value_nullmap)[i])) + { + const auto & data_from = sources[col + 1]->getWhole(); + value = JsonBinary(data_from.data[0], StringRef(&data_from.data[1], data_from.size - 1)); + } + } + else + { + assert(sources[col + 1]); + const auto & data_from = sources[col + 1]->getWhole(); + value = JsonBinary(data_from.data[0], StringRef(&data_from.data[1], data_from.size - 1)); + } + + entries.push_back({key, value, col >> 1}); + } + + std::sort(entries.begin(), entries.end(), [](const auto & lhs, const auto & rhs) { + return lhs.key == rhs.key ? lhs.input_order < rhs.input_order : lhs.key < rhs.key; + }); + + keys.clear(); + values.clear(); + for (size_t entry_idx = 0; entry_idx < entries.size();) + { + size_t last_idx = entry_idx; + while (last_idx + 1 < entries.size() && entries[last_idx + 1].key == entries[entry_idx].key) + ++last_idx; + + keys.push_back(entries[last_idx].key); + values.push_back(entries[last_idx].value); + entry_idx = last_idx + 1; + } + + JsonBinary::buildBinaryJsonObjectInBuffer(keys, values, write_buffer); + writeChar(0, write_buffer); + offsets_to[i] = write_buffer.count(); + + for (const auto & source : sources) + { + if (source) + source->next(); + } + } + } +}; + + class FunctionCastJsonAsJson : public IFunction { public: diff --git a/dbms/src/Functions/tests/gtest_json_object.cpp b/dbms/src/Functions/tests/gtest_json_object.cpp new file mode 100644 index 00000000000..2e56b109588 --- /dev/null +++ b/dbms/src/Functions/tests/gtest_json_object.cpp @@ -0,0 +1,131 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + +namespace DB::ErrorCodes +{ +extern const int ARGUMENT_OUT_OF_BOUND; +extern const int BAD_ARGUMENTS; +} // namespace DB::ErrorCodes + +namespace DB::tests +{ +class TestJsonObject : public DB::tests::FunctionTest +{ +public: + ColumnWithTypeAndName castStringToJson(const ColumnWithTypeAndName & column) + { + assert(removeNullable(column.type)->isString()); + ColumnsWithTypeAndName inputs{column}; + return executeFunction("cast_string_as_json", inputs, nullptr, true); + } + + ColumnWithTypeAndName executeFunctionWithCast( + const ColumnNumbers & argument_column_numbers, + const ColumnsWithTypeAndName & columns) + { + auto json_column = executeFunction("json_object", argument_column_numbers, columns); + tipb::FieldType field_type; + field_type.set_flen(-1); + field_type.set_collate(TiDB::ITiDBCollator::BINARY); + field_type.set_tp(TiDB::TypeString); + return executeCastJsonAsStringFunction(json_column, field_type); + } +}; + +template +void assertThrowsCode(Fn && fn, int expected_code) +{ + try + { + fn(); + FAIL() << "Expected DB::Exception to be thrown"; + } + catch (const Exception & e) + { + ASSERT_EQ(expected_code, e.code()) << e.displayText(); + } +} + +TEST_F(TestJsonObject, TestBasicSemantics) +try +{ + constexpr size_t rows_count = 2; + + { + ColumnsWithTypeAndName inputs{createColumn({"placeholder", "placeholder"})}; + auto res = executeFunctionWithCast({}, inputs); + ASSERT_COLUMN_EQ(createConstColumn>(rows_count, "{}"), res); + } + + { + ColumnsWithTypeAndName inputs{ + createColumn({"b", "b"}), + castStringToJson(createColumn({"1", "1"})), + createColumn({"a", "a"}), + castStringToJson(createColumn>({{}, "\"x\""})), + }; + auto res = executeFunctionWithCast({0, 1, 2, 3}, inputs); + auto expect = createColumn>({R"({"a": null, "b": 1})", R"({"a": "x", "b": 1})"}); + ASSERT_COLUMN_EQ(expect, res); + } + + { + ColumnsWithTypeAndName inputs{ + createConstColumn(rows_count, "dup"), + castStringToJson(createConstColumn(rows_count, "1")), + createConstColumn(rows_count, "dup"), + castStringToJson(createColumn({"2", "3"})), + }; + auto res = executeFunctionWithCast({0, 1, 2, 3}, inputs); + auto expect = createColumn>({R"({"dup": 2})", R"({"dup": 3})"}); + ASSERT_COLUMN_EQ(expect, res); + } +} +CATCH + +TEST_F(TestJsonObject, TestErrors) +try +{ + ASSERT_THROW(executeFunction("json_object", {createColumn({"a"})}), Exception); + + auto value = castStringToJson(createColumn({"1"})); + assertThrowsCode( + [&] { + executeFunction("json_object", {createOnlyNullColumn(1), value}); + }, + ErrorCodes::BAD_ARGUMENTS); + + assertThrowsCode( + [&] { + executeFunction("json_object", {createColumn>({{}}), value}); + }, + ErrorCodes::BAD_ARGUMENTS); + + String too_long_key(std::numeric_limits::max() + 1, 'a'); + assertThrowsCode( + [&] { + executeFunction("json_object", {createColumn({too_long_key}), value}); + }, + ErrorCodes::ARGUMENT_OUT_OF_BOUND); +} +CATCH + +} // namespace DB::tests diff --git a/dbms/src/TiDB/Decode/JsonBinary.cpp b/dbms/src/TiDB/Decode/JsonBinary.cpp index 355aa487b66..5a8ded089f3 100644 --- a/dbms/src/TiDB/Decode/JsonBinary.cpp +++ b/dbms/src/TiDB/Decode/JsonBinary.cpp @@ -34,6 +34,7 @@ namespace DB { namespace ErrorCodes { +extern const int ARGUMENT_OUT_OF_BOUND; extern const int LOGICAL_ERROR; extern const int UNKNOWN_TYPE; } // namespace ErrorCodes @@ -264,7 +265,9 @@ inline UInt64 appendValueOfSIMDJsonElem( encodeNumeric(write_buffer, data_offset); data_offset += key.size(); if (unlikely(key.size() > std::numeric_limits::max())) - throw Exception("TiDB/TiFlash does not yet support JSON objects with the key length >= 65536"); + throw Exception( + "TiDB/TiFlash does not yet support JSON objects with the key length >= 65536", + ErrorCodes::ARGUMENT_OUT_OF_BOUND); UInt16 key_len = key.size(); encodeNumeric(write_buffer, key_len); } @@ -1098,6 +1101,78 @@ void JsonBinary::buildBinaryJsonArrayInBuffer( buildBinaryJsonElementsInBuffer(json_binary_vec, write_buffer); } +void JsonBinary::buildBinaryJsonObjectInBuffer( + const std::vector & keys, + const std::vector & values, + JsonBinaryWriteBuffer & write_buffer) +{ + RUNTIME_CHECK(keys.size() == values.size()); + + write_buffer.write(TYPE_CODE_OBJECT); + + UInt32 buffer_start_pos = write_buffer.offset(); + + UInt32 element_count = keys.size(); + encodeNumeric(write_buffer, element_count); + + auto total_size_pos = write_buffer.offset(); + write_buffer.advance(4); + + UInt32 data_offset_start = HEADER_SIZE + element_count * (KEY_ENTRY_SIZE + VALUE_ENTRY_SIZE); + UInt32 data_offset = data_offset_start; + for (const auto & key : keys) + { + encodeNumeric(write_buffer, data_offset); + if (unlikely(key.size > std::numeric_limits::max())) + throw Exception( + "TiDB/TiFlash does not yet support JSON objects with the key length >= 65536", + ErrorCodes::ARGUMENT_OUT_OF_BOUND); + UInt16 key_len = key.size; + encodeNumeric(write_buffer, key_len); + data_offset += key.size; + } + + UInt32 value_entry_start_pos = write_buffer.offset(); + + write_buffer.setOffset(buffer_start_pos + data_offset_start); + for (const auto & key : keys) + write_buffer.write(key.data, key.size); + + write_buffer.setOffset(value_entry_start_pos); + UInt64 max_child_depth = 0; + for (const auto & value : values) + { + write_buffer.write(value.type); + if (value.type == TYPE_CODE_LITERAL) + { + write_buffer.write(value.data.data[0]); + write_buffer.write(0); + write_buffer.write(0); + write_buffer.write(0); + } + else + { + encodeNumeric(write_buffer, data_offset); + auto tmp_entry_pos = write_buffer.offset(); + + write_buffer.setOffset(buffer_start_pos + data_offset); + write_buffer.write(value.data.data, value.data.size); + data_offset = write_buffer.offset() - buffer_start_pos; + + write_buffer.setOffset(tmp_entry_pos); + } + max_child_depth = std::max(max_child_depth, value.getDepth()); + } + + UInt64 depth = max_child_depth + 1; + JsonBinary::assertJsonDepth(depth); + + UInt32 total_size = data_offset; + write_buffer.setOffset(total_size_pos); + encodeNumeric(write_buffer, total_size); + write_buffer.setOffset(buffer_start_pos + data_offset); +} + void JsonBinary::buildKeyArrayInBuffer(const std::vector & keys, JsonBinaryWriteBuffer & write_buffer) { write_buffer.write(TYPE_CODE_ARRAY); diff --git a/dbms/src/TiDB/Decode/JsonBinary.h b/dbms/src/TiDB/Decode/JsonBinary.h index c65a68723ec..d13ca157db6 100644 --- a/dbms/src/TiDB/Decode/JsonBinary.h +++ b/dbms/src/TiDB/Decode/JsonBinary.h @@ -173,6 +173,10 @@ class JsonBinary static void buildBinaryJsonArrayInBuffer( const std::vector & json_binary_vec, JsonBinaryWriteBuffer & write_buffer); + static void buildBinaryJsonObjectInBuffer( + const std::vector & keys, + const std::vector & values, + JsonBinaryWriteBuffer & write_buffer); static void buildKeyArrayInBuffer(const std::vector & keys, JsonBinaryWriteBuffer & write_buffer); static void appendNumber(JsonBinaryWriteBuffer & write_buffer, bool value); @@ -327,4 +331,4 @@ void JsonBinary::unquoteJsonStringInBuffer(const StringRef & ref, WriteBuffer & } } } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/tests/fullstack-test/expr/json_object.test b/tests/fullstack-test/expr/json_object.test new file mode 100644 index 00000000000..a7c1092b0be --- /dev/null +++ b/tests/fullstack-test/expr/json_object.test @@ -0,0 +1,74 @@ +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t_json_object; +mysql> create table test.t_json_object(id int, k1 varchar(20), k2 varchar(20), v_int int, v_str varchar(20), v_json json); +mysql> alter table test.t_json_object set tiflash replica 1; +mysql> insert into test.t_json_object values + (1, 'b', 'a', 1, 'x', '{\"nested\":1}'), + (2, 'dup', 'dup', 2, 'last', '[1,2]'), + (3, 'c', 'b', null, null, '[]'), + (4, null, 'a', 4, 'boom', '{}'); #NO_UNESCAPE + +func> wait_table test t_json_object + +# TODO: re-enable this explain check after the TiDB-side explain change is merged. +# mysql> set tidb_allow_mpp=1; set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; desc format='brief' select id, json_object(k1, v_int, k2, v_str) as res from test.t_json_object where id in (1, 2) order by id; +# {#REGEXP}id\s+estRows\s+task\s+access object\s+operator info +# {#REGEXP}Sort\s+\d+\.\d+\s+root\s+.*test\.t_json_object\.id +# {#REGEXP}└─TableReader\s+\d+\.\d+\s+root\s+.*MppVersion: 2, data:ExchangeSender +# {#REGEXP}.*ExchangeSender\s+\d+\.\d+\s+mpp\[tiflash\]\s+.*ExchangeType: PassThrough +# {#REGEXP}.*Projection\s+\d+\.\d+\s+mpp\[tiflash\]\s+.*json_object\(test\.t_json_object\.k1, cast\(test\.t_json_object\.v_int, json BINARY\), test\.t_json_object\.k2, cast\(test\.t_json_object\.v_str, json BINARY\)\)->Column#\d+ +# {#REGEXP}.*TableFullScan\s+\d+\.\d+\s+mpp\[tiflash\]\s+table:t_json_object\s+pushed down filter:in\(test\.t_json_object\.id, 1, 2\), keep order:false, stats:pseudo + +# empty object +mysql> set tidb_allow_mpp=1; set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select json_object() from test.t_json_object where id = 1; ++---------------+ +| json_object() | ++---------------+ +| {} | ++---------------+ + +# mixed value types and key sorting +mysql> set tidb_allow_mpp=1; set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select id, json_object('b', v_int, 'a', v_str, 'c', v_json) as res from test.t_json_object where id in (1, 3) order by id; ++----+--------------------------------------+ +| id | res | ++----+--------------------------------------+ +| 1 | {"a": "x", "b": 1, "c": {"nested": 1}} | +| 3 | {"a": null, "b": null, "c": []} | ++----+--------------------------------------+ + +# dynamic key columns and duplicate keys +mysql> set tidb_allow_mpp=1; set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select id, json_object(k1, v_int, k2, v_str) as res from test.t_json_object where id in (1, 2) order by id; ++----+--------------------+ +| id | res | ++----+--------------------+ +| 1 | {"a": "x", "b": 1} | +| 2 | {"dup": "last"} | ++----+--------------------+ + +# SQL NULL value becomes JSON null +mysql> set tidb_allow_mpp=1; set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select json_object('obj', v_json, 'nil', null) as res from test.t_json_object where id = 1; ++-----------------------------------+ +| res | ++-----------------------------------+ +| {"nil": null, "obj": {"nested": 1}} | ++-----------------------------------+ + +# NULL key should fail at execution time +mysql> set tidb_allow_mpp=1; set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select json_object(k1, v_int, k2, v_str) from test.t_json_object where id = 4; +{#REGEXP}.*NULL member names.* + +# Clean up. +mysql> drop table if exists test.t_json_object;