Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Use Program instead of (Hash, Box<[u8]>) #1558

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use spacetimedb_lib::db::{
};
use spacetimedb_lib::{Address, Identity};
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::{bsatn, buffer::BufReader, hash::Hash, AlgebraicValue, ProductValue};
use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_snapshot::ReconstructedSnapshot;
use spacetimedb_table::{
Expand Down Expand Up @@ -527,13 +527,7 @@ impl MutTxDatastore for Locking {
tx.iter(&ctx, ST_MODULE_ID)?.next().map(metadata_from_row).transpose()
}

fn update_program(
&self,
tx: &mut Self::MutTx,
program_kind: ModuleKind,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> Result<()> {
fn update_program(&self, tx: &mut Self::MutTx, program_kind: ModuleKind, program: Program) -> Result<()> {
let ctx = ExecutionContext::internal(self.database_address);
let old = tx
.iter(&ctx, ST_MODULE_ID)?
Expand All @@ -547,8 +541,8 @@ impl MutTxDatastore for Locking {
match old {
Some((ptr, mut row)) => {
row.program_kind = program_kind;
row.program_hash = program_hash;
row.program_bytes = program_bytes;
row.program_hash = program.hash;
row.program_bytes = program.bytes;

tx.delete(ST_MODULE_ID, ptr)?;
tx.insert(ST_MODULE_ID, &mut row.into(), self.database_address)
Expand Down
33 changes: 23 additions & 10 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::db::datastore::system_tables::ST_TABLE_ID;
use crate::execution_context::ExecutionContext;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_lib::db::raw_def::*;
use spacetimedb_lib::{Address, Identity};
use spacetimedb_lib::{hash_bytes, Address, Identity};
use spacetimedb_primitives::*;
use spacetimedb_sats::hash::Hash;
use spacetimedb_sats::{AlgebraicValue, ProductType, ProductValue};
Expand Down Expand Up @@ -315,13 +315,34 @@ pub struct Metadata {
}

/// Program associated with a database.
#[derive(Clone)]
pub struct Program {
/// Hash over the program's bytes.
pub hash: Hash,
/// The raw bytes of the program.
pub bytes: Box<[u8]>,
}

impl Program {
/// Create a [`Program`] from its raw bytes.
///
/// This computes the hash over `bytes`, so prefer constructing [`Program`]
/// directly if the hash is already known.
pub fn from_bytes(bytes: impl Into<Box<[u8]>>) -> Self {
let bytes = bytes.into();
let hash = hash_bytes(&bytes);
Self { hash, bytes }
}

/// Create a [`Program`] with no bytes.
pub fn empty() -> Self {
Self {
hash: Hash::ZERO,
bytes: [].into(),
}
}
}

pub trait TxDatastore: DataRow + Tx {
type Iter<'a>: Iterator<Item = Self::RowRef<'a>>
where
Expand Down Expand Up @@ -479,15 +500,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx {
fn metadata_mut_tx(&self, tx: &Self::MutTx) -> Result<Option<Metadata>>;

/// Update the datastore with the supplied binary program.
///
/// The `program_hash` is the precomputed hash over `program_bytes`.
fn update_program(
&self,
tx: &mut Self::MutTx,
program_kind: ModuleKind,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> Result<()>;
fn update_program(&self, tx: &mut Self::MutTx, program_kind: ModuleKind, program: Program) -> Result<()>;
}

#[cfg(test)]
Expand Down
33 changes: 10 additions & 23 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::db::raw_def::{RawColumnDefV8, RawIndexDefV8, RawSequenceDefV8, RawTableDefV8};
use spacetimedb_lib::Identity;
use spacetimedb_primitives::*;
use spacetimedb_sats::hash::Hash;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_snapshot::{SnapshotError, SnapshotRepository};
Expand Down Expand Up @@ -330,24 +329,18 @@ impl RelationalDB {
/// It is an error to call this method on an alread-initialized database.
///
/// See [`Self::open`] for further information.
pub fn set_initialized(
&self,
tx: &mut MutTx,
host_type: HostType,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> Result<(), DBError> {
pub fn set_initialized(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> {
log::trace!(
"[{}] DATABASE: set initialized owner={} program_hash={}",
self.address,
self.owner_identity,
program_hash
program.hash
);

// Probably a bug: the database is already initialized.
// Ignore if it would be a no-op.
if let Some(meta) = self.inner.metadata_mut_tx(tx)? {
if program_hash == meta.program_hash
if program.hash == meta.program_hash
&& self.address == meta.database_address
&& self.owner_identity == meta.owner_identity
{
Expand All @@ -362,8 +355,8 @@ impl RelationalDB {
program_kind: match host_type {
HostType::Wasm => WASM_MODULE,
},
program_hash,
program_bytes,
program_hash: program.hash,
program_bytes: program.bytes,
};
self.insert(tx, ST_MODULE_ID, row.into()).map(drop)
}
Expand Down Expand Up @@ -395,23 +388,17 @@ impl RelationalDB {
///
/// The caller must ensure that:
///
/// - `program_hash` is the [`Hash`] over `program_bytes`.
/// - `program_bytes` is a valid module acc. to `host_type`.
/// - `program.hash` is the [`Hash`] over `program.bytes`.
/// - `program.bytes` is a valid module acc. to `host_type`.
/// - the schema updates contained in the module have been applied within
/// the transactional context `tx`.
/// - the `__init__` reducer contained in the module has been executed
/// within the transactional context `tx`.
pub fn update_program(
&self,
tx: &mut MutTx,
host_type: HostType,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> Result<(), DBError> {
pub fn update_program(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> {
let program_kind = match host_type {
HostType::Wasm => WASM_MODULE,
};
self.inner.update_program(tx, program_kind, program_hash, program_bytes)
self.inner.update_program(tx, program_kind, program)
}

fn restore_from_snapshot_or_bootstrap(
Expand Down Expand Up @@ -1427,7 +1414,7 @@ pub mod tests_utils {
debug_assert!(connected_clients.is_empty());
let db = db.with_row_count(Self::row_count_fn());
db.with_auto_commit(&ExecutionContext::internal(db.address()), |tx| {
db.set_initialized(tx, HostType::Wasm, Hash::ZERO, [].into())
db.set_initialized(tx, HostType::Wasm, Program::empty())
})?;
Ok(db)
}
Expand Down
107 changes: 60 additions & 47 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,9 @@ impl HostController {
"[{}] updating database from `{}` to `{}`",
db_addr, stored_hash, program_hash
);
let program_bytes = load_program(&self.program_storage, program_hash).await?;
let program = load_program(&self.program_storage, program_hash).await?;
let update_result = host
.update_module(
host_type,
Program {
hash: program_hash,
bytes: program_bytes,
},
self.unregister_fn(instance_id),
)
.update_module(host_type, program, self.unregister_fn(instance_id))
.await?;
if update_result.is_ok() {
*guard = Some(host);
Expand Down Expand Up @@ -523,28 +516,45 @@ async fn make_dbic(

async fn make_module_host(
host_type: HostType,
mcc: ModuleCreationContext,
dbic: Arc<DatabaseInstanceContext>,
scheduler: Scheduler,
program: Program,
energy_monitor: Arc<dyn EnergyMonitor>,
unregister: impl Fn() + Send + Sync + 'static,
) -> anyhow::Result<ModuleHost> {
) -> anyhow::Result<(Program, ModuleHost)> {
spawn_rayon(move || {
let module_host = match host_type {
HostType::Wasm => {
let mcc = ModuleCreationContext {
dbic,
scheduler,
program: &program,
energy_monitor,
};
let start = Instant::now();
let actor = host::wasmtime::make_actor(mcc)?;
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
ModuleHost::new(actor, unregister)
}
};
Ok(module_host)
Ok((program, module_host))
})
.await
}

async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Box<[u8]>> {
storage
async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
let bytes = storage
.lookup(hash)
.await?
.with_context(|| format!("program {} not found", hash))
.with_context(|| format!("program {} not found", hash))?;
Ok(Program { hash, bytes })
}

struct LaunchedModule {
dbic: Arc<DatabaseInstanceContext>,
module_host: ModuleHost,
scheduler: Scheduler,
scheduler_starter: SchedulerStarter,
}

async fn launch_module(
Expand All @@ -554,28 +564,33 @@ async fn launch_module(
on_panic: impl Fn() + Send + Sync + 'static,
relational_db: Arc<RelationalDB>,
energy_monitor: Arc<dyn EnergyMonitor>,
) -> anyhow::Result<(Arc<DatabaseInstanceContext>, ModuleHost, Scheduler, SchedulerStarter)> {
) -> anyhow::Result<(Program, LaunchedModule)> {
let address = database.address;
let host_type = database.host_type;

let dbic = make_dbic(database, instance_id, relational_db).await.map(Arc::new)?;
let (scheduler, scheduler_starter) = Scheduler::open(dbic.relational_db.clone());
let module_host = make_module_host(
let (program, module_host) = make_module_host(
host_type,
ModuleCreationContext {
dbic: dbic.clone(),
scheduler: scheduler.clone(),
program_hash: program.hash,
program_bytes: program.bytes.into(),
energy_monitor: energy_monitor.clone(),
},
dbic.clone(),
scheduler.clone(),
program,
energy_monitor.clone(),
on_panic,
)
.await?;

trace!("launched database {} with program {}", address, program.hash);

Ok((dbic, module_host, scheduler, scheduler_starter))
Ok((
program,
LaunchedModule {
dbic,
module_host,
scheduler,
scheduler_starter,
},
))
}

/// Update a module.
Expand All @@ -601,7 +616,7 @@ async fn update_module(
Ok(())
} else {
info!("updating `{}` from {} to {}", addr, stored, program.hash);
module.update_database(program.hash, program.bytes).await?
module.update_database(program).await?
};

Ok(res)
Expand Down Expand Up @@ -677,45 +692,46 @@ impl Host {
)?
}
};
let (dbic, module_host, scheduler, scheduler_starter) = match db.program()? {
let LaunchedModule {
dbic,
module_host,
scheduler,
scheduler_starter,
} = match db.program()? {
// Launch module with program from existing database.
Some(program) => {
launch_module(
let (_, launched) = launch_module(
database,
instance_id,
program,
on_panic,
Arc::new(db),
energy_monitor.clone(),
)
.await?
.await?;
launched
}

// Database is empty, load program from external storage and run
// initialization.
None => {
let program_hash = database.initial_program;
let program_bytes = load_program(&program_storage, program_hash).await?;
let res = launch_module(
let program = load_program(&program_storage, database.initial_program).await?;
let (program, launched) = launch_module(
database,
instance_id,
Program {
hash: program_hash,
bytes: program_bytes.clone(),
},
program,
on_panic,
Arc::new(db),
energy_monitor.clone(),
)
.await?;

let module_host = &res.1;
let call_result = module_host.init_database(program_hash, program_bytes).await?;
let call_result = launched.module_host.init_database(program).await?;
if let Some(call_result) = call_result {
Result::from(call_result)?;
}

res
launched
}
};

Expand Down Expand Up @@ -765,15 +781,12 @@ impl Host {
) -> anyhow::Result<UpdateDatabaseResult> {
let dbic = &self.dbic;
let (scheduler, scheduler_starter) = self.scheduler.new_with_same_db();
let module = make_module_host(
let (program, module) = make_module_host(
host_type,
ModuleCreationContext {
dbic: dbic.clone(),
scheduler: scheduler.clone(),
program_hash: program.hash,
program_bytes: program.bytes.clone().into(),
energy_monitor: self.energy_monitor.clone(),
},
dbic.clone(),
scheduler.clone(),
program,
self.energy_monitor.clone(),
on_panic,
)
.await?;
Expand Down
Loading
Loading