diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index d084f4ff132..c52bb47b50f 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -167,193 +167,6 @@ impl<'a> Iterator for Iter<'a> { } } -/// An iterator for a unique (constraint) index join -pub enum IxJoin { - /// A left semijoin with single column index. - /// Returns tuples from the streaming side (lhs). - SemiLhs(SingleCol), - /// A right semijoin with single column index. - /// Returns rows from the index side (rhs). - SemiRhs(SingleCol), - /// A left semijoin with multi-column index. - /// Returns tuples from the streaming side (rhs). - MultiColSemiLhs(MultiCol), - /// A right semijoin with multi-column index. - /// Returns rows from the index side (rhs). - MultiColSemiRhs(MultiCol), - /// A multi-column index. - /// If the lhs iterator returns n-tuples, - /// this iterator returns (n+1)-tuples. - MultiCol(MultiCol), - /// A single column index. - /// If the lhs iterator returns n-tuples, - /// this iterator returns (n+1)-tuples. - Eq(SingleCol), -} - -impl<'a, P, Q> Iterator for IxJoin -where - P: Iterator, RowRef<'a>)>, - Q: Iterator, RowRef<'a>)>, -{ - type Item = Tuple<'a>; - - fn next(&mut self) -> Option { - let proj_left_deep_join = |(tuple, ptr)| { - match (tuple, ptr) { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), ptr) => { - // Returns a 2-tuple - Tuple::Join(vec![u, Row::Ptr(ptr)]) - } - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), ptr) => { - // Returns an n+1 tuple - rows.push(Row::Ptr(ptr)); - Tuple::Join(rows) - } - } - }; - match self { - Self::SemiLhs(iter) => { - iter - // A left semijoin - .next() - .map(|(t, _)| t) - } - Self::SemiRhs(iter) => { - iter - // A right semijoin - .next() - .map(|(_, ptr)| ptr) - .map(Row::Ptr) - .map(Tuple::Row) - } - Self::MultiColSemiLhs(iter) => { - iter - // A left semijoin - .next() - .map(|(t, _)| t) - } - Self::MultiColSemiRhs(iter) => { - iter - // A right semijoin - .next() - .map(|(_, ptr)| ptr) - .map(Row::Ptr) - .map(Tuple::Row) - } - Self::MultiCol(iter) => { - iter - // Appends the rhs to the lhs - .next() - .map(proj_left_deep_join) - } - Self::Eq(iter) => { - iter - // Appends the rhs to the lhs - .next() - .map(proj_left_deep_join) - } - } - } -} - -pub trait FieldProject { - fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue>; -} - -/// A unique (constraint) index join iterator -pub struct UniqueIndexJoin<'a, FieldProject> { - /// The lhs of the join - input: Box>, - /// The rhs index - index: &'a BTreeIndex, - /// A handle to the datastore - table: &'a Table, - /// A handle to the blobstore - blob_store: &'a dyn BlobStore, - /// The lhs index key projection - projection: FieldProject, -} - -impl<'a, P> Iterator for UniqueIndexJoin<'a, P> -where - P: FieldProject, -{ - type Item = (Tuple<'a>, RowRef<'a>); - - fn next(&mut self) -> Option { - self.input.find_map(|tuple| { - self.index - .seek(self.projection.eval(&tuple).as_ref()) - .next() - .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) - .map(|ptr| (tuple, ptr)) - }) - } -} - -/// A non-unique (constraint) index join iterator -pub struct IndexJoin<'a, FieldProject> { - /// The lhs of the join - input: Box>, - /// The current tuple from the lhs - tuple: Option>, - /// The rhs index - index: &'a BTreeIndex, - /// The current cursor for the rhs index - index_cursor: Option>, - /// A handle to the datastore - table: &'a Table, - /// A handle to the blobstore - blob_store: &'a dyn BlobStore, - /// The lhs index key projection - projection: FieldProject, -} - -impl<'a, P> Iterator for IndexJoin<'a, P> -where - P: FieldProject, -{ - type Item = (Tuple<'a>, RowRef<'a>); - - fn next(&mut self) -> Option { - self.tuple - .as_ref() - .and_then(|tuple| { - self.index_cursor.as_mut().and_then(|cursor| { - cursor.next().and_then(|ptr| { - self.table - .get_row_ref(self.blob_store, ptr) - .map(|ptr| (tuple.clone(), ptr)) - }) - }) - }) - .or_else(|| { - self.input.find_map(|tuple| { - Some(self.index.seek(self.projection.eval(&tuple).as_ref())).and_then(|mut cursor| { - cursor.next().and_then(|ptr| { - self.table.get_row_ref(self.blob_store, ptr).map(|ptr| { - self.tuple = Some(tuple.clone()); - self.index_cursor = Some(cursor); - (tuple, ptr) - }) - }) - }) - }) - }) - } -} - /// A cross join returns the cross product of its two inputs. /// It materializes the rhs and streams the lhs. pub struct CrossJoinIter<'a> { @@ -391,27 +204,8 @@ impl<'a> Iterator for CrossJoinIter<'a> { } } -/// A tuple at a time filter iterator -pub struct Filter<'a> { - input: Box>, - predicate: ExprProgram<'a>, -} - -impl<'a> Iterator for Filter<'a> { - type Item = Tuple<'a>; - - fn next(&mut self) -> Option { - self.input.find(|tuple| { - ExprEvaluator { - val_stack: vec![], - row_stack: vec![], - } - .eval(&self.predicate, tuple) - .as_bool() - .copied() - .unwrap_or(false) - }) - } +pub trait FieldProject { + fn eval<'a>(&self, tuple: &'a Tuple) -> Cow<'a, AlgebraicValue>; } /// An opcode for a tuple projection operation @@ -614,3 +408,209 @@ impl<'a> ExprEvaluator<'a> { self.val_stack.pop().unwrap() } } + +/// A tuple at a time filter iterator +pub struct Filter<'a> { + input: Box>, + predicate: ExprProgram<'a>, +} + +impl<'a> Iterator for Filter<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.input.find(|tuple| { + ExprEvaluator { + val_stack: vec![], + row_stack: vec![], + } + .eval(&self.predicate, tuple) + .as_bool() + .copied() + .unwrap_or(false) + }) + } +} + +/// An iterator for a unique (constraint) index join +pub enum IxJoin { + /// A left semijoin with single column index. + /// Returns tuples from the streaming side (lhs). + SemiLhs(SingleCol), + /// A right semijoin with single column index. + /// Returns rows from the index side (rhs). + SemiRhs(SingleCol), + /// A left semijoin with multi-column index. + /// Returns tuples from the streaming side (rhs). + MultiColSemiLhs(MultiCol), + /// A right semijoin with multi-column index. + /// Returns rows from the index side (rhs). + MultiColSemiRhs(MultiCol), + /// A multi-column index. + /// If the lhs iterator returns n-tuples, + /// this iterator returns (n+1)-tuples. + MultiCol(MultiCol), + /// A single column index. + /// If the lhs iterator returns n-tuples, + /// this iterator returns (n+1)-tuples. + Eq(SingleCol), +} + +impl<'a, P, Q> Iterator for IxJoin +where + P: Iterator, RowRef<'a>)>, + Q: Iterator, RowRef<'a>)>, +{ + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + let proj_left_deep_join = |(tuple, ptr)| { + match (tuple, ptr) { + // A leaf join + // x + // / \ + // a b + (Tuple::Row(u), ptr) => { + // Returns a 2-tuple + Tuple::Join(vec![u, Row::Ptr(ptr)]) + } + // A left deep join + // x + // / \ + // x c + // / \ + // a b + (Tuple::Join(mut rows), ptr) => { + // Returns an n+1 tuple + rows.push(Row::Ptr(ptr)); + Tuple::Join(rows) + } + } + }; + match self { + Self::SemiLhs(iter) => { + iter + // A left semijoin + .next() + .map(|(t, _)| t) + } + Self::SemiRhs(iter) => { + iter + // A right semijoin + .next() + .map(|(_, ptr)| ptr) + .map(Row::Ptr) + .map(Tuple::Row) + } + Self::MultiColSemiLhs(iter) => { + iter + // A left semijoin + .next() + .map(|(t, _)| t) + } + Self::MultiColSemiRhs(iter) => { + iter + // A right semijoin + .next() + .map(|(_, ptr)| ptr) + .map(Row::Ptr) + .map(Tuple::Row) + } + Self::MultiCol(iter) => { + iter + // Appends the rhs to the lhs + .next() + .map(proj_left_deep_join) + } + Self::Eq(iter) => { + iter + // Appends the rhs to the lhs + .next() + .map(proj_left_deep_join) + } + } + } +} + +/// A unique (constraint) index join iterator +pub struct UniqueIndexJoin<'a, FieldProject> { + /// The lhs of the join + input: Box>, + /// The rhs index + index: &'a BTreeIndex, + /// A handle to the datastore + table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs index key projection + projection: FieldProject, +} + +impl<'a, P> Iterator for UniqueIndexJoin<'a, P> +where + P: FieldProject, +{ + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + self.input.find_map(|tuple| { + self.index + .seek(self.projection.eval(&tuple).as_ref()) + .next() + .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) + .map(|ptr| (tuple, ptr)) + }) + } +} + +/// A non-unique (constraint) index join iterator +pub struct IndexJoin<'a, FieldProject> { + /// The lhs of the join + input: Box>, + /// The current tuple from the lhs + tuple: Option>, + /// The rhs index + index: &'a BTreeIndex, + /// The current cursor for the rhs index + index_cursor: Option>, + /// A handle to the datastore + table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs index key projection + projection: FieldProject, +} + +impl<'a, P> Iterator for IndexJoin<'a, P> +where + P: FieldProject, +{ + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + self.tuple + .as_ref() + .and_then(|tuple| { + self.index_cursor.as_mut().and_then(|cursor| { + cursor.next().and_then(|ptr| { + self.table + .get_row_ref(self.blob_store, ptr) + .map(|ptr| (tuple.clone(), ptr)) + }) + }) + }) + .or_else(|| { + self.input.find_map(|tuple| { + Some(self.index.seek(self.projection.eval(&tuple).as_ref())).and_then(|mut cursor| { + cursor.next().and_then(|ptr| { + self.table.get_row_ref(self.blob_store, ptr).map(|ptr| { + self.tuple = Some(tuple.clone()); + self.index_cursor = Some(cursor); + (tuple, ptr) + }) + }) + }) + }) + }) + } +}