//! # Persy - Transactional Persistence Engine
//!
//! Simple single file durable, isolated, consistent, and atomic persistence engine based on copy on write.
//! It guarantee the persistence of data even in case of crash through a in file write ahead log,
//! and provide simple rust APIs for store simple bytes or value associated (index) data,
//! provide also memory management logic to keep the file size in check.
//!
//! # Example
//!
//! ```rust
//! use persy::{Persy,Config};
//! //...
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! Persy::create("./target/data.persy")?;
//! let persy = Persy::open("./target/data.persy",Config::new())?;
//! let mut tx = persy.begin()?;
//! tx.create_segment("seg")?;
//! let data = vec![1;20];
//! tx.insert("seg", &data)?;
//! let prepared = tx.prepare()?;
//! prepared.commit()?;
//! for (_id,content) in persy.scan("seg")? {
//! assert_eq!(content[0], 1);
//! //....
//! }
//! # std::fs::remove_file("./target/data.persy")?;
//! # Ok(())
//! # }
//! ```
//!
//! # Example Index
//!
//! ```rust
//! use persy::{Persy,Config, ValueMode};
//! //...
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! Persy::create("./target/index_data.persy")?;
//! let persy = Persy::open("./target/index_data.persy",Config::new())?;
//! let mut tx = persy.begin()?;
//! tx.create_index::<i32,i32>("index", ValueMode::Cluster)?;
//! tx.put("index", 10, 20)?;
//! let prepared = tx.prepare()?;
//! prepared.commit()?;
//! for (key,values) in persy.range::<i32,i32,_>("index", ..)? {
//! assert_eq!(key, 10);
//! assert_eq!(values.into_iter().collect::<Vec<_>>(), vec![20]);
//! //....
//! }
//! # std::fs::remove_file("./target/index_data.persy")?;
//! # Ok(())
//! # }
//! ```
//!
mod address;
mod allocator;
mod config;
mod device;
mod error;
mod flush_checksum;
mod id;
mod index;
mod io;
mod journal;
mod locks;
mod open_options;
mod persy;
mod recover;
mod snapshot;
mod snapshots;
mod transaction;
#[cfg(feature = "background_ops")]
mod background;
#[cfg(feature = "experimental_inspect")]
pub mod inspect;
use crate::persy::PersyImpl;
pub use crate::{
address::segment_iter::{SegmentIter, SnapshotSegmentIter, TxSegmentIter},
config::{Config, TransactionConfig, TxStrategy},
error::{
BeginTransactionError, CreateError, CreateIndexError, CreateSegmentError, DeleteError, DropIndexError,
DropSegmentError, GenericError, IndexChangeError, IndexError, IndexOpsError, IndexPutError, InsertError,
OpenError, OpenMemoryError, PersyError, PrepareError, ReadError, SegmentError, UpdateError, PE,
},
id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
index::{
bytevec::ByteVec,
config::{IndexType, IndexTypeId, ValueMode},
iter::{IndexIter, TxIndexIter},
value_iter::ValueIter,
},
open_options::OpenOptions,
persy::IndexInfo,
recover::{Recover, RecoverStatus},
snapshot::Snapshot,
transaction::{Transaction, TransactionFinalize},
};
use std::{
fs::{self, File},
ops::RangeBounds,
path::Path,
sync::Arc,
};
/// Custom identifier to track the transaction in the recover phase
pub type TransactionId = Vec<u8>;
/// Main structure to operate persy storage files
///
#[derive(Clone)]
pub struct Persy {
persy_impl: Arc<PersyImpl>,
}
impl Persy {
/// Create a new database file.
///
/// # Errors
///
/// Fails if the file already exists.
///
/// # Example
///
/// ```rust
/// use std::path::Path;
/// use persy::{Persy, Config};
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let path = Path::new("target/created.db");
/// Persy::create(path)?;
/// let persy = Persy::open(path, Config::new())?;
/// # std::fs::remove_file("target/created.db")?;
/// # Ok(())
/// # }
/// ```
pub fn create<P: AsRef<Path>>(path: P) -> Result<(), PE<CreateError>> {
Ok(PersyImpl::create(path.as_ref())?)
}
/// Create a new database file.
///
/// # Errors
///
/// Fails if the file already exists.
///
pub fn create_from_file(file: File) -> Result<(), PE<CreateError>> {
Ok(PersyImpl::create_from_file(file)?)
}
/// Open a database file.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open<P: AsRef<Path>>(path: P, config: Config) -> Result<Persy, PE<OpenError>> {
Persy::open_with_recover(path, config, |_| true)
}
/// Open a database file from a path with a recover function.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open_with_recover<P: AsRef<Path>, C>(path: P, config: Config, recover: C) -> Result<Persy, PE<OpenError>>
where
C: Fn(&TransactionId) -> bool,
{
let f = fs::OpenOptions::new().write(true).read(true).open(path)?;
Persy::open_from_file_with_recover(f, config, recover)
}
/// Open a database file from a path and return a recover structure that allow to select the
/// transactions to commit and recover them.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn recover<P: AsRef<Path>>(path: P, config: Config) -> Result<Recover, PE<OpenError>> {
let f = fs::OpenOptions::new().write(true).read(true).open(path)?;
let (persy_impl, recover) = PersyImpl::open_recover(f, config)?;
Ok(Recover::new(recover, Arc::new(persy_impl)))
}
/// Open a database file from a direct file handle.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open_from_file(path: File, config: Config) -> Result<Persy, PE<OpenError>> {
let (persy_impl, recover) = PersyImpl::open_recover(path, config)?;
persy_impl.final_recover(recover)?;
Ok(Persy {
persy_impl: Arc::new(persy_impl),
})
}
/// Open a database file, from a direct file handle and a transaction recover function.
///
/// The file should have been created with [`Persy::create`]
///
/// # Errors
///
/// Fails if the file does not exist.
///
/// [`Persy::create`]: struct.Persy.html#method.create
pub fn open_from_file_with_recover<C>(file: File, config: Config, recover: C) -> Result<Persy, PE<OpenError>>
where
C: Fn(&TransactionId) -> bool,
{
let (persy_impl, mut recov) = PersyImpl::open_recover(file, config)?;
recov.apply(recover)?;
persy_impl.final_recover(recov)?;
Ok(Persy {
persy_impl: Arc::new(persy_impl),
})
}
/// Open an existing database or create it if it does not exist yet,
/// calling the `prepare` function just after the creation.
///
/// # Example
///
/// ```rust
/// use std::path::Path;
/// use persy::{Persy, Config, PersyId, ValueMode};
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let path = Path::new("target/open_or_create.db");
/// let config = Config::new();
///
/// let persy = Persy::open_or_create_with(path, config, |persy| {
/// // this closure is only called on database creation
/// let mut tx = persy.begin()?;
/// tx.create_segment("data")?;
/// tx.create_index::<u64, PersyId>("index", ValueMode::Replace)?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// println!("Segment and Index successfully created");
/// Ok(())
/// })?;
/// # std::fs::remove_file("target/open_or_create.db")?;
/// # Ok(())
/// # }
/// ```
pub fn open_or_create_with<P, F>(path: P, config: Config, prepare: F) -> Result<Persy, PE<OpenError>>
where
P: AsRef<Path>,
F: FnOnce(&Persy) -> Result<(), Box<dyn std::error::Error>> + 'static,
{
let path = path.as_ref();
let persy;
if !path.exists() {
Persy::create(path).map_err(|e| PE::PE(OpenError::from(e.error())))?;
persy = Persy::open(path, config)?;
prepare(&persy).map_err(|e| OpenError::InitError(format!("{}", e)))?;
} else {
persy = Persy::open(path, config)?;
}
Ok(persy)
}
/// Begin a new transaction.
///
/// The transaction isolation level is 'read_committed'.
/// for commit call [`prepare`] and [`commit`]
///
/// [`prepare`]:struct.Transaction.html#method.prepare
/// [`commit`]:struct.TransactionFinalize.html#method.commit
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// // ...
/// tx.prepare()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn begin(&self) -> Result<Transaction, PE<BeginTransactionError>> {
self.begin_with(TransactionConfig::new())
}
/// Begin a new transaction specifying parameters for the transaction.
///
/// The transaction isolation level is 'read_committed'.
/// for commit call [`prepare`] and [`commit`]
///
/// [`prepare`]:struct.Transaction.html#method.prepare
/// [`commit`]:struct.TransactionFinalize.html#method.commit
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, TransactionConfig};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let tx_id = vec![2;2];
/// let mut tx = persy.begin_with(TransactionConfig::new().set_transaction_id(tx_id))?;
/// // ...
/// tx.prepare()?.commit()?;
/// # Ok(())
/// # }
/// ```
pub fn begin_with(&self, config: TransactionConfig) -> Result<Transaction, PE<BeginTransactionError>> {
Ok(Transaction {
tx: Some(self.persy_impl.begin_with(config)?),
persy_impl: self.persy_impl.clone(),
})
}
/// Check if a segment already exist in the storage
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// assert!(persy.exists_segment("my_new_segment")?);
/// # Ok(())
/// # }
/// ```
pub fn exists_segment(&self, segment: &str) -> Result<bool, PE<GenericError>> {
Ok(self.persy_impl.exists_segment(segment))
}
/// Resolves the segment to a SegmentId
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("my_new_segment")?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// let segment_id = persy.solve_segment_id("my_new_segment")?;
/// # Ok(())
/// # }
/// ```
pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> Result<SegmentId, PE<SegmentError>> {
Ok(self.persy_impl.solve_segment_id(segment)?)
}
/// Resolves the index to a IndexId,
/// this has no public use as today, may be used in future.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// let index_id = persy.solve_index_id("my_new_index")?;
/// # Ok(())
/// # }
/// ```
pub fn solve_index_id(&self, index: impl ToIndexId) -> Result<IndexId, PE<IndexError>> {
//TODO: find a better name and make this public again
Ok(self.persy_impl.solve_index_id(index)?)
}
/// Read the record content from persistent data.
///
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert("seg", &data)?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// let read = persy.read("seg", &id)?.expect("record exits");
/// assert_eq!(data,read);
/// # Ok(())
/// # }
/// ```
pub fn read(&self, segment: impl ToSegmentId, id: &PersyId) -> Result<Option<Vec<u8>>, PE<ReadError>> {
let segment_id = self
.solve_segment_id(segment)
.map_err(|PE::PE(e)| PE::PE(ReadError::from(e)))?;
Ok(self.persy_impl.read(segment_id, &id.0)?)
}
/// Scan a segment for persistent records
///
/// # Example
///
/// ```rust
/// # use persy::{Persy,Config};
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert("seg", &data)?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// let mut count = 0;
/// for (id,content) in persy.scan("seg")? {
/// println!("record size:{}",content.len());
/// count+=1;
/// }
/// assert_eq!(count,1);
/// # Ok(())
/// # }
/// ```
pub fn scan(&self, segment: impl ToSegmentId) -> Result<SegmentIter, PE<SegmentError>> {
let segment_id = self.solve_segment_id(segment)?;
Ok(SegmentIter::new(
self.persy_impl.scan(segment_id)?,
self.persy_impl.clone(),
))
}
/// Check if a segment already exist in the storage
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("my_new_index", ValueMode::Replace)?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// assert!(persy.exists_index("my_new_index")?);
/// # Ok(())
/// # }
/// ```
pub fn exists_index(&self, index_name: &str) -> Result<bool, PE<GenericError>> {
Ok(self.persy_impl.exists_index(index_name))
}
/// Get a value or a group of values from a key.
///
/// # Example
///
/// ```rust
/// # use persy::{ValueMode, OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// # tx.put::<u8,u8>("my_new_index",10,10)?;
/// # let prepared = tx.prepare()?;
/// # prepared.commit()?;
/// let values = persy.get::<u8,u8>("my_new_index",&10)?;
/// for value in values {
/// //...
/// }
/// # Ok(())
/// # }
/// ```
pub fn get<K, V>(&self, index_name: &str, k: &K) -> Result<ValueIter<V>, PE<IndexOpsError>>
where
K: IndexType,
V: IndexType,
{
let index_id = self
.solve_index_id(index_name)
.map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
Ok(ValueIter::from(
self.persy_impl
.get::<K::Wrapper, V::Wrapper>(index_id, &k.clone().wrap())?,
))
}
/// Get one value or none from a key.
///
/// # Example
///
/// ```rust
/// # use persy::{ValueMode, OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// # tx.put::<u8,u8>("my_new_index",10,10)?;
/// # let prepared = tx.prepare()?;
/// # prepared.commit()?;
/// if let Some(value) = persy.one::<u8,u8>("my_new_index",&10)? {
/// //...
/// }
/// # Ok(())
/// # }
/// ```
pub fn one<K, V>(&self, index_name: &str, k: &K) -> Result<Option<V>, PE<IndexOpsError>>
where
K: IndexType,
V: IndexType,
{
Ok(self.get(index_name, k)?.next())
}
/// Browse a range of keys and values from and index.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions,ValueMode, IndexIter};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// # tx.put::<u8,u8>("my_new_index",10,10)?;
/// # let prepared = tx.prepare()?;
/// # prepared.commit()?;
/// let iter:IndexIter<u8,u8> = persy.range("my_new_index",10..12)?;
/// for (k,values) in iter {
/// for value in values {
/// //...
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub fn range<K, V, R>(&self, index_name: &str, range: R) -> Result<IndexIter<K, V>, PE<IndexOpsError>>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let index_id = self
.solve_index_id(index_name)
.map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
let rr = PersyImpl::map_index_range_bounds(range);
let (_, raw) = self.persy_impl.range(index_id, rr)?;
Ok(IndexIter::new(raw, self.persy_impl.clone()))
}
/// List all the existing segments.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("seg")?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// let segments = persy.list_segments()?;
/// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"seg".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_segments(&self) -> Result<Vec<(String, SegmentId)>, PE<GenericError>> {
Ok(self.persy_impl.list_segments())
}
/// List all the existing indexes.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8,u8>("index", ValueMode::Cluster)?;
/// let prepared = tx.prepare()?;
/// prepared.commit()?;
/// let indexes = persy.list_indexes()?;
/// let names = indexes.into_iter().map(|(name,_info)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"index".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_indexes(&self) -> Result<Vec<(String, IndexInfo)>, PE<GenericError>> {
Ok(self.persy_impl.list_indexes()?)
}
/// Create a read snapshot at the current data status.
///
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// // ... More logic
/// let snapshot = persy.snapshot()?;
/// // .. Access data from the snapshot
/// # Ok(())
/// # }
/// ```
pub fn snapshot(&self) -> Result<Snapshot, PE<GenericError>> {
Ok(Snapshot::new(self.persy_impl.clone(), self.persy_impl.snapshot()))
}
#[cfg(test)]
pub fn free_file_lock(&self) -> crate::error::PERes<()> {
self.persy_impl.free_file_lock()
}
}
#[cfg(test)]
mod tests {
use crate::{OpenOptions, ReadError, PE};
#[test]
pub fn test_read_id_out_of_file() {
let persy = OpenOptions::new().memory().unwrap();
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
let finalizer = tx.prepare().unwrap();
finalizer.commit().unwrap();
let id = crate::PersyId(crate::id::RecRef { page: 10000, pos: 10 });
let read_after = persy.read("test", &id).unwrap();
assert!(read_after.is_none());
}
#[test]
pub fn test_read_id_out_of_page() {
let persy = OpenOptions::new().memory().unwrap();
let mut tx = persy.begin().unwrap();
tx.create_segment("test").unwrap();
let id = tx.insert("test", &[0; 10]).unwrap();
let finalizer = tx.prepare().unwrap();
finalizer.commit().unwrap();
let id = crate::PersyId(crate::id::RecRef {
page: id.0.page,
pos: 2000,
});
let read_after = persy.read("test", &id);
assert!(read_after.is_err());
match read_after.err() {
Some(PE::PE(ReadError::InvalidPersyId(_))) => {}
_ => assert!(false, "wrong error type"),
}
}
}