Skip to main content

dfir_pipes/pull/
stream_compat.rs

1//! [`StreamCompat`] adapter wrapping a [`Pull`] into a [`futures_core::stream::Stream`].
2use core::pin::Pin;
3use core::task::Poll;
4
5use pin_project_lite::pin_project;
6
7use crate::Context;
8use crate::pull::Pull;
9
10pin_project! {
11    /// Adapter that wraps a [`Pull`] to implement the [`Stream`](futures_core::stream::Stream) trait.
12    #[must_use = "`Stream`s do nothing unless polled"]
13    pub struct StreamCompat<Pul> {
14        #[pin]
15        pull: Pul,
16    }
17}
18
19impl<Pul> StreamCompat<Pul> {
20    /// Creates a new [`StreamCompat`] wrapping the given [`Pull`].
21    pub(crate) const fn new(pull: Pul) -> Self {
22        Self { pull }
23    }
24
25    /// Returns the wrapped [`Pull`].
26    pub fn into_inner(self) -> Pul {
27        self.pull
28    }
29
30    /// Returns a pinned mutable reference to the wrapped [`Pull`].
31    pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Pul> {
32        self.project().pull
33    }
34
35    /// Returns a pinned reference to the wrapped [`Pull`].
36    pub fn as_pin_ref(self: Pin<&Self>) -> Pin<&Pul> {
37        self.project_ref().pull
38    }
39}
40
41impl<Pul> AsMut<Pul> for StreamCompat<Pul> {
42    fn as_mut(&mut self) -> &mut Pul {
43        &mut self.pull
44    }
45}
46
47impl<Pul> AsRef<Pul> for StreamCompat<Pul> {
48    fn as_ref(&self) -> &Pul {
49        &self.pull
50    }
51}
52
53impl<Pul> futures_core::stream::Stream for StreamCompat<Pul>
54where
55    Pul: Pull,
56    for<'ctx> Pul::Ctx<'ctx>: Context<'ctx>,
57{
58    type Item = Pul::Item;
59
60    fn poll_next(
61        self: Pin<&mut Self>,
62        cx: &mut core::task::Context<'_>,
63    ) -> Poll<Option<Self::Item>> {
64        self.as_pin_mut()
65            .pull(Context::from_task(cx))
66            .into_poll()
67            .map(|opt| opt.map(|(item, _meta)| item))
68    }
69
70    fn size_hint(&self) -> (usize, Option<usize>) {
71        self.pull.size_hint()
72    }
73}