Conversation
| ): SCollection[Example] = { | ||
| val job = Job.getInstance(conf) | ||
| GcsConnectorUtil.setInputPaths(sc, job, path) | ||
| val filePattern = ScioUtil.filePattern(path, params.suffix) |
There was a problem hiding this comment.
I am surprised suffix was not used initially. Or was it intentional?
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5850 +/- ##
==========================================
+ Coverage 61.49% 61.56% +0.06%
==========================================
Files 317 318 +1
Lines 11650 11678 +28
Branches 845 834 -11
==========================================
+ Hits 7164 7189 +25
- Misses 4486 4489 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala
Outdated
Show resolved
Hide resolved
scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala
Outdated
Show resolved
Hide resolved
| Some(projectionFn), | ||
| None | ||
| ) | ||
| .parDo(new LineageReportDoFn(filePattern)) |
There was a problem hiding this comment.
Isn't this going to result in a new node in the graph? Why are we doing this in sequence w/ the read if it's not actually using any of the read elements; we should be doing like the scio init metrics which is just its own distinct graph create impulse -> submit parquet lineage
There was a problem hiding this comment.
I am trying to correspond Beam conventions and have this metric associated with the actual read transform. This way we keep transform-level lineage (which is supported in Beam)
| tracker.currentRestriction.getFrom, | ||
| if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo | ||
| ) | ||
| FileSystems.reportSourceLineage(file.getMetadata.resourceId()) |
There was a problem hiding this comment.
This is different than the hadoop one insofar as we get every file here, right? That seems bad/annoying for using the lineage for anything
There was a problem hiding this comment.
Actually file-level lineage is the default approach in Beam. Which we might not need directly. Both Lineage Metric implementations (legacy and new one) work ok with many files:
StringSet has internal truncation to 100
BoundedTrie is a data structure that stores hierarchical data very well
| override def apply(input: Void): java.lang.Boolean = true | ||
| }) | ||
|
|
||
| val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip)) |
| import java.util.concurrent.atomic.AtomicBoolean | ||
| import scala.reflect.ClassTag | ||
|
|
||
| private[parquet] object HadoopParquet { |
There was a problem hiding this comment.
This is just to reduce duplication or is there a functional change here?
There was a problem hiding this comment.
just to reduce, no new functionality, except I noticed that in some cases Scio's derived coder was not set to HadoopFormatIO transformation. Probably Beam auto-derives the same coder, but anyway it is better to set explicitly
No description provided.