-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations #50742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
b3c584b
63573db
f7d0e70
560c5c7
81c0eed
0465454
93f014a
226f99b
a892142
768346a
55664c4
8d28ea2
807a3c1
7530d54
95db839
8f15229
32e5545
01b315c
18793fd
3270b84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -204,7 +204,6 @@ class StatePartitionReader( | |
} | ||
|
||
override def close(): Unit = { | ||
store.abort() | ||
super.close() | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils | |
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs._ | ||
|
||
import org.apache.spark.{SparkConf, SparkEnv, SparkException} | ||
import org.apache.spark.{SparkConf, SparkEnv, SparkException, TaskContext} | ||
import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext} | ||
import org.apache.spark.io.CompressionCodec | ||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
|
@@ -89,6 +89,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
|
||
override def abort(): Unit = {} | ||
|
||
override def release(): Unit = {} | ||
|
||
override def toString(): String = { | ||
s"HDFSReadStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]" | ||
} | ||
|
@@ -112,6 +114,16 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
case object UPDATING extends STATE | ||
case object COMMITTED extends STATE | ||
case object ABORTED extends STATE | ||
case object RELEASED extends STATE | ||
|
||
|
||
Option(TaskContext.get()).foreach { ctxt => | ||
ctxt.addTaskCompletionListener[Unit](ctx => { | ||
if (state == UPDATING) { | ||
abort() | ||
} | ||
}) | ||
} | ||
|
||
private val newVersion = version + 1 | ||
@volatile private var state: STATE = UPDATING | ||
|
@@ -194,6 +206,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}") | ||
} | ||
|
||
override def release(): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. release should only be allowed on readstore and shouldn't be allowed on statestore. State store should only allow |
||
state = RELEASED | ||
} | ||
|
||
/** | ||
* Get an iterator of all the store data. | ||
* This can be called only after committing all the updates made in the current thread. | ||
|
@@ -953,7 +969,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
* @param endVersion checkpoint version to end with | ||
* @return [[HDFSBackedStateStore]] | ||
*/ | ||
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { | ||
override def replayStateFromSnapshot( | ||
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = { | ||
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) | ||
logInfo(log"Retrieved snapshot at version " + | ||
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " + | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal | |
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.Path | ||
|
||
import org.apache.spark.{SparkConf, SparkEnv} | ||
import org.apache.spark.{SparkConf, SparkEnv, TaskContext} | ||
import org.apache.spark.internal.{Logging, MDC} | ||
import org.apache.spark.internal.LogKeys._ | ||
import org.apache.spark.io.CompressionCodec | ||
|
@@ -43,12 +43,13 @@ private[sql] class RocksDBStateStoreProvider | |
with SupportsFineGrainedReplay { | ||
import RocksDBStateStoreProvider._ | ||
|
||
class RocksDBStateStore(lastVersion: Long) extends StateStore { | ||
class RocksDBStateStore(lastVersion: Long, var readOnly: Boolean) extends StateStore { | ||
/** Trait and classes representing the internal state of the store */ | ||
trait STATE | ||
case object UPDATING extends STATE | ||
case object COMMITTED extends STATE | ||
case object ABORTED extends STATE | ||
case object RELEASED extends STATE | ||
|
||
@volatile private var state: STATE = UPDATING | ||
@volatile private var isValidated = false | ||
|
@@ -57,6 +58,30 @@ private[sql] class RocksDBStateStoreProvider | |
|
||
override def version: Long = lastVersion | ||
|
||
Option(TaskContext.get()).foreach { ctxt => | ||
ctxt.addTaskCompletionListener[Unit]( ctx => { | ||
try { | ||
if (state == UPDATING) { | ||
if (readOnly) { | ||
release() // Only release, do not throw an error because we rely on | ||
// CompletionListener to release for read-only store in | ||
// mapPartitionsWithReadStateStore. | ||
} else { | ||
abort() // Abort since this is an error if stateful task completes | ||
} | ||
} | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning("Failed to abort state store", e) | ||
} | ||
}) | ||
|
||
ctxt.addTaskFailureListener( (_, _) => { | ||
abort() // Either the store is already aborted (this is a no-op) or | ||
// we need to abort it. | ||
}) | ||
} | ||
|
||
override def createColFamilyIfAbsent( | ||
colFamilyName: String, | ||
keySchema: StructType, | ||
|
@@ -365,6 +390,19 @@ private[sql] class RocksDBStateStoreProvider | |
} | ||
result | ||
} | ||
|
||
override def release(): Unit = { | ||
assert(readOnly, "Release can only be called on a read-only store") | ||
if (state != RELEASED) { | ||
ericm-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logInfo(log"Releasing ${MDC(VERSION_NUM, version + 1)} " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it version + 1 here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, not sure but we log a similar line for abort here |
||
log"for ${MDC(STATE_STORE_ID, id)}") | ||
rocksDB.release() | ||
state = RELEASED | ||
} else { | ||
// Optionally log at DEBUG level that it's already released | ||
logDebug(log"State store already released") | ||
} | ||
} | ||
} | ||
|
||
// Test-visible method to fetch the internal RocksDBStateStore class | ||
|
@@ -446,38 +484,52 @@ private[sql] class RocksDBStateStoreProvider | |
|
||
override def stateStoreId: StateStoreId = stateStoreId_ | ||
|
||
override def getStore(version: Long, uniqueId: Option[String] = None): StateStore = { | ||
/** | ||
* Creates and returns a state store with the specified parameters. | ||
* | ||
* @param version The version of the state store to load | ||
* @param uniqueId Optional unique identifier for checkpoint | ||
* @param readOnly Whether to open the store in read-only mode | ||
* @param existingStore Optional existing store to reuse instead of creating a new one | ||
* @return The loaded state store | ||
*/ | ||
private def loadStateStore( | ||
version: Long, | ||
uniqueId: Option[String], | ||
readOnly: Boolean, | ||
existingStore: Option[ReadStateStore] = None): StateStore = { | ||
try { | ||
if (version < 0) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(version) | ||
} | ||
rocksDB.load( | ||
version, | ||
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None) | ||
new RocksDBStateStore(version) | ||
} | ||
catch { | ||
case e: OutOfMemoryError => | ||
throw QueryExecutionErrors.notEnoughMemoryToLoadStore( | ||
stateStoreId.toString, | ||
"ROCKSDB_STORE_PROVIDER", | ||
e) | ||
case e: Throwable => throw StateStoreErrors.cannotLoadStore(e) | ||
} | ||
} | ||
|
||
override def getReadStore(version: Long, uniqueId: Option[String] = None): StateStore = { | ||
try { | ||
if (version < 0) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(version) | ||
// Early validation of the existing store type before loading RocksDB | ||
existingStore.foreach { store => | ||
if (!store.isInstanceOf[RocksDBStateStore]) { | ||
throw new IllegalArgumentException( | ||
s"Existing store must be a RocksDBStateStore, but got ${store.getClass.getSimpleName}") | ||
} | ||
} | ||
|
||
// Load RocksDB store | ||
rocksDB.load( | ||
version, | ||
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None, | ||
readOnly = true) | ||
new RocksDBStateStore(version) | ||
} | ||
catch { | ||
readOnly = readOnly) | ||
|
||
// Create or reuse store instance | ||
existingStore match { | ||
case Some(store: ReadStateStore) if store.isInstanceOf[RocksDBStateStore] => | ||
// Mark store as being used for write operations | ||
val rocksDBStateStore = store.asInstanceOf[RocksDBStateStore] | ||
rocksDBStateStore.readOnly = readOnly | ||
rocksDBStateStore.asInstanceOf[StateStore] | ||
case None => | ||
// Create new store instance | ||
new RocksDBStateStore(version, readOnly) | ||
case _ => null // No need for error case here since we validated earlier | ||
} | ||
} catch { | ||
case e: OutOfMemoryError => | ||
throw QueryExecutionErrors.notEnoughMemoryToLoadStore( | ||
stateStoreId.toString, | ||
|
@@ -487,6 +539,26 @@ private[sql] class RocksDBStateStoreProvider | |
} | ||
} | ||
|
||
override def getStore(version: Long, uniqueId: Option[String] = None): StateStore = { | ||
loadStateStore(version, uniqueId, readOnly = false) | ||
} | ||
|
||
override def upgradeReadStoreToWriteStore( | ||
readStore: ReadStateStore, | ||
version: Long, | ||
uniqueId: Option[String] = None): StateStore = { | ||
assert(version == readStore.version, | ||
s"Can only upgrade readStore to writeStore with the same version," + | ||
s" readStoreVersion: ${readStore.version}, writeStoreVersion: ${version}") | ||
assert(this.stateStoreId == readStore.id, "Can only upgrade readStore to writeStore with" + | ||
" the same stateStoreId") | ||
loadStateStore(version, uniqueId, readOnly = false, existingStore = Some(readStore)) | ||
ericm-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
override def getReadStore(version: Long, uniqueId: Option[String] = None): StateStore = { | ||
loadStateStore(version, uniqueId, readOnly = true) | ||
} | ||
|
||
override def doMaintenance(): Unit = { | ||
try { | ||
rocksDB.doMaintenance() | ||
|
@@ -572,7 +644,8 @@ private[sql] class RocksDBStateStoreProvider | |
* @param endVersion checkpoint version to end with | ||
* @return [[StateStore]] | ||
*/ | ||
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { | ||
override def replayStateFromSnapshot( | ||
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = { | ||
try { | ||
if (snapshotVersion < 1) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion) | ||
|
@@ -581,7 +654,7 @@ private[sql] class RocksDBStateStoreProvider | |
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion) | ||
} | ||
rocksDB.loadFromSnapshot(snapshotVersion, endVersion) | ||
new RocksDBStateStore(endVersion) | ||
new RocksDBStateStore(endVersion, readOnly) | ||
} | ||
catch { | ||
case e: OutOfMemoryError => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we are removing
abort
api in readstore?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I think I remember a conversation we had about leaving it for now. But with this new change, can you check if there is still any non-test usage of calling abort on readstore?