Skip to content

kvcoord: track a write buffer transformation for all requests #146112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 83 additions & 106 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (twb *txnWriteBuffer) SendLocked(
return nil, kvpb.NewError(err)
}

transformedBa, ts, pErr := twb.applyTransformations(ctx, ba)
transformedBa, rr, pErr := twb.applyTransformations(ctx, ba)
if pErr != nil {
return nil, pErr
}
Expand All @@ -253,8 +253,8 @@ func (twb *txnWriteBuffer) SendLocked(
// left with an empty batch after applying transformations, eschew sending
// anything to KV.
br := ba.CreateReply()
for i, t := range ts {
br.Responses[i], pErr = t.toResp(ctx, twb, kvpb.ResponseUnion{}, ba.Txn)
for i, record := range rr {
br.Responses[i], pErr = record.toResp(ctx, twb, kvpb.ResponseUnion{}, ba.Txn)
if pErr != nil {
return nil, pErr
}
Expand All @@ -265,10 +265,10 @@ func (twb *txnWriteBuffer) SendLocked(

br, pErr := twb.wrapped.SendLocked(ctx, transformedBa)
if pErr != nil {
return nil, twb.adjustError(ctx, transformedBa, ts, pErr)
return nil, twb.adjustError(ctx, transformedBa, rr, pErr)
}

return twb.mergeResponseWithTransformations(ctx, ts, br)
return twb.mergeResponseWithRequestRecords(ctx, rr, br)
}

func (twb *txnWriteBuffer) batchRequiresFlush(ctx context.Context, ba *kvpb.BatchRequest) bool {
Expand Down Expand Up @@ -442,7 +442,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
// adjustError adjusts the provided error based on the transformations made by
// the txnWriteBuffer to the batch request before sending it to KV.
func (twb *txnWriteBuffer) adjustError(
ctx context.Context, ba *kvpb.BatchRequest, ts transformations, pErr *kvpb.Error,
ctx context.Context, ba *kvpb.BatchRequest, rr requestRecords, pErr *kvpb.Error,
) *kvpb.Error {
// Fix the error index to hide the impact of any requests that were
// transformed.
Expand All @@ -452,12 +452,12 @@ func (twb *txnWriteBuffer) adjustError(
// therefore weren't sent to the KV layer. We can then adjust the error
// index accordingly.
numStripped := int32(0)
numOriginalRequests := len(ba.Requests) + len(ts)
numOriginalRequests := len(ba.Requests) + len(rr)
baIdx := int32(0)
for i := range numOriginalRequests {
if len(ts) > 0 && ts[0].index == i {
curTs := ts[0]
ts = ts[1:]
if len(rr) > 0 && rr[0].index == i {
curTs := rr[0]
rr = rr[1:]
if curTs.stripped {
numStripped++
} else {
Expand Down Expand Up @@ -639,49 +639,64 @@ func (twb *txnWriteBuffer) closeLocked() {}

// applyTransformations applies any applicable transformations to the supplied
// batch request. In doing so, a new batch request with transformations applied
// along with a list of transformations that were applied is returned. The
// caller must handle these transformations on the response path.
// along with a list of requestRecords is returned. The caller must handle the
// transformations on the response path.
//
// Some examples of transformations include:
//
// 1. Blind writes (Put/Delete requests) are buffered locally. When they the
// original request has MustAcquireExclusiveLock set, a locking Get is used to
// acquire the lock.
// 1. Blind writes (Put/Delete requests) are buffered locally. When the original
// request has MustAcquireExclusiveLock set, a locking Get is used to acquire
// the lock.
// 2. Point reads (Get requests) are served from the buffer and stripped from
// the batch iff the key has seen a buffered write.
// 3. Scans are always sent to the KV layer, but if the key span being scanned
// overlaps with any buffered writes, then the response from the KV layer needs
// to be merged with buffered writes. These are collected as transformations.
// to be merged with buffered writes. These are collected as requestRecords.
// 4. ReverseScans, similar to scans, are also always sent to the KV layer and
// their response needs to be merged with any buffered writes. The only
// difference is the direction in which the buffer is iterated when doing the
// merge. As a result, they're also collected as tranformations.
// merge. As a result, they're also collected as requestRecords.
// 5. Conditional Puts are decomposed into a locking Get followed by a Put. The
// Put is buffered locally if the condition evaluates successfully using the
// Get's response. Otherwise, a ConditionFailedError is returned.
//
// TODO(arul): Augment this comment as these expand.
func (twb *txnWriteBuffer) applyTransformations(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchRequest, transformations, *kvpb.Error) {
) (*kvpb.BatchRequest, requestRecords, *kvpb.Error) {
baRemote := ba.ShallowCopy()
// TODO(arul): We could improve performance here by pre-allocating
// baRemote.Requests to the correct size by counting the number of Puts/Dels
// in ba.Requests. The same for the transformations slice. We could also
// in ba.Requests. The same for the requestRecords slice. We could also
// allocate the right number of ResponseUnion, PutResponse, and DeleteResponse
// objects as well.
baRemote.Requests = nil

var ts transformations
rr := make(requestRecords, 0, len(ba.Requests))
for i, ru := range ba.Requests {
req := ru.GetInner()
// Track a requestRecord for the request regardless of the type, and
// regardless of whether it was served from the buffer or not. For
// transformed requests (e.g. CPut) this is expected. For Gets and Scans, we
// need to track a requestRecord because we haven't buffered any writes
// from our current batch in the buffer yet, so checking the buffer here, at
// request time, isn't sufficient to determine whether the request needs to
// serve a read from the buffer before returning a response or not.
//
// Only QueryLocksRequest and LeaseInfoRequest don't require a tracking
// requestRecord, but it's harmless to add one, and it simplifies the code.
//
// The stripped and transformed fields will be set below for specific
// requests.
record := requestRecord{
stripped: false,
transformed: false,
index: i,
origRequest: req,
}
switch t := req.(type) {
case *kvpb.ConditionalPutRequest:
ts = append(ts, transformation{
stripped: false,
index: i,
origRequest: req,
})
record.transformed = true
// NB: Regardless of whether there is already a buffered write on
// this key or not, we need to send a locking Get to the KV layer to
// acquire a lock. However, if we had knowledge of what locks the
Expand Down Expand Up @@ -720,11 +735,8 @@ func (twb *txnWriteBuffer) applyTransformations(
})
baRemote.Requests = append(baRemote.Requests, getReqU)
}
ts = append(ts, transformation{
stripped: !t.MustAcquireExclusiveLock,
index: i,
origRequest: req,
})
record.stripped = !t.MustAcquireExclusiveLock
record.transformed = t.MustAcquireExclusiveLock

case *kvpb.DeleteRequest:
// If MustAcquireExclusiveLock flag is set on the DeleteRequest,
Expand All @@ -744,11 +756,8 @@ func (twb *txnWriteBuffer) applyTransformations(
})
baRemote.Requests = append(baRemote.Requests, getReqU)
}
ts = append(ts, transformation{
stripped: !t.MustAcquireExclusiveLock,
index: i,
origRequest: req,
})
record.stripped = !t.MustAcquireExclusiveLock
record.transformed = t.MustAcquireExclusiveLock

case *kvpb.GetRequest:
// If the key is in the buffer, we must serve the read from the buffer.
Expand Down Expand Up @@ -779,45 +788,14 @@ func (twb *txnWriteBuffer) applyTransformations(
// Wasn't served locally; send the request to the KV layer.
baRemote.Requests = append(baRemote.Requests, ru)
}
// Even if the request wasn't served from the buffer here, we still track
// a transformation for it. That's because we haven't buffered any writes
// from our current batch in the buffer yet, so checking the buffer above
// isn't sufficient to determine whether the request needs to serve a read
// from the buffer before returning a response or not.
ts = append(ts, transformation{
stripped: stripped,
index: i,
origRequest: req,
})
record.stripped = stripped

case *kvpb.ScanRequest:
overlaps := twb.scanOverlaps(t.Key, t.EndKey)
if overlaps {
ts = append(ts, transformation{
stripped: false,
index: i,
origRequest: req,
})
}
case *kvpb.ScanRequest, *kvpb.ReverseScanRequest:
// Regardless of whether the scan overlaps with any writes in the buffer
// or not, we must send the request to the KV layer. We can't know for
// sure that there's nothing else to read.
baRemote.Requests = append(baRemote.Requests, ru)

case *kvpb.ReverseScanRequest:
overlaps := twb.scanOverlaps(t.Key, t.EndKey)
if overlaps {
ts = append(ts, transformation{
stripped: false,
index: i,
origRequest: req,
})
}
// Similar to the reasoning above, regardless of whether the reverse
// scan overlaps with any writes in the buffer or not, we must send
// the request to the KV layer.
baRemote.Requests = append(baRemote.Requests, ru)

case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
// These requests don't interact with buffered writes, so we simply
// let them through.
Expand All @@ -826,8 +804,9 @@ func (twb *txnWriteBuffer) applyTransformations(
default:
return nil, nil, kvpb.NewError(unsupportedMethodError(t.Method()))
}
rr = append(rr, record)
}
return baRemote, ts, nil
return baRemote, rr, nil
}

// seekItemForSpan returns a bufferedWrite appropriate for use with a
Expand Down Expand Up @@ -873,14 +852,6 @@ func (twb *txnWriteBuffer) maybeServeRead(
return nil, false
}

// scanOverlaps returns whether the given key range overlaps with any buffered
// write.
func (twb *txnWriteBuffer) scanOverlaps(key roachpb.Key, endKey roachpb.Key) bool {
it := twb.buffer.MakeIter()
it.FirstOverlap(twb.seekItemForSpan(key, endKey))
return it.Valid()
}

// mergeWithScanResp takes a ScanRequest, that was sent to the KV layer, and the
// response returned by the KV layer, and merges it with any writes that were
// buffered by the transaction to correctly uphold read-your-own-write
Expand Down Expand Up @@ -1035,47 +1006,47 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
}
}

// mergeResponsesWithTransformations merges responses from the KV layer with the
// transformations that were applied by the txnWriteBuffer before sending the
// batch request. As a result, interceptors above the txnWriteBuffer remain
// oblivious to its decision to buffer any writes.
func (twb *txnWriteBuffer) mergeResponseWithTransformations(
ctx context.Context, ts transformations, br *kvpb.BatchResponse,
// mergeResponseWithRequestRecords merges responses from the KV layer with the
// requestRecords and potential transformations applied by the txnWriteBuffer
// before sending the batch request. As a result, interceptors above the
// txnWriteBuffer remain oblivious to its decision to buffer any writes.
func (twb *txnWriteBuffer) mergeResponseWithRequestRecords(
ctx context.Context, rr requestRecords, br *kvpb.BatchResponse,
) (_ *kvpb.BatchResponse, pErr *kvpb.Error) {
if ts.Empty() && br == nil {
if rr.Empty() && br == nil {
log.Fatal(ctx, "unexpectedly found no transformations and no batch response")
} else if ts.Empty() {
} else if rr.Empty() {
return br, nil
}

// Figure out the length of the merged responses slice.
mergedRespsLen := len(br.Responses)
for _, t := range ts {
for _, t := range rr {
if t.stripped {
mergedRespsLen++
}
}
mergedResps := make([]kvpb.ResponseUnion, mergedRespsLen)
for i := range mergedResps {
if len(ts) > 0 && ts[0].index == i {
if !ts[0].stripped {
if len(rr) > 0 && rr[0].index == i {
if !rr[0].stripped {
// If the transformation wasn't stripped from the batch we sent to KV,
// we received a response for it, which then needs to be combined with
// what's in the write buffer.
resp := br.Responses[0]
mergedResps[i], pErr = ts[0].toResp(ctx, twb, resp, br.Txn)
mergedResps[i], pErr = rr[0].toResp(ctx, twb, resp, br.Txn)
if pErr != nil {
return nil, pErr
}
br.Responses = br.Responses[1:]
} else {
mergedResps[i], pErr = ts[0].toResp(ctx, twb, kvpb.ResponseUnion{}, br.Txn)
mergedResps[i], pErr = rr[0].toResp(ctx, twb, kvpb.ResponseUnion{}, br.Txn)
if pErr != nil {
return nil, pErr
}
}

ts = ts[1:]
rr = rr[1:]
continue
}

Expand All @@ -1087,26 +1058,32 @@ func (twb *txnWriteBuffer) mergeResponseWithTransformations(
return br, nil
}

// transformation is a modification applied by the txnWriteBuffer on a batch
// request that needs to be accounted for when returning the response.
type transformation struct {
// requestRecord stores a set of metadata fields about potential transformations
// applied by the txnWriteBuffer on a batch request that needs to be accounted
// for when returning the response.
type requestRecord struct {
// stripped, if true, indicates that the request was stripped from the batch
// and never sent to the KV layer.
stripped bool
// index of the request in the original batch to which the transformation
// transformed, if true, indicates that the request was transformed into a
// different request to be sent to the KV layer. If stripped is true, then
// transformed is always false; i.e. if the request was completely dropped,
// then it's not considered transformed.
transformed bool
// index of the request in the original batch to which the requestRecord
// applies.
index int
// origRequest is the original request that was transformed.
origRequest kvpb.Request
}

// toResp returns the response that should be added to the batch response as
// a result of applying the transformation.
func (t transformation) toResp(
// a result of applying the requestRecord.
func (rr requestRecord) toResp(
ctx context.Context, twb *txnWriteBuffer, br kvpb.ResponseUnion, txn *roachpb.Transaction,
) (kvpb.ResponseUnion, *kvpb.Error) {
var ru kvpb.ResponseUnion
switch req := t.origRequest.(type) {
switch req := rr.origRequest.(type) {
case *kvpb.ConditionalPutRequest:
// Evaluate the condition.
evalFn := mvcceval.MaybeConditionFailedError
Expand All @@ -1132,7 +1109,7 @@ func (t transformation) toResp(
)
if condFailedErr != nil {
pErr := kvpb.NewErrorWithTxn(condFailedErr, txn)
pErr.SetErrorIndex(int32(t.index))
pErr.SetErrorIndex(int32(rr.index))
return kvpb.ResponseUnion{}, pErr
}
// The condition was satisfied; buffer the write and return a
Expand Down Expand Up @@ -1191,13 +1168,13 @@ func (t transformation) toResp(
} else {
// The request wasn't served from the buffer; return the response from the
// KV layer.
assertTrue(!t.stripped, "we shouldn't be stripping requests that aren't served from the buffer")
assertTrue(!rr.stripped, "we shouldn't be stripping requests that aren't served from the buffer")
ru = br
}

case *kvpb.ScanRequest:
scanResp, err := twb.mergeWithScanResp(
t.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse),
rr.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse),
)
if err != nil {
return kvpb.ResponseUnion{}, kvpb.NewError(err)
Expand All @@ -1206,7 +1183,7 @@ func (t transformation) toResp(

case *kvpb.ReverseScanRequest:
reverseScanResp, err := twb.mergeWithReverseScanResp(
t.origRequest.(*kvpb.ReverseScanRequest), br.GetInner().(*kvpb.ReverseScanResponse),
rr.origRequest.(*kvpb.ReverseScanRequest), br.GetInner().(*kvpb.ReverseScanResponse),
)
if err != nil {
return kvpb.ResponseUnion{}, kvpb.NewError(err)
Expand All @@ -1224,11 +1201,11 @@ func (t transformation) toResp(
return ru, nil
}

// transformations is a list of transformations applied by the txnWriteBuffer.
type transformations []transformation
// requestRecords is a slice of requestRecord.
type requestRecords []requestRecord

func (t transformations) Empty() bool {
return len(t) == 0
func (rr requestRecords) Empty() bool {
return len(rr) == 0
}

// addToBuffer adds a write to the given key to the buffer.
Expand Down
Loading
Loading