Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Experiment with secondary backend for ETL #1895

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions src/app/ClioApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "app/WebHandlers.hpp"
#include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp"
#include "data/LedgerCache.hpp"
#include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgers.hpp"
Expand Down Expand Up @@ -104,20 +105,22 @@ ClioApplication::run(bool const useNgWebServer)
auto sweepHandler = web::dosguard::IntervalSweepHandler{config_, ioc, dosGuard};

// Interface to the database
auto backend = data::makeBackend(config_);
data::LedgerCache sharedCache;
auto etlBackend = data::makeBackend(config_, sharedCache, "etl");
auto otherBackend = data::makeBackend(config_, sharedCache, "normal");

{
auto const migrationInspector = migration::makeMigrationInspector(config_, backend);
auto const migrationInspector = migration::makeMigrationInspector(config_, otherBackend);
// Check if any migration is blocking Clio server starting.
if (migrationInspector->isBlockingClio() and backend->hardFetchLedgerRangeNoThrow()) {
if (migrationInspector->isBlockingClio() and otherBackend->hardFetchLedgerRangeNoThrow()) {
LOG(util::LogService::error())
<< "Existing Migration is blocking Clio, Please complete the database migration first.";
return EXIT_FAILURE;
}
}

// Manages clients subscribed to streams
auto subscriptions = feed::SubscriptionManager::makeSubscriptionManager(config_, backend);
auto subscriptions = feed::SubscriptionManager::makeSubscriptionManager(config_, otherBackend);

// Tracks which ledgers have been validated by the network
auto ledgers = etl::NetworkValidatedLedgers::makeValidatedLedgers();
Expand All @@ -126,24 +129,24 @@ ClioApplication::run(bool const useNgWebServer)
// ETL uses the balancer to extract data.
// The server uses the balancer to forward RPCs to a rippled node.
// The balancer itself publishes to streams (transactions_proposed and accounts_proposed)
auto balancer = etl::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers);
auto balancer = etl::LoadBalancer::makeLoadBalancer(config_, ioc, otherBackend, subscriptions, ledgers);

// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
auto etl = etl::ETLService::makeETLService(config_, ioc, backend, subscriptions, balancer, ledgers);
auto etl = etl::ETLService::makeETLService(config_, ioc, etlBackend, subscriptions, balancer, ledgers);

auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
auto counters = rpc::Counters::makeCounters(workQueue);
auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(backend);
auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(otherBackend);
auto const handlerProvider = std::make_shared<rpc::impl::ProductionHandlerProvider const>(
config_, backend, subscriptions, balancer, etl, amendmentCenter, counters
config_, otherBackend, subscriptions, balancer, etl, amendmentCenter, counters
);

using RPCEngineType = rpc::RPCEngine<etl::LoadBalancer, rpc::Counters>;
auto const rpcEngine =
RPCEngineType::makeRPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider);
RPCEngineType::makeRPCEngine(config_, otherBackend, balancer, dosGuard, workQueue, counters, handlerProvider);

if (useNgWebServer or config_.get<bool>("server.__ng_web_server")) {
web::ng::RPCServerHandler<RPCEngineType, etl::ETLService> handler{config_, backend, rpcEngine, etl};
web::ng::RPCServerHandler<RPCEngineType, etl::ETLService> handler{config_, otherBackend, rpcEngine, etl};

auto expectedAdminVerifier = web::makeAdminVerificationStrategy(config_);
if (not expectedAdminVerifier.has_value()) {
Expand Down Expand Up @@ -172,7 +175,7 @@ ClioApplication::run(bool const useNgWebServer)
}

appStopper_.setOnStop(
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc)
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *otherBackend, ioc)
);

// Blocks until stopped.
Expand All @@ -185,7 +188,7 @@ ClioApplication::run(bool const useNgWebServer)

// Init the web server
auto handler =
std::make_shared<web::RPCServerHandler<RPCEngineType, etl::ETLService>>(config_, backend, rpcEngine, etl);
std::make_shared<web::RPCServerHandler<RPCEngineType, etl::ETLService>>(config_, otherBackend, rpcEngine, etl);

auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler);

Expand Down
14 changes: 10 additions & 4 deletions src/data/BackendFactory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

#include "data/BackendInterface.hpp"
#include "data/CassandraBackend.hpp"
#include "data/LedgerCache.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ConfigDefinition.hpp"

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <fmt/core.h>

#include <memory>
#include <stdexcept>
Expand All @@ -38,22 +40,26 @@ namespace data {
* @brief A factory function that creates the backend based on a config.
*
* @param config The clio config to use
* @param cache cache
* @param cfgSection section
* @return A shared_ptr<BackendInterface> with the selected implementation
*/
inline std::shared_ptr<BackendInterface>
makeBackend(util::config::ClioConfigDefinition const& config)
makeBackend(util::config::ClioConfigDefinition const& config, LedgerCache& cache, std::string cfgSection)
{
static util::Logger const log{"Backend"}; // NOLINT(readability-identifier-naming)
LOG(log.info()) << "Constructing BackendInterface";

auto const readOnly = config.get<bool>("read_only");

auto const type = config.get<std::string>("database.type");
auto const type = config.get<std::string>(fmt::format("database_{}.type", cfgSection));
std::shared_ptr<BackendInterface> backend = nullptr;

if (boost::iequals(type, "cassandra")) {
auto const cfg = config.getObject("database." + type);
backend = std::make_shared<data::cassandra::CassandraBackend>(data::cassandra::SettingsProvider{cfg}, readOnly);
auto const cfg = config.getObject(fmt::format("database_{}.{}", cfgSection, type));
backend = std::make_shared<data::cassandra::CassandraBackend>(
data::cassandra::SettingsProvider{cfg}, readOnly, cache
);
}

if (!backend)
Expand Down
11 changes: 9 additions & 2 deletions src/data/BackendInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,18 @@
protected:
mutable std::shared_mutex rngMtx_;
std::optional<LedgerRange> range_;
LedgerCache cache_;
LedgerCache& cache_;
std::optional<etl::CorruptionDetector<LedgerCache>> corruptionDetector_;

public:
BackendInterface() = default;
/**
* @brief Constructor
* @param cache the cache
*/
BackendInterface(LedgerCache& cache) : cache_(cache)

Check warning on line 150 in src/data/BackendInterface.hpp

View check run for this annotation

Codecov / codecov/patch

src/data/BackendInterface.hpp#L150

Added line #L150 was not covered by tests
{
}

Check warning on line 152 in src/data/BackendInterface.hpp

View check run for this annotation

Codecov / codecov/patch

src/data/BackendInterface.hpp#L152

Added line #L152 was not covered by tests

virtual ~BackendInterface() = default;

// TODO: Remove this hack. Cache should not be exposed thru BackendInterface
Expand Down
7 changes: 5 additions & 2 deletions src/data/CassandraBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/LedgerCache.hpp"
#include "data/Types.hpp"
#include "data/cassandra/Concepts.hpp"
#include "data/cassandra/Handle.hpp"
Expand Down Expand Up @@ -88,9 +89,11 @@ class BasicCassandraBackend : public BackendInterface {
*
* @param settingsProvider The settings provider to use
* @param readOnly Whether the database should be in readonly mode
* @param cache cache
*/
BasicCassandraBackend(SettingsProviderType settingsProvider, bool readOnly)
: settingsProvider_{std::move(settingsProvider)}
BasicCassandraBackend(SettingsProviderType settingsProvider, bool readOnly, LedgerCache& cache)
: BackendInterface(cache)
, settingsProvider_{std::move(settingsProvider)}
, schema_{settingsProvider_}
, handle_{settingsProvider_.getSettings()}
, executor_{settingsProvider_.getSettings(), handle_}
Expand Down
2 changes: 1 addition & 1 deletion src/migration/MigrationApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ MigratorApplication::MigratorApplication(util::config::ClioConfigDefinition cons
{
PrometheusService::init(config);

auto expectedMigrationManager = migration::impl::makeMigrationManager(config);
auto expectedMigrationManager = migration::impl::makeMigrationManager(config, cache_);

if (not expectedMigrationManager) {
throw std::runtime_error("Failed to create migration manager: " + expectedMigrationManager.error());
Expand Down
2 changes: 2 additions & 0 deletions src/migration/MigrationApplication.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "data/LedgerCache.hpp"
#include "migration/MigrationManagerInterface.hpp"
#include "util/newconfig/ConfigDefinition.hpp"

Expand Down Expand Up @@ -74,6 +75,7 @@ struct MigrateSubCmd {
*/
class MigratorApplication {
std::string option_;
data::LedgerCache cache_;
std::shared_ptr<migration::MigrationManagerInterface> migrationManager_;
MigrateSubCmd cmd_;

Expand Down
6 changes: 4 additions & 2 deletions src/migration/cassandra/CassandraMigrationBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#pragma once

#include "data/CassandraBackend.hpp"
#include "data/LedgerCache.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "data/cassandra/Types.hpp"
#include "migration/MigratiorStatus.hpp"
Expand Down Expand Up @@ -49,9 +50,10 @@
* @brief Construct a new Cassandra Migration Backend object. The backend is not readonly.
*
* @param settingsProvider The settings provider
* @param cache cache
*/
explicit CassandraMigrationBackend(data::cassandra::SettingsProvider settingsProvider)
: data::cassandra::CassandraBackend{auto{settingsProvider}, false /* not readonly */}
explicit CassandraMigrationBackend(data::cassandra::SettingsProvider settingsProvider, data::LedgerCache& cache)
: data::cassandra::CassandraBackend{auto{settingsProvider}, false /* not readonly */, cache}

Check warning on line 56 in src/migration/cassandra/CassandraMigrationBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/migration/cassandra/CassandraMigrationBackend.hpp#L55-L56

Added lines #L55 - L56 were not covered by tests
, settingsProvider_(std::move(settingsProvider))
, migrationSchema_{settingsProvider_}
{
Expand Down
9 changes: 5 additions & 4 deletions src/migration/impl/MigrationManagerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "migration/impl/MigrationManagerFactory.hpp"

#include "data/LedgerCache.hpp"
#include "data/cassandra/SettingsProvider.hpp"
#include "migration/MigrationManagerInterface.hpp"
#include "migration/cassandra/CassandraMigrationBackend.hpp"
Expand All @@ -35,24 +36,24 @@
namespace migration::impl {

std::expected<std::shared_ptr<MigrationManagerInterface>, std::string>
makeMigrationManager(util::config::ClioConfigDefinition const& config)
makeMigrationManager(util::config::ClioConfigDefinition const& config, data::LedgerCache& cache)

Check warning on line 39 in src/migration/impl/MigrationManagerFactory.cpp

View check run for this annotation

Codecov / codecov/patch

src/migration/impl/MigrationManagerFactory.cpp#L39

Added line #L39 was not covered by tests
{
static util::Logger const log{"Migration"}; // NOLINT(readability-identifier-naming)
LOG(log.info()) << "Constructing MigrationManager";

auto const type = config.get<std::string>("database.type");
auto const type = config.get<std::string>("database_normal.type");

if (not boost::iequals(type, "cassandra")) {
LOG(log.error()) << "Unknown database type to migrate: " << type;
return std::unexpected(std::string("Invalid database type"));
}

auto const cfg = config.getObject("database." + type);
auto const cfg = config.getObject("database_normal." + type);

auto migrationCfg = config.getObject("migration");

return std::make_shared<cassandra::CassandraMigrationManager>(
std::make_shared<cassandra::CassandraMigrationBackend>(data::cassandra::SettingsProvider{cfg}),
std::make_shared<cassandra::CassandraMigrationBackend>(data::cassandra::SettingsProvider{cfg}, cache),
std::move(migrationCfg)
);
}
Expand Down
3 changes: 2 additions & 1 deletion src/migration/impl/MigrationManagerFactory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include "data/LedgerCache.hpp"
#include "migration/MigrationManagerInterface.hpp"
#include "util/newconfig/ConfigDefinition.hpp"

Expand All @@ -36,6 +37,6 @@ namespace migration::impl {
* @return A shared pointer to the MigrationManagerInterface if the creation was successful, otherwise an error message
*/
std::expected<std::shared_ptr<MigrationManagerInterface>, std::string>
makeMigrationManager(util::config::ClioConfigDefinition const& config);
makeMigrationManager(util::config::ClioConfigDefinition const& config, data::LedgerCache& cache);

} // namespace migration::impl
74 changes: 53 additions & 21 deletions src/util/newconfig/ConfigDefinition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,35 +259,67 @@ class ClioConfigDefinition {
* without default values must be present in the user's config file.
*/
static ClioConfigDefinition gClioConfig = ClioConfigDefinition{
{{"database.type", ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateCassandraName)
},
{"database.cassandra.contact_points", ConfigValue{ConfigType::String}.defaultValue("localhost")},
{"database.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.port", ConfigValue{ConfigType::Integer}.withConstraint(gValidatePort).optional()},
{"database.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue("clio")},
{"database.cassandra.replication_factor",
{{"database_normal.type",
ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateCassandraName)},
{"database_normal.cassandra.contact_points", ConfigValue{ConfigType::String}.defaultValue("localhost")},
{"database_normal.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database_normal.cassandra.port", ConfigValue{ConfigType::Integer}.withConstraint(gValidatePort).optional()},
{"database_normal.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue("clio")},
{"database_normal.cassandra.replication_factor",
ConfigValue{ConfigType::Integer}.defaultValue(3u).withConstraint(gValidateUint16)},
{"database.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.max_write_requests_outstanding",
{"database_normal.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database_normal.cassandra.max_write_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(10'000).withConstraint(gValidateUint32)},
{"database.cassandra.max_read_requests_outstanding",
{"database_normal.cassandra.max_read_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(100'000).withConstraint(gValidateUint32)},
{"database.cassandra.threads",
{"database_normal.cassandra.threads",
ConfigValue{ConfigType::Integer}
.defaultValue(static_cast<uint32_t>(std::thread::hardware_concurrency()))
.withConstraint(gValidateUint32)},
{"database.cassandra.core_connections_per_host",
{"database_normal.cassandra.core_connections_per_host",
ConfigValue{ConfigType::Integer}.defaultValue(1).withConstraint(gValidateUint16)},
{"database.cassandra.queue_size_io", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint16)},
{"database.cassandra.write_batch_size",
{"database_normal.cassandra.queue_size_io",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint16)},
{"database_normal.cassandra.write_batch_size",
ConfigValue{ConfigType::Integer}.defaultValue(20).withConstraint(gValidateUint16)},
{"database.cassandra.connect_timeout", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)
},
{"database.cassandra.request_timeout", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)
},
{"database.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},
{"database_normal.cassandra.connect_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"database_normal.cassandra.request_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"database_normal.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database_normal.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database_normal.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},

{"database_etl.type",
ConfigValue{ConfigType::String}.defaultValue("cassandra").withConstraint(gValidateCassandraName)},
{"database_etl.cassandra.contact_points", ConfigValue{ConfigType::String}.defaultValue("localhost")},
{"database_etl.cassandra.secure_connect_bundle", ConfigValue{ConfigType::String}.optional()},
{"database_etl.cassandra.port", ConfigValue{ConfigType::Integer}.withConstraint(gValidatePort).optional()},
{"database_etl.cassandra.keyspace", ConfigValue{ConfigType::String}.defaultValue("clio")},
{"database_etl.cassandra.replication_factor",
ConfigValue{ConfigType::Integer}.defaultValue(3u).withConstraint(gValidateUint16)},
{"database_etl.cassandra.table_prefix", ConfigValue{ConfigType::String}.optional()},
{"database_etl.cassandra.max_write_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(10'000).withConstraint(gValidateUint32)},
{"database_etl.cassandra.max_read_requests_outstanding",
ConfigValue{ConfigType::Integer}.defaultValue(100'000).withConstraint(gValidateUint32)},
{"database_etl.cassandra.threads",
ConfigValue{ConfigType::Integer}
.defaultValue(static_cast<uint32_t>(std::thread::hardware_concurrency()))
.withConstraint(gValidateUint32)},
{"database_etl.cassandra.core_connections_per_host",
ConfigValue{ConfigType::Integer}.defaultValue(1).withConstraint(gValidateUint16)},
{"database_etl.cassandra.queue_size_io",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint16)},
{"database_etl.cassandra.write_batch_size",
ConfigValue{ConfigType::Integer}.defaultValue(20).withConstraint(gValidateUint16)},
{"database_etl.cassandra.connect_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"database_etl.cassandra.request_timeout",
ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"database_etl.cassandra.username", ConfigValue{ConfigType::String}.optional()},
{"database_etl.cassandra.password", ConfigValue{ConfigType::String}.optional()},
{"database_etl.cassandra.certfile", ConfigValue{ConfigType::String}.optional()},

{"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)},

Expand Down
Loading
Loading