Skip to content

CSP Notes

Tim Paine edited this page Mar 12, 2025 · 1 revision

Why csp-gateway?

Many csp users often question the necessity of csp-gateway. csp-gateway provides 3 key functionalities on top of the csp:

Dependency Injection

csp graphs are composed of individual nodes which are called when inputs update. Every node and graph can itself take a collection of ticking and non-ticking arguments. Let's say I want to create 2 version of the same graph:

  1. Read from kafka topic abc, perform calculation 1, write to kafka topic ghi
  2. Read from kafka topic abc, perform calculation 2, write to kafka topic ghi
Loading
graph TB
    subgraph 1
      i1[Kafka topic='abc'] --> c1[Calculation 1]
      c1 --> o1[Kafka topic='ghi']
    end
    subgraph 2
      i2[Kafka topic='def'] --> c2[Calculation 2]
      c2 --> o2[Kafka topic='ghi']
    end

Python code for this might look like:

@csp.graph
def my_graph_1():
    kafka_in = KafkaInput(topic="abc")
    calculated = CalculationOne(kafka_in)
    KafkaOutput(topic="ghi")

@csp.graph
def my_graph_2():
    kafka_in = KafkaInput(topic="def")
    calculated = CalculationTwo(kafka_in)
    KafkaOutput(topic="ghi")

Here, our KafkaInput node takes a static argument topic, while our Calculation* nodes take a ticking argument.

csp code connects together nodes inside a graph in a point-to-point fashion. As we build more and more csp graphs, if we want to avoid code duplication, we end up writing some form of graph builder logic.

def my_graph_builder(input_topic: str, output_topic: str, calculation: Node)
    kafka_in = KafkaInput(topic=input_topic)
    calculated = calculation(kafka_in)
    KafkaOutput(topic=output_topic)

my_graph_1 = my_graph_builder("abc", "ghi", CalculationOne)
my_graph_2 = my_graph_builder("def", "ghi", CalculationTwo)

As this starts to become increasingly complex, it is more and more difficult to configure graphs and nodes nested inside other graphs.

csp-gateway solves this by combining csp with ccflow.

GatewayModule instances are ccflow BaseModel instances. This provides type validation, coercion, and dynamic initialization via Pydantic. Additionally, with ccflow you can overlay a configuration graph onto the initialization of your csp-gateway instances via Hydra / OmegaConf - See this worked example for more information.

For more documentation, see Configuration.

Deferred Instantiation

In a normal csp graph, nodes and graphs are wired together "point-to-point". In other words, upstream nodes and graphs must be instantiated first, with subsequent instantions moving from source to sink in a downstream fashion. Furthermore, graphs must end up acyclic by default.

  • If you want to instantiate in a different order, as you may very well want to do if building something like the above section, you need to use csp.DelayedEdge.
  • If you want a cyclic graph, you must manually insert csp.Feedback.

Both of these two are a bit cumbersome, and do not gel well with the csp-gateway "Data Bus" oriented approach. Instead, csp-gateway automatically instantiates all channels in a GatewayChannels as DelayedEdge. When a GatewayModule sets them, they are bound with the real edge. If no GatewayModule sets them and they're not required, they are replaced with a null_ts. Feedback instances are automatically inserted where necessary.

REST API/UI

csp graphs can be difficult to interrogate, and the builtin mechanisms for doing so are csp.print/csp.log and csp.show_graph (generate a static graphviz-based graph of every node/edge).

With an automatic REST API and UI, csp-gateway makes it easy to see every tick of data across every relevant data stream in your csp graph. Additionally, the MountChannelsGraph module provides a "30,000 foot view" of the graph, which is useful when the granular csp.show_graph becomes too complex.