diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go index dfe2f81bcd0f..cc1520b5bfa0 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go @@ -242,7 +242,7 @@ func (twb *txnWriteBuffer) SendLocked( return nil, kvpb.NewError(err) } - transformedBa, ts, pErr := twb.applyTransformations(ctx, ba) + transformedBa, rr, pErr := twb.applyTransformations(ctx, ba) if pErr != nil { return nil, pErr } @@ -253,8 +253,8 @@ func (twb *txnWriteBuffer) SendLocked( // left with an empty batch after applying transformations, eschew sending // anything to KV. br := ba.CreateReply() - for i, t := range ts { - br.Responses[i], pErr = t.toResp(ctx, twb, kvpb.ResponseUnion{}, ba.Txn) + for i, record := range rr { + br.Responses[i], pErr = record.toResp(ctx, twb, kvpb.ResponseUnion{}, ba.Txn) if pErr != nil { return nil, pErr } @@ -265,10 +265,10 @@ func (twb *txnWriteBuffer) SendLocked( br, pErr := twb.wrapped.SendLocked(ctx, transformedBa) if pErr != nil { - return nil, twb.adjustError(ctx, transformedBa, ts, pErr) + return nil, twb.adjustError(ctx, transformedBa, rr, pErr) } - return twb.mergeResponseWithTransformations(ctx, ts, br) + return twb.mergeResponseWithRequestRecords(ctx, rr, br) } func (twb *txnWriteBuffer) batchRequiresFlush(ctx context.Context, ba *kvpb.BatchRequest) bool { @@ -442,7 +442,7 @@ func (twb *txnWriteBuffer) estimateSize(ba *kvpb.BatchRequest) int64 { // adjustError adjusts the provided error based on the transformations made by // the txnWriteBuffer to the batch request before sending it to KV. func (twb *txnWriteBuffer) adjustError( - ctx context.Context, ba *kvpb.BatchRequest, ts transformations, pErr *kvpb.Error, + ctx context.Context, ba *kvpb.BatchRequest, rr requestRecords, pErr *kvpb.Error, ) *kvpb.Error { // Fix the error index to hide the impact of any requests that were // transformed. @@ -452,12 +452,12 @@ func (twb *txnWriteBuffer) adjustError( // therefore weren't sent to the KV layer. We can then adjust the error // index accordingly. numStripped := int32(0) - numOriginalRequests := len(ba.Requests) + len(ts) + numOriginalRequests := len(ba.Requests) + len(rr) baIdx := int32(0) for i := range numOriginalRequests { - if len(ts) > 0 && ts[0].index == i { - curTs := ts[0] - ts = ts[1:] + if len(rr) > 0 && rr[0].index == i { + curTs := rr[0] + rr = rr[1:] if curTs.stripped { numStripped++ } else { @@ -639,23 +639,23 @@ func (twb *txnWriteBuffer) closeLocked() {} // applyTransformations applies any applicable transformations to the supplied // batch request. In doing so, a new batch request with transformations applied -// along with a list of transformations that were applied is returned. The -// caller must handle these transformations on the response path. +// along with a list of requestRecords is returned. The caller must handle the +// transformations on the response path. // // Some examples of transformations include: // -// 1. Blind writes (Put/Delete requests) are buffered locally. When they the -// original request has MustAcquireExclusiveLock set, a locking Get is used to -// acquire the lock. +// 1. Blind writes (Put/Delete requests) are buffered locally. When the original +// request has MustAcquireExclusiveLock set, a locking Get is used to acquire +// the lock. // 2. Point reads (Get requests) are served from the buffer and stripped from // the batch iff the key has seen a buffered write. // 3. Scans are always sent to the KV layer, but if the key span being scanned // overlaps with any buffered writes, then the response from the KV layer needs -// to be merged with buffered writes. These are collected as transformations. +// to be merged with buffered writes. These are collected as requestRecords. // 4. ReverseScans, similar to scans, are also always sent to the KV layer and // their response needs to be merged with any buffered writes. The only // difference is the direction in which the buffer is iterated when doing the -// merge. As a result, they're also collected as tranformations. +// merge. As a result, they're also collected as requestRecords. // 5. Conditional Puts are decomposed into a locking Get followed by a Put. The // Put is buffered locally if the condition evaluates successfully using the // Get's response. Otherwise, a ConditionFailedError is returned. @@ -663,25 +663,40 @@ func (twb *txnWriteBuffer) closeLocked() {} // TODO(arul): Augment this comment as these expand. func (twb *txnWriteBuffer) applyTransformations( ctx context.Context, ba *kvpb.BatchRequest, -) (*kvpb.BatchRequest, transformations, *kvpb.Error) { +) (*kvpb.BatchRequest, requestRecords, *kvpb.Error) { baRemote := ba.ShallowCopy() // TODO(arul): We could improve performance here by pre-allocating // baRemote.Requests to the correct size by counting the number of Puts/Dels - // in ba.Requests. The same for the transformations slice. We could also + // in ba.Requests. The same for the requestRecords slice. We could also // allocate the right number of ResponseUnion, PutResponse, and DeleteResponse // objects as well. baRemote.Requests = nil - var ts transformations + rr := make(requestRecords, 0, len(ba.Requests)) for i, ru := range ba.Requests { req := ru.GetInner() + // Track a requestRecord for the request regardless of the type, and + // regardless of whether it was served from the buffer or not. For + // transformed requests (e.g. CPut) this is expected. For Gets and Scans, we + // need to track a requestRecord because we haven't buffered any writes + // from our current batch in the buffer yet, so checking the buffer here, at + // request time, isn't sufficient to determine whether the request needs to + // serve a read from the buffer before returning a response or not. + // + // Only QueryLocksRequest and LeaseInfoRequest don't require a tracking + // requestRecord, but it's harmless to add one, and it simplifies the code. + // + // The stripped and transformed fields will be set below for specific + // requests. + record := requestRecord{ + stripped: false, + transformed: false, + index: i, + origRequest: req, + } switch t := req.(type) { case *kvpb.ConditionalPutRequest: - ts = append(ts, transformation{ - stripped: false, - index: i, - origRequest: req, - }) + record.transformed = true // NB: Regardless of whether there is already a buffered write on // this key or not, we need to send a locking Get to the KV layer to // acquire a lock. However, if we had knowledge of what locks the @@ -720,11 +735,8 @@ func (twb *txnWriteBuffer) applyTransformations( }) baRemote.Requests = append(baRemote.Requests, getReqU) } - ts = append(ts, transformation{ - stripped: !t.MustAcquireExclusiveLock, - index: i, - origRequest: req, - }) + record.stripped = !t.MustAcquireExclusiveLock + record.transformed = t.MustAcquireExclusiveLock case *kvpb.DeleteRequest: // If MustAcquireExclusiveLock flag is set on the DeleteRequest, @@ -744,11 +756,8 @@ func (twb *txnWriteBuffer) applyTransformations( }) baRemote.Requests = append(baRemote.Requests, getReqU) } - ts = append(ts, transformation{ - stripped: !t.MustAcquireExclusiveLock, - index: i, - origRequest: req, - }) + record.stripped = !t.MustAcquireExclusiveLock + record.transformed = t.MustAcquireExclusiveLock case *kvpb.GetRequest: // If the key is in the buffer, we must serve the read from the buffer. @@ -779,45 +788,14 @@ func (twb *txnWriteBuffer) applyTransformations( // Wasn't served locally; send the request to the KV layer. baRemote.Requests = append(baRemote.Requests, ru) } - // Even if the request wasn't served from the buffer here, we still track - // a transformation for it. That's because we haven't buffered any writes - // from our current batch in the buffer yet, so checking the buffer above - // isn't sufficient to determine whether the request needs to serve a read - // from the buffer before returning a response or not. - ts = append(ts, transformation{ - stripped: stripped, - index: i, - origRequest: req, - }) + record.stripped = stripped - case *kvpb.ScanRequest: - overlaps := twb.scanOverlaps(t.Key, t.EndKey) - if overlaps { - ts = append(ts, transformation{ - stripped: false, - index: i, - origRequest: req, - }) - } + case *kvpb.ScanRequest, *kvpb.ReverseScanRequest: // Regardless of whether the scan overlaps with any writes in the buffer // or not, we must send the request to the KV layer. We can't know for // sure that there's nothing else to read. baRemote.Requests = append(baRemote.Requests, ru) - case *kvpb.ReverseScanRequest: - overlaps := twb.scanOverlaps(t.Key, t.EndKey) - if overlaps { - ts = append(ts, transformation{ - stripped: false, - index: i, - origRequest: req, - }) - } - // Similar to the reasoning above, regardless of whether the reverse - // scan overlaps with any writes in the buffer or not, we must send - // the request to the KV layer. - baRemote.Requests = append(baRemote.Requests, ru) - case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest: // These requests don't interact with buffered writes, so we simply // let them through. @@ -826,8 +804,9 @@ func (twb *txnWriteBuffer) applyTransformations( default: return nil, nil, kvpb.NewError(unsupportedMethodError(t.Method())) } + rr = append(rr, record) } - return baRemote, ts, nil + return baRemote, rr, nil } // seekItemForSpan returns a bufferedWrite appropriate for use with a @@ -873,14 +852,6 @@ func (twb *txnWriteBuffer) maybeServeRead( return nil, false } -// scanOverlaps returns whether the given key range overlaps with any buffered -// write. -func (twb *txnWriteBuffer) scanOverlaps(key roachpb.Key, endKey roachpb.Key) bool { - it := twb.buffer.MakeIter() - it.FirstOverlap(twb.seekItemForSpan(key, endKey)) - return it.Valid() -} - // mergeWithScanResp takes a ScanRequest, that was sent to the KV layer, and the // response returned by the KV layer, and merges it with any writes that were // buffered by the transaction to correctly uphold read-your-own-write @@ -1035,47 +1006,47 @@ func (twb *txnWriteBuffer) mergeBufferAndResp( } } -// mergeResponsesWithTransformations merges responses from the KV layer with the -// transformations that were applied by the txnWriteBuffer before sending the -// batch request. As a result, interceptors above the txnWriteBuffer remain -// oblivious to its decision to buffer any writes. -func (twb *txnWriteBuffer) mergeResponseWithTransformations( - ctx context.Context, ts transformations, br *kvpb.BatchResponse, +// mergeResponseWithRequestRecords merges responses from the KV layer with the +// requestRecords and potential transformations applied by the txnWriteBuffer +// before sending the batch request. As a result, interceptors above the +// txnWriteBuffer remain oblivious to its decision to buffer any writes. +func (twb *txnWriteBuffer) mergeResponseWithRequestRecords( + ctx context.Context, rr requestRecords, br *kvpb.BatchResponse, ) (_ *kvpb.BatchResponse, pErr *kvpb.Error) { - if ts.Empty() && br == nil { + if rr.Empty() && br == nil { log.Fatal(ctx, "unexpectedly found no transformations and no batch response") - } else if ts.Empty() { + } else if rr.Empty() { return br, nil } // Figure out the length of the merged responses slice. mergedRespsLen := len(br.Responses) - for _, t := range ts { + for _, t := range rr { if t.stripped { mergedRespsLen++ } } mergedResps := make([]kvpb.ResponseUnion, mergedRespsLen) for i := range mergedResps { - if len(ts) > 0 && ts[0].index == i { - if !ts[0].stripped { + if len(rr) > 0 && rr[0].index == i { + if !rr[0].stripped { // If the transformation wasn't stripped from the batch we sent to KV, // we received a response for it, which then needs to be combined with // what's in the write buffer. resp := br.Responses[0] - mergedResps[i], pErr = ts[0].toResp(ctx, twb, resp, br.Txn) + mergedResps[i], pErr = rr[0].toResp(ctx, twb, resp, br.Txn) if pErr != nil { return nil, pErr } br.Responses = br.Responses[1:] } else { - mergedResps[i], pErr = ts[0].toResp(ctx, twb, kvpb.ResponseUnion{}, br.Txn) + mergedResps[i], pErr = rr[0].toResp(ctx, twb, kvpb.ResponseUnion{}, br.Txn) if pErr != nil { return nil, pErr } } - ts = ts[1:] + rr = rr[1:] continue } @@ -1087,13 +1058,19 @@ func (twb *txnWriteBuffer) mergeResponseWithTransformations( return br, nil } -// transformation is a modification applied by the txnWriteBuffer on a batch -// request that needs to be accounted for when returning the response. -type transformation struct { +// requestRecord stores a set of metadata fields about potential transformations +// applied by the txnWriteBuffer on a batch request that needs to be accounted +// for when returning the response. +type requestRecord struct { // stripped, if true, indicates that the request was stripped from the batch // and never sent to the KV layer. stripped bool - // index of the request in the original batch to which the transformation + // transformed, if true, indicates that the request was transformed into a + // different request to be sent to the KV layer. If stripped is true, then + // transformed is always false; i.e. if the request was completely dropped, + // then it's not considered transformed. + transformed bool + // index of the request in the original batch to which the requestRecord // applies. index int // origRequest is the original request that was transformed. @@ -1101,12 +1078,12 @@ type transformation struct { } // toResp returns the response that should be added to the batch response as -// a result of applying the transformation. -func (t transformation) toResp( +// a result of applying the requestRecord. +func (rr requestRecord) toResp( ctx context.Context, twb *txnWriteBuffer, br kvpb.ResponseUnion, txn *roachpb.Transaction, ) (kvpb.ResponseUnion, *kvpb.Error) { var ru kvpb.ResponseUnion - switch req := t.origRequest.(type) { + switch req := rr.origRequest.(type) { case *kvpb.ConditionalPutRequest: // Evaluate the condition. evalFn := mvcceval.MaybeConditionFailedError @@ -1132,7 +1109,7 @@ func (t transformation) toResp( ) if condFailedErr != nil { pErr := kvpb.NewErrorWithTxn(condFailedErr, txn) - pErr.SetErrorIndex(int32(t.index)) + pErr.SetErrorIndex(int32(rr.index)) return kvpb.ResponseUnion{}, pErr } // The condition was satisfied; buffer the write and return a @@ -1191,13 +1168,13 @@ func (t transformation) toResp( } else { // The request wasn't served from the buffer; return the response from the // KV layer. - assertTrue(!t.stripped, "we shouldn't be stripping requests that aren't served from the buffer") + assertTrue(!rr.stripped, "we shouldn't be stripping requests that aren't served from the buffer") ru = br } case *kvpb.ScanRequest: scanResp, err := twb.mergeWithScanResp( - t.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse), + rr.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse), ) if err != nil { return kvpb.ResponseUnion{}, kvpb.NewError(err) @@ -1206,7 +1183,7 @@ func (t transformation) toResp( case *kvpb.ReverseScanRequest: reverseScanResp, err := twb.mergeWithReverseScanResp( - t.origRequest.(*kvpb.ReverseScanRequest), br.GetInner().(*kvpb.ReverseScanResponse), + rr.origRequest.(*kvpb.ReverseScanRequest), br.GetInner().(*kvpb.ReverseScanResponse), ) if err != nil { return kvpb.ResponseUnion{}, kvpb.NewError(err) @@ -1224,11 +1201,11 @@ func (t transformation) toResp( return ru, nil } -// transformations is a list of transformations applied by the txnWriteBuffer. -type transformations []transformation +// requestRecords is a slice of requestRecord. +type requestRecords []requestRecord -func (t transformations) Empty() bool { - return len(t) == 0 +func (rr requestRecords) Empty() bool { + return len(rr) == 0 } // addToBuffer adds a write to the given key to the buffer. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go index bd2b5b8a23ad..bd37327e592b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go @@ -292,10 +292,10 @@ func TestTxnWriteBufferBlindWritesIncludingOtherRequests(t *testing.T) { // Expect 4 responses, even though only 2 KV requests were sent. Moreover, // ensure that the responses are in the correct order. require.Len(t, br.Responses, 4) - require.Equal(t, br.Responses[0].GetInner(), &kvpb.PutResponse{}) - require.Equal(t, br.Responses[1].GetInner(), &kvpb.GetResponse{}) - require.Equal(t, br.Responses[2].GetInner(), &kvpb.DeleteResponse{}) - require.Equal(t, br.Responses[3].GetInner(), &kvpb.ScanResponse{}) + require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner()) + require.IsType(t, &kvpb.GetResponse{}, br.Responses[1].GetInner()) + require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[2].GetInner()) + require.IsType(t, &kvpb.ScanResponse{}, br.Responses[3].GetInner()) // Verify the writes were buffered correctly. expBufferedWrites := []bufferedWrite{ diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index 4bbed6520990..6b7700c962c7 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -42,14 +42,6 @@ import ( "github.com/stretchr/testify/require" ) -func checkGetResults(t *testing.T, expected map[string][]byte, results ...kv.Result) { - for _, result := range results { - require.Equal(t, 1, len(result.Rows)) - require.Equal(t, expected[string(result.Rows[0].Key)], result.Rows[0].ValueBytes()) - } - require.Len(t, expected, len(results)) -} - // TestTxnDBBasics verifies that a simple transaction can be run and // either committed or aborted. On commit, mutations are visible; on // abort, mutations are never visible. During the txn, verify that @@ -2235,66 +2227,110 @@ func TestTxnBufferedWriteReadYourOwnWrites(t *testing.T) { keyA := []byte("keyA") keyB := []byte("keyB") keyC := []byte("keyC") + keyD := []byte("keyD") // Before the test begins, write a value to keyC. txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */) require.NoError(t, txn.Put(ctx, keyC, value3)) require.NoError(t, txn.Commit(ctx)) - err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetBufferedWritesEnabled(true) - - // Put transactional value at keyA. - if err := txn.Put(ctx, keyA, value1); err != nil { - return err - } - - // Construct a batch that contains two Gets -- one on keyA, which will be - // served from the buffer, and another on keyC, which will be served by - // the server. - b := txn.NewBatch() - b.Get(keyA) - b.Get(keyC) - if err := txn.Run(ctx, b); err != nil { - return err - } - expected := map[string][]byte{ - "keyA": value1, - "keyC": value3, - } - checkGetResults(t, expected, b.Results...) - - // Next, construct a batch that contains both Puts and Gets to keyB. The Get - // should see the value written by the Put preceding it in the batch. - b = txn.NewBatch() - b.Get(keyB) - b.Put(keyB, value21) - b.Get(keyB) - b.Put(keyB, value22) - b.Get(keyB) - - if err := txn.Run(ctx, b); err != nil { - return err - } - checkGetResults(t, map[string][]byte{ - "keyB": nil, + for _, tc := range []struct { + makeBatch func() *kv.Batch + // A map from a result index to the expected results, represented as a map + // of keys and values. + expected map[int32]map[string][]byte + }{ + // Before any test case runs, we have: + // - keyC <- value3 (from a previous transaction), and + // - keyA <- value1 (from the same transaction). + // Then we run the batch from each test case. + { + makeBatch: func() *kv.Batch { + b := txn.NewBatch() + b.Get(keyA) + b.Get(keyC) + return b + }, + // The Get on keyA, should be served from the buffer, and the Get on keyC, + // should be served by the server. + expected: map[int32]map[string][]byte{ + 0: {"keyA": value1}, + 1: {"keyC": value3}, + }, }, - b.Results[0], - ) - checkGetResults(t, map[string][]byte{ - "keyB": value21, + { + makeBatch: func() *kv.Batch { + b := txn.NewBatch() + b.Scan(keyA, keyC) + b.Scan(keyB, keyD) + b.Scan(keyA, keyD) + return b + }, + // The first Scan should be served from the buffer, the second Scan should + // be served by server, and the third scan should be served by both. + expected: map[int32]map[string][]byte{ + 0: {"keyA": value1}, + 1: {"keyC": value3}, + 2: {"keyA": value1, "keyC": value3}, + }, }, - b.Results[2], - ) - checkGetResults(t, map[string][]byte{ - "keyB": value22, + { + makeBatch: func() *kv.Batch { + b := txn.NewBatch() + b.Get(keyB) + b.Put(keyB, value21) + b.Get(keyB) + b.Put(keyB, value22) + b.Get(keyB) + return b + }, + // The Gets should see the preceding values written in the same batch. + expected: map[int32]map[string][]byte{ + 0: {"keyB": nil}, + 2: {"keyB": value21}, + 4: {"keyB": value22}, + }, + }, + { + makeBatch: func() *kv.Batch { + b := txn.NewBatch() + b.Scan(keyB, keyC) + b.Put(keyB, value3) + b.Scan(keyB, keyC) + return b + }, + // The Scans should see the values written preceding in the same batch. + expected: map[int32]map[string][]byte{ + 0: {"keyB": value22}, + 2: {"keyB": value3}, + }, }, - b.Results[4], - ) + // TODO(mira): See if we need more test coverage for other request types + // (e.g. deletes, reverse scans). + } { + err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetBufferedWritesEnabled(true) - return nil - }) - require.NoError(t, err) + // Put transactional value at keyA. + if err := txn.Put(ctx, keyA, value1); err != nil { + return err + } + + b := tc.makeBatch() + if err := txn.Run(ctx, b); err != nil { + return err + } + for i, expected := range tc.expected { + require.Equal(t, len(expected), len(b.Results[i].Rows)) + for _, row := range b.Results[i].Rows { + require.Equal(t, expected[string(row.Key)], row.ValueBytes()) + } + } + + return nil + }) + require.NoError(t, err) + } } // TestLeafTransactionAdmissionHeader tests that the admission control header is