Skip to main content

dfir_pipes/pull/
stream_ready.rs

1//! [`StreamReady`] - a non-blocking `Pull` that wraps a `Stream`.
2
3use core::pin::Pin;
4use core::task::{Poll, Waker};
5
6use futures_core::stream::Stream;
7use pin_project_lite::pin_project;
8
9use crate::pull::{Pull, PullStep};
10use crate::{No, Yes};
11
12pin_project! {
13    /// A `Pull` implementation that wraps a `Stream` and a `Waker`.
14    ///
15    /// Converts a `Stream` into a non-blocking `Pull` by polling with the provided waker.
16    /// If the stream returns `Pending`, this pull treats it as ended (non-blocking).
17    #[must_use = "`Pull`s do nothing unless polled"]
18    #[derive(Clone, Debug)]
19    pub struct StreamReady<S> {
20        #[pin]
21        stream: S,
22        waker: Waker,
23    }
24}
25
26impl<S> StreamReady<S>
27where
28    Self: Pull,
29{
30    /// Create a new `StreamReady` from the given stream and waker function.
31    pub(crate) const fn new(stream: S, waker: Waker) -> Self {
32        Self { stream, waker }
33    }
34}
35
36/// StreamReady uses its own waker, so it ignores the context parameter.
37/// It implements `Pull` with `Ctx = ()`.
38impl<S> Pull for StreamReady<S>
39where
40    S: Stream,
41{
42    type Ctx<'ctx> = ();
43
44    type Item = S::Item;
45    type Meta = ();
46    type CanPend = No;
47    type CanEnd = Yes;
48
49    fn pull(
50        self: Pin<&mut Self>,
51        _ctx: &mut Self::Ctx<'_>,
52    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
53        let this = self.project();
54        let mut cx = core::task::Context::from_waker(this.waker);
55        match this.stream.poll_next(&mut cx) {
56            Poll::Ready(Some(item)) => PullStep::Ready(item, ()),
57            Poll::Ready(None) => PullStep::Ended(Yes),
58            Poll::Pending => PullStep::Ended(Yes),
59        }
60    }
61
62    fn size_hint(&self) -> (usize, Option<usize>) {
63        self.stream.size_hint()
64    }
65}