Skip to content

Commit cb6e493

Browse files
committed
kvcoord: deterministic error ordering
DistSender splits requests up in range order and collects errors in the order requests were dispatched. If there are multiple errors, the highest priority error is returned and ties are broken by dispatch order. Now, the merge logic consults the index of the request that generated the error. This is particularly useful for CPuts as it allows the caller to observe the first cput in the batch that failed. The deterministic order allows LDR, `deleteSwap`, and `updateSwap` to guarantee the cput error is for the primary key. This property is essential because they are using cput to optimistically guess at the value of the row in the database. The batches may also contain cput failures for unique indexes, but those errors should only be raised to the user if the primary key cput succeeded. This is still a bit of a hack. In a perfect world, the cput failure would be treated as a result instead of an error. That would allow the caller to inspect each cput result independently and would make it clear that cput failures do not poison any of the other requests in the batch. Informs: cockroachdb#146117 Release note: fixes an issue with LDR where the presence of a unique index may cause spurious DLQ entries if the unique index has a smaller index id than the primary key index.
1 parent 9b09a33 commit cb6e493

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

pkg/crosscluster/logical/batch_handler_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ func TestBatchHandlerExhaustive(t *testing.T) {
183183
if uniqueConstraint {
184184
runner.Exec(t, `ALTER TABLE test_table ADD CONSTRAINT unique_value UNIQUE (value)`)
185185
}
186-
// TODO(jeffswenson): enable this metamorphic option after fixing #146465.
187-
if false && addAndRemoveColumn {
186+
if addAndRemoveColumn {
188187
runner.Exec(t, `ALTER TABLE test_table ADD COLUMN temp_col INT`)
189188
runner.Exec(t, `ALTER TABLE test_table DROP COLUMN temp_col`)
190189
}

pkg/kv/kvclient/kvcoord/dist_sender.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1684,9 +1684,19 @@ func maybeSwapErrorIndex(pErr *kvpb.Error, a, b int) {
16841684
}
16851685

16861686
// mergeErrors merges the two errors, combining their transaction state and
1687-
// returning the error with the highest priority.
1687+
// returning the error with the highest priority. If errors have the same
1688+
// priority, the error with the lowest request index is preferred. This allows
1689+
// a caller issuing multiple cputs to control which ConditionFailedError is
1690+
// returned. Specifically, it allows returning a primary key cput failure
1691+
// instead of a unique index conflict.
16881692
func mergeErrors(pErr1, pErr2 *kvpb.Error) *kvpb.Error {
16891693
ret, drop := pErr1, pErr2
1694+
1695+
hasIndex := ret.Index != nil && drop.Index != nil
1696+
if hasIndex && drop.Index.Index < ret.Index.Index {
1697+
ret, drop = drop, ret
1698+
}
1699+
16901700
if kvpb.ErrPriority(drop.GoError()) > kvpb.ErrPriority(ret.GoError()) {
16911701
ret, drop = drop, ret
16921702
}

pkg/kv/kvclient/kvcoord/dist_sender_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4022,6 +4022,27 @@ func TestMultipleErrorsMerged(t *testing.T) {
40224022
}
40234023
}
40244024

4025+
func TestMergeErrorIndex(t *testing.T) {
4026+
// When merging, the error with lower index should be preferred. Create two
4027+
// errors with the same priority but different indices.
4028+
err1 := kvpb.NewError(&kvpb.ConditionFailedError{})
4029+
err1.Index = &kvpb.ErrPosition{Index: 2}
4030+
err2 := kvpb.NewError(&kvpb.ConditionFailedError{})
4031+
err2.Index = &kvpb.ErrPosition{Index: 1}
4032+
4033+
require.Equal(t, int32(1), mergeErrors(err1, err2).Index.Index)
4034+
require.Equal(t, int32(1), mergeErrors(err2, err1).Index.Index)
4035+
4036+
// Test that priority still takes precedence over index
4037+
highPriorityErr := kvpb.NewError(&kvpb.TransactionAbortedError{})
4038+
highPriorityErr.Index = &kvpb.ErrPosition{Index: 5}
4039+
lowPriorityErr := kvpb.NewError(&kvpb.ConditionFailedError{})
4040+
lowPriorityErr.Index = &kvpb.ErrPosition{Index: 3}
4041+
4042+
require.Equal(t, int32(5), mergeErrors(highPriorityErr, lowPriorityErr).Index.Index)
4043+
require.Equal(t, int32(5), mergeErrors(lowPriorityErr, highPriorityErr).Index.Index)
4044+
}
4045+
40254046
// Regression test for #20067.
40264047
// If a batch is partitioned into multiple partial batches, the
40274048
// kvpb.Error.Index of each batch should correspond to its original index in

0 commit comments

Comments
 (0)