Skip to main content

hydro_lang/deploy/maelstrom/
mod.rs

1//! Deployment backend for running correctness tests against Jepsen Maelstrom (<https://github.com/jepsen-io/maelstrom>)
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::live_collections::stream::TotalOrder;
9use crate::location::Cluster;
10use crate::nondet::nondet;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "maelstrom")]
14#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
15pub mod deploy_maelstrom;
16
17pub mod deploy_runtime_maelstrom;
18
19/// Sets up bidirectional communication with Maelstrom clients on a cluster.
20///
21/// This function provides a similar API to `bidi_external_many_bytes` but for Maelstrom
22/// client communication. It returns a keyed input stream of client messages and accepts
23/// a keyed output stream of responses.
24///
25/// The key type is `String` (the client ID like "c1", "c2").
26/// The value type is `serde_json::Value` (the message body).
27///
28/// # Example
29/// ```ignore
30/// let (input, output_handle) = maelstrom_bidi_clients(&cluster);
31/// output_handle.complete(input.map(q!(|(client_id, body)| {
32///     // Process and return response
33///     (client_id, response_body)
34/// })));
35/// ```
36#[expect(clippy::type_complexity, reason = "stream markers")]
37pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
38    cluster: &Cluster<'a, C>,
39) -> (
40    KeyedStream<String, In, Cluster<'a, C>>,
41    ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42) {
43    use stageleft::q;
44
45    use crate::location::Location;
46
47    let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
48        stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
49
50    // Create the input stream from Maelstrom clients
51    let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
52        .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
53        .into_keyed()
54        .map(q!(|b| serde_json::from_value(b).unwrap()));
55
56    // Create a forward reference for the output stream
57    let (fwd_handle, output_stream) =
58        cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
59
60    // Set up the output sink to send responses back to clients
61    output_stream
62        .entries()
63        .assume_ordering::<TotalOrder>(nondet!(/** maelstrom responses can be sent in any order */))
64        .for_each(q!(|(client_id, body)| {
65            deploy_runtime_maelstrom::maelstrom_send_response(
66                &meta.node_id,
67                &client_id,
68                serde_json::to_value(body).unwrap(),
69            );
70        }));
71
72    (input, fwd_handle)
73}