Skip to content

Commit 9f935f0

Browse files
committed
sql: always use cput for ldr primary key writes
This change reworks the SQL layer so that it always uses a CPUT with an origin timestamp if the LDR origin timestamp option is set. This change allows the classic and crud SQL writers to correctly implement LWW in the presence of tombstones. Note: the classic SQL writer only depends on the CPUT when inserting or upserting over a tombstone. The crud SQL writer relies on the CPut of inserts, updates, and deletes. Release note: none Fixes: cockroachdb#146117
1 parent 80e78d8 commit 9f935f0

12 files changed

+60
-42
lines changed

pkg/crosscluster/logical/batch_handler_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3232
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
3333
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
34-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3534
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3635
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3736
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -198,17 +197,13 @@ func TestBatchHandlerExhaustiveSQL(t *testing.T) {
198197
defer leaktest.AfterTest(t)()
199198
defer log.Scope(t).Close(t)
200199

201-
skip.WithIssue(t, 146117)
202-
203200
testBatchHandlerExhaustive(t, newSqlBatchHandler)
204201
}
205202

206203
func TestBatchHandlerExhaustiveCrud(t *testing.T) {
207204
defer leaktest.AfterTest(t)()
208205
defer log.Scope(t).Close(t)
209206

210-
skip.WithIssue(t, 146117)
211-
212207
testBatchHandlerExhaustive(t, newCrudBatchHandler)
213208
}
214209

@@ -369,7 +364,6 @@ func testBatchHandlerExhaustive(t *testing.T, factory batchHandlerFactory) {
369364
for _, localValue := range localValues {
370365
for _, replicationType := range replicationTypes {
371366
for _, winLww := range []bool{true, false} {
372-
373367
if !winLww {
374368
if localValue == localValueNull {
375369
// If there is no tombstone or local row, then its

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2790,18 +2790,7 @@ func TestGetWriterType(t *testing.T) {
27902790
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
27912791
})
27922792

2793-
t.Run("immediate-mode-pre-25.2", func(t *testing.T) {
2794-
st := cluster.MakeTestingClusterSettingsWithVersions(
2795-
clusterversion.V25_1.Version(),
2796-
clusterversion.V25_1.Version(),
2797-
true, /* initializeVersion */
2798-
)
2799-
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
2800-
require.NoError(t, err)
2801-
require.Equal(t, sqlclustersettings.LDRWriterTypeLegacyKV, wt)
2802-
})
2803-
2804-
t.Run("immediate-mode-post-25.2", func(t *testing.T) {
2793+
t.Run("immediate-mode", func(t *testing.T) {
28052794
st := cluster.MakeTestingClusterSettingsWithVersions(
28062795
clusterversion.V25_2.Version(),
28072796
clusterversion.PreviousRelease.Version(),
@@ -2812,9 +2801,9 @@ func TestGetWriterType(t *testing.T) {
28122801
require.NoError(t, err)
28132802
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
28142803

2815-
sqlclustersettings.LDRImmediateModeWriter.Override(ctx, &st.SV, string(sqlclustersettings.LDRWriterTypeSQL))
2804+
sqlclustersettings.LDRImmediateModeWriter.Override(ctx, &st.SV, string(sqlclustersettings.LDRWriterTypeCRUD))
28162805
wt, err = getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
28172806
require.NoError(t, err)
2818-
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
2807+
require.Equal(t, sqlclustersettings.LDRWriterTypeCRUD, wt)
28192808
})
28202809
}

pkg/crosscluster/logical/lww_row_processor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,8 @@ DELETE FROM [%d as t] WHERE %s
897897
AND ((t.crdb_internal_mvcc_timestamp < $%[3]d
898898
AND t.crdb_internal_origin_timestamp IS NULL)
899899
OR (t.crdb_internal_origin_timestamp < $%[3]d
900-
AND t.crdb_internal_origin_timestamp IS NOT NULL))`
900+
AND t.crdb_internal_origin_timestamp IS NOT NULL))
901+
RETURNING *`
901902
stmt, err := parser.ParseOne(
902903
fmt.Sprintf(baseQuery, dstTableDescID, whereClause.String(), originTSIdx))
903904
if err != nil {

pkg/crosscluster/logical/replication_statements.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,11 @@ func newDeleteStatement(
254254
TableID: int64(table.GetID()),
255255
As: tree.AliasClause{Alias: "replication_target"},
256256
},
257-
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
258-
Returning: tree.AbsentReturningClause,
257+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
258+
// NOTE: we use RETURNING * to ensure that every column in the table is decoded.
259+
// This ensures that the Deleter can reconstruct the previous value when generating
260+
// the cput to update the primary key.
261+
Returning: &tree.ReturningExprs{tree.StarSelectExpr()},
259262
}
260263

261264
return toParsedStatement(delete)

pkg/crosscluster/logical/testdata/ldr_statements

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ UPDATE [104 AS replication_target] SET id = $5::INT8, name = $6::STRING, value =
1818

1919
show-delete table=basic_table
2020
----
21-
DELETE FROM [104 AS replication_target] WHERE (((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (value IS NOT DISTINCT FROM $3::INT8)) AND (data IS NOT DISTINCT FROM $4::BYTES)
21+
DELETE FROM [104 AS replication_target] WHERE (((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (value IS NOT DISTINCT FROM $3::INT8)) AND (data IS NOT DISTINCT FROM $4::BYTES) RETURNING *
2222

2323
show-select table=basic_table
2424
----
@@ -52,7 +52,7 @@ UPDATE [107 AS replication_target] SET id = $7::INT8, title = $8::STRING, descri
5252

5353
show-delete table=tasks
5454
----
55-
DELETE FROM [107 AS replication_target] WHERE (((((id = $1::INT8) AND (title IS NOT DISTINCT FROM $2::STRING)) AND (description IS NOT DISTINCT FROM $3::STRING)) AND (status IS NOT DISTINCT FROM $4::@100105)) AND (priority IS NOT DISTINCT FROM $5::INT8)) AND (created_at IS NOT DISTINCT FROM $6::TIMESTAMP)
55+
DELETE FROM [107 AS replication_target] WHERE (((((id = $1::INT8) AND (title IS NOT DISTINCT FROM $2::STRING)) AND (description IS NOT DISTINCT FROM $3::STRING)) AND (status IS NOT DISTINCT FROM $4::@100105)) AND (priority IS NOT DISTINCT FROM $5::INT8)) AND (created_at IS NOT DISTINCT FROM $6::TIMESTAMP) RETURNING *
5656

5757
show-select table=tasks
5858
----
@@ -86,7 +86,7 @@ UPDATE [108 AS replication_target] SET id = $7::INT8, name = $8::STRING, unit_pr
8686
# NOTE: total_price and discount_price are not included since they are computed.[
8787
show-delete table=products
8888
----
89-
DELETE FROM [108 AS replication_target] WHERE ((((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (unit_price IS NOT DISTINCT FROM $3::DECIMAL(10,2))) AND (quantity IS NOT DISTINCT FROM $4::INT8)) AND (last_updated IS NOT DISTINCT FROM $6::TIMESTAMP)
89+
DELETE FROM [108 AS replication_target] WHERE ((((id = $1::INT8) AND (name IS NOT DISTINCT FROM $2::STRING)) AND (unit_price IS NOT DISTINCT FROM $3::DECIMAL(10,2))) AND (quantity IS NOT DISTINCT FROM $4::INT8)) AND (last_updated IS NOT DISTINCT FROM $6::TIMESTAMP) RETURNING *
9090

9191
# NOTE: total_price is not included because it is a computed column, but
9292
# discount_price is included because its part of the primary key.
@@ -137,7 +137,7 @@ UPDATE [109 AS replication_target] SET id = $8::INT8, first_name = $9::STRING, l
137137

138138
show-delete table=employees
139139
----
140-
DELETE FROM [109 AS replication_target] WHERE ((((((id = $1::INT8) AND (first_name IS NOT DISTINCT FROM $2::STRING)) AND (last_name IS NOT DISTINCT FROM $3::STRING)) AND (email IS NOT DISTINCT FROM $4::STRING)) AND (salary IS NOT DISTINCT FROM $5::DECIMAL(12,2))) AND (department IS NOT DISTINCT FROM $6::STRING)) AND (hire_date IS NOT DISTINCT FROM $7::DATE)
140+
DELETE FROM [109 AS replication_target] WHERE ((((((id = $1::INT8) AND (first_name IS NOT DISTINCT FROM $2::STRING)) AND (last_name IS NOT DISTINCT FROM $3::STRING)) AND (email IS NOT DISTINCT FROM $4::STRING)) AND (salary IS NOT DISTINCT FROM $5::DECIMAL(12,2))) AND (department IS NOT DISTINCT FROM $6::STRING)) AND (hire_date IS NOT DISTINCT FROM $7::DATE) RETURNING *
141141

142142
show-select table=employees
143143
----
@@ -171,7 +171,7 @@ UPDATE [112 AS replication_target] SET id = $7::UUID, user_id = $8::INT8, event_
171171

172172
show-delete table=user_events
173173
----
174-
DELETE FROM [112 AS replication_target] WHERE (((((id = $1::UUID) AND (user_id IS NOT DISTINCT FROM $2::INT8)) AND (event_type IS NOT DISTINCT FROM $3::STRING)) AND (event_data IS NOT DISTINCT FROM $4::JSONB)) AND (created_at IS NOT DISTINCT FROM $5::TIMESTAMP)) AND (region = $6::@100110)
174+
DELETE FROM [112 AS replication_target] WHERE (((((id = $1::UUID) AND (user_id IS NOT DISTINCT FROM $2::INT8)) AND (event_type IS NOT DISTINCT FROM $3::STRING)) AND (event_data IS NOT DISTINCT FROM $4::JSONB)) AND (created_at IS NOT DISTINCT FROM $5::TIMESTAMP)) AND (region = $6::@100110) RETURNING *
175175

176176
show-select table=user_events
177177
----

pkg/sql/create_table.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ func (n *createTableNode) startExec(params runParams) error {
614614
// indexes, partial, vector, or otherwise, to update.
615615
var pm row.PartialIndexUpdateHelper
616616
var vh row.VectorIndexUpdateHelper
617-
if err := ti.row(params.ctx, rowBuffer, pm, vh, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil {
617+
if err := ti.row(params.ctx, rowBuffer, pm, vh, nil, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil {
618618
return err
619619
}
620620
}

pkg/sql/insert.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,15 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro
242242
return err
243243
}
244244

245+
var oth *row.OriginTimestampCPutHelper
246+
if params.p.SessionData().OriginTimestampForLogicalDataReplication.IsSet() {
247+
oth = &row.OriginTimestampCPutHelper{
248+
OriginTimestamp: params.p.SessionData().OriginTimestampForLogicalDataReplication,
249+
}
250+
}
251+
245252
// Queue the insert in the KV batch.
246-
if err := r.ti.row(params.ctx, insertVals, pm, vh, r.traceKV); err != nil {
253+
if err := r.ti.row(params.ctx, insertVals, pm, vh, oth, r.traceKV); err != nil {
247254
return err
248255
}
249256

pkg/sql/sqlclustersettings/clustersettings.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ var LDRImmediateModeWriter = settings.RegisterStringSetting(
129129
settings.ApplicationLevel,
130130
"logical_replication.consumer.immediate_mode_writer",
131131
"the writer to use when in immediate mode",
132-
// TODO(jeffswenson): make the sql writer the default.
133-
//metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeSQL), string(LDRWriterTypeLegacyKV), string(LDRWriterTypeCRUD)),
134-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeLegacyKV)),
132+
metamorphic.ConstantWithTestChoice(
133+
"logical_replication.consumer.immediate_mode_writer",
134+
string(LDRWriterTypeSQL), string(LDRWriterTypeLegacyKV), string(LDRWriterTypeCRUD)),
135135
settings.WithValidateString(func(sv *settings.Values, val string) error {
136136
if val != string(LDRWriterTypeSQL) && val != string(LDRWriterTypeLegacyKV) && val != string(LDRWriterTypeCRUD) {
137137
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)

pkg/sql/tablewriter_delete.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ import (
2121
// tableDeleter handles writing kvs and forming table rows for deletes.
2222
type tableDeleter struct {
2323
tableWriterBase
24-
25-
rd row.Deleter
24+
rd row.Deleter
25+
evalCtx *eval.Context
2626
}
2727

2828
// init initializes the tableDeleter with a Txn.
2929
func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, evalCtx *eval.Context) error {
30+
td.evalCtx = evalCtx
3031
return td.tableWriterBase.init(txn, td.tableDesc(), evalCtx)
3132
}
3233

@@ -59,7 +60,13 @@ func (td *tableDeleter) row(
5960
traceKV bool,
6061
) error {
6162
td.currentBatchSize++
62-
return td.rd.DeleteRow(ctx, td.b, values, pm, vh, nil, mustValidateOldPKValues, traceKV)
63+
var oth *row.OriginTimestampCPutHelper
64+
if td.evalCtx.SessionData().OriginTimestampForLogicalDataReplication.IsSet() {
65+
oth = &row.OriginTimestampCPutHelper{
66+
OriginTimestamp: td.evalCtx.SessionData().OriginTimestampForLogicalDataReplication,
67+
}
68+
}
69+
return td.rd.DeleteRow(ctx, td.b, values, pm, vh, oth, mustValidateOldPKValues, traceKV)
6370
}
6471

6572
// deleteIndex runs the kv operations necessary to delete all kv entries in the

pkg/sql/tablewriter_insert.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ func (ti *tableInserter) row(
4848
values tree.Datums,
4949
pm row.PartialIndexUpdateHelper,
5050
vh row.VectorIndexUpdateHelper,
51+
oth *row.OriginTimestampCPutHelper,
5152
traceKV bool,
5253
) error {
5354
ti.currentBatchSize++
54-
return ti.ri.InsertRow(ctx, &ti.putter, values, pm, vh, nil, row.CPutOp, traceKV)
55+
return ti.ri.InsertRow(ctx, &ti.putter, values, pm, vh, oth, row.CPutOp, traceKV)
5556
}
5657

5758
// tableDesc returns the TableDescriptor for the table that the tableInserter

pkg/sql/tablewriter_update.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ import (
1818
// tableUpdater handles writing kvs and forming table rows for updates.
1919
type tableUpdater struct {
2020
tableWriterBase
21-
ru row.Updater
21+
ru row.Updater
22+
evalCtx *eval.Context
2223
}
2324

2425
// init initializes the tableUpdater with a Txn.
2526
func (tu *tableUpdater) init(_ context.Context, txn *kv.Txn, evalCtx *eval.Context) error {
27+
tu.evalCtx = evalCtx
2628
return tu.tableWriterBase.init(txn, tu.tableDesc(), evalCtx)
2729
}
2830

@@ -55,8 +57,14 @@ func (tu *tableUpdater) rowForUpdate(
5557
traceKV bool,
5658
) (tree.Datums, error) {
5759
tu.currentBatchSize++
60+
var oth *row.OriginTimestampCPutHelper
61+
if tu.evalCtx.SessionData().OriginTimestampForLogicalDataReplication.IsSet() {
62+
oth = &row.OriginTimestampCPutHelper{
63+
OriginTimestamp: tu.evalCtx.SessionData().OriginTimestampForLogicalDataReplication,
64+
}
65+
}
5866
return tu.ru.UpdateRow(
59-
ctx, tu.b, oldValues, updateValues, pm, vh, nil, mustValidateOldPKValues, traceKV,
67+
ctx, tu.b, oldValues, updateValues, pm, vh, oth, mustValidateOldPKValues, traceKV,
6068
)
6169
}
6270

pkg/sql/tablewriter_upsert.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,13 @@ func (tu *tableUpserter) row(
171171
) error {
172172
tu.currentBatchSize++
173173

174+
var oth *row.OriginTimestampCPutHelper
175+
if tu.originTimestamp.IsSet() {
176+
oth = &row.OriginTimestampCPutHelper{
177+
OriginTimestamp: tu.originTimestamp,
178+
}
179+
}
180+
174181
// Consult the canary column to determine whether to insert or update. For
175182
// more details on how canary columns work, see the block comment on
176183
// Builder.buildInsert in opt/optbuilder/insert.go.
@@ -186,11 +193,11 @@ func (tu *tableUpserter) row(
186193
// - if buffered writes are disabled, then the KV layer will write an
187194
// intent which acts as a lock.
188195
kvOp := row.PutMustAcquireExclusiveLockOp
189-
return tu.insertNonConflictingRow(ctx, datums[:insertEnd], pm, vh, kvOp, traceKV)
196+
return tu.insertNonConflictingRow(ctx, datums[:insertEnd], pm, vh, oth, kvOp, traceKV)
190197
}
191198
if datums[tu.canaryOrdinal] == tree.DNull {
192199
// No conflict, so insert a new row.
193-
return tu.insertNonConflictingRow(ctx, datums[:insertEnd], pm, vh, row.CPutOp, traceKV)
200+
return tu.insertNonConflictingRow(ctx, datums[:insertEnd], pm, vh, oth, row.CPutOp, traceKV)
194201
}
195202

196203
// If no columns need to be updated, then possibly collect the unchanged row.
@@ -229,11 +236,12 @@ func (tu *tableUpserter) insertNonConflictingRow(
229236
insertRow tree.Datums,
230237
pm row.PartialIndexUpdateHelper,
231238
vh row.VectorIndexUpdateHelper,
239+
oh *row.OriginTimestampCPutHelper,
232240
kvOp row.KVInsertOp,
233241
traceKV bool,
234242
) error {
235243
// Perform the insert proper.
236-
if err := tu.ri.InsertRow(ctx, &tu.putter, insertRow, pm, vh, nil /* oth */, kvOp, traceKV); err != nil {
244+
if err := tu.ri.InsertRow(ctx, &tu.putter, insertRow, pm, vh, oh, kvOp, traceKV); err != nil {
237245
return err
238246
}
239247

0 commit comments

Comments
 (0)