Skip to content

[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations #50742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Apr 28, 2025

What changes were proposed in this pull request?

Adding a release() method to the ReadStateStore interface to properly close read stores without aborting them
Implementing a getWriteStore() method that allows converting a read-only store to a writable store
Creating a StateStoreRDDProvider interface for tracking state stores by partition ID
Enhancing StateStoreRDD to find and reuse existing state stores through RDD lineage
Improving task completion handling with proper cleanup listeners

Why are the changes needed?

Currently, stateful operations like aggregations follow a pattern where both read and write stores are opened simultaneously:
readStore.acquire()
writeStore.acquire()
writeStore.commit()
readStore.abort()
This pattern creates inefficiency because:

The abort() call on the read store unnecessarily invalidates the store's state, causing subsequent operations to reload the entire state store from scratch
Having two stores open simultaneously increases memory usage and can create contention issues
The upcoming lock hardening changes will only allow one state store to be open at a time, making this pattern incompatible

With the new approach, the usage paradigm becomes:
readStore = getReadStore()
writeStore = getWriteStore(readStore)
writeStore.commit()
This new paradigm allows us to reuse an existing read store by converting it to a write store using getWriteStore(), and properly clean up resources using release() instead of abort() when operations complete successfully. This avoids the unnecessary reloading of state data and improves performance while being compatible with future lock hardening changes.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@ericm-db ericm-db force-pushed the read-store-changes branch from 14565a8 to c4c6c07 Compare April 29, 2025 21:06
@ericm-db ericm-db changed the title [WIP] Adding release() to ReadStateStore interface Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations Apr 29, 2025
@ericm-db ericm-db changed the title Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations [SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations Apr 29, 2025
loadStateStore(version, uniqueId, readOnly = false)
}

override def getWriteStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to getWriteStoreFromReadStore?

readStore: ReadStateStore,
version: Long,
uniqueId: Option[String] = None): StateStore = {
assert(version == readStore.version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment or more informative error msg here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -565,6 +582,11 @@ trait StateStoreProvider {
version: Long,
stateStoreCkptId: Option[String] = None): StateStore

def getWriteStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs comment?

if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting this twice? Can you add more comments about what is going on here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea prob dont need to set multiple times

partitionStores.put(partitionId, (store, false))

// Register a cleanup callback to be executed when the task completes
ctxt.addTaskCompletionListener[Unit](_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding the listeners here? Is it different from the one in mapPartitionsWithReadStateStore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericm-db - i guess we did not register any listener before in the ReadStoreRDD path ?

@anishshri-db
Copy link
Contributor

@ericm-db - in the PR description, please update it to say what the new usage paradigm will look like as well

@liviazhu-db
Copy link
Contributor

liviazhu-db commented Apr 29, 2025

@ericm-db Can you update the PR description to be more specific about the inefficiency we are addressing here? Basically that in the current impl, we always abort read store, triggering unnecessary reload of the state store.

@ericm-db
Copy link
Contributor Author

@anishshri-db @liviazhu-db Sure yeah sounds good

* @param uniqueId Optional unique identifier for checkpointing
* @return A writable StateStore instance that can be used to update and commit changes
*/
def getWriteStoreFromReadStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ni: should we rename as upgradeReadStoreToWriteStore ?

@@ -34,7 +59,7 @@ abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
operatorId: Long,
sessionState: SessionState,
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional ?


val inputIter = dataRDD.iterator(partition, ctxt)
val store = StateStore.getReadOnly(
storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, storeVersion,
stateStoreCkptIds.map(_.apply(partition.index).head),
stateStoreCkptIds.map(_.apply(partitionId).head),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug before ?

Copy link
Contributor Author

@ericm-db ericm-db Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No these are equivalent though. I'll change it back

TaskContext.get().addTaskCompletionListener[Unit](_ => {
store.abort()
val taskContext = TaskContext.get()
taskContext.addTaskCompletionListener[Unit](_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it again here ?

@ericm-db
Copy link
Contributor Author

cc @cloud-fan

@anishshri-db
Copy link
Contributor

cc - @cloud-fan - could you PTAL too ? especially around the RDD interactions ? Thx

@ericm-db ericm-db force-pushed the read-store-changes branch from 0c913e2 to f7d0e70 Compare April 30, 2025 22:48
Copy link
Contributor

@liviazhu-db liviazhu-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Could you add a test in StateStoreRDDSuite to check the ThreadLocal logic correctly passes the readstore to the writestore too?

@ericm-db
Copy link
Contributor Author

ericm-db commented May 1, 2025

Looks good! Could you add a test in StateStoreRDDSuite to check the ThreadLocal logic correctly passes the readstore to the writestore too?

Yup, working on that rn!

* This allows a ReadStateStore to be reused by a subsequent StateStore operation.
*/
object StateStoreThreadLocalTracker {
private val readStore: ThreadLocal[ReadStateStore] = new ThreadLocal[ReadStateStore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine these into a single thread local ? Just make them members of a case class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure yeah

*/
object StateStoreThreadLocalTracker {
/** Case class to hold both the store and its usage state */
case class StoreInfo(store: ReadStateStore, usedForWriteStore: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe move members to a new line each ?

@@ -194,6 +196,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}")
}

override def release(): Unit = {}
Copy link
Contributor

@liviazhu-db liviazhu-db May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a new state and update the state here?

@@ -89,6 +89,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

override def abort(): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we are removing abort api in readstore?

Copy link
Contributor

@micheal-o micheal-o May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think I remember a conversation we had about leaving it for now. But with this new change, can you check if there is still any non-test usage of calling abort on readstore?

@@ -194,6 +197,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}")
}

override def release(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release should only be allowed on readstore and shouldn't be allowed on statestore. State store should only allow abort or commit


override def release(): Unit = {
if (state != RELEASED) {
logInfo(log"Releasing ${MDC(VERSION_NUM, version + 1)} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it version + 1 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, not sure but we log a similar line for abort here

existingStore match {
// We need to match like this as opposed to case Some(ss: RocksDBStateStore)
// because of how the tests create the class in StateStoreRDDSuite
case Some(stateStore: ReadStateStore) if stateStore.isInstanceOf[RocksDBStateStore] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check isn't sufficient because it would succeed if I pass in a RocksDBStateStore that wasn't created as readonly. Instead we should check if the RocksDBStateStore passed in was created as readonly?

case Some(stateStore: ReadStateStore) if stateStore.isInstanceOf[RocksDBStateStore] =>
stateStore.asInstanceOf[StateStore]
case Some(other) =>
throw new IllegalArgumentException(s"Existing store must be a RocksDBStateStore," +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw error earlier if a bad store was passed in before we do load?

@@ -950,6 +989,29 @@ object StateStore extends Logging {
storeProvider.getReadStore(version, stateStoreCkptId)
}

def getWriteStore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add func comment

if (!store.hasCommitted) store.abort()
})
ctxt.addTaskFailureListener(new TaskFailureListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the failure listener?

store.abort()
val ctxt = TaskContext.get()
ctxt.addTaskCompletionListener[Unit](_ => {
if (!StateStoreThreadLocalTracker.isUsedForWriteStore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants