Skip to content

Do not review : do not serialize bucket in replication transaction flow #5103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2614,7 +2614,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
}

if (!override_existing_keys_ && !res.is_new) {
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
// LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
}

if (auto* ts = db_slice->shard_owner()->tiered_storage(); ts)
Expand Down
58 changes: 35 additions & 23 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
OnDbChange(db_index, req);
};

snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
if (!stream_journal) {
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
}

if (stream_journal) {
auto* journal = db_slice_->shard_owner()->journal();
Expand All @@ -93,12 +95,15 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
}
serializer_ = std::make_unique<RdbSerializer>(compression_mode_, flush_fun);

VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
VLOG(1) << "DbSaver::Start - saving entries with version less than "
<< snapshot_version_.value_or(0);

string fb_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex());
snapshot_fb_ = fb2::Fiber(fb_name, [this, stream_journal] {
this->IterateBucketsFb(stream_journal);
db_slice_->UnregisterOnChange(snapshot_version_);
if (!stream_journal) {
db_slice_->UnregisterOnChange(snapshot_version_.value());
}
consumer_->Finalize();
});
}
Expand Down Expand Up @@ -194,7 +199,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {

// serialized + side_saved must be equal to the total saved.
VLOG(1) << "Exit SnapshotSerializer loop_serialized: " << stats_.loop_serialized
<< ", side_saved " << stats_.side_saved << ", cbcalls " << stats_.savecb_calls;
<< ", side_saved " << stats_.side_saved << ", cbcalls " << stats_.savecb_calls
<< ", journal_saved " << stats_.jounal_changes;
}

void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
Expand Down Expand Up @@ -242,15 +248,18 @@ bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator i

++stats_.savecb_calls;

if (it.GetVersion() >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << it.GetVersion();
++stats_.skipped;
return false;
}
if (snapshot_version_) {
if (it.GetVersion() >= *snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at "
<< it.GetVersion();
++stats_.skipped;
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
snapshot_version_);
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
*snapshot_version_);
}

auto* latch = db_slice_->GetLatch();

Expand All @@ -265,11 +274,14 @@ bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator i
}

unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
if (snapshot_version_) {
DCHECK_LT(it.GetVersion(), *snapshot_version_);
it.SetVersion(*snapshot_version_);
}

// traverse physical bucket and write it into string file.
serialize_bucket_running_ = true;
it.SetVersion(snapshot_version_);

unsigned result = 0;

for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) {
Expand Down Expand Up @@ -391,14 +403,15 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
PrimeTable* table = db_slice_->GetTables(db_index).first;
const PrimeTable::bucket_iterator* bit = req.update();

DCHECK(snapshot_version_.has_value());
if (bit) {
if (!bit->is_done() && bit->GetVersion() < snapshot_version_) {
if (!bit->is_done() && bit->GetVersion() < *snapshot_version_) {
stats_.side_saved += SerializeBucket(db_index, *bit);
}
} else {
string_view key = get<string_view>(req.change);
table->CVCUponInsert(snapshot_version_, key, [this, db_index](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
table->CVCUponInsert(*snapshot_version_, key, [this, db_index](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), *snapshot_version_);
stats_.side_saved += SerializeBucket(db_index, it);
});
}
Expand All @@ -409,12 +422,11 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::ConsumeJournalChange(const journal::JournalItem& item) {
{
// We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
// barrier here for atomic serialization.
std::lock_guard barrier(big_value_mu_);
std::ignore = serializer_->WriteJournalEntry(item.data);
}
// We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
// barrier here for atomic serialization.
std::lock_guard barrier(big_value_mu_);
std::ignore = serializer_->WriteJournalEntry(item.data);
++stats_.jounal_changes;
}

void SliceSnapshot::ThrottleIfNeeded() {
Expand Down
7 changes: 2 additions & 5 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ class SliceSnapshot : public journal::JournalConsumerInterface {
snapshot_fb_.JoinIfNeeded();
}

uint64_t snapshot_version() const {
return snapshot_version_;
}

const RdbTypeFreqMap& freq_map() const {
return type_freq_map_;
}
Expand Down Expand Up @@ -166,7 +162,7 @@ class SliceSnapshot : public journal::JournalConsumerInterface {
RdbTypeFreqMap type_freq_map_;

// version upper bound for entries that should be saved (not included).
uint64_t snapshot_version_ = 0;
std::optional<uint64_t> snapshot_version_;
uint32_t journal_cb_id_ = 0;

uint64_t rec_id_ = 1, last_pushed_id_ = 0;
Expand All @@ -177,6 +173,7 @@ class SliceSnapshot : public journal::JournalConsumerInterface {
size_t side_saved = 0;
size_t savecb_calls = 0;
size_t keys_total = 0;
size_t jounal_changes = 0;
} stats_;

ThreadLocalMutex big_value_mu_;
Expand Down
11 changes: 10 additions & 1 deletion tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def wait_for_replicas_state(*clients, state="online", node_role="slave", t
"t_master, t_replicas, seeder_config, stream_target",
[
# Quick general test that replication is working
(1, 3 * [1], dict(key_target=1_000), 500),
(1, [1], dict(key_target=1_000), 500),
# A lot of huge values
(2, 2 * [1], dict(key_target=5_000, huge_value_target=30), 500),
(4, [4, 4], dict(key_target=10_000), 1_000),
Expand Down Expand Up @@ -144,6 +144,15 @@ async def check():
# it's usually close to 1% but there are some that are close to 3.
assert preemptions <= (key_capacity * 0.03)

master.stop()
lines = master.find_in_logs("Exit SnapshotSerializer")
assert len(lines) > 0
for line in lines:
# We test the full sync journal path of command execution
journal_saved = extract_int_after_prefix("journal_saved ", line)
logging.debug(f"Journal saves {journal_saved}")
assert journal_saved > 0


async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset):
role = await c_replica.role()
Expand Down
Loading