Skip to main content

dfir_pipes/pull/
mod.rs

1//! Pull-based stream combinators for dataflow pipelines.
2//!
3//! This module provides pull-based operators that can be chained via method calls
4//! on [`pull::Pull`], similar to iterator adapters.
5
6use core::pin::Pin;
7use core::task::{Poll, Waker};
8
9use crate::{Context, No, Toggle, Yes};
10
11#[cfg(feature = "std")]
12mod accumulator;
13mod chain;
14mod collect;
15mod cross_singleton;
16mod either;
17mod empty;
18mod enumerate;
19mod filter;
20mod filter_map;
21mod flat_map;
22mod flatten;
23mod flatten_stream;
24mod for_each;
25mod from_fn;
26mod fuse;
27#[cfg(feature = "std")]
28#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
29pub mod half_join_state;
30mod inspect;
31mod iter;
32mod map;
33mod next;
34mod once;
35mod pending;
36mod poll_fn;
37mod repeat;
38mod send_push;
39mod send_sink;
40mod skip;
41mod skip_while;
42mod stream;
43mod stream_compat;
44mod stream_ready;
45#[cfg(feature = "std")]
46mod symmetric_hash_join;
47mod take;
48mod take_while;
49#[cfg(test)]
50pub(crate) mod test_utils;
51mod zip;
52mod zip_longest;
53
54#[cfg(feature = "std")]
55#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
56pub use accumulator::{AccumulateAll, Accumulator, Fold, FoldFrom, Reduce, accumulate_all};
57pub use chain::Chain;
58pub use collect::Collect;
59pub use cross_singleton::CrossSingleton;
60pub use empty::Empty;
61pub use enumerate::Enumerate;
62pub use filter::Filter;
63pub use filter_map::FilterMap;
64pub use flat_map::FlatMap;
65pub use flatten::Flatten;
66pub use flatten_stream::FlattenStream;
67pub use for_each::ForEach;
68pub use from_fn::FromFn;
69pub use fuse::Fuse;
70#[cfg(feature = "std")]
71#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
72pub use half_join_state::{HalfJoinState, HalfMultisetJoinState, HalfSetJoinState};
73pub use inspect::Inspect;
74pub use iter::Iter;
75pub use map::Map;
76pub use next::Next;
77pub use once::Once;
78pub use pending::Pending;
79pub use poll_fn::PollFn;
80pub use repeat::Repeat;
81pub use send_push::SendPush;
82pub use send_sink::SendSink;
83pub use skip::Skip;
84pub use skip_while::SkipWhile;
85pub use stream::Stream;
86pub use stream_compat::StreamCompat;
87pub use stream_ready::StreamReady;
88#[cfg(feature = "std")]
89#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
90pub use symmetric_hash_join::{
91    NewTickJoinIter, SymmetricHashJoin, SymmetricHashJoinEither, symmetric_hash_join,
92};
93pub use take::Take;
94pub use take_while::TakeWhile;
95pub use zip::Zip;
96pub use zip_longest::ZipLongest;
97
98/// The result of polling a [`Pull`].
99///
100/// `PullStep` represents the three possible outcomes when pulling from a stream:
101/// - `Ready(item, meta)`: An item is available along with associated metadata.
102/// - `Pending(can_pend)`: No item is available yet, but more may come (async).
103/// - `Ended(can_end)`: The stream has terminated and will produce no more items.
104///
105/// The `CanPend` and `CanEnd` type parameters use [`Toggle`] to statically encode
106/// which variants are possible. When a variant is impossible (e.g., `CanPend = No`),
107/// its payload type becomes [`No`], making it a compile error to construct.
108#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
109pub enum PullStep<Item, Meta, CanPend: Toggle, CanEnd: Toggle> {
110    /// An item is ready with associated metadata.
111    Ready(Item, Meta),
112    /// The pull is not ready yet (only possible when `CanPend = Yes`).
113    Pending(CanPend),
114    /// The pull has ended (only possible when `CanEnd = Yes`).
115    Ended(CanEnd),
116}
117
118impl<Item, Meta, CanPend: Toggle, CanEnd: Toggle> PullStep<Item, Meta, CanPend, CanEnd> {
119    /// Creates a new `PullStep::Ended`, or panics if `CanEnd = No`.
120    pub fn ended() -> Self {
121        PullStep::Ended(Toggle::create())
122    }
123
124    /// Creates a new `PullStep::Pending`, or panics if `CanPend = No`.
125    pub fn pending() -> Self {
126        PullStep::Pending(Toggle::create())
127    }
128
129    /// Returns `true` if the step is a [`PullStep::Ready`].
130    pub const fn is_ready(&self) -> bool {
131        matches!(self, PullStep::Ready(_, _))
132    }
133
134    /// Returns `true` if the step is a [`PullStep::Pending`].
135    pub const fn is_pending(&self) -> bool {
136        matches!(self, PullStep::Pending(_))
137    }
138
139    /// Returns `true` if the step is a [`PullStep::Ended`].
140    pub const fn is_ended(&self) -> bool {
141        matches!(self, PullStep::Ended(_))
142    }
143
144    /// Tries to convert the `CanPend` and `CanEnd` type parameters, returning `None` if the conversion is invalid.
145    pub fn try_convert_into<NewPend: Toggle, NewEnd: Toggle>(
146        self,
147    ) -> Option<PullStep<Item, Meta, NewPend, NewEnd>> {
148        Some(match self {
149            Self::Ready(item, meta) => PullStep::Ready(item, meta),
150            Self::Pending(_) => PullStep::Pending(Toggle::try_create()?),
151            Self::Ended(_) => PullStep::Ended(Toggle::try_create()?),
152        })
153    }
154
155    /// Converts the `CanPend` and `CanEnd` type parameters, panicking if the conversion is invalid.
156    pub fn convert_into<NewPend: Toggle, NewEnd: Toggle>(
157        self,
158    ) -> PullStep<Item, Meta, NewPend, NewEnd> {
159        match self {
160            Self::Ready(item, meta) => PullStep::Ready(item, meta),
161            Self::Pending(_) => PullStep::pending(),
162            Self::Ended(_) => PullStep::ended(),
163        }
164    }
165
166    /// Converts this `PullStep` into a [`Poll`]`<Option<(Item, Meta)>>`.
167    pub fn into_poll(self) -> Poll<Option<(Item, Meta)>> {
168        match self {
169            PullStep::Ready(item, meta) => Poll::Ready(Some((item, meta))),
170            PullStep::Pending(_) => Poll::Pending,
171            PullStep::Ended(_) => Poll::Ready(None),
172        }
173    }
174}
175
176/// The `Pull` trait represents a pull-based stream that can be polled for items.
177///
178/// The `Ctx` type parameter allows operators to be generic over the context type.
179/// Most operators don't use the context and just forward it to their predecessor,
180/// so they can be generic over `Ctx`. Operators that need `std::task::Context`
181/// (like `StreamReady`) will use `Ctx = &mut Context<'_>`.
182///
183/// Setting `Ctx = ()` allows most pull pipelines to be used without any context.
184pub trait Pull {
185    /// The context type required to poll this pull.
186    type Ctx<'ctx>: Context<'ctx>;
187
188    /// The type of items yielded by this pull.
189    type Item;
190    /// The metadata type associated with each item.
191    type Meta: Copy;
192    /// Whether this pull can return [`PullStep::Pending`].
193    type CanPend: Toggle;
194    /// Whether this pull can return [`PullStep::Ended`].
195    type CanEnd: Toggle;
196
197    /// Attempts to pull the next item from this stream.
198    fn pull(
199        self: Pin<&mut Self>,
200        ctx: &mut Self::Ctx<'_>,
201    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd>;
202
203    /// Returns the bounds on the remaining length of the pull.
204    ///
205    /// Specifically, `size_hint()` returns a tuple where the first element
206    /// is the lower bound, and the second element is the upper bound.
207    ///
208    /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
209    /// A [`None`] here means that either there is no known upper bound, or the
210    /// upper bound is larger than [`usize`].
211    ///
212    /// # Implementation notes
213    ///
214    /// It is not enforced that a pull implementation yields the declared
215    /// number of elements. A buggy pull may yield less than the lower bound
216    /// or more than the upper bound of elements.
217    ///
218    /// `size_hint()` is primarily intended to be used for optimizations such as
219    /// reserving space for the elements of the pull, but must not be trusted
220    /// to e.g., omit bounds checks in unsafe code. An incorrect implementation
221    /// of `size_hint()` should not lead to memory safety violations.
222    ///
223    /// That said, the implementation should provide a correct estimation,
224    /// because otherwise it would be a violation of the trait's protocol.
225    ///
226    /// A default implementation should return `(0, None)` which is correct for any
227    /// pull. However this is not provided, to prevent oversight.
228    fn size_hint(&self) -> (usize, Option<usize>);
229
230    /// Borrows this pull, allowing it to be used by reference.
231    fn by_ref(&mut self) -> &mut Self {
232        self
233    }
234
235    /// Takes two pulls and creates a new pull over both in sequence.
236    ///
237    /// `chain()` will return a new pull which will first iterate over
238    /// values from the first pull and then over values from the second pull.
239    ///
240    /// The first pull must be finite (`CanEnd = Yes`) and fused ([`FusedPull`]).
241    fn chain<U>(self, other: U) -> Chain<Self, U>
242    where
243        Self: Sized + FusedPull<CanEnd = Yes>,
244        U: Pull<Item = Self::Item, Meta = Self::Meta>,
245    {
246        Chain::new(self, other)
247    }
248
249    /// Creates a pull which gives the current iteration count as well as the next value.
250    ///
251    /// The pull returned yields pairs `(i, val)`, where `i` is the current index
252    /// of iteration and `val` is the value returned by the pull.
253    fn enumerate(self) -> Enumerate<Self>
254    where
255        Self: Sized,
256    {
257        Enumerate::new(self)
258    }
259
260    /// Creates a pull which uses a closure to determine if an element should be yielded.
261    ///
262    /// Given an element the closure must return `true` or `false`. The returned pull
263    /// will yield only the elements for which the closure returns `true`.
264    fn filter<P>(self, predicate: P) -> Filter<Self, P>
265    where
266        Self: Sized,
267        P: FnMut(&Self::Item) -> bool,
268    {
269        Filter::new(self, predicate)
270    }
271
272    /// Creates a pull that both filters and maps.
273    ///
274    /// The returned pull yields only the values for which the supplied closure
275    /// returns `Some(value)`.
276    fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
277    where
278        Self: Sized,
279        F: FnMut(Self::Item) -> Option<B>,
280    {
281        FilterMap::new(self, f)
282    }
283
284    /// Creates a pull that works like map, but flattens nested structure.
285    ///
286    /// The `flat_map()` method is useful when you have a pull of items, and you
287    /// want to apply a function that returns an iterator for each item, then
288    /// flatten all those iterators into a single pull.
289    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, F, U::IntoIter, Self::Meta>
290    where
291        Self: Sized,
292        F: FnMut(Self::Item) -> U,
293        U: IntoIterator,
294    {
295        FlatMap::new(self, f)
296    }
297
298    /// Creates a pull that flattens nested structure.
299    ///
300    /// This is useful when you have a pull of iterables, and you want to
301    /// flatten them into a single pull.
302    fn flatten(self) -> Flatten<Self, <Self::Item as IntoIterator>::IntoIter, Self::Meta>
303    where
304        Self: Sized,
305        Self::Item: IntoIterator,
306    {
307        Flatten::new(self)
308    }
309
310    /// Creates a pull that flattens items that are streams by polling each inner stream.
311    ///
312    /// This is useful when you have a pull of streams, and you want to
313    /// flatten them into a single pull. Requires an async context since
314    /// inner streams are polled.
315    fn flatten_stream(self) -> FlattenStream<Self, Self::Item, Self::Meta>
316    where
317        Self: Sized,
318        Self::Item: futures_core::Stream,
319    {
320        FlattenStream::new(self)
321    }
322
323    /// Creates a future which runs the given function on each element of a pull.
324    fn for_each<F>(self, f: F) -> ForEach<Self, F>
325    where
326        Self: Sized,
327        F: FnMut(Self::Item),
328    {
329        ForEach::new(self, f)
330    }
331
332    /// Creates a future which collects all elements of a pull into a collection.
333    ///
334    /// The collection type `C` must implement `Default` and `Extend<Item>`.
335    fn collect<C>(self) -> Collect<Self, C>
336    where
337        Self: Sized,
338        C: Default + Extend<Self::Item>,
339    {
340        Collect::new(self)
341    }
342
343    /// Creates a pull that ends after the first `None`.
344    ///
345    /// After a pull returns `Ended` for the first time, the behavior of calling
346    /// `pull` again is implementation-defined. `fuse()` adapts any pull,
347    /// ensuring that after `Ended` is given once, it will always return `Ended`
348    /// forever.
349    ///
350    /// Usually this method will simply return `Fuse<Self>`, but it may be
351    /// overridden for optimization.
352    fn fuse(
353        self,
354    ) -> impl for<'ctx> FusedPull<
355        Ctx<'ctx> = Self::Ctx<'ctx>,
356        Item = Self::Item,
357        Meta = Self::Meta,
358        CanPend = Self::CanPend,
359        CanEnd = Self::CanEnd,
360    >
361    where
362        Self: Sized,
363    {
364        Fuse::new(self)
365    }
366
367    /// Does something with each element of a pull, passing the value on.
368    ///
369    /// When using pulls, you'll often chain several of them together.
370    /// While working on such code, you might want to check out what's
371    /// happening at various parts in the pipeline. To do that, insert
372    /// a call to `inspect()`.
373    fn inspect<F>(self, f: F) -> Inspect<Self, F>
374    where
375        Self: Sized,
376        F: FnMut(&Self::Item),
377    {
378        Inspect::new(self, f)
379    }
380
381    /// Takes a closure and creates a pull that calls that closure on each element.
382    ///
383    /// `map()` transforms one pull into another, by means of its argument: something
384    /// that implements `FnMut`. It produces a new pull which calls this closure on
385    /// each element of the original pull.
386    fn map<B, F>(self, f: F) -> Map<Self, F>
387    where
388        Self: Sized,
389        F: FnMut(Self::Item) -> B,
390    {
391        Map::new(self, f)
392    }
393
394    /// Creates a future that pulls all items and sends them into a [`crate::Sink`].
395    fn send_sink<Push>(self, push: Push) -> SendSink<Self, Push>
396    where
397        Self: Sized,
398        Push: futures_sink::Sink<Self::Item>,
399    {
400        SendSink::new(self, push)
401    }
402
403    /// Creates a future that pulls all items and pushes them into a [`crate::push::Push`].
404    fn send_push<Psh>(self, push: Psh) -> SendPush<Self, Psh>
405    where
406        Self: Sized,
407        Psh: crate::push::Push<Self::Item, Self::Meta>,
408    {
409        SendPush::new(self, push)
410    }
411
412    /// Creates a pull that skips the first `n` elements.
413    fn skip(self, n: usize) -> Skip<Self>
414    where
415        Self: Sized,
416    {
417        Skip::new(self, n)
418    }
419
420    /// Creates a pull that skips elements based on a predicate.
421    ///
422    /// `skip_while()` takes a closure as an argument. It will call this closure
423    /// on each element of the pull, and ignore elements until it returns `false`.
424    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
425    where
426        Self: Sized,
427        P: FnMut(&Self::Item) -> bool,
428    {
429        SkipWhile::new(self, predicate)
430    }
431
432    /// Creates a pull that yields the first `n` elements, or fewer if the
433    /// underlying pull ends sooner.
434    fn take(self, n: usize) -> Take<Self>
435    where
436        Self: Sized,
437    {
438        Take::new(self, n)
439    }
440
441    /// Creates a pull that yields elements based on a predicate.
442    ///
443    /// `take_while()` takes a closure as an argument. It will call this closure
444    /// on each element of the pull, and yield elements while it returns `true`.
445    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
446    where
447        Self: Sized,
448        P: FnMut(&Self::Item) -> bool,
449    {
450        TakeWhile::new(self, predicate)
451    }
452
453    /// Zips two pulls together, stopping once either is exhausted.
454    fn zip<U>(self, other: U) -> Zip<Self, U>
455    where
456        Self: Sized,
457        U: Pull<Meta = Self::Meta>,
458    {
459        Zip::new(self, other)
460    }
461
462    /// Zips two pulls together, continuing until both are exhausted.
463    ///
464    /// Unlike a regular zip which ends when either pull ends, `zip_longest`
465    /// continues until both pulls have ended, yielding [`crate::EitherOrBoth`]
466    /// values to indicate which pulls yielded items.
467    ///
468    /// Both pulls must be fused ([`FusedPull`]) to ensure correct behavior
469    /// after one pull ends.
470    fn zip_longest<U>(self, other: U) -> ZipLongest<Self, U>
471    where
472        Self: Sized + FusedPull,
473        U: FusedPull<Meta = Self::Meta>,
474    {
475        ZipLongest::new(self, other)
476    }
477
478    /// Creates a future that resolves with the next item from this pull.
479    ///
480    /// This is the `Pull` equivalent of the `StreamExt::next()` future.
481    fn next(self) -> Next<Self>
482    where
483        Self: Sized,
484    {
485        Next::new(self)
486    }
487
488    /// Crosses each item from this pull with a singleton value from another pull.
489    ///
490    /// The singleton value is obtained from the first item of `singleton_pull` and cached.
491    /// All subsequent items from this pull are paired with this cached singleton value.
492    ///
493    /// If `singleton_pull` ends before yielding any items, the entire combinator ends immediately.
494    fn cross_singleton<SinglePull>(
495        self,
496        singleton_pull: SinglePull,
497    ) -> CrossSingleton<Self, SinglePull, Option<SinglePull::Item>>
498    where
499        Self: Sized,
500        SinglePull: Pull,
501        SinglePull::Item: Clone,
502    {
503        CrossSingleton::new(self, singleton_pull, None)
504    }
505
506    /// [Self::cross_singleton] with external state.
507    fn cross_singleton_state<SinglePull>(
508        self,
509        singleton_pull: SinglePull,
510        singleton_state: &mut Option<SinglePull::Item>,
511    ) -> CrossSingleton<Self, SinglePull, &mut Option<SinglePull::Item>>
512    where
513        Self: Sized,
514        SinglePull: Pull,
515        SinglePull::Item: Clone,
516    {
517        CrossSingleton::new(self, singleton_pull, singleton_state)
518    }
519
520    /// Performs a symmetric hash join with another pull.
521    ///
522    /// Joins items from this pull with items from `rhs` based on a common key.
523    /// Both pulls must yield `(Key, Value)` tuples. The result is a pull of
524    /// `(Key, (V1, V2))` tuples for each matching pair.
525    ///
526    /// The `lhs_state` and `rhs_state` parameters store the join state and must
527    /// implement [`HalfJoinState`].
528    #[cfg(feature = "std")]
529    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
530    fn symmetric_hash_join<Key, V1, Rhs, V2, LhsState, RhsState>(
531        self,
532        rhs: Rhs,
533        lhs_state: LhsState,
534        rhs_state: RhsState,
535    ) -> SymmetricHashJoin<Self, Rhs, LhsState, RhsState, LhsState, RhsState>
536    where
537        Self: Sized + FusedPull<Item = (Key, V1), Meta = ()>,
538        Key: Eq + std::hash::Hash + Clone,
539        V1: Clone,
540        V2: Clone,
541        Rhs: FusedPull<Item = (Key, V2), Meta = ()>,
542        LhsState: HalfJoinState<Key, V1, V2>,
543        RhsState: HalfJoinState<Key, V2, V1>,
544    {
545        SymmetricHashJoin::new(self, rhs, lhs_state, rhs_state)
546    }
547
548    /// [Self::symmetric_hash_join] with external state.
549    #[cfg(feature = "std")]
550    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
551    fn symmetric_hash_join_state<'a, Key, V1, Rhs, V2, LhsState, RhsState>(
552        self,
553        rhs: Rhs,
554        lhs_state: &'a mut LhsState,
555        rhs_state: &'a mut RhsState,
556    ) -> SymmetricHashJoin<Self, Rhs, &'a mut LhsState, &'a mut RhsState, LhsState, RhsState>
557    where
558        Self: Sized + FusedPull<Item = (Key, V1), Meta = ()>,
559        Key: Eq + std::hash::Hash + Clone,
560        V1: Clone,
561        V2: Clone,
562        Rhs: FusedPull<Item = (Key, V2), Meta = ()>,
563        LhsState: HalfJoinState<Key, V1, V2>,
564        RhsState: HalfJoinState<Key, V2, V1>,
565    {
566        SymmetricHashJoin::new(self, rhs, lhs_state, rhs_state)
567    }
568}
569
570impl<P> Pull for &mut P
571where
572    P: Pull + Unpin + ?Sized,
573{
574    type Ctx<'ctx> = P::Ctx<'ctx>;
575
576    type Item = P::Item;
577    type Meta = P::Meta;
578    type CanPend = P::CanPend;
579    type CanEnd = P::CanEnd;
580
581    fn pull(
582        mut self: Pin<&mut Self>,
583        ctx: &mut Self::Ctx<'_>,
584    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
585        Pin::new(&mut **self).pull(ctx)
586    }
587
588    fn size_hint(&self) -> (usize, Option<usize>) {
589        (**self).size_hint()
590    }
591}
592
593/// A marker trait for pulls that are "fused".
594///
595/// A fused pull guarantees that once it returns [`PullStep::Ended`], all subsequent
596/// calls to [`Pull::pull`] will also return [`PullStep::Ended`]. This property allows
597/// downstream operators like [`Pull::chain`] to avoid tracking whether
598/// the upstream has ended.
599///
600/// Implementors should ensure this invariant is upheld. The [`Pull::fuse`]
601/// adapter can be used to make any pull fused.
602pub trait FusedPull: Pull {}
603
604impl<P> FusedPull for &mut P where P: FusedPull + Unpin + ?Sized {}
605
606/// Creates a pull from an iterator.
607///
608/// This is the primary way to create a pull from synchronous data.
609/// The resulting pull will never pend and will end when the iterator is exhausted.
610pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
611    Iter::new(iter.into_iter())
612}
613
614/// Creates a pull from a `futures::Stream`.
615///
616/// The resulting pull requires `&mut Context<'_>` to be polled and can both
617/// pend and end.
618pub const fn stream<S: futures_core::stream::Stream>(stream: S) -> Stream<S> {
619    Stream::new(stream)
620}
621
622/// Creates a [`StreamCompat`] adapter that wraps a [`Pull`] and implements [`futures_core::stream::Stream`].
623pub const fn stream_compat<Pul: Pull>(pull: Pul) -> StreamCompat<Pul> {
624    StreamCompat::new(pull)
625}
626
627/// Creates a pull from a `futures::Stream` with a custom waker.
628///
629/// This variant uses a provided waker function instead of requiring a context.
630/// When the stream returns `Pending`, this pull treats it as ended (non-blocking).
631pub const fn stream_ready<S>(stream: S, waker: Waker) -> StreamReady<S>
632where
633    S: futures_core::stream::Stream,
634{
635    StreamReady::new(stream, waker)
636}
637
638/// Creates a synchronous pull from a closure.
639///
640/// The closure is called each time the pull is polled and should return a `PullStep`.
641pub fn from_fn<F, Item, Meta, CanEnd>(func: F) -> FromFn<F, Item, Meta, CanEnd>
642where
643    F: FnMut() -> PullStep<Item, Meta, No, CanEnd>,
644    Meta: Copy,
645    CanEnd: Toggle,
646{
647    FromFn::new(func)
648}
649
650/// Creates an asynchronous or synchronous pull from a closure.
651///
652/// The closure is called each time the pull is polled and should return a `PullStep`.
653pub fn poll_fn<F, Item, Meta, CanPend, CanEnd>(func: F) -> PollFn<F, Item, Meta, CanPend, CanEnd>
654where
655    F: FnMut(&mut core::task::Context<'_>) -> PullStep<Item, Meta, CanPend, CanEnd>,
656    Meta: Copy,
657    CanPend: Toggle,
658    CanEnd: Toggle,
659{
660    PollFn::new(func)
661}
662
663/// Creates an empty pull that immediately ends.
664pub fn empty<Item>() -> Empty<Item> {
665    Empty::default()
666}
667
668/// Creates a pull that yields a single item.
669pub const fn once<Item>(item: Item) -> Once<Item> {
670    Once::new(item)
671}
672
673/// Creates a pull that yields clones of the given item forever.
674pub const fn repeat<Item>(item: Item) -> Repeat<Item>
675where
676    Item: Clone,
677{
678    Repeat::new(item)
679}
680
681/// Creates a pull that is always pending and never yields items or ends.
682pub const fn pending<Item>() -> Pending<Item> {
683    Pending::new()
684}
685
686/// A macro to override [`Pull::fuse`] for pulls that are already fused.
687///
688/// This macro should be used in the `impl` block for a pull type that is already fused.
689/// It provides a default implementation of `fuse` that simply returns `self`.
690macro_rules! fuse_self {
691    () => {
692        fn fuse(
693            self,
694        ) -> impl for<'ctx> FusedPull<
695            Ctx<'ctx> = Self::Ctx<'ctx>,
696            Item = Self::Item,
697            Meta = Self::Meta,
698            CanPend = Self::CanPend,
699            CanEnd = Self::CanEnd,
700        >
701        where
702            Self: Sized,
703        {
704            self
705        }
706    };
707}
708use fuse_self;