-
Notifications
You must be signed in to change notification settings - Fork 561
[spark][bugfix] Use pk index for comparator align with SortMergeReader #2987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,10 +48,15 @@ case class LogChangesIterator( | |
| comparator: Comparator[InternalRow] | ||
| ) extends CloseableIterator[KeyValueRow] { | ||
|
|
||
| private val projectRow1 = ProjectedRow.from(pkProjection) | ||
| private val projectRow2 = ProjectedRow.from(pkProjection) | ||
|
|
||
| // Sort the records by primary key and then by offset | ||
| private val sortedLogRecords = logRecords.sortWith { | ||
| case (record1, record2) => | ||
| val keyComparison = comparator.compare(record1.getRow, record2.getRow) | ||
| val keyComparison = comparator.compare( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why this change is needed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SortMergeReader compare row with pk, but here compare with full row, so ut failed when pk col was not at begining. |
||
| projectRow1.replaceRow(record1.getRow), | ||
| projectRow2.replaceRow(record2.getRow)) | ||
| if (keyComparison == 0) { | ||
| record1.logOffset() < record2.logOffset() // For same key, lower offset comes first | ||
| } else { | ||
|
|
@@ -63,9 +68,6 @@ case class LogChangesIterator( | |
| sortedLogRecords.head, | ||
| CloseableIterator.wrap(sortedLogRecords.tail.toIterator.asJava)) | ||
|
|
||
| private val projectRow1 = ProjectedRow.from(pkProjection) | ||
| private val projectRow2 = ProjectedRow.from(pkProjection) | ||
|
|
||
| private var currentScanRecord: ScanRecord = _ | ||
|
|
||
| override def hasNext: Boolean = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what is needed here is the row type of input data, not just the row type of primary key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually only pk col is needed, ut already covered this case.