Refactor BQ to expose all beam's configurations#5456
Refactor BQ to expose all beam's configurations#5456RustedBones wants to merge 4 commits intomainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5456 +/- ##
==========================================
+ Coverage 61.29% 61.47% +0.18%
==========================================
Files 314 315 +1
Lines 11250 11269 +19
Branches 793 823 +30
==========================================
+ Hits 6896 6928 +32
+ Misses 4354 4341 -13 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| import org.apache.beam.sdk.values.{PCollection, PCollectionTuple, TupleTag} | ||
|
|
||
| /** | ||
| * A sink for error records. |
There was a problem hiding this comment.
bit more explanation on error records could be helpful, maybe:
| * A sink for error records. | |
| * A sink for error records. | |
| * | |
| * An error record is produced by certain PTransforms that catch processing exceptions and transform the resulting (element, exception) pair into a [[BadRecord]] instance. | |
| * When an ErrorSink is configured (via ScioContext#errorSink), these BadRecords can be accessed as an SCollection by invoking the ErrorSink#sink method. | |
| * An ErrorSink is useful if you'd like to set up special handling of exceptions (incrementing Counters, logging the exceptions in a database, etc). |
| * Once the [[sink]] is materialized, the [[handler]] must not be used anymore. | ||
| */ | ||
| sealed trait ErrorSink { | ||
| def handler: ErrorHandler[BadRecord, _] |
There was a problem hiding this comment.
could def handler be private[scio]? not sure when a user would need to access this
There was a problem hiding this comment.
This is the API exposed by beam. As mentioned in the description we do not pass the ErrorSink directly.
sc.bigQueryStorageFormat[MyType](
table,
format,
errorHandler = errorSink.handler
)I was thinking of adding to the ScioContext a beam java like API too
def registerBadRecordErrorHandler[T](handler: PTransform[PCollection[BadRecord], T] sinkTransform): BadRecordErrorHandler[OutputT]f9c02ac to
2d66440
Compare
2d66440 to
cfd83f4
Compare
cfd83f4 to
37b545f
Compare
|
Intended to fix #5530 |
Here are the main changes:
the BQ
Tablesource has a single normalized definition, with multiple constrictors (form string spec orTableReference). It nows includes an optionalTable.Filterthat can be used is the storage read API to project and filter.read API changes with
FormatAPI take aBigqueryIO.Formatobject allowing to convert either fromGenericRecord(this should be prefered) orTableRowStorageApi allow to pass anErrorHandler(Fix Add ErrorHandling in BigQuery #5530). In order to preserve a flat structureScioContext.errorSink(): ErrorSinkhas been added. This allow to do the followingThe
handlercan be passed to multiple IOs beforesinkis materialized. The sink will flatten the errors from the IOs.