dfir_pipes/pull/mod.rs
1//! Pull-based stream combinators for dataflow pipelines.
2//!
3//! This module provides pull-based operators that can be chained via method calls
4//! on [`pull::Pull`], similar to iterator adapters.
5
6use core::pin::Pin;
7use core::task::{Poll, Waker};
8
9use crate::{Context, No, Toggle, Yes};
10
11#[cfg(feature = "std")]
12mod accumulator;
13mod chain;
14mod collect;
15mod cross_singleton;
16mod either;
17mod empty;
18mod enumerate;
19mod filter;
20mod filter_map;
21mod flat_map;
22mod flatten;
23mod flatten_stream;
24mod for_each;
25mod from_fn;
26mod fuse;
27#[cfg(feature = "std")]
28#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
29pub mod half_join_state;
30mod inspect;
31mod iter;
32mod map;
33mod next;
34mod once;
35mod pending;
36mod poll_fn;
37mod repeat;
38mod send_push;
39mod send_sink;
40mod skip;
41mod skip_while;
42mod stream;
43mod stream_compat;
44mod stream_ready;
45#[cfg(feature = "std")]
46mod symmetric_hash_join;
47mod take;
48mod take_while;
49#[cfg(test)]
50pub(crate) mod test_utils;
51mod zip;
52mod zip_longest;
53
54#[cfg(feature = "std")]
55#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
56pub use accumulator::{AccumulateAll, Accumulator, Fold, FoldFrom, Reduce, accumulate_all};
57pub use chain::Chain;
58pub use collect::Collect;
59pub use cross_singleton::CrossSingleton;
60pub use empty::Empty;
61pub use enumerate::Enumerate;
62pub use filter::Filter;
63pub use filter_map::FilterMap;
64pub use flat_map::FlatMap;
65pub use flatten::Flatten;
66pub use flatten_stream::FlattenStream;
67pub use for_each::ForEach;
68pub use from_fn::FromFn;
69pub use fuse::Fuse;
70#[cfg(feature = "std")]
71#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
72pub use half_join_state::{HalfJoinState, HalfMultisetJoinState, HalfSetJoinState};
73pub use inspect::Inspect;
74pub use iter::Iter;
75pub use map::Map;
76pub use next::Next;
77pub use once::Once;
78pub use pending::Pending;
79pub use poll_fn::PollFn;
80pub use repeat::Repeat;
81pub use send_push::SendPush;
82pub use send_sink::SendSink;
83pub use skip::Skip;
84pub use skip_while::SkipWhile;
85pub use stream::Stream;
86pub use stream_compat::StreamCompat;
87pub use stream_ready::StreamReady;
88#[cfg(feature = "std")]
89#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
90pub use symmetric_hash_join::{
91 NewTickJoinIter, SymmetricHashJoin, SymmetricHashJoinEither, symmetric_hash_join,
92};
93pub use take::Take;
94pub use take_while::TakeWhile;
95pub use zip::Zip;
96pub use zip_longest::ZipLongest;
97
98/// The result of polling a [`Pull`].
99///
100/// `PullStep` represents the three possible outcomes when pulling from a stream:
101/// - `Ready(item, meta)`: An item is available along with associated metadata.
102/// - `Pending(can_pend)`: No item is available yet, but more may come (async).
103/// - `Ended(can_end)`: The stream has terminated and will produce no more items.
104///
105/// The `CanPend` and `CanEnd` type parameters use [`Toggle`] to statically encode
106/// which variants are possible. When a variant is impossible (e.g., `CanPend = No`),
107/// its payload type becomes [`No`], making it a compile error to construct.
108#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
109pub enum PullStep<Item, Meta, CanPend: Toggle, CanEnd: Toggle> {
110 /// An item is ready with associated metadata.
111 Ready(Item, Meta),
112 /// The pull is not ready yet (only possible when `CanPend = Yes`).
113 Pending(CanPend),
114 /// The pull has ended (only possible when `CanEnd = Yes`).
115 Ended(CanEnd),
116}
117
118impl<Item, Meta, CanPend: Toggle, CanEnd: Toggle> PullStep<Item, Meta, CanPend, CanEnd> {
119 /// Creates a new `PullStep::Ended`, or panics if `CanEnd = No`.
120 pub fn ended() -> Self {
121 PullStep::Ended(Toggle::create())
122 }
123
124 /// Creates a new `PullStep::Pending`, or panics if `CanPend = No`.
125 pub fn pending() -> Self {
126 PullStep::Pending(Toggle::create())
127 }
128
129 /// Returns `true` if the step is a [`PullStep::Ready`].
130 pub const fn is_ready(&self) -> bool {
131 matches!(self, PullStep::Ready(_, _))
132 }
133
134 /// Returns `true` if the step is a [`PullStep::Pending`].
135 pub const fn is_pending(&self) -> bool {
136 matches!(self, PullStep::Pending(_))
137 }
138
139 /// Returns `true` if the step is a [`PullStep::Ended`].
140 pub const fn is_ended(&self) -> bool {
141 matches!(self, PullStep::Ended(_))
142 }
143
144 /// Tries to convert the `CanPend` and `CanEnd` type parameters, returning `None` if the conversion is invalid.
145 pub fn try_convert_into<NewPend: Toggle, NewEnd: Toggle>(
146 self,
147 ) -> Option<PullStep<Item, Meta, NewPend, NewEnd>> {
148 Some(match self {
149 Self::Ready(item, meta) => PullStep::Ready(item, meta),
150 Self::Pending(_) => PullStep::Pending(Toggle::try_create()?),
151 Self::Ended(_) => PullStep::Ended(Toggle::try_create()?),
152 })
153 }
154
155 /// Converts the `CanPend` and `CanEnd` type parameters, panicking if the conversion is invalid.
156 pub fn convert_into<NewPend: Toggle, NewEnd: Toggle>(
157 self,
158 ) -> PullStep<Item, Meta, NewPend, NewEnd> {
159 match self {
160 Self::Ready(item, meta) => PullStep::Ready(item, meta),
161 Self::Pending(_) => PullStep::pending(),
162 Self::Ended(_) => PullStep::ended(),
163 }
164 }
165
166 /// Converts this `PullStep` into a [`Poll`]`<Option<(Item, Meta)>>`.
167 pub fn into_poll(self) -> Poll<Option<(Item, Meta)>> {
168 match self {
169 PullStep::Ready(item, meta) => Poll::Ready(Some((item, meta))),
170 PullStep::Pending(_) => Poll::Pending,
171 PullStep::Ended(_) => Poll::Ready(None),
172 }
173 }
174}
175
176/// The `Pull` trait represents a pull-based stream that can be polled for items.
177///
178/// The `Ctx` type parameter allows operators to be generic over the context type.
179/// Most operators don't use the context and just forward it to their predecessor,
180/// so they can be generic over `Ctx`. Operators that need `std::task::Context`
181/// (like `StreamReady`) will use `Ctx = &mut Context<'_>`.
182///
183/// Setting `Ctx = ()` allows most pull pipelines to be used without any context.
184pub trait Pull {
185 /// The context type required to poll this pull.
186 type Ctx<'ctx>: Context<'ctx>;
187
188 /// The type of items yielded by this pull.
189 type Item;
190 /// The metadata type associated with each item.
191 type Meta: Copy;
192 /// Whether this pull can return [`PullStep::Pending`].
193 type CanPend: Toggle;
194 /// Whether this pull can return [`PullStep::Ended`].
195 type CanEnd: Toggle;
196
197 /// Attempts to pull the next item from this stream.
198 fn pull(
199 self: Pin<&mut Self>,
200 ctx: &mut Self::Ctx<'_>,
201 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd>;
202
203 /// Returns the bounds on the remaining length of the pull.
204 ///
205 /// Specifically, `size_hint()` returns a tuple where the first element
206 /// is the lower bound, and the second element is the upper bound.
207 ///
208 /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
209 /// A [`None`] here means that either there is no known upper bound, or the
210 /// upper bound is larger than [`usize`].
211 ///
212 /// # Implementation notes
213 ///
214 /// It is not enforced that a pull implementation yields the declared
215 /// number of elements. A buggy pull may yield less than the lower bound
216 /// or more than the upper bound of elements.
217 ///
218 /// `size_hint()` is primarily intended to be used for optimizations such as
219 /// reserving space for the elements of the pull, but must not be trusted
220 /// to e.g., omit bounds checks in unsafe code. An incorrect implementation
221 /// of `size_hint()` should not lead to memory safety violations.
222 ///
223 /// That said, the implementation should provide a correct estimation,
224 /// because otherwise it would be a violation of the trait's protocol.
225 ///
226 /// A default implementation should return `(0, None)` which is correct for any
227 /// pull. However this is not provided, to prevent oversight.
228 fn size_hint(&self) -> (usize, Option<usize>);
229
230 /// Borrows this pull, allowing it to be used by reference.
231 fn by_ref(&mut self) -> &mut Self {
232 self
233 }
234
235 /// Takes two pulls and creates a new pull over both in sequence.
236 ///
237 /// `chain()` will return a new pull which will first iterate over
238 /// values from the first pull and then over values from the second pull.
239 ///
240 /// The first pull must be finite (`CanEnd = Yes`) and fused ([`FusedPull`]).
241 fn chain<U>(self, other: U) -> Chain<Self, U>
242 where
243 Self: Sized + FusedPull<CanEnd = Yes>,
244 U: Pull<Item = Self::Item, Meta = Self::Meta>,
245 {
246 Chain::new(self, other)
247 }
248
249 /// Creates a pull which gives the current iteration count as well as the next value.
250 ///
251 /// The pull returned yields pairs `(i, val)`, where `i` is the current index
252 /// of iteration and `val` is the value returned by the pull.
253 fn enumerate(self) -> Enumerate<Self>
254 where
255 Self: Sized,
256 {
257 Enumerate::new(self)
258 }
259
260 /// Creates a pull which uses a closure to determine if an element should be yielded.
261 ///
262 /// Given an element the closure must return `true` or `false`. The returned pull
263 /// will yield only the elements for which the closure returns `true`.
264 fn filter<P>(self, predicate: P) -> Filter<Self, P>
265 where
266 Self: Sized,
267 P: FnMut(&Self::Item) -> bool,
268 {
269 Filter::new(self, predicate)
270 }
271
272 /// Creates a pull that both filters and maps.
273 ///
274 /// The returned pull yields only the values for which the supplied closure
275 /// returns `Some(value)`.
276 fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
277 where
278 Self: Sized,
279 F: FnMut(Self::Item) -> Option<B>,
280 {
281 FilterMap::new(self, f)
282 }
283
284 /// Creates a pull that works like map, but flattens nested structure.
285 ///
286 /// The `flat_map()` method is useful when you have a pull of items, and you
287 /// want to apply a function that returns an iterator for each item, then
288 /// flatten all those iterators into a single pull.
289 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, F, U::IntoIter, Self::Meta>
290 where
291 Self: Sized,
292 F: FnMut(Self::Item) -> U,
293 U: IntoIterator,
294 {
295 FlatMap::new(self, f)
296 }
297
298 /// Creates a pull that flattens nested structure.
299 ///
300 /// This is useful when you have a pull of iterables, and you want to
301 /// flatten them into a single pull.
302 fn flatten(self) -> Flatten<Self, <Self::Item as IntoIterator>::IntoIter, Self::Meta>
303 where
304 Self: Sized,
305 Self::Item: IntoIterator,
306 {
307 Flatten::new(self)
308 }
309
310 /// Creates a pull that flattens items that are streams by polling each inner stream.
311 ///
312 /// This is useful when you have a pull of streams, and you want to
313 /// flatten them into a single pull. Requires an async context since
314 /// inner streams are polled.
315 fn flatten_stream(self) -> FlattenStream<Self, Self::Item, Self::Meta>
316 where
317 Self: Sized,
318 Self::Item: futures_core::Stream,
319 {
320 FlattenStream::new(self)
321 }
322
323 /// Creates a future which runs the given function on each element of a pull.
324 fn for_each<F>(self, f: F) -> ForEach<Self, F>
325 where
326 Self: Sized,
327 F: FnMut(Self::Item),
328 {
329 ForEach::new(self, f)
330 }
331
332 /// Creates a future which collects all elements of a pull into a collection.
333 ///
334 /// The collection type `C` must implement `Default` and `Extend<Item>`.
335 fn collect<C>(self) -> Collect<Self, C>
336 where
337 Self: Sized,
338 C: Default + Extend<Self::Item>,
339 {
340 Collect::new(self)
341 }
342
343 /// Creates a pull that ends after the first `None`.
344 ///
345 /// After a pull returns `Ended` for the first time, the behavior of calling
346 /// `pull` again is implementation-defined. `fuse()` adapts any pull,
347 /// ensuring that after `Ended` is given once, it will always return `Ended`
348 /// forever.
349 ///
350 /// Usually this method will simply return `Fuse<Self>`, but it may be
351 /// overridden for optimization.
352 fn fuse(
353 self,
354 ) -> impl for<'ctx> FusedPull<
355 Ctx<'ctx> = Self::Ctx<'ctx>,
356 Item = Self::Item,
357 Meta = Self::Meta,
358 CanPend = Self::CanPend,
359 CanEnd = Self::CanEnd,
360 >
361 where
362 Self: Sized,
363 {
364 Fuse::new(self)
365 }
366
367 /// Does something with each element of a pull, passing the value on.
368 ///
369 /// When using pulls, you'll often chain several of them together.
370 /// While working on such code, you might want to check out what's
371 /// happening at various parts in the pipeline. To do that, insert
372 /// a call to `inspect()`.
373 fn inspect<F>(self, f: F) -> Inspect<Self, F>
374 where
375 Self: Sized,
376 F: FnMut(&Self::Item),
377 {
378 Inspect::new(self, f)
379 }
380
381 /// Takes a closure and creates a pull that calls that closure on each element.
382 ///
383 /// `map()` transforms one pull into another, by means of its argument: something
384 /// that implements `FnMut`. It produces a new pull which calls this closure on
385 /// each element of the original pull.
386 fn map<B, F>(self, f: F) -> Map<Self, F>
387 where
388 Self: Sized,
389 F: FnMut(Self::Item) -> B,
390 {
391 Map::new(self, f)
392 }
393
394 /// Creates a future that pulls all items and sends them into a [`crate::Sink`].
395 fn send_sink<Push>(self, push: Push) -> SendSink<Self, Push>
396 where
397 Self: Sized,
398 Push: futures_sink::Sink<Self::Item>,
399 {
400 SendSink::new(self, push)
401 }
402
403 /// Creates a future that pulls all items and pushes them into a [`crate::push::Push`].
404 fn send_push<Psh>(self, push: Psh) -> SendPush<Self, Psh>
405 where
406 Self: Sized,
407 Psh: crate::push::Push<Self::Item, Self::Meta>,
408 {
409 SendPush::new(self, push)
410 }
411
412 /// Creates a pull that skips the first `n` elements.
413 fn skip(self, n: usize) -> Skip<Self>
414 where
415 Self: Sized,
416 {
417 Skip::new(self, n)
418 }
419
420 /// Creates a pull that skips elements based on a predicate.
421 ///
422 /// `skip_while()` takes a closure as an argument. It will call this closure
423 /// on each element of the pull, and ignore elements until it returns `false`.
424 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
425 where
426 Self: Sized,
427 P: FnMut(&Self::Item) -> bool,
428 {
429 SkipWhile::new(self, predicate)
430 }
431
432 /// Creates a pull that yields the first `n` elements, or fewer if the
433 /// underlying pull ends sooner.
434 fn take(self, n: usize) -> Take<Self>
435 where
436 Self: Sized,
437 {
438 Take::new(self, n)
439 }
440
441 /// Creates a pull that yields elements based on a predicate.
442 ///
443 /// `take_while()` takes a closure as an argument. It will call this closure
444 /// on each element of the pull, and yield elements while it returns `true`.
445 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
446 where
447 Self: Sized,
448 P: FnMut(&Self::Item) -> bool,
449 {
450 TakeWhile::new(self, predicate)
451 }
452
453 /// Zips two pulls together, stopping once either is exhausted.
454 fn zip<U>(self, other: U) -> Zip<Self, U>
455 where
456 Self: Sized,
457 U: Pull<Meta = Self::Meta>,
458 {
459 Zip::new(self, other)
460 }
461
462 /// Zips two pulls together, continuing until both are exhausted.
463 ///
464 /// Unlike a regular zip which ends when either pull ends, `zip_longest`
465 /// continues until both pulls have ended, yielding [`crate::EitherOrBoth`]
466 /// values to indicate which pulls yielded items.
467 ///
468 /// Both pulls must be fused ([`FusedPull`]) to ensure correct behavior
469 /// after one pull ends.
470 fn zip_longest<U>(self, other: U) -> ZipLongest<Self, U>
471 where
472 Self: Sized + FusedPull,
473 U: FusedPull<Meta = Self::Meta>,
474 {
475 ZipLongest::new(self, other)
476 }
477
478 /// Creates a future that resolves with the next item from this pull.
479 ///
480 /// This is the `Pull` equivalent of the `StreamExt::next()` future.
481 fn next(self) -> Next<Self>
482 where
483 Self: Sized,
484 {
485 Next::new(self)
486 }
487
488 /// Crosses each item from this pull with a singleton value from another pull.
489 ///
490 /// The singleton value is obtained from the first item of `singleton_pull` and cached.
491 /// All subsequent items from this pull are paired with this cached singleton value.
492 ///
493 /// If `singleton_pull` ends before yielding any items, the entire combinator ends immediately.
494 fn cross_singleton<SinglePull>(
495 self,
496 singleton_pull: SinglePull,
497 ) -> CrossSingleton<Self, SinglePull, Option<SinglePull::Item>>
498 where
499 Self: Sized,
500 SinglePull: Pull,
501 SinglePull::Item: Clone,
502 {
503 CrossSingleton::new(self, singleton_pull, None)
504 }
505
506 /// [Self::cross_singleton] with external state.
507 fn cross_singleton_state<SinglePull>(
508 self,
509 singleton_pull: SinglePull,
510 singleton_state: &mut Option<SinglePull::Item>,
511 ) -> CrossSingleton<Self, SinglePull, &mut Option<SinglePull::Item>>
512 where
513 Self: Sized,
514 SinglePull: Pull,
515 SinglePull::Item: Clone,
516 {
517 CrossSingleton::new(self, singleton_pull, singleton_state)
518 }
519
520 /// Performs a symmetric hash join with another pull.
521 ///
522 /// Joins items from this pull with items from `rhs` based on a common key.
523 /// Both pulls must yield `(Key, Value)` tuples. The result is a pull of
524 /// `(Key, (V1, V2))` tuples for each matching pair.
525 ///
526 /// The `lhs_state` and `rhs_state` parameters store the join state and must
527 /// implement [`HalfJoinState`].
528 #[cfg(feature = "std")]
529 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
530 fn symmetric_hash_join<Key, V1, Rhs, V2, LhsState, RhsState>(
531 self,
532 rhs: Rhs,
533 lhs_state: LhsState,
534 rhs_state: RhsState,
535 ) -> SymmetricHashJoin<Self, Rhs, LhsState, RhsState, LhsState, RhsState>
536 where
537 Self: Sized + FusedPull<Item = (Key, V1), Meta = ()>,
538 Key: Eq + std::hash::Hash + Clone,
539 V1: Clone,
540 V2: Clone,
541 Rhs: FusedPull<Item = (Key, V2), Meta = ()>,
542 LhsState: HalfJoinState<Key, V1, V2>,
543 RhsState: HalfJoinState<Key, V2, V1>,
544 {
545 SymmetricHashJoin::new(self, rhs, lhs_state, rhs_state)
546 }
547
548 /// [Self::symmetric_hash_join] with external state.
549 #[cfg(feature = "std")]
550 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
551 fn symmetric_hash_join_state<'a, Key, V1, Rhs, V2, LhsState, RhsState>(
552 self,
553 rhs: Rhs,
554 lhs_state: &'a mut LhsState,
555 rhs_state: &'a mut RhsState,
556 ) -> SymmetricHashJoin<Self, Rhs, &'a mut LhsState, &'a mut RhsState, LhsState, RhsState>
557 where
558 Self: Sized + FusedPull<Item = (Key, V1), Meta = ()>,
559 Key: Eq + std::hash::Hash + Clone,
560 V1: Clone,
561 V2: Clone,
562 Rhs: FusedPull<Item = (Key, V2), Meta = ()>,
563 LhsState: HalfJoinState<Key, V1, V2>,
564 RhsState: HalfJoinState<Key, V2, V1>,
565 {
566 SymmetricHashJoin::new(self, rhs, lhs_state, rhs_state)
567 }
568}
569
570impl<P> Pull for &mut P
571where
572 P: Pull + Unpin + ?Sized,
573{
574 type Ctx<'ctx> = P::Ctx<'ctx>;
575
576 type Item = P::Item;
577 type Meta = P::Meta;
578 type CanPend = P::CanPend;
579 type CanEnd = P::CanEnd;
580
581 fn pull(
582 mut self: Pin<&mut Self>,
583 ctx: &mut Self::Ctx<'_>,
584 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
585 Pin::new(&mut **self).pull(ctx)
586 }
587
588 fn size_hint(&self) -> (usize, Option<usize>) {
589 (**self).size_hint()
590 }
591}
592
593/// A marker trait for pulls that are "fused".
594///
595/// A fused pull guarantees that once it returns [`PullStep::Ended`], all subsequent
596/// calls to [`Pull::pull`] will also return [`PullStep::Ended`]. This property allows
597/// downstream operators like [`Pull::chain`] to avoid tracking whether
598/// the upstream has ended.
599///
600/// Implementors should ensure this invariant is upheld. The [`Pull::fuse`]
601/// adapter can be used to make any pull fused.
602pub trait FusedPull: Pull {}
603
604impl<P> FusedPull for &mut P where P: FusedPull + Unpin + ?Sized {}
605
606/// Creates a pull from an iterator.
607///
608/// This is the primary way to create a pull from synchronous data.
609/// The resulting pull will never pend and will end when the iterator is exhausted.
610pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
611 Iter::new(iter.into_iter())
612}
613
614/// Creates a pull from a `futures::Stream`.
615///
616/// The resulting pull requires `&mut Context<'_>` to be polled and can both
617/// pend and end.
618pub const fn stream<S: futures_core::stream::Stream>(stream: S) -> Stream<S> {
619 Stream::new(stream)
620}
621
622/// Creates a [`StreamCompat`] adapter that wraps a [`Pull`] and implements [`futures_core::stream::Stream`].
623pub const fn stream_compat<Pul: Pull>(pull: Pul) -> StreamCompat<Pul> {
624 StreamCompat::new(pull)
625}
626
627/// Creates a pull from a `futures::Stream` with a custom waker.
628///
629/// This variant uses a provided waker function instead of requiring a context.
630/// When the stream returns `Pending`, this pull treats it as ended (non-blocking).
631pub const fn stream_ready<S>(stream: S, waker: Waker) -> StreamReady<S>
632where
633 S: futures_core::stream::Stream,
634{
635 StreamReady::new(stream, waker)
636}
637
638/// Creates a synchronous pull from a closure.
639///
640/// The closure is called each time the pull is polled and should return a `PullStep`.
641pub fn from_fn<F, Item, Meta, CanEnd>(func: F) -> FromFn<F, Item, Meta, CanEnd>
642where
643 F: FnMut() -> PullStep<Item, Meta, No, CanEnd>,
644 Meta: Copy,
645 CanEnd: Toggle,
646{
647 FromFn::new(func)
648}
649
650/// Creates an asynchronous or synchronous pull from a closure.
651///
652/// The closure is called each time the pull is polled and should return a `PullStep`.
653pub fn poll_fn<F, Item, Meta, CanPend, CanEnd>(func: F) -> PollFn<F, Item, Meta, CanPend, CanEnd>
654where
655 F: FnMut(&mut core::task::Context<'_>) -> PullStep<Item, Meta, CanPend, CanEnd>,
656 Meta: Copy,
657 CanPend: Toggle,
658 CanEnd: Toggle,
659{
660 PollFn::new(func)
661}
662
663/// Creates an empty pull that immediately ends.
664pub fn empty<Item>() -> Empty<Item> {
665 Empty::default()
666}
667
668/// Creates a pull that yields a single item.
669pub const fn once<Item>(item: Item) -> Once<Item> {
670 Once::new(item)
671}
672
673/// Creates a pull that yields clones of the given item forever.
674pub const fn repeat<Item>(item: Item) -> Repeat<Item>
675where
676 Item: Clone,
677{
678 Repeat::new(item)
679}
680
681/// Creates a pull that is always pending and never yields items or ends.
682pub const fn pending<Item>() -> Pending<Item> {
683 Pending::new()
684}
685
686/// A macro to override [`Pull::fuse`] for pulls that are already fused.
687///
688/// This macro should be used in the `impl` block for a pull type that is already fused.
689/// It provides a default implementation of `fuse` that simply returns `self`.
690macro_rules! fuse_self {
691 () => {
692 fn fuse(
693 self,
694 ) -> impl for<'ctx> FusedPull<
695 Ctx<'ctx> = Self::Ctx<'ctx>,
696 Item = Self::Item,
697 Meta = Self::Meta,
698 CanPend = Self::CanPend,
699 CanEnd = Self::CanEnd,
700 >
701 where
702 Self: Sized,
703 {
704 self
705 }
706 };
707}
708use fuse_self;