From 67e6b43a30634ab5d797380296a4aeeacd2aef9c Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 24 Oct 2024 17:35:35 -0700 Subject: [PATCH] feat: Add non-unique index join iterator --- crates/execution/src/iter.rs | 441 +++++++++++++++++++---------------- 1 file changed, 236 insertions(+), 205 deletions(-) diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index cd196f2c21b..ba169bb4195 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use spacetimedb_lib::{AlgebraicValue, ProductValue}; use spacetimedb_table::{ blob_store::BlobStore, - btree_index::BTreeIndex, + btree_index::{BTreeIndex, BTreeIndexRangeIter}, static_assert_size, table::{IndexScanIter, RowRef, Table, TableScanIter}, }; @@ -15,9 +15,9 @@ pub enum Row<'a> { Ref(&'a ProductValue), } -impl<'a> Row<'a> { +impl Row<'_> { /// Expect a pointer value, panic otherwise - pub fn expect_ptr(&self) -> &'a RowRef { + pub fn expect_ptr(&self) -> &RowRef { match self { Self::Ptr(ptr) => ptr, _ => unreachable!(), @@ -25,7 +25,7 @@ impl<'a> Row<'a> { } /// Expect a product value, panic otherwise - pub fn expect_ref(&self) -> &'a ProductValue { + pub fn expect_ref(&self) -> &ProductValue { match self { Self::Ref(r) => r, _ => unreachable!(), @@ -46,9 +46,9 @@ pub enum Tuple<'a> { static_assert_size!(Tuple, 40); -impl<'a> Tuple<'a> { +impl Tuple<'_> { /// Expect a row from a base table, panic otherwise - pub fn expect_row(&self) -> &'a Row { + pub fn expect_row(&self) -> &Row { match self { Self::Row(row) => row, _ => unreachable!(), @@ -56,7 +56,7 @@ impl<'a> Tuple<'a> { } /// Expect a temporary tuple, panic otherwise - pub fn expect_join(&'a self) -> &'a [Row<'a>] { + pub fn expect_join(&self) -> &[Row] { match self { Self::Join(elems) => elems.as_slice(), _ => unreachable!(), @@ -76,23 +76,11 @@ pub enum Iter<'a> { IndexScan(IndexScanIter<'a>), /// A cross product iterator CrossJoin(CrossJoinIter<'a>), - /// A unique single column index join iterator - UniqueIxJoin(UniqueIxJoin<'a, ProjEvaluator>), - /// A unique multi-column index join iterator - UniqueMultiColIxJoin(UniqueIxJoin<'a, MultiColProjEvaluator<'a>>), - /// A unique single column index semijoin iterator. - /// Returns tuples from the streaming side (lhs). - UniqueIxSemiLhs(UniqueIxSemiLhs<'a, ProjEvaluator>), - /// A unique multi-column index semijoin iterator. - /// Returns tuples from the streaming side (lhs). - UniqueMultiColIxSemiLhs(UniqueIxSemiLhs<'a, MultiColProjEvaluator<'a>>), - /// A unique single column index semijoin iterator. - /// Returns [RowRef]s from the index (rhs). - UniqueIxSemiRhs(UniqueIxSemiRhs<'a, ProjEvaluator>), - /// A unique multi-column index semijoin iterator. - /// Returns [RowRef]s from the index (rhs). - UniqueMultiColIxSemiRhs(UniqueIxSemiRhs<'a, MultiColProjEvaluator<'a>>), - /// A tuple at a time filter + /// A non-unique (constraint) index join iterator + IxJoin(IxJoin, IndexJoin<'a, Concat<'a>>>), + /// A unique (constraint) index join iterator + UniqueIxJoin(IxJoin, UniqueIndexJoin<'a, Concat<'a>>>), + /// A tuple at a time filter iterator Filter(Filter<'a>), } @@ -109,6 +97,20 @@ impl<'a> Iterator for Iter<'a> { // Index scans return row ids iter.next().map(Row::Ptr).map(Tuple::Row) } + Self::IxJoin(iter) => { + // Index joins return tuples. + // Index semijoins return row ids. + iter.next() + } + Self::UniqueIxJoin(iter) => { + // Index joins return tuples. + // Index semijoins return row ids. + iter.next() + } + Self::Filter(iter) => { + // Filter is a passthru + iter.next() + } Self::CrossJoin(iter) => { iter.next().map(|t| { match t { @@ -116,7 +118,10 @@ impl<'a> Iterator for Iter<'a> { // x // / \ // a b - (Tuple::Row(u), Tuple::Row(v)) => Tuple::Join(vec![u, v]), + (Tuple::Row(u), Tuple::Row(v)) => { + // Returns a 2-tuple + Tuple::Join(vec![u, v]) + } // A right deep join // x // / \ @@ -124,6 +129,8 @@ impl<'a> Iterator for Iter<'a> { // / \ // b c (Tuple::Row(r), Tuple::Join(mut rows)) => { + // Returns (n+1)-tuples, + // if the rhs returns n-tuples. let mut pointers = vec![r]; pointers.append(&mut rows); Tuple::Join(pointers) @@ -135,6 +142,8 @@ impl<'a> Iterator for Iter<'a> { // / \ // a b (Tuple::Join(mut rows), Tuple::Row(r)) => { + // Returns (n+1)-tuples, + // if the lhs returns n-tuples. rows.push(r); Tuple::Join(rows) } @@ -145,74 +154,15 @@ impl<'a> Iterator for Iter<'a> { // / \ / \ // a b c d (Tuple::Join(mut lhs), Tuple::Join(mut rhs)) => { + // Returns (n+m)-tuples, + // if the lhs returns n-tuples, + // if the rhs returns m-tuples. lhs.append(&mut rhs); Tuple::Join(lhs) } } }) } - Self::UniqueIxJoin(iter) => { - iter.next().map(|t| { - match t { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), ptr) => Tuple::Join(vec![u, Row::Ptr(ptr)]), - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), ptr) => { - rows.push(Row::Ptr(ptr)); - Tuple::Join(rows) - } - } - }) - } - Self::UniqueMultiColIxJoin(iter) => { - iter.next().map(|t| { - match t { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), ptr) => Tuple::Join(vec![u, Row::Ptr(ptr)]), - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), ptr) => { - rows.push(Row::Ptr(ptr)); - Tuple::Join(rows) - } - } - }) - } - Self::UniqueIxSemiLhs(iter) => { - // Left index semijoins return tuples from the lhs - iter.next() - } - Self::UniqueIxSemiRhs(iter) => { - // Right index semijions return row ids from the index - iter.next().map(|ptr| Tuple::Row(Row::Ptr(ptr))) - } - Self::UniqueMultiColIxSemiLhs(iter) => { - // Left index semijoins return tuples from the lhs - iter.next() - } - Self::UniqueMultiColIxSemiRhs(iter) => { - // Right index semijions return row ids from the index - iter.next().map(|ptr| Tuple::Row(Row::Ptr(ptr))) - } - Self::Filter(iter) => { - // Filter is a passthru - iter.next() - } } } } @@ -254,120 +204,10 @@ impl<'a> Iterator for CrossJoinIter<'a> { } } -pub trait TupleProjector { +pub trait FieldProject { fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue>; } -/// A unique index join has the same signature as that of a cross join. -/// It is an index join where the index is a unique index. -/// A primary key index is one such example. -pub struct UniqueIxJoin<'a, P> { - /// The lhs of the join - input: Box>, - /// The rhs index - index: &'a BTreeIndex, - /// A handle to the datastore - table: &'a Table, - /// A handle to the blobstore - blob_store: &'a dyn BlobStore, - /// The lhs column projector - projection: P, -} - -impl<'a, P> Iterator for UniqueIxJoin<'a, P> -where - P: TupleProjector, -{ - type Item = (Tuple<'a>, RowRef<'a>); - - fn next(&mut self) -> Option { - self.input.find_map(|tuple| { - self.index - .seek(self.projection.eval(&tuple).as_ref()) - .next() - .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) - .map(|ptr| (tuple, ptr)) - }) - } -} - -/// This iterator implements a unique index join, -/// followed by a projection of the lhs. -pub struct UniqueIxSemiLhs<'a, P> { - /// The lhs of the join - input: Box>, - /// The rhs index - index: &'a BTreeIndex, - /// The lhs column projector - projection: P, -} - -impl<'a, P> Iterator for UniqueIxSemiLhs<'a, P> -where - P: TupleProjector, -{ - type Item = Tuple<'a>; - - fn next(&mut self) -> Option { - self.input - .find(|tuple| self.index.contains_any(self.projection.eval(tuple).as_ref())) - } -} - -/// This iterator implements a unique index join, -/// followed by a projection of the rhs. -pub struct UniqueIxSemiRhs<'a, P> { - /// The lhs of the join - input: Box>, - /// The rhs index - index: &'a BTreeIndex, - /// A handle to the datastore - table: &'a Table, - /// A handle to the blobstore - blob_store: &'a dyn BlobStore, - /// The lhs column projector - projection: P, -} - -impl<'a, P> Iterator for UniqueIxSemiRhs<'a, P> -where - P: TupleProjector, -{ - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - self.input.find_map(|tuple| { - self.index - .seek(self.projection.eval(&tuple).as_ref()) - .next() - .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) - }) - } -} - -/// A tuple at a time filter iterator -pub struct Filter<'a> { - input: Box>, - predicate: ExprProgram<'a>, -} - -impl<'a> Iterator for Filter<'a> { - type Item = Tuple<'a>; - - fn next(&mut self) -> Option { - self.input.find(|tuple| { - ExprEvaluator { - val_stack: vec![], - row_stack: vec![], - } - .eval(&self.predicate, tuple) - .as_bool() - .copied() - .unwrap_or(false) - }) - } -} - /// An opcode for a tuple projection operation #[derive(Clone, Copy)] pub enum ProjOpCode { @@ -383,12 +223,12 @@ pub enum ProjOpCode { static_assert_size!(ProjOpCode, 4); -/// A single column projection evaluator -pub struct ProjEvaluator { +/// A single field/column projection evaluator +pub struct Project { op: ProjOpCode, } -impl TupleProjector for ProjEvaluator { +impl FieldProject for Project { fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue> { match self.op { ProjOpCode::Ptr(i) => tuple @@ -425,18 +265,19 @@ impl TupleProjector for ProjEvaluator { } } -/// A multi-column projection evaluator -pub struct MultiColProjEvaluator<'a> { +/// A multi-column projection evaluator. +/// It concatenates a sequence of field projections. +pub struct Concat<'a> { ops: &'a [ProjOpCode], } -impl TupleProjector for MultiColProjEvaluator<'_> { +impl FieldProject for Concat<'_> { fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue> { Cow::Owned(AlgebraicValue::Product(ProductValue::from_iter( self.ops .iter() .copied() - .map(|op| ProjEvaluator { op }) + .map(|op| Project { op }) .map(|evaluator| evaluator.eval(tuple)) .map(|v| v.into_owned()), ))) @@ -567,3 +408,193 @@ impl<'a> ExprEvaluator<'a> { self.val_stack.pop().unwrap() } } + +/// A tuple at a time filter iterator +pub struct Filter<'a> { + input: Box>, + predicate: ExprProgram<'a>, +} + +impl<'a> Iterator for Filter<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.input.find(|tuple| { + ExprEvaluator { + val_stack: vec![], + row_stack: vec![], + } + .eval(&self.predicate, tuple) + .as_bool() + .copied() + .unwrap_or(false) + }) + } +} + +/// An iterator for a unique (constraint) index join +pub enum IxJoin { + /// A left semijoin with single column index. + /// Returns tuples from the streaming side (lhs). + SemiLhs(SingleCol), + /// A right semijoin with single column index. + /// Returns rows from the index side (rhs). + SemiRhs(SingleCol), + /// A left semijoin with multi-column index. + /// Returns tuples from the streaming side (rhs). + MultiColSemiLhs(MultiCol), + /// A right semijoin with multi-column index. + /// Returns rows from the index side (rhs). + MultiColSemiRhs(MultiCol), + /// A multi-column index. + /// If the lhs iterator returns n-tuples, + /// this iterator returns (n+1)-tuples. + MultiCol(MultiCol), + /// A single column index. + /// If the lhs iterator returns n-tuples, + /// this iterator returns (n+1)-tuples. + Eq(SingleCol), +} + +impl<'a, P, Q> Iterator for IxJoin +where + P: Iterator, RowRef<'a>)>, + Q: Iterator, RowRef<'a>)>, +{ + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + let proj_left_deep_join = |(tuple, ptr)| { + match (tuple, ptr) { + // A leaf join + // x + // / \ + // a b + (Tuple::Row(u), ptr) => { + // Returns a 2-tuple + Tuple::Join(vec![u, Row::Ptr(ptr)]) + } + // A left deep join + // x + // / \ + // x c + // / \ + // a b + (Tuple::Join(mut rows), ptr) => { + // Returns an n+1 tuple + rows.push(Row::Ptr(ptr)); + Tuple::Join(rows) + } + } + }; + match self { + Self::SemiLhs(iter) => { + // A left semijoin + iter.next().map(|(t, _)| t) + } + Self::SemiRhs(iter) => { + // A right semijoin + iter.next().map(|(_, ptr)| ptr).map(Row::Ptr).map(Tuple::Row) + } + Self::MultiColSemiLhs(iter) => { + // A left semijoin + iter.next().map(|(t, _)| t) + } + Self::MultiColSemiRhs(iter) => { + // A right semijoin + iter.next().map(|(_, ptr)| ptr).map(Row::Ptr).map(Tuple::Row) + } + Self::MultiCol(iter) => { + // Appends the rhs to the lhs + iter.next().map(proj_left_deep_join) + } + Self::Eq(iter) => { + // Appends the rhs to the lhs + iter.next().map(proj_left_deep_join) + } + } + } +} + +/// A unique (constraint) index join iterator +pub struct UniqueIndexJoin<'a, FieldProject> { + /// The lhs of the join + input: Box>, + /// The rhs index + index: &'a BTreeIndex, + /// A handle to the datastore + table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs index key projection + projection: FieldProject, +} + +impl<'a, P> Iterator for UniqueIndexJoin<'a, P> +where + P: FieldProject, +{ + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + self.input.find_map(|tuple| { + self.index + .seek(self.projection.eval(&tuple).as_ref()) + .next() + .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) + .map(|ptr| (tuple, ptr)) + }) + } +} + +/// A non-unique (constraint) index join iterator +pub struct IndexJoin<'a, FieldProject> { + /// The lhs of the join + input: Box>, + /// The current tuple from the lhs + tuple: Option>, + /// The rhs index + index: &'a BTreeIndex, + /// The current cursor for the rhs index + index_cursor: Option>, + /// A handle to the datastore + table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs index key projection + projection: FieldProject, +} + +impl<'a, P> Iterator for IndexJoin<'a, P> +where + P: FieldProject, +{ + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + self.tuple + .as_ref() + .and_then(|tuple| { + self.index_cursor.as_mut().and_then(|cursor| { + cursor.next().and_then(|ptr| { + self.table + .get_row_ref(self.blob_store, ptr) + .map(|ptr| (tuple.clone(), ptr)) + }) + }) + }) + .or_else(|| { + self.input.find_map(|tuple| { + Some(self.index.seek(self.projection.eval(&tuple).as_ref())).and_then(|mut cursor| { + cursor.next().and_then(|ptr| { + self.table.get_row_ref(self.blob_store, ptr).map(|ptr| { + self.tuple = Some(tuple.clone()); + self.index_cursor = Some(cursor); + (tuple, ptr) + }) + }) + }) + }) + }) + } +}