Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit df6efbf

Browse files
committed
add streaming_exec tests
1 parent e0edb21 commit df6efbf

11 files changed

+968
-101
lines changed

sqld/src/connection/libsql.rs

+24-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55

66
use parking_lot::{Mutex, RwLock};
77
use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus, TransactionState};
8-
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook};
8+
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook, };
99
use tokio::sync::{watch, Notify};
1010
use tokio::time::{Duration, Instant};
1111

@@ -237,6 +237,29 @@ where
237237
}
238238
}
239239

240+
#[cfg(test)]
241+
impl LibSqlConnection<TransparentMethods> {
242+
pub fn new_test(path: &Path) -> Self {
243+
let (_snd, rcv) = watch::channel(None);
244+
let conn = Connection::new(
245+
path,
246+
Arc::new([]),
247+
&crate::libsql_bindings::wal_hook::TRANSPARENT_METHODS,
248+
(),
249+
Default::default(),
250+
DatabaseConfigStore::new_test().into(),
251+
QueryBuilderConfig::default(),
252+
rcv,
253+
Default::default(),
254+
)
255+
.unwrap();
256+
257+
Self {
258+
inner: Arc::new(Mutex::new(conn)),
259+
}
260+
}
261+
}
262+
240263
struct Connection<W: WalHook = TransparentMethods> {
241264
conn: sqld_libsql_bindings::Connection<W>,
242265
stats: Arc<Stats>,

sqld/src/connection/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
353353
}
354354

355355
#[cfg(test)]
356-
mod test {
356+
pub mod test {
357357
use super::*;
358358

359359
#[derive(Debug)]

sqld/src/connection/write_proxy.rs

+41-86
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::Arc;
33

44
use futures_core::future::BoxFuture;
55
use parking_lot::Mutex as PMutex;
6-
use rusqlite::types::ValueRef;
76
use sqld_libsql_bindings::wal_hook::{TransparentMethods, TRANSPARENT_METHODS};
87
use tokio::sync::{mpsc, watch, Mutex};
98
use tokio_stream::StreamExt;
@@ -17,15 +16,10 @@ use crate::connection::program::{DescribeCol, DescribeParam};
1716
use crate::error::Error;
1817
use crate::namespace::NamespaceName;
1918
use crate::query_analysis::TxnStatus;
20-
use crate::query_result_builder::{Column, QueryBuilderConfig, QueryResultBuilder};
19+
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
2120
use crate::replication::FrameNo;
2221
use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
23-
use crate::rpc::proxy::rpc::resp_step::Step;
24-
use crate::rpc::proxy::rpc::row_value::Value;
25-
use crate::rpc::proxy::rpc::{
26-
self, exec_req, exec_resp, AddRowValue, ColsDescription, DisconnectMessage, ExecReq, ExecResp,
27-
Finish, FinishStep, RowValue, StepError,
28-
};
22+
use crate::rpc::proxy::rpc::{self, exec_req, exec_resp, DisconnectMessage, ExecReq, ExecResp};
2923
use crate::rpc::NAMESPACE_METADATA_KEY;
3024
use crate::stats::Stats;
3125
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};
@@ -297,7 +291,7 @@ impl RemoteConnection {
297291
Ok(resp) => {
298292
// there was an interuption, and we moved to the next query
299293
if resp.request_id > request_id {
300-
return Err(Error::PrimaryStreamInterupted)
294+
return Err(Error::PrimaryStreamInterupted);
301295
}
302296

303297
// we can ignore response for previously interupted requests
@@ -327,58 +321,20 @@ impl RemoteConnection {
327321
let mut txn_status = TxnStatus::Invalid;
328322
let mut new_frame_no = None;
329323
let builder_config = self.builder_config;
330-
let cb = |response: exec_resp::Response| {
331-
match response {
332-
exec_resp::Response::ProgramResp(resp) => {
333-
for step in resp.steps {
334-
let Some(step) = step.step else { return Err(Error::PrimaryStreamMisuse) };
335-
match step {
336-
Step::Init(_) => builder.init(&builder_config)?,
337-
Step::BeginStep(_) => builder.begin_step()?,
338-
Step::FinishStep(FinishStep {
339-
affected_row_count,
340-
last_insert_rowid,
341-
}) => builder.finish_step(affected_row_count, last_insert_rowid)?,
342-
Step::StepError(StepError { error: Some(err) }) => builder
343-
.step_error(crate::error::Error::RpcQueryError(err))?,
344-
Step::ColsDescription(ColsDescription { columns }) => {
345-
let cols = columns.iter().map(|c| Column {
346-
name: &c.name,
347-
decl_ty: c.decltype.as_deref(),
348-
});
349-
builder.cols_description(cols)?
350-
}
351-
Step::BeginRows(_) => builder.begin_rows()?,
352-
Step::BeginRow(_) => builder.begin_row()?,
353-
Step::AddRowValue(AddRowValue {
354-
val: Some(RowValue { value: Some(val) }),
355-
}) => {
356-
let val = match &val {
357-
Value::Text(s) => ValueRef::Text(s.as_bytes()),
358-
Value::Integer(i) => ValueRef::Integer(*i),
359-
Value::Real(x) => ValueRef::Real(*x),
360-
Value::Blob(b) => ValueRef::Blob(b.as_slice()),
361-
Value::Null(_) => ValueRef::Null,
362-
};
363-
builder.add_row_value(val)?;
364-
}
365-
Step::FinishRow(_) => builder.finish_row()?,
366-
Step::FinishRows(_) => builder.finish_rows()?,
367-
Step::Finish(f @ Finish { last_frame_no, .. }) => {
368-
txn_status = TxnStatus::from(f.state());
369-
new_frame_no = last_frame_no;
370-
builder.finish(last_frame_no, txn_status)?;
371-
return Ok(false);
372-
}
373-
_ => return Err(Error::PrimaryStreamMisuse),
374-
}
375-
}
376-
}
377-
exec_resp::Response::DescribeResp(_) => return Err(Error::PrimaryStreamMisuse),
378-
exec_resp::Response::Error(e) => return Err(Error::RpcQueryError(e)),
324+
let cb = |response: exec_resp::Response| match response {
325+
exec_resp::Response::ProgramResp(resp) => {
326+
crate::rpc::streaming_exec::apply_program_resp_to_builder(
327+
&builder_config,
328+
&mut builder,
329+
resp,
330+
|last_frame_no, status| {
331+
txn_status = status;
332+
new_frame_no = last_frame_no;
333+
},
334+
)
379335
}
380-
381-
Ok(true)
336+
exec_resp::Response::DescribeResp(_) => Err(Error::PrimaryStreamMisuse),
337+
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
382338
};
383339

384340
self.make_request(
@@ -395,32 +351,30 @@ impl RemoteConnection {
395351
#[allow(dead_code)] // reference implementation
396352
async fn describe(&mut self, stmt: String) -> crate::Result<DescribeResponse> {
397353
let mut out = None;
398-
let cb = |response: exec_resp::Response| {
399-
match response {
400-
exec_resp::Response::DescribeResp(resp) => {
401-
out = Some(DescribeResponse {
402-
params: resp
403-
.params
404-
.into_iter()
405-
.map(|p| DescribeParam { name: p.name })
406-
.collect(),
407-
cols: resp
408-
.cols
409-
.into_iter()
410-
.map(|c| DescribeCol {
411-
name: c.name,
412-
decltype: c.decltype,
413-
})
414-
.collect(),
415-
is_explain: resp.is_explain,
416-
is_readonly: resp.is_readonly,
417-
});
418-
419-
Ok(false)
420-
}
421-
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
422-
exec_resp::Response::ProgramResp(_) => Err(Error::PrimaryStreamMisuse),
354+
let cb = |response: exec_resp::Response| match response {
355+
exec_resp::Response::DescribeResp(resp) => {
356+
out = Some(DescribeResponse {
357+
params: resp
358+
.params
359+
.into_iter()
360+
.map(|p| DescribeParam { name: p.name })
361+
.collect(),
362+
cols: resp
363+
.cols
364+
.into_iter()
365+
.map(|c| DescribeCol {
366+
name: c.name,
367+
decltype: c.decltype,
368+
})
369+
.collect(),
370+
is_explain: resp.is_explain,
371+
is_readonly: resp.is_readonly,
372+
});
373+
374+
Ok(false)
423375
}
376+
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
377+
exec_resp::Response::ProgramResp(_) => Err(Error::PrimaryStreamMisuse),
424378
};
425379

426380
self.make_request(
@@ -520,10 +474,11 @@ pub mod test {
520474
use arbitrary::{Arbitrary, Unstructured};
521475
use bytes::Bytes;
522476
use rand::Fill;
477+
use rusqlite::types::ValueRef;
523478

524479
use super::*;
525480
use crate::{
526-
query_result_builder::{test::test_driver, QueryResultBuilderError},
481+
query_result_builder::{test::test_driver, QueryResultBuilderError, Column},
527482
rpc::proxy::rpc::{query_result::RowResult, ExecuteResults},
528483
};
529484

sqld/src/rpc/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub mod proxy;
1919
pub mod replica_proxy;
2020
pub mod replication_log;
2121
pub mod replication_log_proxy;
22-
mod streaming_exec;
22+
pub mod streaming_exec;
2323

2424
/// A tonic error code to signify that a namespace doesn't exist.
2525
pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST";

sqld/src/rpc/proxy.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use self::rpc::proxy_server::Proxy;
2323
use self::rpc::query_result::RowResult;
2424
use self::rpc::{
2525
describe_result, Ack, DescribeRequest, DescribeResult, Description, DisconnectMessage, ExecReq,
26-
ExecuteResults, QueryResult, ResultRows, Row, ExecResp,
26+
ExecResp, ExecuteResults, QueryResult, ResultRows, Row,
2727
};
2828
use super::NAMESPACE_DOESNT_EXIST;
2929

@@ -475,7 +475,7 @@ impl Proxy for ProxyService {
475475
&self,
476476
req: tonic::Request<tonic::Streaming<ExecReq>>,
477477
) -> Result<tonic::Response<Self::StreamExecStream>, tonic::Status> {
478-
let auth= if let Some(auth) = &self.auth {
478+
let auth = if let Some(auth) = &self.auth {
479479
auth.authenticate_grpc(&req, self.disable_namespaces)?
480480
} else {
481481
Authenticated::from_proxy_grpc_request(&req, self.disable_namespaces)?

0 commit comments

Comments
 (0)