@@ -242,7 +242,7 @@ func (twb *txnWriteBuffer) SendLocked(
242
242
return nil , kvpb .NewError (err )
243
243
}
244
244
245
- transformedBa , ts , pErr := twb .applyTransformations (ctx , ba )
245
+ transformedBa , rr , pErr := twb .applyTransformations (ctx , ba )
246
246
if pErr != nil {
247
247
return nil , pErr
248
248
}
@@ -253,8 +253,8 @@ func (twb *txnWriteBuffer) SendLocked(
253
253
// left with an empty batch after applying transformations, eschew sending
254
254
// anything to KV.
255
255
br := ba .CreateReply ()
256
- for i , t := range ts {
257
- br .Responses [i ], pErr = t .toResp (ctx , twb , kvpb.ResponseUnion {}, ba .Txn )
256
+ for i , record := range rr {
257
+ br .Responses [i ], pErr = record .toResp (ctx , twb , kvpb.ResponseUnion {}, ba .Txn )
258
258
if pErr != nil {
259
259
return nil , pErr
260
260
}
@@ -265,10 +265,10 @@ func (twb *txnWriteBuffer) SendLocked(
265
265
266
266
br , pErr := twb .wrapped .SendLocked (ctx , transformedBa )
267
267
if pErr != nil {
268
- return nil , twb .adjustError (ctx , transformedBa , ts , pErr )
268
+ return nil , twb .adjustError (ctx , transformedBa , rr , pErr )
269
269
}
270
270
271
- return twb .mergeResponseWithTransformations (ctx , ts , br )
271
+ return twb .mergeResponseWithRequestRecords (ctx , rr , br )
272
272
}
273
273
274
274
func (twb * txnWriteBuffer ) batchRequiresFlush (ctx context.Context , ba * kvpb.BatchRequest ) bool {
@@ -442,7 +442,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 {
442
442
// adjustError adjusts the provided error based on the transformations made by
443
443
// the txnWriteBuffer to the batch request before sending it to KV.
444
444
func (twb * txnWriteBuffer ) adjustError (
445
- ctx context.Context , ba * kvpb.BatchRequest , ts transformations , pErr * kvpb.Error ,
445
+ ctx context.Context , ba * kvpb.BatchRequest , rr requestRecords , pErr * kvpb.Error ,
446
446
) * kvpb.Error {
447
447
// Fix the error index to hide the impact of any requests that were
448
448
// transformed.
@@ -452,12 +452,12 @@ func (twb *txnWriteBuffer) adjustError(
452
452
// therefore weren't sent to the KV layer. We can then adjust the error
453
453
// index accordingly.
454
454
numStripped := int32 (0 )
455
- numOriginalRequests := len (ba .Requests ) + len (ts )
455
+ numOriginalRequests := len (ba .Requests ) + len (rr )
456
456
baIdx := int32 (0 )
457
457
for i := range numOriginalRequests {
458
- if len (ts ) > 0 && ts [0 ].index == i {
459
- curTs := ts [0 ]
460
- ts = ts [1 :]
458
+ if len (rr ) > 0 && rr [0 ].index == i {
459
+ curTs := rr [0 ]
460
+ rr = rr [1 :]
461
461
if curTs .stripped {
462
462
numStripped ++
463
463
} else {
@@ -639,61 +639,64 @@ func (twb *txnWriteBuffer) closeLocked() {}
639
639
640
640
// applyTransformations applies any applicable transformations to the supplied
641
641
// batch request. In doing so, a new batch request with transformations applied
642
- // along with a list of transformations that were applied is returned. The
643
- // caller must handle these transformations on the response path.
642
+ // along with a list of requestRecords is returned. The caller must handle the
643
+ // transformations on the response path.
644
644
//
645
645
// Some examples of transformations include:
646
646
//
647
- // 1. Blind writes (Put/Delete requests) are buffered locally. When they the
648
- // original request has MustAcquireExclusiveLock set, a locking Get is used to
649
- // acquire the lock.
647
+ // 1. Blind writes (Put/Delete requests) are buffered locally. When the original
648
+ // request has MustAcquireExclusiveLock set, a locking Get is used to acquire
649
+ // the lock.
650
650
// 2. Point reads (Get requests) are served from the buffer and stripped from
651
651
// the batch iff the key has seen a buffered write.
652
652
// 3. Scans are always sent to the KV layer, but if the key span being scanned
653
653
// overlaps with any buffered writes, then the response from the KV layer needs
654
- // to be merged with buffered writes. These are collected as transformations .
654
+ // to be merged with buffered writes. These are collected as requestRecords .
655
655
// 4. ReverseScans, similar to scans, are also always sent to the KV layer and
656
656
// their response needs to be merged with any buffered writes. The only
657
657
// difference is the direction in which the buffer is iterated when doing the
658
- // merge. As a result, they're also collected as tranformations .
658
+ // merge. As a result, they're also collected as requestRecords .
659
659
// 5. Conditional Puts are decomposed into a locking Get followed by a Put. The
660
660
// Put is buffered locally if the condition evaluates successfully using the
661
661
// Get's response. Otherwise, a ConditionFailedError is returned.
662
662
//
663
663
// TODO(arul): Augment this comment as these expand.
664
664
func (twb * txnWriteBuffer ) applyTransformations (
665
665
ctx context.Context , ba * kvpb.BatchRequest ,
666
- ) (* kvpb.BatchRequest , transformations , * kvpb.Error ) {
666
+ ) (* kvpb.BatchRequest , requestRecords , * kvpb.Error ) {
667
667
baRemote := ba .ShallowCopy ()
668
668
// TODO(arul): We could improve performance here by pre-allocating
669
669
// baRemote.Requests to the correct size by counting the number of Puts/Dels
670
- // in ba.Requests. The same for the transformations slice. We could also
670
+ // in ba.Requests. The same for the requestRecords slice. We could also
671
671
// allocate the right number of ResponseUnion, PutResponse, and DeleteResponse
672
672
// objects as well.
673
673
baRemote .Requests = nil
674
674
675
- ts := make (transformations , 0 , len (ba .Requests ))
675
+ rr := make (requestRecords , 0 , len (ba .Requests ))
676
676
for i , ru := range ba .Requests {
677
677
req := ru .GetInner ()
678
- // Track a transformation for the request regardless of the type, and
679
- // regardless of whether it was served from the buffer or not. For truly
678
+ // Track a requestRecord for the request regardless of the type, and
679
+ // regardless of whether it was served from the buffer or not. For
680
680
// transformed requests (e.g. CPut) this is expected. For Gets and Scans, we
681
- // need to track a transformation because we haven't buffered any writes
681
+ // need to track a requestRecord because we haven't buffered any writes
682
682
// from our current batch in the buffer yet, so checking the buffer here, at
683
683
// request time, isn't sufficient to determine whether the request needs to
684
684
// serve a read from the buffer before returning a response or not.
685
685
//
686
686
// Only QueryLocksRequest and LeaseInfoRequest don't require a tracking
687
- // transformation , but it's harmless to add one, and it simplifies the code.
687
+ // requestRecord , but it's harmless to add one, and it simplifies the code.
688
688
//
689
- // The stripped field will be set below for specific stripped requests.
690
- tr := transformation {
689
+ // The stripped and transformed fields will be set below for specific
690
+ // requests.
691
+ record := requestRecord {
691
692
stripped : false ,
693
+ transformed : false ,
692
694
index : i ,
693
695
origRequest : req ,
694
696
}
695
697
switch t := req .(type ) {
696
698
case * kvpb.ConditionalPutRequest :
699
+ record .transformed = true
697
700
// NB: Regardless of whether there is already a buffered write on
698
701
// this key or not, we need to send a locking Get to the KV layer to
699
702
// acquire a lock. However, if we had knowledge of what locks the
@@ -732,7 +735,8 @@ func (twb *txnWriteBuffer) applyTransformations(
732
735
})
733
736
baRemote .Requests = append (baRemote .Requests , getReqU )
734
737
}
735
- tr .stripped = ! t .MustAcquireExclusiveLock
738
+ record .stripped = ! t .MustAcquireExclusiveLock
739
+ record .transformed = t .MustAcquireExclusiveLock
736
740
737
741
case * kvpb.DeleteRequest :
738
742
// If MustAcquireExclusiveLock flag is set on the DeleteRequest,
@@ -752,7 +756,8 @@ func (twb *txnWriteBuffer) applyTransformations(
752
756
})
753
757
baRemote .Requests = append (baRemote .Requests , getReqU )
754
758
}
755
- tr .stripped = ! t .MustAcquireExclusiveLock
759
+ record .stripped = ! t .MustAcquireExclusiveLock
760
+ record .transformed = t .MustAcquireExclusiveLock
756
761
757
762
case * kvpb.GetRequest :
758
763
// If the key is in the buffer, we must serve the read from the buffer.
@@ -783,7 +788,7 @@ func (twb *txnWriteBuffer) applyTransformations(
783
788
// Wasn't served locally; send the request to the KV layer.
784
789
baRemote .Requests = append (baRemote .Requests , ru )
785
790
}
786
- tr .stripped = stripped
791
+ record .stripped = stripped
787
792
788
793
case * kvpb.ScanRequest , * kvpb.ReverseScanRequest :
789
794
// Regardless of whether the scan overlaps with any writes in the buffer
@@ -799,9 +804,9 @@ func (twb *txnWriteBuffer) applyTransformations(
799
804
default :
800
805
return nil , nil , kvpb .NewError (unsupportedMethodError (t .Method ()))
801
806
}
802
- ts = append (ts , tr )
807
+ rr = append (rr , record )
803
808
}
804
- return baRemote , ts , nil
809
+ return baRemote , rr , nil
805
810
}
806
811
807
812
// seekItemForSpan returns a bufferedWrite appropriate for use with a
@@ -1001,47 +1006,47 @@ func (twb *txnWriteBuffer) mergeBufferAndResp(
1001
1006
}
1002
1007
}
1003
1008
1004
- // mergeResponsesWithTransformations merges responses from the KV layer with the
1005
- // transformations that were applied by the txnWriteBuffer before sending the
1006
- // batch request. As a result, interceptors above the txnWriteBuffer remain
1007
- // oblivious to its decision to buffer any writes.
1008
- func (twb * txnWriteBuffer ) mergeResponseWithTransformations (
1009
- ctx context.Context , ts transformations , br * kvpb.BatchResponse ,
1009
+ // mergeResponseWithRequestRecords merges responses from the KV layer with the
1010
+ // requestRecords and potential transformations applied by the txnWriteBuffer
1011
+ // before sending the batch request. As a result, interceptors above the
1012
+ // txnWriteBuffer remain oblivious to its decision to buffer any writes.
1013
+ func (twb * txnWriteBuffer ) mergeResponseWithRequestRecords (
1014
+ ctx context.Context , rr requestRecords , br * kvpb.BatchResponse ,
1010
1015
) (_ * kvpb.BatchResponse , pErr * kvpb.Error ) {
1011
- if ts .Empty () && br == nil {
1016
+ if rr .Empty () && br == nil {
1012
1017
log .Fatal (ctx , "unexpectedly found no transformations and no batch response" )
1013
- } else if ts .Empty () {
1018
+ } else if rr .Empty () {
1014
1019
return br , nil
1015
1020
}
1016
1021
1017
1022
// Figure out the length of the merged responses slice.
1018
1023
mergedRespsLen := len (br .Responses )
1019
- for _ , t := range ts {
1024
+ for _ , t := range rr {
1020
1025
if t .stripped {
1021
1026
mergedRespsLen ++
1022
1027
}
1023
1028
}
1024
1029
mergedResps := make ([]kvpb.ResponseUnion , mergedRespsLen )
1025
1030
for i := range mergedResps {
1026
- if len (ts ) > 0 && ts [0 ].index == i {
1027
- if ! ts [0 ].stripped {
1031
+ if len (rr ) > 0 && rr [0 ].index == i {
1032
+ if ! rr [0 ].stripped {
1028
1033
// If the transformation wasn't stripped from the batch we sent to KV,
1029
1034
// we received a response for it, which then needs to be combined with
1030
1035
// what's in the write buffer.
1031
1036
resp := br .Responses [0 ]
1032
- mergedResps [i ], pErr = ts [0 ].toResp (ctx , twb , resp , br .Txn )
1037
+ mergedResps [i ], pErr = rr [0 ].toResp (ctx , twb , resp , br .Txn )
1033
1038
if pErr != nil {
1034
1039
return nil , pErr
1035
1040
}
1036
1041
br .Responses = br .Responses [1 :]
1037
1042
} else {
1038
- mergedResps [i ], pErr = ts [0 ].toResp (ctx , twb , kvpb.ResponseUnion {}, br .Txn )
1043
+ mergedResps [i ], pErr = rr [0 ].toResp (ctx , twb , kvpb.ResponseUnion {}, br .Txn )
1039
1044
if pErr != nil {
1040
1045
return nil , pErr
1041
1046
}
1042
1047
}
1043
1048
1044
- ts = ts [1 :]
1049
+ rr = rr [1 :]
1045
1050
continue
1046
1051
}
1047
1052
@@ -1053,26 +1058,32 @@ func (twb *txnWriteBuffer) mergeResponseWithTransformations(
1053
1058
return br , nil
1054
1059
}
1055
1060
1056
- // transformation is a modification applied by the txnWriteBuffer on a batch
1057
- // request that needs to be accounted for when returning the response.
1058
- type transformation struct {
1061
+ // requestRecord stores a set of metadata fields about potential transformations
1062
+ // applied by the txnWriteBuffer on a batch request that needs to be accounted
1063
+ // for when returning the response.
1064
+ type requestRecord struct {
1059
1065
// stripped, if true, indicates that the request was stripped from the batch
1060
1066
// and never sent to the KV layer.
1061
1067
stripped bool
1062
- // index of the request in the original batch to which the transformation
1068
+ // transformed, if true, indicates that the request was transformed into a
1069
+ // different request to be sent to the KV layer. If stripped is true, then
1070
+ // transformed is always false; i.e. if the request was completely dropped,
1071
+ // then it's not considered transformed.
1072
+ transformed bool
1073
+ // index of the request in the original batch to which the requestRecord
1063
1074
// applies.
1064
1075
index int
1065
1076
// origRequest is the original request that was transformed.
1066
1077
origRequest kvpb.Request
1067
1078
}
1068
1079
1069
1080
// toResp returns the response that should be added to the batch response as
1070
- // a result of applying the transformation .
1071
- func (t transformation ) toResp (
1081
+ // a result of applying the requestRecord .
1082
+ func (rr requestRecord ) toResp (
1072
1083
ctx context.Context , twb * txnWriteBuffer , br kvpb.ResponseUnion , txn * roachpb.Transaction ,
1073
1084
) (kvpb.ResponseUnion , * kvpb.Error ) {
1074
1085
var ru kvpb.ResponseUnion
1075
- switch req := t .origRequest .(type ) {
1086
+ switch req := rr .origRequest .(type ) {
1076
1087
case * kvpb.ConditionalPutRequest :
1077
1088
// Evaluate the condition.
1078
1089
evalFn := mvcceval .MaybeConditionFailedError
@@ -1098,7 +1109,7 @@ func (t transformation) toResp(
1098
1109
)
1099
1110
if condFailedErr != nil {
1100
1111
pErr := kvpb .NewErrorWithTxn (condFailedErr , txn )
1101
- pErr .SetErrorIndex (int32 (t .index ))
1112
+ pErr .SetErrorIndex (int32 (rr .index ))
1102
1113
return kvpb.ResponseUnion {}, pErr
1103
1114
}
1104
1115
// The condition was satisfied; buffer the write and return a
@@ -1157,13 +1168,13 @@ func (t transformation) toResp(
1157
1168
} else {
1158
1169
// The request wasn't served from the buffer; return the response from the
1159
1170
// KV layer.
1160
- assertTrue (! t .stripped , "we shouldn't be stripping requests that aren't served from the buffer" )
1171
+ assertTrue (! rr .stripped , "we shouldn't be stripping requests that aren't served from the buffer" )
1161
1172
ru = br
1162
1173
}
1163
1174
1164
1175
case * kvpb.ScanRequest :
1165
1176
scanResp , err := twb .mergeWithScanResp (
1166
- t .origRequest .(* kvpb.ScanRequest ), br .GetInner ().(* kvpb.ScanResponse ),
1177
+ rr .origRequest .(* kvpb.ScanRequest ), br .GetInner ().(* kvpb.ScanResponse ),
1167
1178
)
1168
1179
if err != nil {
1169
1180
return kvpb.ResponseUnion {}, kvpb .NewError (err )
@@ -1172,7 +1183,7 @@ func (t transformation) toResp(
1172
1183
1173
1184
case * kvpb.ReverseScanRequest :
1174
1185
reverseScanResp , err := twb .mergeWithReverseScanResp (
1175
- t .origRequest .(* kvpb.ReverseScanRequest ), br .GetInner ().(* kvpb.ReverseScanResponse ),
1186
+ rr .origRequest .(* kvpb.ReverseScanRequest ), br .GetInner ().(* kvpb.ReverseScanResponse ),
1176
1187
)
1177
1188
if err != nil {
1178
1189
return kvpb.ResponseUnion {}, kvpb .NewError (err )
@@ -1190,11 +1201,11 @@ func (t transformation) toResp(
1190
1201
return ru , nil
1191
1202
}
1192
1203
1193
- // transformations is a list of transformations applied by the txnWriteBuffer .
1194
- type transformations []transformation
1204
+ // requestRecords is a slice of requestRecord .
1205
+ type requestRecords []requestRecord
1195
1206
1196
- func (t transformations ) Empty () bool {
1197
- return len (t ) == 0
1207
+ func (rr requestRecords ) Empty () bool {
1208
+ return len (rr ) == 0
1198
1209
}
1199
1210
1200
1211
// addToBuffer adds a write to the given key to the buffer.
0 commit comments