Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.fluss.record.LogRecord
import org.apache.fluss.row.{encode, InternalRow, KeyValueRow}
import org.apache.fluss.spark.SparkFlussConf
import org.apache.fluss.spark.utils.LogChangesIterator
import org.apache.fluss.types.{DataField, RowType}
import org.apache.fluss.utils.CloseableIterator

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -65,13 +66,12 @@ class FlussUpsertPartitionReader(
extraProjections ++= projection
pkIndexes.foreach {
pkIndex =>
projection.find(p => p == pkIndex) match {
case Some(index) =>
_pkProjection += index
case _ =>
projection.indexOf(pkIndex) match {
case -1 =>
extraProjections += pkIndex
_pkProjection += projection.length + i
i += 1
case idx => _pkProjection += idx
}
}
(extraProjections.toArray, _pkProjection.toArray)
Expand Down Expand Up @@ -99,9 +99,13 @@ class FlussUpsertPartitionReader(

private def createSortMergeReader(): SortMergeReader = {
// Create key encoder for primary keys
val pkIndexes = tableInfo.getSchema.getPrimaryKeyIndexes
val pkFields = new java.util.ArrayList[DataField]()
pkIndexes.foreach(i => pkFields.add(rowType.getFields.get(i)))
val pkRowType = new RowType(pkFields)
val keyEncoder =
encode.KeyEncoder.ofPrimaryKeyEncoder(
rowType,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what is needed here is the row type of input data, not just the row type of primary key.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually only pk col is needed, ut already covered this case.

pkRowType,
tableInfo.getPhysicalPrimaryKeys,
tableInfo.getTableConfig,
tableInfo.isDefaultBucketKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ case class LogChangesIterator(
comparator: Comparator[InternalRow]
) extends CloseableIterator[KeyValueRow] {

private val projectRow1 = ProjectedRow.from(pkProjection)
private val projectRow2 = ProjectedRow.from(pkProjection)

// Sort the records by primary key and then by offset
private val sortedLogRecords = logRecords.sortWith {
case (record1, record2) =>
val keyComparison = comparator.compare(record1.getRow, record2.getRow)
val keyComparison = comparator.compare(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this change is needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortMergeReader compare row with pk, but here compare with full row, so ut failed when pk col was not at begining.

projectRow1.replaceRow(record1.getRow),
projectRow2.replaceRow(record2.getRow))
if (keyComparison == 0) {
record1.logOffset() < record2.logOffset() // For same key, lower offset comes first
} else {
Expand All @@ -63,9 +68,6 @@ case class LogChangesIterator(
sortedLogRecords.head,
CloseableIterator.wrap(sortedLogRecords.tail.toIterator.asJava))

private val projectRow1 = ProjectedRow.from(pkProjection)
private val projectRow2 = ProjectedRow.from(pkProjection)

private var currentScanRecord: ScanRecord = _

override def hasNext: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,19 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
}
}

test("Spark Read: primary key table with random project") {
withTable("t") {
sql(
"CREATE TABLE t (id int, name string, pk int, pk2 string) TBLPROPERTIES('primary.key'='pk,pk2')")
checkAnswer(sql("SELECT * FROM t"), Nil)
sql("INSERT INTO t VALUES (1, 'a', 10, 'x'), (2, 'b', 20, 'y')")
checkAnswer(
sql("SELECT * FROM t ORDER BY id"),
Row(1, "a", 10, "x") :: Row(2, "b", 20, "y") :: Nil)
checkAnswer(sql("SELECT pk, id FROM t ORDER BY id"), Row(10, 1) :: Row(20, 2) :: Nil)
}
}

private def genInputPartition(
tablePath: TablePath,
partitionName: String): Array[FlussUpsertInputPartition] = {
Expand Down