Skip to content

Commit 7ed98e4

Browse files
authored
Pass pg extensions to dq TaskCommandExecutor (#7548)
1 parent 0a122bc commit 7ed98e4

7 files changed

Lines changed: 79 additions & 1 deletion

File tree

ydb/library/yql/minikql/mkql_node.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,8 @@ static const THashSet<TStringBuf> PG_SUPPORTED_PRESORT = {
658658
};
659659

660660
TPgType::TPgType(ui32 typeId, const TTypeEnvironment& env)
661-
: TType(EKind::Pg, env.GetTypeOfTypeLazy(), PG_SUPPORTED_PRESORT.contains(NYql::NPg::LookupType(typeId).Name))
661+
: TType(EKind::Pg, env.GetTypeOfTypeLazy(),
662+
NYql::NPg::HasType(typeId) && PG_SUPPORTED_PRESORT.contains(NYql::NPg::LookupType(typeId).Name))
662663
, TypeId(typeId)
663664
{
664665
}

ydb/library/yql/parser/pg_wrapper/comp_factory.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3514,6 +3514,10 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId)
35143514
template <typename F>
35153515
void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId, bool needCanonizeFp, F f) {
35163516
YQL_ENSURE(value); // null could not be represented as binary
3517+
if (!NPg::HasType(pgTypeId)) {
3518+
f(TStringBuf(value.AsStringRef()));
3519+
return;
3520+
}
35173521

35183522
const bool oldNeedCanonizeFp = NeedCanonizeFp;
35193523
NeedCanonizeFp = needCanonizeFp;
@@ -3748,6 +3752,10 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput
37483752
}
37493753

37503754
NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgTypeId) {
3755+
if (!NPg::HasType(pgTypeId)) {
3756+
return MakeString(binary);
3757+
}
3758+
37513759
TPAllocScope call;
37523760
StringInfoData stringInfo;
37533761
stringInfo.data = (char*)binary.Data();
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#include <ydb/library/yql/minikql/mkql_alloc.h>
2+
#include <ydb/library/yql/minikql/mkql_node.h>
3+
#include <ydb/library/yql/minikql/mkql_string_util.h>
4+
#include <ydb/library/yql/minikql/mkql_program_builder.h>
5+
#include <ydb/library/yql/minikql/mkql_function_registry.h>
6+
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
7+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
8+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
9+
10+
#include <library/cpp/testing/unittest/registar.h>
11+
12+
namespace NYql {
13+
14+
Y_UNIT_TEST_SUITE(TPGPackTests) {
15+
Y_UNIT_TEST(UnknownTypeAsString) {
16+
using namespace NKikimr::NMiniKQL;
17+
TScopedAlloc alloc(__LOCATION__);
18+
TTypeEnvironment env(alloc);
19+
TIntrusivePtr<IFunctionRegistry> functionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry()));
20+
TProgramBuilder pgmBuilder(env, *functionRegistry);
21+
TMemoryUsageInfo memInfo("Memory");
22+
THolderFactory holderFactory(alloc.Ref(), memInfo, functionRegistry.Get());
23+
24+
auto pgType = pgmBuilder.NewPgType(0xffffffff);
25+
TValuePacker pgPacker(false, pgType);
26+
27+
NUdf::TUnboxedValue s = MakeString(NUdf::TStringRef::Of("foo"));
28+
auto p = pgPacker.Pack(s);
29+
auto u = pgPacker.Unpack(p, holderFactory);
30+
UNIT_ASSERT_VALUES_EQUAL(TStringBuf(u.AsStringRef()), "foo");
31+
}
32+
}
33+
34+
}

ydb/library/yql/parser/pg_wrapper/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SRCS(
1010
codegen_ut.cpp
1111
error_ut.cpp
1212
memory_ut.cpp
13+
pack_ut.cpp
1314
parser_ut.cpp
1415
proc_ut.cpp
1516
sort_ut.cpp

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
#include <util/generic/size_literals.h>
5555
#include <util/stream/file.h>
5656
#include <util/string/builder.h>
57+
#include <util/folder/dirut.h>
5758

5859
#include <memory>
5960
#include <vector>
@@ -540,6 +541,23 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
540541
TTypeEnvironment& typeEnv,
541542
TUserDataTable& files) const
542543
{
544+
if (!localRun) {
545+
for (const auto& file : files) {
546+
const auto& fileName = file.first.Alias();
547+
const auto& block = file.second;
548+
if (fileName == NCommon::PgCatalogFileName || block.Usage.Test(EUserDataBlockUsage::PgExt)) {
549+
auto f = IDqGateway::TFileResource();
550+
auto filePath = block.FrozenFile->GetPath().GetPath();
551+
f.SetLocalPath(RealPath(filePath));
552+
f.SetName(fileName);
553+
f.SetObjectId(block.FrozenFile->GetMd5());
554+
f.SetObjectType(IDqGateway::TFileResource::EUSER_FILE);
555+
f.SetSize(block.FrozenFile->GetSize());
556+
uploadList->emplace(f);
557+
}
558+
}
559+
}
560+
543561
if (!State->Settings->_SkipRevisionCheck.Get().GetOrElse(false)) {
544562
if (State->VanillaJobPath.empty()) {
545563
auto f = IDqGateway::TFileResource();

ydb/library/yql/providers/dq/runtime/task_command_executor.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h>
44
#include <ydb/library/yql/providers/dq/counters/task_counters.h>
55
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
6+
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
67
#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h>
78
#include <ydb/library/yql/providers/dq/api/protos/task_command_executor.pb.h>
89
#include <ydb/library/yql/utils/backtrace/backtrace.h>
@@ -19,6 +20,9 @@
1920
#include <ydb/library/yql/utils/log/log.h>
2021
#include <ydb/library/yql/utils/yql_panic.h>
2122

23+
#include <ydb/library/yql/parser/pg_wrapper/interface/context.h>
24+
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
25+
2226
#include <util/system/thread.h>
2327
#include <util/system/fs.h>
2428
#include <util/system/env.h>
@@ -678,6 +682,15 @@ class TTaskCommandExecutor {
678682
Y_ABORT_UNLESS(workingDirectory);
679683
NFs::SetCurrentWorkingDirectory(workingDirectory);
680684

685+
QueryStat.Measure<void>("LoadPgExtensions", [&]()
686+
{
687+
if (TFsPath(NCommon::PgCatalogFileName).Exists()) {
688+
TFileInput file(TString{NCommon::PgCatalogFileName});
689+
NPg::ImportExtensions(file.ReadAll(), false,
690+
NKikimr::NMiniKQL::CreateExtensionLoader().get());
691+
}
692+
});
693+
681694
THashMap<TString, TString> modulesMapping;
682695

683696
QueryStat.Measure<void>("LoadUdfs", [&]()

ydb/library/yql/providers/dq/runtime/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ PEERDIR(
1010
ydb/library/yql/dq/integration/transform
1111
ydb/library/yql/dq/runtime
1212
ydb/library/yql/providers/common/proto
13+
ydb/library/yql/providers/common/provider
1314
ydb/library/yql/providers/dq/api/protos
1415
ydb/library/yql/providers/dq/common
1516
ydb/library/yql/providers/dq/counters
1617
ydb/library/yql/providers/dq/task_runner
18+
ydb/library/yql/parser/pg_wrapper/interface
19+
ydb/library/yql/parser/pg_catalog
1720
)
1821

1922
YQL_LAST_ABI_VERSION()

0 commit comments

Comments
 (0)