Skip to content

[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations #50742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class StatePartitionReader(
}

override def close(): Unit = {
store.abort()
super.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.{SparkConf, SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand Down Expand Up @@ -89,6 +89,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

override def abort(): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we are removing abort api in readstore?

Copy link
Contributor

@micheal-o micheal-o May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think I remember a conversation we had about leaving it for now. But with this new change, can you check if there is still any non-test usage of calling abort on readstore?


override def release(): Unit = {}

override def toString(): String = {
s"HDFSReadStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]"
}
Expand All @@ -112,6 +114,16 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
case object UPDATING extends STATE
case object COMMITTED extends STATE
case object ABORTED extends STATE
case object RELEASED extends STATE


Option(TaskContext.get()).foreach { ctxt =>
ctxt.addTaskCompletionListener[Unit](ctx => {
if (state == UPDATING) {
abort()
}
})
}

private val newVersion = version + 1
@volatile private var state: STATE = UPDATING
Expand Down Expand Up @@ -194,6 +206,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}")
}

override def release(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release should only be allowed on readstore and shouldn't be allowed on statestore. State store should only allow abort or commit

state = RELEASED
}

/**
* Get an iterator of all the store data.
* This can be called only after committing all the updates made in the current thread.
Expand Down Expand Up @@ -953,7 +969,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
override def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = {
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,12 @@ class RocksDB(
}
}

def release(): Unit = {
if (db != null) {
release(LoadStore)
}
}

/**
* Commit all the updates made as a version to DFS. The steps it needs to do to commits are:
* - Flush all changes to disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
Expand All @@ -43,12 +43,13 @@ private[sql] class RocksDBStateStoreProvider
with SupportsFineGrainedReplay {
import RocksDBStateStoreProvider._

class RocksDBStateStore(lastVersion: Long) extends StateStore {
class RocksDBStateStore(lastVersion: Long, var readOnly: Boolean) extends StateStore {
/** Trait and classes representing the internal state of the store */
trait STATE
case object UPDATING extends STATE
case object COMMITTED extends STATE
case object ABORTED extends STATE
case object RELEASED extends STATE

@volatile private var state: STATE = UPDATING
@volatile private var isValidated = false
Expand All @@ -57,6 +58,30 @@ private[sql] class RocksDBStateStoreProvider

override def version: Long = lastVersion

Option(TaskContext.get()).foreach { ctxt =>
ctxt.addTaskCompletionListener[Unit]( ctx => {
try {
if (state == UPDATING) {
if (readOnly) {
release() // Only release, do not throw an error because we rely on
// CompletionListener to release for read-only store in
// mapPartitionsWithReadStateStore.
} else {
abort() // Abort since this is an error if stateful task completes
}
}
} catch {
case NonFatal(e) =>
logWarning("Failed to abort state store", e)
}
})

ctxt.addTaskFailureListener( (_, _) => {
abort() // Either the store is already aborted (this is a no-op) or
// we need to abort it.
})
}

override def createColFamilyIfAbsent(
colFamilyName: String,
keySchema: StructType,
Expand Down Expand Up @@ -365,6 +390,19 @@ private[sql] class RocksDBStateStoreProvider
}
result
}

override def release(): Unit = {
assert(readOnly, "Release can only be called on a read-only store")
if (state != RELEASED) {
logInfo(log"Releasing ${MDC(VERSION_NUM, version + 1)} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it version + 1 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, not sure but we log a similar line for abort here

log"for ${MDC(STATE_STORE_ID, id)}")
rocksDB.release()
state = RELEASED
} else {
// Optionally log at DEBUG level that it's already released
logDebug(log"State store already released")
}
}
}

// Test-visible method to fetch the internal RocksDBStateStore class
Expand Down Expand Up @@ -446,38 +484,52 @@ private[sql] class RocksDBStateStoreProvider

override def stateStoreId: StateStoreId = stateStoreId_

override def getStore(version: Long, uniqueId: Option[String] = None): StateStore = {
/**
* Creates and returns a state store with the specified parameters.
*
* @param version The version of the state store to load
* @param uniqueId Optional unique identifier for checkpoint
* @param readOnly Whether to open the store in read-only mode
* @param existingStore Optional existing store to reuse instead of creating a new one
* @return The loaded state store
*/
private def loadStateStore(
version: Long,
uniqueId: Option[String],
readOnly: Boolean,
existingStore: Option[ReadStateStore] = None): StateStore = {
try {
if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
rocksDB.load(
version,
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None)
new RocksDBStateStore(version)
}
catch {
case e: OutOfMemoryError =>
throw QueryExecutionErrors.notEnoughMemoryToLoadStore(
stateStoreId.toString,
"ROCKSDB_STORE_PROVIDER",
e)
case e: Throwable => throw StateStoreErrors.cannotLoadStore(e)
}
}

override def getReadStore(version: Long, uniqueId: Option[String] = None): StateStore = {
try {
if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
// Early validation of the existing store type before loading RocksDB
existingStore.foreach { store =>
if (!store.isInstanceOf[RocksDBStateStore]) {
throw new IllegalArgumentException(
s"Existing store must be a RocksDBStateStore, but got ${store.getClass.getSimpleName}")
}
}

// Load RocksDB store
rocksDB.load(
version,
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None,
readOnly = true)
new RocksDBStateStore(version)
}
catch {
readOnly = readOnly)

// Create or reuse store instance
existingStore match {
case Some(store: ReadStateStore) if store.isInstanceOf[RocksDBStateStore] =>
// Mark store as being used for write operations
val rocksDBStateStore = store.asInstanceOf[RocksDBStateStore]
rocksDBStateStore.readOnly = readOnly
rocksDBStateStore.asInstanceOf[StateStore]
case None =>
// Create new store instance
new RocksDBStateStore(version, readOnly)
case _ => null // No need for error case here since we validated earlier
}
} catch {
case e: OutOfMemoryError =>
throw QueryExecutionErrors.notEnoughMemoryToLoadStore(
stateStoreId.toString,
Expand All @@ -487,6 +539,26 @@ private[sql] class RocksDBStateStoreProvider
}
}

override def getStore(version: Long, uniqueId: Option[String] = None): StateStore = {
loadStateStore(version, uniqueId, readOnly = false)
}

override def upgradeReadStoreToWriteStore(
readStore: ReadStateStore,
version: Long,
uniqueId: Option[String] = None): StateStore = {
assert(version == readStore.version,
s"Can only upgrade readStore to writeStore with the same version," +
s" readStoreVersion: ${readStore.version}, writeStoreVersion: ${version}")
assert(this.stateStoreId == readStore.id, "Can only upgrade readStore to writeStore with" +
" the same stateStoreId")
loadStateStore(version, uniqueId, readOnly = false, existingStore = Some(readStore))
}

override def getReadStore(version: Long, uniqueId: Option[String] = None): StateStore = {
loadStateStore(version, uniqueId, readOnly = true)
}

override def doMaintenance(): Unit = {
try {
rocksDB.doMaintenance()
Expand Down Expand Up @@ -572,7 +644,8 @@ private[sql] class RocksDBStateStoreProvider
* @param endVersion checkpoint version to end with
* @return [[StateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
override def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = {
try {
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
Expand All @@ -581,7 +654,7 @@ private[sql] class RocksDBStateStoreProvider
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
new RocksDBStateStore(endVersion)
new RocksDBStateStore(endVersion, readOnly)
}
catch {
case e: OutOfMemoryError =>
Expand Down
Loading