Skip to content

feat: adding wasm compatibility, windows support and removing protoc dependency #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,291 changes: 604 additions & 687 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 23 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ license = "MIT"
keywords = ["async_graphql", "async", "graphql", "apollo", "studio"]
categories = ["network-programming", "asynchronous"]
edition = "2021"
resolver = "2"

[features]
default = ["tokio-comp", "compression"]
default = ["compression"]
compression = ["libflate"]
tokio-comp = ["tokio"]

[dependencies]
anyhow = "1"
Expand All @@ -25,21 +25,33 @@ cfg-if = "1"
derive_builder = "0.13"
futures = "0.3"
futures-locks = "0.7"
prost = "0.12"
prost-types = "0.12"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
sha2 = "0.10"
tonic = "0.10"
serde-json-wasm = "1.0.1"
tracing = "0.1"
tracing-futures = { version = "0.2.5", default-features = false, features = ["tokio", "futures-03", "std"] }
uuid = { version = "1.7", features = ["v4"] } # A library to generate and parse UUIDs.
uuid = { version = "1.7", features = ["v4", "js"] } # A library to generate and parse UUIDs.
wasm-bindgen-futures = "0.4.18"
protobuf = "3.4.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }

# Non-feature optional dependencies
libflate = { version = "2", optional = true }
tokio = { version = "1", features = ["full"], optional = true }

[target.'cfg(all(not(target_arch = "wasm32"), not(target_os = "windows")))'.dependencies]
uname = "0.1.1"

[build-dependencies]
reqwest = { version = "0.11", default-features = false, features = ["blocking", "rustls-tls"] }
protobuf-codegen = "3.4.0"
async-std = { version = "1", default-features = false, features = ["default", "tokio1"] }
cfg-if = "1.0.0"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
prost-build = "0.12.3"
protox = "0.6.0"
tonic-build = "0.10"

[target.'cfg(not(target_arch = "wasm32"))'.build-dependencies]
tokio = { version = "1", features = ["full"] }
54 changes: 40 additions & 14 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// Derived from https://github.com/pellizzetti/router/blob/cc0ebcaf1d68184e1fe06f16534fddff76286b40/apollo-spaceport/build.rs
use protobuf_codegen::Customize;
use std::io::Write;
use std::path::Path;
use std::{
error::Error,
fs::File,
Expand All @@ -11,8 +14,18 @@ fn main() -> Result<(), Box<dyn Error>> {
} else {
// Retrieve a live version of the reports.proto file
let proto_url = "https://usage-reporting.api.apollographql.com/proto/reports.proto";
let response = reqwest::blocking::get(proto_url)?;
let mut content = response.text()?;
let fut = reqwest::get(proto_url);

cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
let rt = tokio::runtime::Runtime::new().unwrap();
let response = rt.block_on(fut)?;
let mut content = rt.block_on(response.text())?;
} else {
let response = async_std::task::block_on(fut)?;
let mut content = async_std::task::block_on(response.text())?;
}
}

// Process the retrieved content to:
// - Insert a package Report; line after the import lines (currently only one) and before the first message definition
Expand Down Expand Up @@ -45,19 +58,32 @@ fn main() -> Result<(), Box<dyn Error>> {
}

// Process the proto files
let proto_files = vec!["proto/agents.proto", "proto/reports.proto"];
let proto_files = vec!["proto/reports.proto"];

protobuf_codegen::Codegen::new()
.pure()
.cargo_out_dir("proto")
.inputs(&proto_files)
.include(".")
.customize(Customize::default().gen_mod_rs(false))
.run_from_script();

let out_dir = std::env::var("OUT_DIR")?;
let path = Path::new(&out_dir).join("proto").join("reports.rs");
let content = std::fs::read_to_string(&path)?;

let content = content
.lines()
.filter(|line| !(line.contains("#![") || line.contains("//!")))
.fold(String::new(), |mut content, line| {
content.push_str(line);
content.push('\n');
content
});

tonic_build::configure()
.type_attribute("ContextualizedStats", "#[derive(serde::Serialize)]")
.type_attribute("StatsContext", "#[derive(serde::Serialize)]")
.type_attribute("QueryLatencyStats", "#[derive(serde::Serialize)]")
.type_attribute("TypeStat", "#[derive(serde::Serialize)]")
.type_attribute("PathErrorStats", "#[derive(serde::Serialize)]")
.type_attribute("FieldStat", "#[derive(serde::Serialize)]")
.type_attribute("ReferencedFieldsForType", "#[derive(serde::Serialize)]")
.type_attribute("StatsContext", "#[derive(Eq, Hash)]")
.build_server(true)
.compile(&proto_files, &["."])?;
std::fs::remove_file(&path)?;
let mut file = std::fs::File::create(&path)?;
file.write_all(content.as_bytes())?;

for file in proto_files {
println!("cargo:rerun-if-changed={}", file);
Expand Down
8 changes: 4 additions & 4 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#[cfg(feature = "compression")]
#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
use libflate::gzip;

#[cfg(feature = "compression")]
#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
const TARGET_LOG_COMPRESSION: &str = "apollo-studio-extension-compression";

#[cfg(feature = "compression")]
#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
pub fn compress(msg: Vec<u8>) -> Result<Vec<u8>, std::io::Error> {
let mut encoder = gzip::Encoder::new(Vec::new()).unwrap();
let mut msg = std::io::Cursor::new(msg);
Expand All @@ -20,7 +20,7 @@ pub fn compress(msg: Vec<u8>) -> Result<Vec<u8>, std::io::Error> {
encoder.finish().into_result()
}

#[cfg(not(feature = "compression"))]
#[cfg(any(not(feature = "compression"), target_arch = "wasm32"))]
pub fn compress(msg: Vec<u8>) -> Result<Vec<u8>, std::io::Error> {
Ok::<Vec<u8>, std::io::Error>(msg)
}
79 changes: 44 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,25 @@
//!
//! * `compression` - To enable GZIP Compression when sending traces to Apollo Studio.
mod compression;
mod packages;
mod proto;
pub mod register;
mod report_aggregator;

mod runtime;
mod packages;

use futures::SinkExt;
use prost_types::Timestamp;
use protobuf::{well_known_types::timestamp::Timestamp, EnumOrUnknown, MessageField};
use report_aggregator::ReportAggregator;
use runtime::spawn;
use packages::serde_json;

#[macro_use]
extern crate tracing;

use futures_locks::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use async_graphql::QueryPathSegment;
use chrono::{DateTime, Utc};
Expand All @@ -55,13 +57,13 @@ use async_graphql::extensions::{
};
use async_graphql::parser::types::{ExecutableDocument, OperationType, Selection};
use async_graphql::{Response, ServerResult, Value, Variables};
use proto::report::{
use proto::reports::{
trace::{self, node, Node},
Trace,
};
use std::convert::TryInto;

pub use proto::report::trace::http::Method;
pub use proto::reports::trace::http::Method;

/// Apollo Tracing Extension to send traces to Apollo Studio
/// The extension to include to your `async_graphql` instance to connect with Apollo Studio.
Expand Down Expand Up @@ -185,7 +187,7 @@ impl Extension for ApolloTracingExtension {
.any(|(_, operation)| operation.node.selection_set.node.items.iter().any(|selection| matches!(&selection.node, Selection::Field(field) if field.node.name.node == "__schema")));
if !is_schema {
let result: String =
ctx.stringify_execute_doc(&document, &Variables::from_json(serde_json::json!({})));
ctx.stringify_execute_doc(&document, &Variables::from_json(serde_json::from_str("{}").unwrap()));
let name = document
.operations
.iter()
Expand All @@ -194,7 +196,7 @@ impl Extension for ApolloTracingExtension {
.map(|x| x.as_str())
.unwrap_or("no_name");
let query_type = format!("# {name}\n {query}", name = name, query = result);
*self.operation_name.write().await = query_type;
*self.operation_name.write().unwrap() = query_type;
}
Ok(document)
}
Expand Down Expand Up @@ -227,7 +229,9 @@ impl Extension for ApolloTracingExtension {
let client_version = tracing_extension
.client_version
.unwrap_or_else(|| "no client version".to_string());
let method = tracing_extension.method.unwrap_or(Method::Unknown);
let method = tracing_extension
.method
.or(<Method as protobuf::Enum>::from_str("UNKNOWN"));
let status_code = tracing_extension.status_code.unwrap_or(0);

let mut trace: Trace = Trace {
Expand All @@ -245,34 +249,39 @@ impl Extension for ApolloTracingExtension {
.map(|x| x.to_string())
.unwrap_or_else(|| "no operation".to_string()),
..Default::default()
});
})
.into();

trace.http = Some(trace::Http {
method: method.into(),
trace.http = Some(trace::HTTP {
method: EnumOrUnknown::new(method.unwrap()),
status_code,
..Default::default()
});
})
.into();

trace.end_time = Some(Timestamp {
trace.end_time = MessageField::some(Timestamp {
nanos: inner.end_time.timestamp_subsec_nanos().try_into().unwrap(),
seconds: inner.end_time.timestamp(),
special_fields: Default::default(),
});

trace.start_time = Some(Timestamp {
nanos: inner
.start_time
.timestamp_subsec_nanos()
.try_into()
.unwrap(),
seconds: inner.start_time.timestamp(),
});
trace.start_time =
protobuf::MessageField::some(protobuf::well_known_types::timestamp::Timestamp {
nanos: inner
.start_time
.timestamp_subsec_nanos()
.try_into()
.unwrap(),
seconds: inner.start_time.timestamp(),
special_fields: Default::default(),
});

let root_node = self.root_node.read().await;
trace.root = Some(root_node.clone());
let root_node = self.root_node.read().unwrap();
trace.root = Some(root_node.clone()).into();

let mut sender = self.report.sender();

let operation_name = self.operation_name.read().await.clone();
let operation_name = self.operation_name.read().unwrap().clone();

let _handle = spawn(async move {
if let Err(e) = sender.send((operation_name, trace)).await {
Expand All @@ -295,7 +304,7 @@ impl Extension for ApolloTracingExtension {
let path = info.path_node.to_string_vec().join(".");
let field_name = info.path_node.field_name().to_string();
let parent_type = info.parent_type.to_string();
let return_type = info.return_type.to_string();
let _return_type = info.return_type.to_string();
let start_time = Utc::now() - self.inner.lock().await.start_time;
let path_node = info.path_node;

Expand All @@ -320,12 +329,11 @@ impl Extension for ApolloTracingExtension {
},
parent_type: parent_type.to_string(),
original_field_name: field_name,
r#type: return_type,
..Default::default()
};

let node = Arc::new(RwLock::new(node));
self.nodes.write().await.insert(path, node.clone());
self.nodes.write().unwrap().insert(path, node.clone());
let parent_node = path_node.parent.map(|x| x.to_string_vec().join("."));
// Use the path to create a new node
// https://github.com/apollographql/apollo-server/blob/291c17e255122d4733b23177500188d68fac55ce/packages/apollo-server-core/src/plugin/traceTreeBuilder.ts
Expand All @@ -334,7 +342,7 @@ impl Extension for ApolloTracingExtension {
Err(e) => {
let json = match serde_json::to_string(&e) {
Ok(content) => content,
Err(e) => serde_json::json!({ "error": format!("{:?}", e) }).to_string(),
Err(e) => format!("{{ \"error\": \"{e:?}\" }}"),
};
let error = trace::Error {
message: e.message.clone(),
Expand All @@ -345,19 +353,20 @@ impl Extension for ApolloTracingExtension {
.map(|x| trace::Location {
line: x.line as u32,
column: x.column as u32,
special_fields: protobuf::SpecialFields::default(),
})
.collect(),
json,
..Default::default()
};

node.write().await.error = vec![error];
node.write().unwrap().error = vec![error];
Err(e)
}
};
let end_time = Utc::now() - self.inner.lock().await.start_time;

node.write().await.end_time = match end_time
node.write().unwrap().end_time = match end_time
.num_nanoseconds()
.and_then(|x| u64::try_from(x).ok())
{
Expand All @@ -371,19 +380,19 @@ impl Extension for ApolloTracingExtension {

match parent_node {
None => {
let mut root_node = self.root_node.write().await;
let mut root_node = self.root_node.write().unwrap();
let child = &mut root_node.child;
let node = node.read().await;
let node = node.read().unwrap();
// Can't copy or pass a ref to Protobuf
// So we clone
child.push(node.clone());
}
Some(parent) => {
let nodes = self.nodes.read().await;
let nodes = self.nodes.read().unwrap();
let node_read = nodes.get(&parent).unwrap();
let mut parent = node_read.write().await;
let mut parent = node_read.write().unwrap();
let child = &mut parent.child;
let node = node.read().await;
let node = node.read().unwrap();
// Can't copy or pass a ref to Protobuf
// So we clone
child.push(node.clone());
Expand Down
4 changes: 3 additions & 1 deletion src/packages/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod uname;

pub mod serde_json;
pub mod uname;
5 changes: 5 additions & 0 deletions src/packages/serde_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

#[cfg(not(target_arch = "wasm32"))]
pub use serde_json::*;
#[cfg(target_arch = "wasm32")]
pub use serde_json_wasm::*;
Loading