hydro_lang/deploy/maelstrom/
mod.rs1use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::live_collections::stream::TotalOrder;
9use crate::location::Cluster;
10use crate::nondet::nondet;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "maelstrom")]
14#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
15pub mod deploy_maelstrom;
16
17pub mod deploy_runtime_maelstrom;
18
19#[expect(clippy::type_complexity, reason = "stream markers")]
37pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
38 cluster: &Cluster<'a, C>,
39) -> (
40 KeyedStream<String, In, Cluster<'a, C>>,
41 ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42) {
43 use stageleft::q;
44
45 use crate::location::Location;
46
47 let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
48 stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
49
50 let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
52 .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
53 .into_keyed()
54 .map(q!(|b| serde_json::from_value(b).unwrap()));
55
56 let (fwd_handle, output_stream) =
58 cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
59
60 output_stream
62 .entries()
63 .assume_ordering::<TotalOrder>(nondet!())
64 .for_each(q!(|(client_id, body)| {
65 deploy_runtime_maelstrom::maelstrom_send_response(
66 &meta.node_id,
67 &client_id,
68 serde_json::to_value(body).unwrap(),
69 );
70 }));
71
72 (input, fwd_handle)
73}