diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala index e3aa3262e..597c37112 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/memory/SpillBuf.scala @@ -108,6 +108,7 @@ class FileBasedSpillBuf( with Logging { private var readPosition: Long = 0 + private var closedFileDiskUsage: Long = 0 override def write(buf: ByteBuffer): Unit = { val startTimeNs = System.nanoTime() @@ -126,12 +127,21 @@ class FileBasedSpillBuf( } override val memUsed: Long = 0 - override def diskUsed: Long = fileChannel.size() + override def diskUsed: Long = { + if (fileChannel.isOpen) { + fileChannel.size() + } else { + closedFileDiskUsage + } + } override def diskIOTime: Long = diskIOTimeNs override def size: Long = numWrittenBytes override def release(): Unit = { - fileChannel.close() + if (fileChannel.isOpen) { + closedFileDiskUsage = fileChannel.size() + fileChannel.close() + } if (!file.delete()) { logWarning(s"Was unable to delete spill file: ${file.getAbsolutePath}") }