Skip to content

Add a basic flatten operator for data pipelines#1229

Closed
MartinGleize wants to merge 1 commit intoybe/optfrom
mg-flatten
Closed

Add a basic flatten operator for data pipelines#1229
MartinGleize wants to merge 1 commit intoybe/optfrom
mg-flatten

Conversation

@MartinGleize
Copy link
Contributor

What does this PR do? Please describe:
Adds an operator turning a pipeline of lists into a pipeline of their flattened elements.

Does your PR introduce any breaking changes? If yes, please list them:
None

Check list:

  • Was the content of this PR discussed and approved via a GitHub issue? (no need for typos or documentation improvements)
  • Did you read the contributor guideline?
  • Did you make sure that your PR does only one thing instead of bundling different changes together?
  • Did you make sure to update the documentation with your changes? (if necessary)
  • Did you write any new necessary tests?
  • Did you verify new and existing tests pass locally with your changes?
  • Did you update the CHANGELOG? (no need for typos, documentation, or minor internal changes)

@MartinGleize MartinGleize requested a review from cbalioglu as a code owner July 16, 2025 23:20
@meta-cla meta-cla bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jul 16, 2025
@MartinGleize MartinGleize changed the base branch from main to ybe/opt July 16, 2025 23:20
Copy link
Contributor

@cbalioglu cbalioglu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Left a couple comments.

std::unique_ptr<data_source> inner_;
std::optional<std::string> selector_;
std::queue<data> elements_queue_;
//std::exception_ptr exception_ptr_{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

if (selector_) {
std::string error_msg = "Cannot use selector with flatten.";
std::cerr << error_msg << std::endl;
std::terminate(); // Force immediate program termination
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to raise a "not implemented exception" here, instead of forcibly terminating the process. In data_pipeline.cc:93 we mark the pipeline as broken if we catch an unhandled exception.

// If no selector is provided, assume the example itself is a list
else if (example.is_list()) {
auto example_list = example.as_list();
for (size_t i = 0; i < example_list.size(); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding when looking at next() implementation, we can actually use move semantics here. So the signature of extract_elements can be:

extract_elements(data &&example);

If that works, you can instead use element.emplace(std::move(example_list[i])).

elements.push(example_list.at(i));
}
}
// If no valid elements were found, the queue remains empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we simply discard/drop the example we just read from the underlying data source?


// If the queue is empty (no elements could be extracted), recursively call next()
if (elements_queue_.empty())
return next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be cleaner if we had this:

while (elements_queue_.empty()) {
  std::optional<data> maybe_example = ...
  elements_queue = extract_elements(*maybe_example);

@MartinGleize
Copy link
Contributor Author

I realize it pinged you as a reviewer, I didn't intend this to be ready for review and merged into main. I'll revisit this to make it cleaner overall, this was a rushed change.

@MartinGleize MartinGleize changed the base branch from ybe/opt to main August 1, 2025 18:22
@MartinGleize MartinGleize changed the base branch from main to ybe/opt August 1, 2025 18:23
@artemru
Copy link
Contributor

artemru commented Aug 19, 2025

i think i can be achieved with yield_from pattern ?

@MartinGleize
Copy link
Contributor Author

i think i can be achieved with yield_from pattern ?

I think you're right, I hadn't taken a look at this operator before. I'll close this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants