Skip to main content

dfir_pipes/push/
resolve_futures.rs

1//! [`ResolveFutures`] push operator for resolving futures and pushing their outputs downstream.
2
3use core::borrow::BorrowMut;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll, Waker};
7
8use futures_core::stream::{FusedStream, Stream};
9use pin_project_lite::pin_project;
10
11use crate::Yes;
12use crate::push::{Push, PushStep, ready};
13
14pin_project! {
15    /// Push operator that receives futures, queues them, and pushes their resolved outputs downstream.
16    ///
17    /// `Queue` is expected to be either [`futures_util::stream::FuturesOrdered`] or [`futures_util::stream::FuturesUnordered`]
18    /// (or a mutable reference thereof).
19    #[must_use = "`Push`es do nothing unless items are pushed into them"]
20pub struct ResolveFutures<Psh, Queue, QueueInner> {
21        #[pin]
22        push: Psh,
23        queue: Queue,
24        // If `Some`, this waker will schedule future ticks, so all futures should be driven
25        // by it. If `None`, the subgraph execution should block until all futures are resolved.
26        subgraph_waker: Option<Waker>,
27
28        _phantom: PhantomData<QueueInner>,
29    }
30}
31
32impl<Psh, Queue, QueueInner> ResolveFutures<Psh, Queue, QueueInner> {
33    /// Create with the given queue and following push.
34    ///
35    /// If `subgraph_waker` is `Some`, the queue will be polled with this waker.
36    pub(crate) const fn new<Fut>(queue: Queue, subgraph_waker: Option<Waker>, push: Psh) -> Self
37    where
38        Psh: Push<Fut::Output, ()>,
39        Queue: BorrowMut<QueueInner>,
40        QueueInner: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
41        Fut: Future,
42        // for<'ctx> Psh::Ctx<'ctx>: crate::Context<'ctx>,
43    {
44        Self {
45            push,
46            queue,
47            subgraph_waker,
48            _phantom: PhantomData,
49        }
50    }
51
52    /// Empties any ready items from the queue into the following push, and readies for the next send.
53    fn empty_ready<Fut>(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> PushStep<Yes>
54    where
55        Psh: Push<Fut::Output, ()>,
56        Queue: BorrowMut<QueueInner>,
57        QueueInner: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
58        Fut: Future,
59    {
60        let mut this = self.project();
61
62        loop {
63            // Ensure the following push is ready.
64            ready!(
65                this.push
66                    .as_mut()
67                    .poll_ready(crate::Context::from_task(ctx))
68            );
69
70            let poll_result = if let Some(w) = this.subgraph_waker.as_ref() {
71                Stream::poll_next(
72                    Pin::new(this.queue.borrow_mut()),
73                    &mut Context::<'_>::from_waker(w),
74                )
75            } else {
76                Stream::poll_next(Pin::new(this.queue.borrow_mut()), ctx)
77            };
78
79            match poll_result {
80                Poll::Ready(Some(out)) => {
81                    this.push.as_mut().start_send(out, ());
82                }
83                Poll::Ready(None) => {
84                    return PushStep::Done;
85                }
86                Poll::Pending => {
87                    if this.subgraph_waker.is_some() {
88                        return PushStep::Done; // We will be re-woken on a future tick
89                    } else {
90                        // We will pend until the queue is emptied.
91                        // TODO(mingwei): Does this mean only one item may be sent at a time?
92                        return PushStep::Pending(Yes);
93                    }
94                }
95            }
96        }
97    }
98}
99
100// TODO(mingwei): support arbitrary metadata
101impl<Psh, Queue, QueueInner, Fut> Push<Fut, ()> for ResolveFutures<Psh, Queue, QueueInner>
102where
103    Psh: Push<Fut::Output, ()>,
104    Queue: BorrowMut<QueueInner>,
105    QueueInner: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
106    Fut: Future,
107    for<'ctx> Psh::Ctx<'ctx>: crate::Context<'ctx>,
108{
109    type Ctx<'ctx> = Context<'ctx>;
110    type CanPend = Yes;
111
112    fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
113        self.as_mut().empty_ready(ctx) // TODO(mingwei): see above
114    }
115
116    fn start_send(self: Pin<&mut Self>, item: Fut, _meta: ()) {
117        let this = self.project();
118        this.queue.borrow_mut().extend(core::iter::once(item));
119
120        if let Some(waker) = this.subgraph_waker.as_ref() {
121            // If `subgraph_waker` is `Some`:
122            // We MUST poll the queue stream to ensure that the futures begin.
123            // We use `this.subgraph_waker` to poll the queue stream, which means the futures are driven
124            // by the subgraph's own waker. This allows the subgraph execution to continue without waiting
125            // for the queued futures to complete; the subgraph does not block ("yield") on their readiness.
126            // If we instead used `cx.waker()`, the subgraph execution would yield ("block") until all queued
127            // futures are ready, effectively pausing subgraph progress until completion of those futures.
128            // Choose the waker based on whether you want subgraph execution to proceed independently of
129            // the queued futures, or to wait for them to complete before continuing.
130            if let Poll::Ready(Some(out)) = Stream::poll_next(
131                Pin::new(this.queue.borrow_mut()),
132                &mut Context::from_waker(waker),
133            ) {
134                this.push.start_send(out, ());
135            }
136        }
137    }
138
139    fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
140        // First drain any ready items from the queue.
141        ready!(self.as_mut().empty_ready(ctx));
142        // Then flush the downstream push.
143        let this = self.project();
144        this.push
145            .poll_flush(crate::Context::from_task(ctx))
146            .convert_into()
147    }
148
149    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
150        // Each future input produces one value output.
151        self.project().push.size_hint(hint);
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use core::pin::Pin;
158    use core::task::{Context, Waker};
159
160    use futures_util::stream::FuturesUnordered;
161
162    use super::*;
163    use crate::push::Push;
164    use crate::push::test_utils::{PushCall, TestPush};
165
166    type Queue = FuturesUnordered<core::future::Ready<i32>>;
167
168    #[test]
169    fn test_poll_ready_readies_downstream() {
170        let waker = Waker::noop();
171        let mut cx = Context::from_waker(waker);
172
173        let mut mock = TestPush::no_pend();
174        let mut queue: Queue = [1, 2, 3].into_iter().map(core::future::ready).collect();
175        let mut rf = ResolveFutures::<_, _, Queue>::new(&mut queue, None, &mut mock);
176
177        let result = Push::<core::future::Ready<i32>, ()>::poll_ready(Pin::new(&mut rf), &mut cx);
178        assert!(result.is_done());
179
180        drop(rf);
181        assert!(
182            mock.history
183                .iter()
184                .any(|c| matches!(c, PushCall::PollReady)),
185            "downstream poll_ready was not called"
186        );
187        assert!(
188            mock.history
189                .iter()
190                .any(|c| matches!(c, PushCall::SendItem(_))),
191            "downstream should have received items"
192        );
193    }
194
195    #[test]
196    fn test_poll_flush_calls_downstream_flush() {
197        let waker = Waker::noop();
198        let mut cx = Context::from_waker(waker);
199
200        let mut mock = TestPush::no_pend();
201        let mut queue: Queue = FuturesUnordered::new();
202        let mut rf = ResolveFutures::<_, _, Queue>::new(&mut queue, None, &mut mock);
203
204        let result = Push::<core::future::Ready<i32>, ()>::poll_flush(Pin::new(&mut rf), &mut cx);
205        assert!(result.is_done());
206
207        drop(rf);
208        assert!(
209            mock.history
210                .iter()
211                .any(|c| matches!(c, PushCall::PollFlush)),
212            "downstream poll_flush was not called"
213        );
214    }
215
216    #[test]
217    fn resolve_futures_readies_downstream_before_each_send() {
218        let waker = Waker::noop();
219        let mut cx = Context::from_waker(waker);
220
221        let mut guard = TestPush::no_pend();
222        let mut queue: Queue = [1, 2, 3].into_iter().map(core::future::ready).collect();
223        let mut rf = ResolveFutures::<_, _, Queue>::new(&mut queue, None, &mut guard);
224
225        // poll_ready drains resolved futures into downstream, each with poll_ready before start_send.
226        let result = Push::<core::future::Ready<i32>, ()>::poll_ready(Pin::new(&mut rf), &mut cx);
227        assert!(result.is_done());
228
229        // Send a new immediately-resolving future.
230        Push::<core::future::Ready<i32>, ()>::start_send(
231            Pin::new(&mut rf),
232            core::future::ready(4),
233            (),
234        );
235
236        // Flush should drain and flush without violating the ready guard.
237        let result = Push::<core::future::Ready<i32>, ()>::poll_flush(Pin::new(&mut rf), &mut cx);
238        assert!(result.is_done());
239    }
240}