dfir_pipes/pull/
stream_ready.rs1use core::pin::Pin;
4use core::task::{Poll, Waker};
5
6use futures_core::stream::Stream;
7use pin_project_lite::pin_project;
8
9use crate::pull::{Pull, PullStep};
10use crate::{No, Yes};
11
12pin_project! {
13 #[must_use = "`Pull`s do nothing unless polled"]
18 #[derive(Clone, Debug)]
19 pub struct StreamReady<S> {
20 #[pin]
21 stream: S,
22 waker: Waker,
23 }
24}
25
26impl<S> StreamReady<S>
27where
28 Self: Pull,
29{
30 pub(crate) const fn new(stream: S, waker: Waker) -> Self {
32 Self { stream, waker }
33 }
34}
35
36impl<S> Pull for StreamReady<S>
39where
40 S: Stream,
41{
42 type Ctx<'ctx> = ();
43
44 type Item = S::Item;
45 type Meta = ();
46 type CanPend = No;
47 type CanEnd = Yes;
48
49 fn pull(
50 self: Pin<&mut Self>,
51 _ctx: &mut Self::Ctx<'_>,
52 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
53 let this = self.project();
54 let mut cx = core::task::Context::from_waker(this.waker);
55 match this.stream.poll_next(&mut cx) {
56 Poll::Ready(Some(item)) => PullStep::Ready(item, ()),
57 Poll::Ready(None) => PullStep::Ended(Yes),
58 Poll::Pending => PullStep::Ended(Yes),
59 }
60 }
61
62 fn size_hint(&self) -> (usize, Option<usize>) {
63 self.stream.size_hint()
64 }
65}