Skip to content

Commit 14bafff

Browse files
committed
logical: switch back to kv writer
25.2 was going to use the sql writer by default. But cockroachdb#146117 requires us to roll back cockroachdb#143100, which means the sql writer no longer implements LWW correctly in the presence of tombstones. This also includes a change to fix the cput origin time validation. If the cput is allowed to include origin time in the batch header and the cput request iff the times match. This behavior will be required by the sql writer to fix tombstone handling. This removes the version check in `getWriterType` since the SQL writer no longer changes it behavior based on the version of the KV server. Release note: none Part of: cockroachdb#146117 Informs: cockroachdb#146217
1 parent 3bf5ec4 commit 14bafff

File tree

5 files changed

+16
-10
lines changed

5 files changed

+16
-10
lines changed

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -754,12 +754,7 @@ func getWriterType(
754754
) (sqlclustersettings.LDRWriterType, error) {
755755
switch mode {
756756
case jobspb.LogicalReplicationDetails_Immediate:
757-
// Require v25.2 to use the sql writer by default to ensure that the
758-
// KV origin timestamp validation is available on all nodes.
759-
if settings.Version.IsActive(ctx, clusterversion.V25_2) {
760-
return sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV)), nil
761-
}
762-
return sqlclustersettings.LDRWriterTypeSQL, nil
757+
return sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV)), nil
763758
case jobspb.LogicalReplicationDetails_Validated:
764759
return sqlclustersettings.LDRWriterTypeSQL, nil
765760
default:

pkg/crosscluster/logical/lww_row_processor_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,11 @@ func TestLWWConflictResolution(t *testing.T) {
467467
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
468468
})
469469
t.Run("cross-cluster-local-delete", func(t *testing.T) {
470+
if !useKVProc {
471+
// TODO(jeffswenson): re-enable this test
472+
skip.WithIssue(t, 146217, "crud writer does not correctly implement lww")
473+
}
474+
470475
tableNameDst, rp, encoder := setup(t, useKVProc)
471476

472477
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)

pkg/crosscluster/logical/table_batch_handler_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
2020
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2121
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
22+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
2223
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2324
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -199,6 +200,9 @@ func TestTableHandlerExhaustive(t *testing.T) {
199200
defer leaktest.AfterTest(t)()
200201
defer log.Scope(t).Close(t)
201202

203+
// TODO(jeffswenson): re-enable this test
204+
skip.WithIssue(t, 146217, "crud writer does not correctly implement lww")
205+
202206
// This test is an "exhaustive" test of the table handler. It tries to test
203207
// cross product of every possible (replication event type, local value,
204208
// previous value, lww win).

pkg/kv/kvserver/batcheval/cmd_conditional_put.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ func ConditionalPut(
6161
originTimestampForValueHeader := h.WriteOptions.GetOriginTimestamp()
6262
if args.OriginTimestamp.IsSet() {
6363
originTimestampForValueHeader = args.OriginTimestamp
64-
}
65-
if args.OriginTimestamp.IsSet() && h.WriteOptions.GetOriginTimestamp().IsSet() {
66-
return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header")
64+
if h.WriteOptions.GetOriginTimestamp().IsSet() && args.OriginTimestamp != h.WriteOptions.GetOriginTimestamp() {
65+
return result.Result{}, errors.AssertionFailedf("OriginTimestamp passed via CPut arg must match the origin timestamp in the request header")
66+
}
6767
}
6868

6969
opts := storage.ConditionalPutWriteOptions{

pkg/sql/sqlclustersettings/clustersettings.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ var LDRImmediateModeWriter = settings.RegisterStringSetting(
129129
settings.ApplicationLevel,
130130
"logical_replication.consumer.immediate_mode_writer",
131131
"the writer to use when in immediate mode",
132-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeSQL), string(LDRWriterTypeLegacyKV), string(LDRWriterTypeCRUD)),
132+
// TODO(jeffswenson): make the sql writer the default.
133+
//metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeSQL), string(LDRWriterTypeLegacyKV), string(LDRWriterTypeCRUD)),
134+
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeLegacyKV)),
133135
settings.WithValidateString(func(sv *settings.Values, val string) error {
134136
if val != string(LDRWriterTypeSQL) && val != string(LDRWriterTypeLegacyKV) && val != string(LDRWriterTypeCRUD) {
135137
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)

0 commit comments

Comments
 (0)