Skip to content
This repository was archived by the owner on Jun 23, 2022. It is now read-only.

feat: forbid large-size-value writes to Pegasus #414

Merged
merged 20 commits into from
Mar 12, 2020
Merged
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
@@ -44,6 +45,18 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (dsn_unlikely(_stub->_max_allowed_write_size &&
request->body_size() > _stub->_max_allowed_write_size)) {
dwarn_replica("client from {} write request body size exceed threshold, request_body_size "
"= {}, max_allowed_write_size = {}, it will be rejected!",
request->header->from_address.to_string(),
request->body_size(),
_stub->_max_allowed_write_size);
_stub->_counter_recent_write_size_exceed_threshold_count->increment();
response_client_write(request, ERR_INVALID_DATA);
return;
}

task_spec *spec = task_spec::get(request->rpc_code());
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_write(request, ERR_OPERATION_DISABLED);
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
@@ -85,6 +85,13 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_log = nullptr;
_primary_address_str[0] = '\0';
install_perf_counters();

_max_allowed_write_size = dsn_config_get_value_uint64("replication",
"max_allowed_write_size",
1 << 20,
"write operation exceed this "
"threshold will be logged and reject, "
"default is 1MB, 0 means no check");
}

replica_stub::~replica_stub(void) { close(); }
@@ -322,6 +329,12 @@ void replica_stub::install_perf_counters()
"recent.write.busy.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write busy count in the recent period");

_counter_recent_write_size_exceed_threshold_count.init_app_counter(
"eon_replica_stub",
"recent_write_size_exceed_threshold_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write size exceed threshold count in the recent period");
}

void replica_stub::initialize(bool clear /* = false*/)
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
@@ -273,6 +273,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class duplication_sync_timer;
friend class duplication_sync_timer_test;
friend class replica_duplicator_manager_test;
friend class replica_test;

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
@@ -343,6 +344,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// cli service
std::unique_ptr<dsn::cli_service> _cli_service;

// write body size exceed this threshold will be logged and reject, 0 means no check
uint64_t _max_allowed_write_size;

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
@@ -402,6 +406,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
perf_counter_wrapper _counter_recent_read_busy_count;
perf_counter_wrapper _counter_recent_write_busy_count;

perf_counter_wrapper _counter_recent_write_size_exceed_threshold_count;

dsn::task_tracker _tracker;
};
} // namespace replication
63 changes: 63 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/replica_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"
#include <dsn/utility/defer.h>

namespace dsn {
namespace replication {

class replica_test : public replica_test_base
{
public:
dsn::app_info _app_info;
dsn::gpid pid = gpid(2, 1);

public:
void SetUp() override
{
stub->install_perf_counters();
mock_app_info();
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1);
}

int get_write_size_exceed_threshold_count()
{
return stub->_counter_recent_write_size_exceed_threshold_count->get_value();
}

void mock_app_info()
{
_app_info.app_id = 2;
_app_info.app_name = "replica_test";
_app_info.app_type = "replica";
_app_info.is_stateful = true;
_app_info.max_replica_count = 3;
_app_info.partition_count = 8;
}
};

TEST_F(replica_test, write_size_limited)
{
int count = 100;
task_code default_code;
struct dsn::message_header header;
header.body_length = 10000000;

auto write_request = dsn::message_ex::create_request(default_code);
auto cleanup = dsn::defer([=]() { delete write_request; });
write_request->header = &header;

for (int i = 0; i < count; i++) {
stub->on_client_write(pid, write_request);
}

ASSERT_EQ(get_write_size_exceed_threshold_count(), count);
}

} // namespace replication
} // namespace dsn