Skip to main content

dfir_pipes/
lib.rs

1//! Pull and push-based stream combinators for dataflow pipelines.
2//!
3//! This crate provides a [`pull::Pull`] trait and a [`push::Push`] trait, along with collections
4//! of composable operators for building pull-based and push-based data pipelines.
5//! Operators are chained via trait methods on [`pull::Pull`] (same as iterator adapters) or
6//! module functions in [`push`].
7#![no_std]
8#![cfg_attr(nightly, feature(extend_one))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![warn(missing_docs, clippy::missing_const_for_fn)]
11
12#[cfg(any(test, feature = "alloc"))]
13extern crate alloc;
14#[cfg(any(test, feature = "std"))]
15extern crate std;
16
17/// Type-level `false` for [`Toggle`].
18///
19/// Indicates that a capability is absent (e.g., the pull cannot pend or cannot end).
20///
21/// A type alias for `core::convert::Infallible`, representing a type that can never be constructed.
22///
23/// Used in `Step` variants that are statically impossible (e.g., `Pending` when `CanPend = No`).
24pub use core::convert::Infallible as No;
25
26pub use futures_core::stream::{FusedStream, Stream};
27pub use futures_sink::Sink;
28pub use itertools::{self, Either, EitherOrBoth};
29use sealed::sealed;
30
31/// Pull-based stream combinators.
32pub mod pull;
33
34/// Push-based stream combinators.
35pub mod push;
36
37/// A sealed trait for type-level booleans used to track pull capabilities.
38///
39/// `Toggle` is used to statically encode whether a pull can pend (`CanPend`) or end (`CanEnd`).
40/// This enables compile-time guarantees about pull behavior and allows the type system to
41/// optimize away impossible code paths.
42#[sealed]
43pub trait Toggle: Sized {
44    /// Attempts to create this type, returning `Err(())` if `Self` is `No`.
45    fn try_create() -> Option<Self>;
46
47    /// Attempts to create this type, panicking if `Self` is `No`.
48    fn create() -> Self {
49        Self::try_create().unwrap()
50    }
51
52    /// The result of OR-ing two toggles. `Yes.or(T) = Yes`, `No.or(T) = T`.
53    type Or<T: Toggle>: Toggle;
54    /// The result of AND-ing two toggles. `Yes.and(T) = T`, `No.and(T) = No`.
55    type And<T: Toggle>: Toggle;
56}
57
58/// Type-level `true` for [`Toggle`].
59///
60/// Indicates that a capability is present (e.g., the pull can pend or can end).
61#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
62pub struct Yes;
63
64#[sealed]
65impl Toggle for Yes {
66    fn try_create() -> Option<Self> {
67        Some(Yes)
68    }
69
70    type Or<T: Toggle> = Yes;
71    type And<T: Toggle> = T;
72}
73
74#[sealed]
75impl Toggle for No {
76    fn try_create() -> Option<Self> {
77        None
78    }
79
80    type Or<T: Toggle> = T;
81    type And<T: Toggle> = No;
82}
83
84const fn mut_unit<'a>() -> &'a mut () {
85    // SAFETY: `UNIT` is a zero-sized type (ZST), so its pointer cannot dangle.
86    // https://doc.rust-lang.org/reference/behavior-considered-undefined.html#r-undefined.dangling.zero-size
87    unsafe { core::ptr::NonNull::dangling().as_mut() }
88}
89
90/// Context trait for pull-based streams, allowing operators to be generic over
91/// synchronous (`()`) and asynchronous ([`core::task::Context`]) execution contexts.
92#[sealed]
93pub trait Context<'ctx>: Sized {
94    /// The merged context type when combining two pulls.
95    type Merged<Other: Context<'ctx>>: Context<'ctx>;
96
97    /// Creates a context reference from a [`core::task::Context`].
98    fn from_task<'s>(task_ctx: &'s mut core::task::Context<'ctx>) -> &'s mut Self;
99
100    /// Extracts the self-side context from a merged context.
101    fn unmerge_self<'s, Other: Context<'ctx>>(merged: &'s mut Self::Merged<Other>) -> &'s mut Self;
102    /// Extracts the other-side context from a merged context.
103    fn unmerge_other<'s, Other: Context<'ctx>>(
104        merged: &'s mut Self::Merged<Other>,
105    ) -> &'s mut Other;
106}
107
108#[sealed]
109impl<'ctx> Context<'ctx> for () {
110    type Merged<Other: Context<'ctx>> = Other;
111
112    fn from_task<'s>(_task_ctx: &'s mut core::task::Context<'ctx>) -> &'s mut Self {
113        mut_unit()
114    }
115
116    fn unmerge_self<'s, Other: Context<'ctx>>(
117        _merged: &'s mut Self::Merged<Other>,
118    ) -> &'s mut Self {
119        mut_unit()
120    }
121    fn unmerge_other<'s, Other: Context<'ctx>>(
122        merged: &'s mut Self::Merged<Other>,
123    ) -> &'s mut Other {
124        merged
125    }
126}
127
128#[sealed]
129impl<'ctx> Context<'ctx> for core::task::Context<'ctx> {
130    type Merged<Other: Context<'ctx>> = core::task::Context<'ctx>;
131
132    fn from_task<'s>(task_ctx: &'s mut core::task::Context<'ctx>) -> &'s mut Self {
133        task_ctx
134    }
135
136    fn unmerge_self<'s, Other: Context<'ctx>>(merged: &'s mut Self::Merged<Other>) -> &'s mut Self {
137        merged
138    }
139    fn unmerge_other<'s, Other: Context<'ctx>>(
140        merged: &'s mut Self::Merged<Other>,
141    ) -> &'s mut Other {
142        Other::from_task(merged)
143    }
144}