Skip to main content

dfir_pipes/pull/half_join_state/
multiset.rs

1use std::borrow::Cow;
2use std::collections::VecDeque;
3use std::collections::hash_map::Entry;
4
5use rustc_hash::FxHashMap;
6use smallvec::{SmallVec, smallvec};
7
8use super::HalfJoinState;
9
10/// [`HalfJoinState`] with multiset semantics.
11///
12/// Allows duplicate key-value pairs to be stored.
13#[derive(Debug)]
14pub struct HalfMultisetJoinState<Key, ValBuild, ValProbe> {
15    // Here a smallvec with inline storage of 1 is chosen.
16    // The rationale for this decision is that, I speculate, that joins possibly have a bimodal distribution with regards to how much key contention they have.
17    // That is, I think that there are many joins that have 1 value per key on LHS/RHS, and there are also a large category of joins that have multiple values per key.
18    // For the category of joins that have multiple values per key, it's not clear why they would only have 2, 3, 4, or N specific number of values per key. So there's no good number to set the smallvec storage to.
19    // Instead we can just focus on the first group of joins that have 1 value per key and get benefit there without hurting the other group too much with excessive memory usage.
20    /// Table to probe, vec val contains all matches.
21    table: FxHashMap<Key, SmallVec<[ValBuild; 1]>>,
22    /// Not-yet emitted matches.
23    current_matches: VecDeque<(Key, ValProbe, ValBuild)>,
24    len: usize,
25}
26
27impl<Key, ValBuild, ValProbe> Default for HalfMultisetJoinState<Key, ValBuild, ValProbe> {
28    fn default() -> Self {
29        Self {
30            table: FxHashMap::default(),
31            current_matches: VecDeque::default(),
32            len: 0,
33        }
34    }
35}
36
37impl<Key, ValBuild, ValProbe> HalfJoinState<Key, ValBuild, ValProbe>
38    for HalfMultisetJoinState<Key, ValBuild, ValProbe>
39where
40    Key: Clone + Eq + std::hash::Hash,
41    ValBuild: Clone,
42    ValProbe: Clone,
43{
44    fn build(&mut self, k: Key, v: Cow<'_, ValBuild>) -> bool {
45        let entry = self.table.entry(k);
46
47        match entry {
48            Entry::Occupied(mut e) => {
49                e.get_mut().push(v.into_owned());
50                self.len += 1;
51            }
52            Entry::Vacant(e) => {
53                e.insert(smallvec![v.into_owned()]);
54                self.len += 1;
55            }
56        };
57
58        true
59    }
60
61    fn probe(&mut self, k: &Key, v: &ValProbe) -> Option<(Key, ValProbe, ValBuild)> {
62        // TODO: We currently don't free/shrink the self.current_matches vecdeque to save time.
63        // This mean it will grow to eventually become the largest number of matches in a single probe call.
64        // Maybe we should clear this memory at the beginning of every tick/periodically?
65        let mut iter = self
66            .table
67            .get(k)?
68            .iter()
69            .map(|valbuild| (k.clone(), v.clone(), valbuild.clone()));
70
71        let first = iter.next();
72        self.current_matches.extend(iter);
73        first
74    }
75
76    fn full_probe(&self, k: &Key) -> std::slice::Iter<'_, ValBuild> {
77        self.table.get(k).map(|sv| sv.iter()).unwrap_or_default()
78    }
79
80    fn pop_match(&mut self) -> Option<(Key, ValProbe, ValBuild)> {
81        self.current_matches.pop_front()
82    }
83
84    fn len(&self) -> usize {
85        self.len
86    }
87
88    fn iter(&self) -> std::collections::hash_map::Iter<'_, Key, SmallVec<[ValBuild; 1]>> {
89        #[expect(clippy::disallowed_methods, reason = "FxHasher is deterministic")]
90        self.table.iter()
91    }
92
93    fn clear(&mut self) {
94        self.table.clear();
95        self.current_matches.clear();
96        self.len = 0;
97    }
98}