From 3fd08ee9bf8b4be1d5c4bcf09638ff1fbf6e6bca Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 24 Oct 2024 17:35:35 -0700 Subject: [PATCH] feat: Add non-unique index join iterator --- crates/execution/Cargo.toml | 2 +- crates/execution/src/iter.rs | 363 +++++++++++++++++++---------------- 2 files changed, 197 insertions(+), 168 deletions(-) diff --git a/crates/execution/Cargo.toml b/crates/execution/Cargo.toml index abc34e24fab..d6329a0b3dd 100644 --- a/crates/execution/Cargo.toml +++ b/crates/execution/Cargo.toml @@ -7,6 +7,6 @@ license-file = "LICENSE" description = "The SpacetimeDB query engine" [dependencies] -spacetimedb-lib.workspace = true spacetimedb-expr.workspace = true +spacetimedb-lib.workspace = true spacetimedb-table.workspace = true diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index cd196f2c21b..f252027a458 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use spacetimedb_lib::{AlgebraicValue, ProductValue}; use spacetimedb_table::{ blob_store::BlobStore, - btree_index::BTreeIndex, + btree_index::{BTreeIndex, BTreeIndexRangeIter}, static_assert_size, table::{IndexScanIter, RowRef, Table, TableScanIter}, }; @@ -15,9 +15,9 @@ pub enum Row<'a> { Ref(&'a ProductValue), } -impl<'a> Row<'a> { +impl Row<'_> { /// Expect a pointer value, panic otherwise - pub fn expect_ptr(&self) -> &'a RowRef { + pub fn expect_ptr(&self) -> &RowRef { match self { Self::Ptr(ptr) => ptr, _ => unreachable!(), @@ -25,7 +25,7 @@ impl<'a> Row<'a> { } /// Expect a product value, panic otherwise - pub fn expect_ref(&self) -> &'a ProductValue { + pub fn expect_ref(&self) -> &ProductValue { match self { Self::Ref(r) => r, _ => unreachable!(), @@ -46,9 +46,9 @@ pub enum Tuple<'a> { static_assert_size!(Tuple, 40); -impl<'a> Tuple<'a> { +impl Tuple<'_> { /// Expect a row from a base table, panic otherwise - pub fn expect_row(&self) -> &'a Row { + pub fn expect_row(&self) -> &Row { match self { Self::Row(row) => row, _ => unreachable!(), @@ -56,7 +56,7 @@ impl<'a> Tuple<'a> { } /// Expect a temporary tuple, panic otherwise - pub fn expect_join(&'a self) -> &'a [Row<'a>] { + pub fn expect_join(&self) -> &[Row] { match self { Self::Join(elems) => elems.as_slice(), _ => unreachable!(), @@ -76,23 +76,11 @@ pub enum Iter<'a> { IndexScan(IndexScanIter<'a>), /// A cross product iterator CrossJoin(CrossJoinIter<'a>), - /// A unique single column index join iterator - UniqueIxJoin(UniqueIxJoin<'a, ProjEvaluator>), - /// A unique multi-column index join iterator - UniqueMultiColIxJoin(UniqueIxJoin<'a, MultiColProjEvaluator<'a>>), - /// A unique single column index semijoin iterator. - /// Returns tuples from the streaming side (lhs). - UniqueIxSemiLhs(UniqueIxSemiLhs<'a, ProjEvaluator>), - /// A unique multi-column index semijoin iterator. - /// Returns tuples from the streaming side (lhs). - UniqueMultiColIxSemiLhs(UniqueIxSemiLhs<'a, MultiColProjEvaluator<'a>>), - /// A unique single column index semijoin iterator. - /// Returns [RowRef]s from the index (rhs). - UniqueIxSemiRhs(UniqueIxSemiRhs<'a, ProjEvaluator>), - /// A unique multi-column index semijoin iterator. - /// Returns [RowRef]s from the index (rhs). - UniqueMultiColIxSemiRhs(UniqueIxSemiRhs<'a, MultiColProjEvaluator<'a>>), - /// A tuple at a time filter + /// A non-unique (constraint) index join iterator + IxJoin(IxJoin, IndexJoin<'a, Concat<'a>>>), + /// A unique (constraint) index join iterator + UniqueIxJoin(IxJoin, UniqueIndexJoin<'a, Concat<'a>>>), + /// A tuple at a time filter iterator Filter(Filter<'a>), } @@ -102,13 +90,25 @@ impl<'a> Iterator for Iter<'a> { fn next(&mut self) -> Option { match self { Self::TableScan(iter) => { - // Table scans return row ids + // Returns row ids iter.next().map(Row::Ptr).map(Tuple::Row) } Self::IndexScan(iter) => { - // Index scans return row ids + // Returns row ids iter.next().map(Row::Ptr).map(Tuple::Row) } + Self::IxJoin(iter) => { + // Returns row ids for semijoins, (n+1)-tuples otherwise + iter.next() + } + Self::UniqueIxJoin(iter) => { + // Returns row ids for semijoins, (n+1)-tuples otherwise + iter.next() + } + Self::Filter(iter) => { + // Filter is a passthru + iter.next() + } Self::CrossJoin(iter) => { iter.next().map(|t| { match t { @@ -116,7 +116,10 @@ impl<'a> Iterator for Iter<'a> { // x // / \ // a b - (Tuple::Row(u), Tuple::Row(v)) => Tuple::Join(vec![u, v]), + (Tuple::Row(u), Tuple::Row(v)) => { + // Returns a 2-tuple + Tuple::Join(vec![u, v]) + } // A right deep join // x // / \ @@ -124,6 +127,8 @@ impl<'a> Iterator for Iter<'a> { // / \ // b c (Tuple::Row(r), Tuple::Join(mut rows)) => { + // Returns (n+1)-tuples, + // if the rhs returns n-tuples. let mut pointers = vec![r]; pointers.append(&mut rows); Tuple::Join(pointers) @@ -135,6 +140,8 @@ impl<'a> Iterator for Iter<'a> { // / \ // a b (Tuple::Join(mut rows), Tuple::Row(r)) => { + // Returns (n+1)-tuples, + // if the lhs returns n-tuples. rows.push(r); Tuple::Join(rows) } @@ -145,123 +152,109 @@ impl<'a> Iterator for Iter<'a> { // / \ / \ // a b c d (Tuple::Join(mut lhs), Tuple::Join(mut rhs)) => { + // Returns (n+m)-tuples, + // if the lhs returns n-tuples, + // if the rhs returns m-tuples. lhs.append(&mut rhs); Tuple::Join(lhs) } } }) } - Self::UniqueIxJoin(iter) => { - iter.next().map(|t| { - match t { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), ptr) => Tuple::Join(vec![u, Row::Ptr(ptr)]), - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), ptr) => { - rows.push(Row::Ptr(ptr)); - Tuple::Join(rows) - } - } - }) - } - Self::UniqueMultiColIxJoin(iter) => { - iter.next().map(|t| { - match t { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), ptr) => Tuple::Join(vec![u, Row::Ptr(ptr)]), - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), ptr) => { - rows.push(Row::Ptr(ptr)); - Tuple::Join(rows) - } - } - }) - } - Self::UniqueIxSemiLhs(iter) => { - // Left index semijoins return tuples from the lhs - iter.next() - } - Self::UniqueIxSemiRhs(iter) => { - // Right index semijions return row ids from the index - iter.next().map(|ptr| Tuple::Row(Row::Ptr(ptr))) - } - Self::UniqueMultiColIxSemiLhs(iter) => { - // Left index semijoins return tuples from the lhs - iter.next() - } - Self::UniqueMultiColIxSemiRhs(iter) => { - // Right index semijions return row ids from the index - iter.next().map(|ptr| Tuple::Row(Row::Ptr(ptr))) - } - Self::Filter(iter) => { - // Filter is a passthru - iter.next() - } } } } -/// A cross join returns the cross product of its two inputs. -/// It materializes the rhs and streams the lhs. -pub struct CrossJoinIter<'a> { - /// The lhs input - lhs: Box>, - /// The rhs input - rhs: Box>, - /// The materialized rhs - build: Vec>, - /// The current lhs tuple - lhs_row: Option>, - /// The current rhs tuple - rhs_ptr: usize, +/// An iterator for a unique (constraint) index join +pub enum IxJoin { + /// A single column left semijoin. + /// Returns tuples from the lhs. + SemiLhs(SingleCol), + /// A single column right semijoin. + /// Returns rows from the index side. + SemiRhs(SingleCol), + /// A multi-column left semijoin. + /// Returns tuples from the lhs. + MultiColSemiLhs(MultiCol), + /// A multi-column right semijoin. + /// Returns rows from the index side. + MultiColSemiRhs(MultiCol), + /// A multi-column index join. + /// If the lhs returns n-tuples, + /// this returns (n+1)-tuples. + MultiCol(MultiCol), + /// A single column index join. + /// If the lhs returns n-tuples, + /// this returns (n+1)-tuples. + Eq(SingleCol), } -impl<'a> Iterator for CrossJoinIter<'a> { - type Item = (Tuple<'a>, Tuple<'a>); +impl<'a, P, Q> Iterator for IxJoin +where + P: Iterator, RowRef<'a>)>, + Q: Iterator, RowRef<'a>)>, +{ + type Item = Tuple<'a>; fn next(&mut self) -> Option { - // Materialize the rhs on the first call - if self.build.is_empty() { - self.build = self.rhs.as_mut().collect(); - self.lhs_row = self.lhs.next(); - self.rhs_ptr = 0; - } - // Reset the rhs pointer - if self.rhs_ptr == self.build.len() { - self.lhs_row = self.lhs.next(); - self.rhs_ptr = 0; + let proj_left_deep_join = |(tuple, ptr)| { + match (tuple, ptr) { + // A leaf join + // x + // / \ + // a b + (Tuple::Row(u), ptr) => { + // Returns a 2-tuple + Tuple::Join(vec![u, Row::Ptr(ptr)]) + } + // A left deep join + // x + // / \ + // x c + // / \ + // a b + (Tuple::Join(mut rows), ptr) => { + // Returns an n+1 tuple + rows.push(Row::Ptr(ptr)); + Tuple::Join(rows) + } + } + }; + match self { + Self::SemiLhs(iter) => { + // A left semijoin + iter.next().map(|(t, _)| t) + } + Self::SemiRhs(iter) => { + // A right semijoin + iter.next().map(|(_, ptr)| ptr).map(Row::Ptr).map(Tuple::Row) + } + Self::MultiColSemiLhs(iter) => { + // A left semijoin + iter.next().map(|(t, _)| t) + } + Self::MultiColSemiRhs(iter) => { + // A right semijoin + iter.next().map(|(_, ptr)| ptr).map(Row::Ptr).map(Tuple::Row) + } + Self::MultiCol(iter) => { + // Appends the rhs to the lhs + iter.next().map(proj_left_deep_join) + } + Self::Eq(iter) => { + // Appends the rhs to the lhs + iter.next().map(proj_left_deep_join) + } } - self.lhs_row.as_ref().map(|lhs_tuple| { - self.rhs_ptr += 1; - (lhs_tuple.clone(), self.build[self.rhs_ptr - 1].clone()) - }) } } -pub trait TupleProjector { +pub trait FieldProject { fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue>; } -/// A unique index join has the same signature as that of a cross join. -/// It is an index join where the index is a unique index. -/// A primary key index is one such example. -pub struct UniqueIxJoin<'a, P> { +/// A unique (constraint) index join iterator +pub struct UniqueIndexJoin<'a, FieldProject> { /// The lhs of the join input: Box>, /// The rhs index @@ -270,13 +263,13 @@ pub struct UniqueIxJoin<'a, P> { table: &'a Table, /// A handle to the blobstore blob_store: &'a dyn BlobStore, - /// The lhs column projector - projection: P, + /// The lhs index key projection + projection: FieldProject, } -impl<'a, P> Iterator for UniqueIxJoin<'a, P> +impl<'a, P> Iterator for UniqueIndexJoin<'a, P> where - P: TupleProjector, + P: FieldProject, { type Item = (Tuple<'a>, RowRef<'a>); @@ -291,56 +284,91 @@ where } } -/// This iterator implements a unique index join, -/// followed by a projection of the lhs. -pub struct UniqueIxSemiLhs<'a, P> { +/// A non-unique (constraint) index join iterator +pub struct IndexJoin<'a, FieldProject> { /// The lhs of the join input: Box>, + /// The current tuple from the lhs + tuple: Option>, /// The rhs index index: &'a BTreeIndex, - /// The lhs column projector - projection: P, + /// The current cursor for the rhs index + index_cursor: Option>, + /// A handle to the datastore + table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs index key projection + projection: FieldProject, } -impl<'a, P> Iterator for UniqueIxSemiLhs<'a, P> +impl<'a, P> Iterator for IndexJoin<'a, P> where - P: TupleProjector, + P: FieldProject, { - type Item = Tuple<'a>; + type Item = (Tuple<'a>, RowRef<'a>); fn next(&mut self) -> Option { - self.input - .find(|tuple| self.index.contains_any(self.projection.eval(tuple).as_ref())) + self.tuple + .as_ref() + .and_then(|tuple| { + self.index_cursor.as_mut().and_then(|cursor| { + cursor.next().and_then(|ptr| { + self.table + .get_row_ref(self.blob_store, ptr) + .map(|ptr| (tuple.clone(), ptr)) + }) + }) + }) + .or_else(|| { + self.input.find_map(|tuple| { + Some(self.index.seek(self.projection.eval(&tuple).as_ref())).and_then(|mut cursor| { + cursor.next().and_then(|ptr| { + self.table.get_row_ref(self.blob_store, ptr).map(|ptr| { + self.tuple = Some(tuple.clone()); + self.index_cursor = Some(cursor); + (tuple, ptr) + }) + }) + }) + }) + }) } } -/// This iterator implements a unique index join, -/// followed by a projection of the rhs. -pub struct UniqueIxSemiRhs<'a, P> { - /// The lhs of the join - input: Box>, - /// The rhs index - index: &'a BTreeIndex, - /// A handle to the datastore - table: &'a Table, - /// A handle to the blobstore - blob_store: &'a dyn BlobStore, - /// The lhs column projector - projection: P, +/// A cross join returns the cross product of its two inputs. +/// It materializes the rhs and streams the lhs. +pub struct CrossJoinIter<'a> { + /// The lhs input + lhs: Box>, + /// The rhs input + rhs: Box>, + /// The materialized rhs + build: Vec>, + /// The current lhs tuple + lhs_row: Option>, + /// The current rhs tuple + rhs_ptr: usize, } -impl<'a, P> Iterator for UniqueIxSemiRhs<'a, P> -where - P: TupleProjector, -{ - type Item = RowRef<'a>; +impl<'a> Iterator for CrossJoinIter<'a> { + type Item = (Tuple<'a>, Tuple<'a>); fn next(&mut self) -> Option { - self.input.find_map(|tuple| { - self.index - .seek(self.projection.eval(&tuple).as_ref()) - .next() - .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) + // Materialize the rhs on the first call + if self.build.is_empty() { + self.build = self.rhs.as_mut().collect(); + self.lhs_row = self.lhs.next(); + self.rhs_ptr = 0; + } + // Reset the rhs pointer + if self.rhs_ptr == self.build.len() { + self.lhs_row = self.lhs.next(); + self.rhs_ptr = 0; + } + self.lhs_row.as_ref().map(|lhs_tuple| { + self.rhs_ptr += 1; + (lhs_tuple.clone(), self.build[self.rhs_ptr - 1].clone()) }) } } @@ -383,12 +411,12 @@ pub enum ProjOpCode { static_assert_size!(ProjOpCode, 4); -/// A single column projection evaluator -pub struct ProjEvaluator { +/// A single field/column projection evaluator +pub struct Project { op: ProjOpCode, } -impl TupleProjector for ProjEvaluator { +impl FieldProject for Project { fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue> { match self.op { ProjOpCode::Ptr(i) => tuple @@ -425,18 +453,19 @@ impl TupleProjector for ProjEvaluator { } } -/// A multi-column projection evaluator -pub struct MultiColProjEvaluator<'a> { +/// A multi-column projection evaluator. +/// It concatenates a sequence of field projections. +pub struct Concat<'a> { ops: &'a [ProjOpCode], } -impl TupleProjector for MultiColProjEvaluator<'_> { +impl FieldProject for Concat<'_> { fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue> { Cow::Owned(AlgebraicValue::Product(ProductValue::from_iter( self.ops .iter() .copied() - .map(|op| ProjEvaluator { op }) + .map(|op| Project { op }) .map(|evaluator| evaluator.eval(tuple)) .map(|v| v.into_owned()), )))