@@ -85,387 +85,4 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
8585#undef HNDL
8686}
8787
88- TMaybe<bool > TYtPhysicalOptProposalTransformer::CanFuseLambdas (const TCoLambda& innerLambda, const TCoLambda& outerLambda, TExprContext& ctx) const {
89- auto maxJobMemoryLimit = State_->Configuration ->MaxExtraJobMemoryToFuseOperations .Get ();
90- auto maxOperationFiles = State_->Configuration ->MaxOperationFiles .Get ().GetOrElse (DEFAULT_MAX_OPERATION_FILES);
91- TMap<TStringBuf, ui64> memUsage;
92-
93- TExprNode::TPtr updatedBody = innerLambda.Body ().Ptr ();
94- if (maxJobMemoryLimit) {
95- auto status = UpdateTableContentMemoryUsage (innerLambda.Body ().Ptr (), updatedBody, State_, ctx);
96- if (status.Level != TStatus::Ok) {
97- return {};
98- }
99- }
100- size_t innerFiles = 1 ; // jobstate. Take into account only once
101- ScanResourceUsage (*updatedBody, *State_->Configuration , State_->Types , maxJobMemoryLimit ? &memUsage : nullptr , nullptr , &innerFiles);
102-
103- auto prevMemory = Accumulate (memUsage.begin (), memUsage.end (), 0ul ,
104- [](ui64 sum, const std::pair<const TStringBuf, ui64>& val) { return sum + val.second ; });
105-
106- updatedBody = outerLambda.Body ().Ptr ();
107- if (maxJobMemoryLimit) {
108- auto status = UpdateTableContentMemoryUsage (outerLambda.Body ().Ptr (), updatedBody, State_, ctx);
109- if (status.Level != TStatus::Ok) {
110- return {};
111- }
112- }
113- size_t outerFiles = 0 ;
114- ScanResourceUsage (*updatedBody, *State_->Configuration , State_->Types , maxJobMemoryLimit ? &memUsage : nullptr , nullptr , &outerFiles);
115-
116- auto currMemory = Accumulate (memUsage.begin (), memUsage.end (), 0ul ,
117- [](ui64 sum, const std::pair<const TStringBuf, ui64>& val) { return sum + val.second ; });
118-
119- if (maxJobMemoryLimit && currMemory != prevMemory && currMemory > *maxJobMemoryLimit) {
120- YQL_CLOG (DEBUG, ProviderYt) << " Memory usage: innerLambda=" << prevMemory
121- << " , joinedLambda=" << currMemory << " , MaxJobMemoryLimit=" << *maxJobMemoryLimit;
122- return false ;
123- }
124- if (innerFiles + outerFiles > maxOperationFiles) {
125- YQL_CLOG (DEBUG, ProviderYt) << " Files usage: innerLambda=" << innerFiles
126- << " , outerLambda=" << outerFiles << " , MaxOperationFiles=" << maxOperationFiles;
127- return false ;
128- }
129-
130- if (auto maxReplcationFactor = State_->Configuration ->MaxReplicationFactorToFuseOperations .Get ()) {
131- double replicationFactor1 = NCommon::GetDataReplicationFactor (innerLambda.Ref (), ctx);
132- double replicationFactor2 = NCommon::GetDataReplicationFactor (outerLambda.Ref (), ctx);
133- YQL_CLOG (DEBUG, ProviderYt) << " Replication factors: innerLambda=" << replicationFactor1
134- << " , outerLambda=" << replicationFactor2 << " , MaxReplicationFactorToFuseOperations=" << *maxReplcationFactor;
135-
136- if (replicationFactor1 > 1.0 && replicationFactor2 > 1.0 && replicationFactor1 * replicationFactor2 > *maxReplcationFactor) {
137- return false ;
138- }
139- }
140- return true ;
141- }
142-
143- template <bool WithList>
144- TCoLambda TYtPhysicalOptProposalTransformer::MakeJobLambda (TCoLambda lambda, bool useFlow, TExprContext& ctx) const {
145- if (useFlow) {
146- if constexpr (WithList) {
147- return Build<TCoLambda>(ctx, lambda.Pos ())
148- .Args ({" flow" })
149- .Body <TCoToFlow>()
150- .Input <TExprApplier>()
151- .Apply (lambda)
152- .With <TCoForwardList>(0 )
153- .Stream (" flow" )
154- .Build ()
155- .Build ()
156- .Build ()
157- .Done ();
158- } else {
159- return Build<TCoLambda>(ctx, lambda.Pos ())
160- .Args ({" flow" })
161- .Body <TCoToFlow>()
162- .Input <TExprApplier>()
163- .Apply (lambda)
164- .With <TCoFromFlow>(0 )
165- .Input (" flow" )
166- .Build ()
167- .Build ()
168- .Build ()
169- .Done ();
170- }
171- } else {
172- if constexpr (WithList) {
173- return Build<TCoLambda>(ctx, lambda.Pos ())
174- .Args ({" stream" })
175- .Body <TCoToStream>()
176- .Input <TExprApplier>()
177- .Apply (lambda)
178- .With <TCoForwardList>(0 )
179- .Stream (" stream" )
180- .Build ()
181- .Build ()
182- .Build ()
183- .Done ();
184- } else {
185- return Build<TCoLambda>(ctx, lambda.Pos ())
186- .Args ({" stream" })
187- .Body <TCoToStream>()
188- .Input <TExprApplier>()
189- .Apply (lambda)
190- .With (0 , " stream" )
191- .Build ()
192- .Build ()
193- .Done ();
194- }
195- }
196- }
197-
198- template <bool IsTop>
199- TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::Sort (TExprBase node, TExprContext& ctx) const {
200- if (State_->Types ->EvaluationInProgress || State_->PassiveExecution ) {
201- return node;
202- }
203-
204- const auto sort = node.Cast <std::conditional_t <IsTop, TCoTopBase, TCoSort>>();
205- if (!IsYtProviderInput (sort.Input ())) {
206- return node;
207- }
208-
209- auto sortDirections = sort.SortDirections ();
210- if (!IsConstExpSortDirections (sortDirections)) {
211- return node;
212- }
213-
214- auto keySelectorLambda = sort.KeySelectorLambda ();
215- auto cluster = TString{GetClusterName (sort.Input ())};
216- TSyncMap syncList;
217- if (!IsYtCompleteIsolatedLambda (keySelectorLambda.Ref (), syncList, cluster, true , false )) {
218- return node;
219- }
220-
221- const TStructExprType* outType = nullptr ;
222- if (auto type = GetSequenceItemType (node, false , ctx)) {
223- outType = type->Cast <TStructExprType>();
224- } else {
225- return {};
226- }
227-
228- TVector<TYtPathInfo::TPtr> inputInfos = GetInputPaths (sort.Input ());
229-
230- TMaybe<NYT::TNode> firstNativeType;
231- if (!inputInfos.empty ()) {
232- firstNativeType = inputInfos.front ()->GetNativeYtType ();
233- }
234- auto maybeReadSettings = sort.Input ().template Maybe <TCoRight>().Input ().template Maybe <TYtReadTable>().Input ().Item (0 ).Settings ();
235- const ui64 nativeTypeFlags = State_->Configuration ->UseNativeYtTypes .Get ().GetOrElse (DEFAULT_USE_NATIVE_YT_TYPES)
236- ? GetNativeYtTypeFlags (*outType)
237- : 0ul ;
238- const bool needMap = (maybeReadSettings && NYql::HasSetting (maybeReadSettings.Ref (), EYtSettingType::SysColumns))
239- || AnyOf (inputInfos, [nativeTypeFlags, firstNativeType] (const TYtPathInfo::TPtr& path) {
240- return path->RequiresRemap ()
241- || nativeTypeFlags != path->GetNativeYtTypeFlags ()
242- || firstNativeType != path->GetNativeYtType ();
243- });
244-
245- bool useExplicitColumns = AnyOf (inputInfos, [] (const TYtPathInfo::TPtr& path) {
246- return !path->Table ->IsTemp || (path->Table ->RowSpec && path->Table ->RowSpec ->HasAuxColumns ());
247- });
248-
249- const bool needMerge = maybeReadSettings && NYql::HasSetting (maybeReadSettings.Ref (), EYtSettingType::Sample);
250-
251- const bool useNativeDescSort = State_->Configuration ->UseNativeDescSort .Get ().GetOrElse (DEFAULT_USE_NATIVE_DESC_SORT);
252-
253- TKeySelectorBuilder builder (node.Pos (), ctx, useNativeDescSort, outType);
254- builder.ProcessKeySelector (keySelectorLambda.Ptr (), sortDirections.Ptr ());
255-
256- TYtOutTableInfo sortOut (outType, nativeTypeFlags);
257- builder.FillRowSpecSort (*sortOut.RowSpec );
258- sortOut.SetUnique (sort.Ref ().template GetConstraint <TDistinctConstraintNode>(), node.Pos (), ctx);
259-
260- TExprBase sortInput = sort.Input ();
261- TExprBase world = TExprBase (ApplySyncListToWorld (GetWorld (sortInput, {}, ctx).Ptr (), syncList, ctx));
262- bool unordered = ctx.IsConstraintEnabled <TSortedConstraintNode>();
263- if (needMap || builder.NeedMap ()) {
264- auto mapper = builder.MakeRemapLambda ();
265-
266- auto mapperClean = CleanupWorld (TCoLambda (mapper), ctx);
267- if (!mapperClean) {
268- return {};
269- }
270-
271- TYtOutTableInfo mapOut (builder.MakeRemapType (), nativeTypeFlags);
272- mapOut.SetUnique (sort.Ref ().template GetConstraint <TDistinctConstraintNode>(), node.Pos (), ctx);
273-
274- sortInput = Build<TYtOutput>(ctx, node.Pos ())
275- .Operation <TYtMap>()
276- .World (world)
277- .DataSink (GetDataSink (sort.Input (), ctx))
278- .Input (ConvertInputTable (sort.Input (), ctx, TConvertInputOpts ().MakeUnordered (unordered)))
279- .Output ()
280- .Add (mapOut.ToExprNode (ctx, node.Pos ()).Cast <TYtOutTable>())
281- .Build ()
282- .Settings (GetFlowSettings (node.Pos (), *State_, ctx))
283- .Mapper (mapperClean.Cast ())
284- .Build ()
285- .OutIndex ()
286- .Value (0U )
287- .Build ()
288- .Done ();
289- world = TExprBase (ctx.NewWorld (node.Pos ()));
290- unordered = false ;
291- }
292- else if (needMerge) {
293- TYtOutTableInfo mergeOut (outType, nativeTypeFlags);
294- mergeOut.SetUnique (sort.Ref ().template GetConstraint <TDistinctConstraintNode>(), node.Pos (), ctx);
295- if (firstNativeType) {
296- mergeOut.RowSpec ->CopyTypeOrders (*firstNativeType);
297- sortOut.RowSpec ->CopyTypeOrders (*firstNativeType);
298- }
299-
300- TConvertInputOpts opts;
301- if (useExplicitColumns) {
302- opts.ExplicitFields (*mergeOut.RowSpec , node.Pos (), ctx);
303- useExplicitColumns = false ;
304- }
305-
306- sortInput = Build<TYtOutput>(ctx, node.Pos ())
307- .Operation <TYtMerge>()
308- .World (world)
309- .DataSink (GetDataSink (sort.Input (), ctx))
310- .Input (ConvertInputTable (sort.Input (), ctx, opts.MakeUnordered (unordered)))
311- .Output ()
312- .Add (mergeOut.ToExprNode (ctx, node.Pos ()).Cast <TYtOutTable>())
313- .Build ()
314- .Settings ()
315- .Add ()
316- .Name ()
317- .Value (ToString (EYtSettingType::ForceTransform), TNodeFlags::Default)
318- .Build ()
319- .Build ()
320- .Build ()
321- .Build ()
322- .OutIndex ()
323- .Value (0U )
324- .Build ()
325- .Done ();
326- world = TExprBase (ctx.NewWorld (node.Pos ()));
327- unordered = false ;
328- } else if (firstNativeType) {
329- sortOut.RowSpec ->CopyTypeOrders (*firstNativeType);
330- }
331-
332- bool canUseMerge = !needMap && !needMerge;
333- if (auto maxTablesForSortedMerge = State_->Configuration ->MaxInputTablesForSortedMerge .Get ()) {
334- if (inputInfos.size () > *maxTablesForSortedMerge) {
335- canUseMerge = false ;
336- }
337- }
338-
339- if (canUseMerge) {
340- TYqlRowSpecInfo commonSorted = *sortOut.RowSpec ;
341- for (auto & pathInfo: inputInfos) {
342- commonSorted.MakeCommonSortness (*pathInfo->Table ->RowSpec );
343- }
344- // input is sorted at least as strictly as output
345- if (!sortOut.RowSpec ->CompareSortness (commonSorted)) {
346- canUseMerge = false ;
347- }
348- }
349-
350- sortOut.RowSpec ->SetConstraints (sort.Ref ().GetConstraintSet ());
351-
352- TConvertInputOpts opts;
353- if (useExplicitColumns) {
354- opts.ExplicitFields (*sortOut.RowSpec , node.Pos (), ctx);
355- }
356-
357- auto res = canUseMerge ?
358- TExprBase (Build<TYtMerge>(ctx, node.Pos ())
359- .World (world)
360- .DataSink (GetDataSink (sortInput, ctx))
361- .Input (ConvertInputTable (sortInput, ctx, opts.ClearUnordered ()))
362- .Output ()
363- .Add (sortOut.ToExprNode (ctx, node.Pos ()).Cast <TYtOutTable>())
364- .Build ()
365- .Settings ()
366- .Add ()
367- .Name ()
368- .Value (ToString (EYtSettingType::KeepSorted), TNodeFlags::Default)
369- .Build ()
370- .Build ()
371- .Build ()
372- .Done ()):
373- TExprBase (Build<TYtSort>(ctx, node.Pos ())
374- .World (world)
375- .DataSink (GetDataSink (sortInput, ctx))
376- .Input (ConvertInputTable (sortInput, ctx, opts.MakeUnordered (unordered)))
377- .Output ()
378- .Add (sortOut.ToExprNode (ctx, node.Pos ()).Cast <TYtOutTable>())
379- .Build ()
380- .Settings ().Build ()
381- .Done ());
382-
383- res = Build<TYtOutput>(ctx, node.Pos ())
384- .Operation (res)
385- .OutIndex ().Value (0U ).Build ()
386- .Done ();
387-
388-
389- if constexpr (IsTop) {
390- res = Build<TCoTake>(ctx, node.Pos ())
391- .Input (res)
392- .Count (sort.Count ())
393- .Done ();
394- }
395-
396- return res;
397- }
398-
399- template <typename TLMapType>
400- TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::LMap (TExprBase node, TExprContext& ctx) const {
401- if (State_->Types ->EvaluationInProgress || State_->PassiveExecution ) {
402- return node;
403- }
404-
405- auto lmap = node.Cast <TLMapType>();
406-
407- if (!IsYtProviderInput (lmap.Input (), true )) {
408- return node;
409- }
410-
411- const auto inItemType = GetSequenceItemType (lmap.Input (), true , ctx);
412- if (!inItemType) {
413- return {};
414- }
415- const auto outItemType = SilentGetSequenceItemType (lmap.Lambda ().Body ().Ref (), true );
416- if (!outItemType || !outItemType->IsPersistable ()) {
417- return node;
418- }
419-
420- auto cluster = TString{GetClusterName (lmap.Input ())};
421- TSyncMap syncList;
422- if (!IsYtCompleteIsolatedLambda (lmap.Lambda ().Ref (), syncList, cluster, true , false )) {
423- return node;
424- }
425-
426- auto cleanup = CleanupWorld (lmap.Lambda (), ctx);
427- if (!cleanup) {
428- return {};
429- }
430-
431- auto mapper = cleanup.Cast ().Ptr ();
432- bool sortedOutput = false ;
433- TVector<TYtOutTable> outTables = ConvertOutTablesWithSortAware (mapper, sortedOutput, lmap.Pos (),
434- outItemType, ctx, State_, lmap.Ref ().GetConstraintSet ());
435-
436- const bool useFlow = State_->Configuration ->UseFlow .Get ().GetOrElse (DEFAULT_USE_FLOW);
437-
438- auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, lmap.Pos ());
439- if (std::is_same<TLMapType, TCoOrderedLMap>::value) {
440- settingsBuilder
441- .Add ()
442- .Name ()
443- .Value (ToString (EYtSettingType::Ordered))
444- .Build ()
445- .Build ();
446- }
447-
448- if (useFlow) {
449- settingsBuilder
450- .Add ()
451- .Name ()
452- .Value (ToString (EYtSettingType::Flow))
453- .Build ()
454- .Build ();
455- }
456-
457- auto map = Build<TYtMap>(ctx, lmap.Pos ())
458- .World (ApplySyncListToWorld (GetWorld (lmap.Input (), {}, ctx).Ptr (), syncList, ctx))
459- .DataSink (GetDataSink (lmap.Input (), ctx))
460- .Input (ConvertInputTable (lmap.Input (), ctx))
461- .Output ()
462- .Add (outTables)
463- .Build ()
464- .Settings (settingsBuilder.Done ())
465- .Mapper (MakeJobLambda<false >(TCoLambda (mapper), useFlow, ctx))
466- .Done ();
467-
468- return WrapOp (map, ctx);
469- }
470-
47188} // namespace NYql
0 commit comments