Skip to main content

dfir_pipes/push/
mod.rs

1//! Push-based stream combinators for dataflow pipelines.
2//!
3//! This module provides push-based operators that mirror the pull-based operators
4//! in the parent module, but work in the opposite direction: items are pushed into
5//! a pipeline rather than pulled from it.
6use core::pin::Pin;
7use core::task::Waker;
8
9use crate::{Context, Toggle};
10
11mod fanout;
12mod filter;
13mod filter_map;
14mod flat_map;
15mod flat_map_stream;
16mod flatten;
17mod flatten_stream;
18mod for_each;
19mod inspect;
20mod map;
21#[cfg(feature = "alloc")]
22#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
23mod persist;
24mod resolve_futures;
25mod sink;
26mod sink_compat;
27mod unzip;
28#[cfg(feature = "alloc")]
29#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
30mod vec_push;
31
32#[cfg(test)]
33pub(crate) mod test_utils;
34
35#[cfg(feature = "variadics")]
36#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
37pub mod demux_var;
38
39#[cfg(feature = "variadics")]
40#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
41pub use demux_var::{DemuxVar, PushVariadic, demux_var};
42pub use fanout::Fanout;
43pub use filter::Filter;
44pub use filter_map::FilterMap;
45pub use flat_map::FlatMap;
46pub use flat_map_stream::FlatMapStream;
47pub use flatten::Flatten;
48pub use flatten_stream::FlattenStream;
49pub use for_each::ForEach;
50use futures_core::FusedStream;
51pub use inspect::Inspect;
52pub use map::Map;
53#[cfg(feature = "alloc")]
54#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
55pub use persist::Persist;
56pub use resolve_futures::ResolveFutures;
57pub use sink::Sink;
58pub use sink_compat::SinkCompat;
59pub use unzip::Unzip;
60#[cfg(feature = "alloc")]
61#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
62pub use vec_push::VecPush;
63
64/// The result of pushing an item into a [`Push`].
65///
66/// `PushStep` represents the two possible outcomes when pushing into a pipeline:
67/// - `Done`: The item was successfully consumed.
68/// - `Pending(can_pend)`: The push could not accept the item yet (async backpressure).
69///
70/// The `CanPend` type parameter uses [`Toggle`] to statically encode whether pending
71/// is possible. When `CanPend = No`, the `Pending` variant cannot be constructed,
72/// and the push is guaranteed to always accept items immediately.
73#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
74pub enum PushStep<CanPend>
75where
76    CanPend: Toggle,
77{
78    /// The item was successfully consumed.
79    Done,
80    /// The push is not ready yet (only possible when `CanPend = Yes`).
81    Pending(CanPend),
82}
83
84impl<CanPend> PushStep<CanPend>
85where
86    CanPend: Toggle,
87{
88    /// Creates a new `PushStep::Pending`, or panics if `CanPend = No`.
89    pub fn pending() -> Self {
90        PushStep::Pending(Toggle::create())
91    }
92
93    /// Returns `true` if the step is [`PushStep::Done`].
94    pub const fn is_done(&self) -> bool {
95        matches!(self, PushStep::Done)
96    }
97
98    /// Returns `true` if the step is [`PushStep::Pending`].
99    pub const fn is_pending(&self) -> bool {
100        matches!(self, PushStep::Pending(_))
101    }
102
103    /// Tries to convert the `CanPend` type parameter, returning `None` if the conversion is invalid.
104    pub fn try_convert_into<NewPend>(self) -> Option<PushStep<NewPend>>
105    where
106        NewPend: Toggle,
107    {
108        Some(match self {
109            PushStep::Done => PushStep::Done,
110            PushStep::Pending(_) => PushStep::Pending(Toggle::try_create()?),
111        })
112    }
113
114    /// Converts the `CanPend` type parameter, panicking if the conversion is invalid.
115    pub fn convert_into<NewPend>(self) -> PushStep<NewPend>
116    where
117        NewPend: Toggle,
118    {
119        match self {
120            PushStep::Done => PushStep::Done,
121            PushStep::Pending(_) => PushStep::pending(),
122        }
123    }
124}
125
126/// The `Push` trait represents a push-based pipeline that items can be sent into.
127///
128/// This is the dual of [`crate::pull::Pull`]: where `Pull` allows you to request items from
129/// a source, `Push` allows you to send items into a sink. Push operators form
130/// chains where each operator transforms items and passes them downstream.
131///
132/// The protocol mirrors [`futures_sink::Sink`]:
133/// 1. Call [`Push::poll_ready`] to check if the push can accept an item.
134/// 2. If ready, call [`Push::start_send`] to send the item.
135/// 3. Call [`Push::poll_flush`] to flush buffered items.
136pub trait Push<Item, Meta>
137where
138    Meta: Copy,
139{
140    /// The context type required to push into this pipeline.
141    type Ctx<'ctx>: Context<'ctx>;
142
143    /// Whether this push can return [`PushStep::Pending`].
144    type CanPend: Toggle;
145
146    /// Check if this push is ready to accept an item.
147    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
148
149    /// Send an item into this push pipeline.
150    ///
151    /// Must only be called after [`Push::poll_ready`] returns [`PushStep::Done`].
152    fn start_send(self: Pin<&mut Self>, item: Item, meta: Meta);
153
154    /// Flushes any buffered items in this push pipeline.
155    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
156
157    /// Informs this push how many items are about to be sent.
158    ///
159    /// The semantics match [`crate::pull::Pull::size_hint`] / [`Iterator::size_hint`]:
160    /// the first element is a lower bound, the second is an optional upper bound.
161    ///
162    /// Combinators should propagate this downstream, adjusting bounds where
163    /// appropriate (e.g. [`Filter`] sets the lower bound to 0).
164    ///
165    /// Under normal usage expect this to be called once before items are pushed. However
166    /// this may be called multiple times or never at all. It is an error to call this multiple
167    /// times with size hints that conflict--each size hint should match or narrow the estimated
168    /// range (after accounting for already-sent items).
169    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>));
170}
171
172impl<P, Item, Meta> Push<Item, Meta> for &mut P
173where
174    P: Push<Item, Meta> + Unpin + ?Sized,
175    Meta: Copy,
176{
177    type Ctx<'ctx> = P::Ctx<'ctx>;
178
179    type CanPend = P::CanPend;
180
181    fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
182        Pin::new(&mut **self).poll_ready(ctx)
183    }
184
185    fn start_send(mut self: Pin<&mut Self>, item: Item, meta: Meta) {
186        Pin::new(&mut **self).start_send(item, meta)
187    }
188
189    fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
190        Pin::new(&mut **self).poll_flush(ctx)
191    }
192
193    fn size_hint(mut self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
194        Pin::new(&mut **self).size_hint(hint)
195    }
196}
197
198/// Evaluates a [`PushStep`] expression and returns [`PushStep::Pending`] if it is pending.
199/// Analogous to [`core::task::ready!`] but for [`PushStep`].
200macro_rules! ready {
201    ($e:expr $(,)?) => {
202        match $e {
203            PushStep::Done => (),
204            PushStep::Pending(_) => return PushStep::pending(),
205        }
206    };
207}
208use ready;
209
210/// Evaluates both [`PushStep`] expressions and returns [`PushStep::Pending`] if either is pending.
211/// Both expressions are always evaluated so that both sides can do work and/or register wakers.
212macro_rules! ready_both {
213    ($a:expr, $b:expr $(,)?) => {{
214        let a = $a;
215        let b = $b;
216        $crate::push::ready!(a);
217        $crate::push::ready!(b);
218    }};
219}
220use ready_both;
221
222/// Creates a [`Fanout`] push that clones each item and sends to both downstream pushes.
223pub const fn fanout<P0, P1>(push0: P0, push1: P1) -> Fanout<P0, P1> {
224    Fanout::new(push0, push1)
225}
226
227/// Creates a [`Filter`] push that filters items based on a predicate.
228pub const fn filter<Func, Item, Next>(func: Func, next: Next) -> Filter<Next, Func>
229where
230    Func: FnMut(&Item) -> bool,
231{
232    Filter::new(func, next)
233}
234
235/// Creates a [`FilterMap`] push that filters and maps items in one step.
236pub const fn filter_map<Func, In, Out, Next>(func: Func, next: Next) -> FilterMap<Next, Func>
237where
238    Func: FnMut(In) -> Option<Out>,
239{
240    FilterMap::new(func, next)
241}
242
243/// Creates a [`FlatMap`] push that maps each item to an iterator and flattens the results.
244pub const fn flat_map<Func, In, IntoIter, Meta, Next>(
245    func: Func,
246    next: Next,
247) -> FlatMap<Next, Func, IntoIter, Meta>
248where
249    Func: FnMut(In) -> IntoIter,
250    IntoIter: IntoIterator,
251    Meta: Copy,
252    Next: Push<IntoIter::Item, Meta>,
253{
254    FlatMap::new(func, next)
255}
256
257/// Creates a [`FlatMapStream`] push that maps each item to a stream and flattens the results.
258pub const fn flat_map_stream<Func, In, St, Meta, Next>(
259    func: Func,
260    next: Next,
261) -> FlatMapStream<Next, Func, St, Meta>
262where
263    Func: FnMut(In) -> St,
264    St: futures_core::Stream,
265    Meta: Copy,
266    Next: Push<St::Item, Meta>,
267{
268    FlatMapStream::new(func, next)
269}
270
271/// Creates a [`Flatten`] push that flattens items that are iterators.
272pub const fn flatten<IntoIter, Meta, Next>(next: Next) -> Flatten<Next, IntoIter, Meta>
273where
274    IntoIter: IntoIterator,
275    Meta: Copy,
276    Next: Push<IntoIter::Item, Meta>,
277{
278    Flatten::new(next)
279}
280
281/// Creates a [`FlattenStream`] push that flattens items that are streams.
282pub const fn flatten_stream<St, Meta, Next>(next: Next) -> FlattenStream<Next, St, Meta>
283where
284    St: futures_core::Stream,
285    Meta: Copy,
286    Next: Push<St::Item, Meta>,
287{
288    FlattenStream::new(next)
289}
290
291/// Creates a [`ForEach`] terminal push that consumes each item with a function.
292pub const fn for_each<Func, Item>(func: Func) -> ForEach<Func>
293where
294    Func: FnMut(Item),
295{
296    ForEach::new(func)
297}
298
299/// Creates an [`Inspect`] push that inspects each item without modifying it.
300pub const fn inspect<Func, Item, Next>(func: Func, next: Next) -> Inspect<Next, Func>
301where
302    Func: FnMut(&Item),
303{
304    Inspect::new(func, next)
305}
306
307/// Creates a [`Map`] push that applies a function to each item.
308pub const fn map<Func, In, Out, Next>(func: Func, next: Next) -> Map<Next, Func>
309where
310    Func: FnMut(In) -> Out,
311{
312    Map::new(func, next)
313}
314
315/// Creates a [`Persist`] using an external `Vec` state for buffering items.
316#[cfg(feature = "alloc")]
317#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
318pub fn persist_state<Item, Next>(
319    buf: &mut alloc::vec::Vec<Item>,
320    replay: bool,
321    next: Next,
322) -> Persist<Next, &mut alloc::vec::Vec<Item>>
323where
324    Item: Clone,
325    Next: Push<Item, ()>,
326{
327    Persist::new(buf, replay, next)
328}
329
330/// Creates a [`ResolveFutures`] push that resolves futures and sends their outputs.
331///
332/// The futures queue is supplied as external state.
333///
334/// `Queue` is generally expected to be either `futures_util::stream::FuturesUnordered`
335/// or `futures_util::stream::FuturesOrdered`.
336pub const fn resolve_futures_state<Queue, Fut, Next>(
337    queue: &mut Queue,
338    subgraph_waker: Option<Waker>,
339    next: Next,
340) -> ResolveFutures<Next, &mut Queue, Queue>
341where
342    Queue: Default + Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
343    Fut: Future,
344    Next: Push<Fut::Output, ()>,
345{
346    ResolveFutures::new(queue, subgraph_waker, next)
347}
348
349/// Creates a [`Sink`] push that wraps a [`futures_sink::Sink`].
350pub const fn sink<Si, Item>(si: Si) -> Sink<Si>
351where
352    Si: futures_sink::Sink<Item>,
353{
354    Sink::new(si)
355}
356
357/// Creates a [`SinkCompat`] adapter that wraps a [`Push`] and implements [`futures_sink::Sink`].
358pub const fn sink_compat<Psh, Item>(push: Psh) -> SinkCompat<Psh>
359where
360    Psh: Push<Item, ()>,
361{
362    SinkCompat::new(push)
363}
364
365/// Creates an [`Unzip`] push that splits tuple items into two separate pushes.
366pub const fn unzip<P0, P1>(push0: P0, push1: P1) -> Unzip<P0, P1> {
367    Unzip::new(push0, push1)
368}
369
370/// Creates a [`VecPush`] that pushes items into the given `Vec`.
371#[cfg(feature = "alloc")]
372#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
373pub const fn vec_push<Item>(
374    buf: &mut alloc::vec::Vec<Item>,
375) -> VecPush<&mut alloc::vec::Vec<Item>> {
376    VecPush::new(buf)
377}