Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

libsqlx server #511

Merged
merged 8 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
604 changes: 525 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"sqld-libsql-bindings",
"testing/end-to-end",
"libsqlx",
"libsqlx-server",
]

[workspace.dependencies]
Expand Down
32 changes: 32 additions & 0 deletions libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "libsqlx-server"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = "0.6.18"
base64 = "0.21.2"
bincode = "1.3.3"
bytes = "1.4.0"
clap = { version = "4.3.11", features = ["derive"] }
color-eyre = "0.6.2"
futures = "0.3.28"
hmac = "0.12.1"
hyper = { version = "0.14.27", features = ["h2", "server"] }
libsqlx = { version = "0.1.0", path = "../libsqlx" }
moka = { version = "0.11.2", features = ["future"] }
parking_lot = "0.12.1"
priority-queue = "1.3.2"
rand = "0.8.5"
regex = "1.9.1"
serde = { version = "1.0.166", features = ["derive", "rc"] }
serde_json = "1.0.100"
sha2 = "0.10.7"
sled = "0.34.7"
thiserror = "1.0.43"
tokio = { version = "1.29.1", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.4.0", features = ["v4"] }
21 changes: 21 additions & 0 deletions libsqlx-server/src/allocation/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

/// Structural supertype of AllocConfig, used for checking the meta version. Subsequent version of
/// AllocConfig need to conform to this prototype.
#[derive(Debug, Serialize, Deserialize)]
struct ConfigVersion {
config_version: u32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AllocConfig {
pub max_conccurent_connection: u32,
pub id: String,
pub db_config: DbConfig,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum DbConfig {
Primary {},
Replica { primary_node_id: String },
}
191 changes: 191 additions & 0 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use std::path::PathBuf;
use std::sync::Arc;

use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType};
use libsqlx::Database as _;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{block_in_place, JoinSet};

use crate::hrana;
use crate::hrana::http::handle_pipeline;
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};

use self::config::{AllocConfig, DbConfig};

pub mod config;

type ExecFn = Box<dyn FnOnce(&mut dyn libsqlx::Connection) + Send>;

#[derive(Clone)]
pub struct ConnectionId {
id: u32,
close_sender: mpsc::Sender<()>,
}

pub enum AllocationMessage {
NewConnection(oneshot::Sender<ConnectionHandle>),
HranaPipelineReq {
req: PipelineRequestBody,
ret: oneshot::Sender<crate::Result<PipelineResponseBody>>,
}
}

pub enum Database {
Primary(libsqlx::libsql::LibsqlDatabase<PrimaryType>),
}

struct Compactor;

impl LogCompactor for Compactor {
fn should_compact(&self, _log: &LogFile) -> bool {
false
}

fn compact(
&self,
_log: LogFile,
_path: std::path::PathBuf,
_size_after: u32,
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
todo!()
}
}

impl Database {
pub fn from_config(config: &AllocConfig, path: PathBuf) -> Self {
match config.db_config {
DbConfig::Primary {} => {
let db = LibsqlDatabase::new_primary(path, Compactor, false).unwrap();
Self::Primary(db)
}
DbConfig::Replica { .. } => todo!(),
}
}

fn connect(&self) -> Box<dyn libsqlx::Connection + Send> {
match self {
Database::Primary(db) => Box::new(db.connect().unwrap()),
}
}
}

pub struct Allocation {
pub inbox: mpsc::Receiver<AllocationMessage>,
pub database: Database,
/// spawned connection futures, returning their connection id on completion.
pub connections_futs: JoinSet<u32>,
pub next_conn_id: u32,
pub max_concurrent_connections: u32,

pub hrana_server: Arc<hrana::http::Server>,
}

pub struct ConnectionHandle {
exec: mpsc::Sender<ExecFn>,
exit: oneshot::Sender<()>,
}

impl ConnectionHandle {
pub async fn exec<F, R>(&self, f: F) -> crate::Result<R>
where F: for<'a> FnOnce(&'a mut (dyn libsqlx::Connection + 'a)) -> R + Send + 'static,
R: Send + 'static,
{
let (sender, ret) = oneshot::channel();
let cb = move |conn: &mut dyn libsqlx::Connection| {
let res = f(conn);
let _ = sender.send(res);
};

self.exec.send(Box::new(cb)).await.unwrap();

Ok(ret.await?)
}
}

impl Allocation {
pub async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.inbox.recv() => {
match msg {
AllocationMessage::NewConnection(ret) => {
let _ =ret.send(self.new_conn().await);
},
AllocationMessage::HranaPipelineReq { req, ret} => {
let res = handle_pipeline(&self.hrana_server.clone(), req, || async {
let conn= self.new_conn().await;
dbg!();
Ok(conn)
}).await;
let _ = ret.send(res);
}
}
},
maybe_id = self.connections_futs.join_next() => {
if let Some(Ok(_id)) = maybe_id {
// self.connections.remove_entry(&id);
}
},
else => break,
}
}
}

async fn new_conn(&mut self) -> ConnectionHandle {
dbg!();
let id = self.next_conn_id();
dbg!();
let conn = block_in_place(|| self.database.connect());
dbg!();
let (close_sender, exit) = oneshot::channel();
let (exec_sender, exec_receiver) = mpsc::channel(1);
let conn = Connection {
id,
conn,
exit,
exec: exec_receiver,
};

dbg!();
self.connections_futs.spawn(conn.run());
dbg!();

ConnectionHandle {
exec: exec_sender,
exit: close_sender,
}

}

fn next_conn_id(&mut self) -> u32 {
loop {
self.next_conn_id = self.next_conn_id.wrapping_add(1);
return self.next_conn_id;
// if !self.connections.contains_key(&self.next_conn_id) {
// return self.next_conn_id;
// }
}
}
}

struct Connection {
id: u32,
conn: Box<dyn libsqlx::Connection + Send>,
exit: oneshot::Receiver<()>,
exec: mpsc::Receiver<ExecFn>,
}

impl Connection {
async fn run(mut self) -> u32 {
loop {
tokio::select! {
_ = &mut self.exit => break,
Some(exec) = self.exec.recv() => {
tokio::task::block_in_place(|| exec(&mut *self.conn));
}
}
}

self.id
}
}
19 changes: 19 additions & 0 deletions libsqlx-server/src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use tokio::sync::{mpsc, oneshot};

use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
use crate::allocation::{AllocationMessage, ConnectionHandle};

pub struct Database {
pub sender: mpsc::Sender<AllocationMessage>,
}

impl Database {
pub async fn hrana_pipeline(&self, req: PipelineRequestBody) -> crate::Result<PipelineResponseBody> {
dbg!();
let (sender, ret) = oneshot::channel();
dbg!();
self.sender.send(AllocationMessage::HranaPipelineReq { req, ret: sender }).await.unwrap();
dbg!();
ret.await.unwrap()
}
}
Loading