Skip to main content

dfir_pipes/pull/
stream.rs

1use core::pin::Pin;
2use core::task::Context;
3
4use futures_core::FusedStream;
5use pin_project_lite::pin_project;
6
7use crate::Yes;
8use crate::pull::{FusedPull, Pull, PullStep};
9
10pin_project! {
11    /// A pull that wraps a [`futures::Stream`](futures_core::stream::Stream).
12    #[must_use = "`Pull`s do nothing unless polled"]
13    #[derive(Clone, Debug, Default)]
14    pub struct Stream<St> {
15        #[pin]
16        stream: St,
17    }
18}
19
20impl<St> Stream<St>
21where
22    Self: Pull,
23{
24    pub(crate) const fn new(stream: St) -> Self {
25        Self { stream }
26    }
27}
28
29impl<St> Pull for Stream<St>
30where
31    St: futures_core::stream::Stream,
32{
33    type Ctx<'ctx> = Context<'ctx>;
34
35    type Item = St::Item;
36    type Meta = ();
37    type CanPend = Yes;
38    type CanEnd = Yes;
39
40    fn pull(
41        self: Pin<&mut Self>,
42        ctx: &mut Self::Ctx<'_>,
43    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
44        let this = self.project();
45        match futures_core::stream::Stream::poll_next(this.stream, ctx) {
46            core::task::Poll::Ready(Some(item)) => PullStep::Ready(item, ()),
47            core::task::Poll::Ready(None) => PullStep::Ended(Yes),
48            core::task::Poll::Pending => PullStep::Pending(Yes),
49        }
50    }
51
52    fn size_hint(&self) -> (usize, Option<usize>) {
53        self.stream.size_hint()
54    }
55}
56
57impl<St> FusedPull for Stream<St> where St: FusedStream {}