dfir_pipes/pull/
stream.rs1use core::pin::Pin;
2use core::task::Context;
3
4use futures_core::FusedStream;
5use pin_project_lite::pin_project;
6
7use crate::Yes;
8use crate::pull::{FusedPull, Pull, PullStep};
9
10pin_project! {
11 #[must_use = "`Pull`s do nothing unless polled"]
13 #[derive(Clone, Debug, Default)]
14 pub struct Stream<St> {
15 #[pin]
16 stream: St,
17 }
18}
19
20impl<St> Stream<St>
21where
22 Self: Pull,
23{
24 pub(crate) const fn new(stream: St) -> Self {
25 Self { stream }
26 }
27}
28
29impl<St> Pull for Stream<St>
30where
31 St: futures_core::stream::Stream,
32{
33 type Ctx<'ctx> = Context<'ctx>;
34
35 type Item = St::Item;
36 type Meta = ();
37 type CanPend = Yes;
38 type CanEnd = Yes;
39
40 fn pull(
41 self: Pin<&mut Self>,
42 ctx: &mut Self::Ctx<'_>,
43 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
44 let this = self.project();
45 match futures_core::stream::Stream::poll_next(this.stream, ctx) {
46 core::task::Poll::Ready(Some(item)) => PullStep::Ready(item, ()),
47 core::task::Poll::Ready(None) => PullStep::Ended(Yes),
48 core::task::Poll::Pending => PullStep::Pending(Yes),
49 }
50 }
51
52 fn size_hint(&self) -> (usize, Option<usize>) {
53 self.stream.size_hint()
54 }
55}
56
57impl<St> FusedPull for Stream<St> where St: FusedStream {}