Skip to main content

dfir_pipes/pull/
collect.rs

1use core::pin::Pin;
2use core::task::Poll;
3
4use pin_project_lite::pin_project;
5
6use crate::Context;
7use crate::pull::{Pull, PullStep};
8
9pin_project! {
10    /// Future that collects all items from a pull into a collection.
11    #[must_use = "futures do nothing unless polled"]
12    #[derive(Clone, Debug, Default)]
13    pub struct Collect<Prev, C> {
14        #[pin]
15        prev: Prev,
16        collect: C,
17    }
18}
19
20impl<Prev, C> Collect<Prev, C>
21where
22    Self: Future,
23    C: Default,
24{
25    pub(crate) fn new(prev: Prev) -> Self {
26        Self {
27            prev,
28            collect: C::default(),
29        }
30    }
31}
32
33impl<Prev, C> Future for Collect<Prev, C>
34where
35    Prev: Pull,
36    for<'ctx> Prev::Ctx<'ctx>: Context<'ctx>,
37    C: Default + Extend<Prev::Item>,
38{
39    type Output = C;
40
41    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
42        let mut this = self.project();
43        let ctx = <Prev::Ctx<'_> as Context<'_>>::from_task(cx);
44
45        #[cfg(nightly)]
46        this.collect.extend_reserve(this.prev.size_hint().0);
47
48        loop {
49            return match this.prev.as_mut().pull(ctx) {
50                PullStep::Ready(item, _meta) => {
51                    #[cfg(nightly)]
52                    this.collect.extend_one(item);
53                    #[cfg(not(nightly))]
54                    this.collect.extend(core::iter::once(item));
55
56                    continue;
57                }
58                PullStep::Pending(_) => Poll::Pending,
59                PullStep::Ended(_) => Poll::Ready(core::mem::take(this.collect)),
60            };
61        }
62    }
63}