persy 1.5.0

Transactional Persistence Engine
Documentation
//! # Persy - Transactional Persistence Engine
//!
//! Simple single file durable, isolated, consistent, and atomic persistence engine based on copy on write.
//! It guarantee the persistence of data even in case of crash through a in file write ahead log,
//! and provide simple rust APIs for store simple bytes or value associated (index) data,
//! provide also memory management logic to keep the file size in check.
//!
//! # Example
//!
//! ```rust
//! use persy::{Persy,Config};
//! //...
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! Persy::create("./target/data.persy")?;
//! let persy = Persy::open("./target/data.persy",Config::new())?;
//! let mut tx = persy.begin()?;
//! tx.create_segment("seg")?;
//! let data = vec![1;20];
//! tx.insert("seg", &data)?;
//! let prepared = tx.prepare()?;
//! prepared.commit()?;
//! for (_id,content) in persy.scan("seg")? {
//!     assert_eq!(content[0], 1);
//!     //....
//! }
//! # std::fs::remove_file("./target/data.persy")?;
//! # Ok(())
//! # }
//! ```
//!
//! # Example Index
//!
//! ```rust
//! use persy::{Persy,Config, ValueMode};
//! //...
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! Persy::create("./target/index_data.persy")?;
//! let persy = Persy::open("./target/index_data.persy",Config::new())?;
//! let mut tx = persy.begin()?;
//! tx.create_index::<i32,i32>("index", ValueMode::Cluster)?;
//! tx.put("index", 10, 20)?;
//! let prepared = tx.prepare()?;
//! prepared.commit()?;
//! for (key,values) in persy.range::<i32,i32,_>("index", ..)? {
//!     assert_eq!(key, 10);
//!     assert_eq!(values.into_iter().collect::<Vec<_>>(), vec![20]);
//!     //....
//! }
//! # std::fs::remove_file("./target/index_data.persy")?;
//! # Ok(())
//! # }
//! ```
//!

mod address;
mod allocator;
mod config;
mod device;
mod error;
mod flush_checksum;
mod id;
mod index;
mod io;
mod journal;
mod locks;
mod open_options;
mod persy;
mod recover;
mod snapshot;
mod snapshots;
mod transaction;

#[cfg(feature = "background_ops")]
mod background;

#[cfg(feature = "experimental_inspect")]
pub mod inspect;

use crate::persy::PersyImpl;
pub use crate::{
    address::segment_iter::{SegmentIter, SnapshotSegmentIter, TxSegmentIter},
    config::{Config, TransactionConfig, TxStrategy},
    error::{
        BeginTransactionError, CreateError, CreateIndexError, CreateSegmentError, DeleteError, DropIndexError,
        DropSegmentError, GenericError, IndexChangeError, IndexError, IndexOpsError, IndexPutError, InsertError,
        OpenError, OpenMemoryError, PersyError, PrepareError, ReadError, SegmentError, UpdateError, PE,
    },
    id::{IndexId, PersyId, SegmentId, ToIndexId, ToSegmentId},
    index::{
        bytevec::ByteVec,
        config::{IndexType, IndexTypeId, ValueMode},
        iter::{IndexIter, TxIndexIter},
        value_iter::ValueIter,
    },
    open_options::OpenOptions,
    persy::IndexInfo,
    recover::{Recover, RecoverStatus},
    snapshot::Snapshot,
    transaction::{Transaction, TransactionFinalize},
};

use std::{
    fs::{self, File},
    ops::RangeBounds,
    path::Path,
    sync::Arc,
};

/// Custom identifier to track the transaction in the recover phase
pub type TransactionId = Vec<u8>;

/// Main structure to operate persy storage files
///
#[derive(Clone)]
pub struct Persy {
    persy_impl: Arc<PersyImpl>,
}

impl Persy {
    /// Create a new database file.
    ///
    /// # Errors
    ///
    /// Fails if the file already exists.
    ///
    /// # Example
    ///
    /// ```rust
    /// use std::path::Path;
    /// use persy::{Persy, Config};
    ///
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let path = Path::new("target/created.db");
    /// Persy::create(path)?;
    /// let persy = Persy::open(path, Config::new())?;
    /// # std::fs::remove_file("target/created.db")?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn create<P: AsRef<Path>>(path: P) -> Result<(), PE<CreateError>> {
        Ok(PersyImpl::create(path.as_ref())?)
    }

    /// Create a new database file.
    ///
    /// # Errors
    ///
    /// Fails if the file already exists.
    ///
    pub fn create_from_file(file: File) -> Result<(), PE<CreateError>> {
        Ok(PersyImpl::create_from_file(file)?)
    }

    /// Open a database file.
    ///
    /// The file should have been created with [`Persy::create`]
    ///
    /// # Errors
    ///
    /// Fails if the file does not exist.
    ///
    /// [`Persy::create`]: struct.Persy.html#method.create
    pub fn open<P: AsRef<Path>>(path: P, config: Config) -> Result<Persy, PE<OpenError>> {
        Persy::open_with_recover(path, config, |_| true)
    }

    /// Open a database file from a path with a recover function.
    ///
    /// The file should have been created with [`Persy::create`]
    ///
    /// # Errors
    ///
    /// Fails if the file does not exist.
    ///
    /// [`Persy::create`]: struct.Persy.html#method.create
    pub fn open_with_recover<P: AsRef<Path>, C>(path: P, config: Config, recover: C) -> Result<Persy, PE<OpenError>>
    where
        C: Fn(&TransactionId) -> bool,
    {
        let f = fs::OpenOptions::new().write(true).read(true).open(path)?;
        Persy::open_from_file_with_recover(f, config, recover)
    }

    /// Open a database file from a path and return a recover structure that allow to select the
    /// transactions to commit and recover them.
    ///
    /// The file should have been created with [`Persy::create`]
    ///
    /// # Errors
    ///
    /// Fails if the file does not exist.
    ///
    /// [`Persy::create`]: struct.Persy.html#method.create
    pub fn recover<P: AsRef<Path>>(path: P, config: Config) -> Result<Recover, PE<OpenError>> {
        let f = fs::OpenOptions::new().write(true).read(true).open(path)?;
        let (persy_impl, recover) = PersyImpl::open_recover(f, config)?;
        Ok(Recover::new(recover, Arc::new(persy_impl)))
    }

    /// Open a database file from a direct file handle.
    ///
    /// The file should have been created with [`Persy::create`]
    ///
    /// # Errors
    ///
    /// Fails if the file does not exist.
    ///
    /// [`Persy::create`]: struct.Persy.html#method.create
    pub fn open_from_file(path: File, config: Config) -> Result<Persy, PE<OpenError>> {
        let (persy_impl, recover) = PersyImpl::open_recover(path, config)?;
        persy_impl.final_recover(recover)?;
        Ok(Persy {
            persy_impl: Arc::new(persy_impl),
        })
    }

    /// Open a database file, from a direct file handle and a transaction recover function.
    ///
    /// The file should have been created with [`Persy::create`]
    ///
    /// # Errors
    ///
    /// Fails if the file does not exist.
    ///
    /// [`Persy::create`]: struct.Persy.html#method.create
    pub fn open_from_file_with_recover<C>(file: File, config: Config, recover: C) -> Result<Persy, PE<OpenError>>
    where
        C: Fn(&TransactionId) -> bool,
    {
        let (persy_impl, mut recov) = PersyImpl::open_recover(file, config)?;
        recov.apply(recover)?;
        persy_impl.final_recover(recov)?;
        Ok(Persy {
            persy_impl: Arc::new(persy_impl),
        })
    }

    /// Open an existing database or create it if it does not exist yet,
    /// calling the `prepare` function just after the creation.
    ///
    /// # Example
    ///
    /// ```rust
    /// use std::path::Path;
    /// use persy::{Persy, Config, PersyId, ValueMode};
    ///
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let path = Path::new("target/open_or_create.db");
    /// let config = Config::new();
    ///
    /// let persy = Persy::open_or_create_with(path, config, |persy| {
    ///     // this closure is only called on database creation
    ///     let mut tx = persy.begin()?;
    ///     tx.create_segment("data")?;
    ///     tx.create_index::<u64, PersyId>("index", ValueMode::Replace)?;
    ///     let prepared = tx.prepare()?;
    ///     prepared.commit()?;
    ///     println!("Segment and Index successfully created");
    ///     Ok(())
    /// })?;
    /// # std::fs::remove_file("target/open_or_create.db")?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn open_or_create_with<P, F>(path: P, config: Config, prepare: F) -> Result<Persy, PE<OpenError>>
    where
        P: AsRef<Path>,
        F: FnOnce(&Persy) -> Result<(), Box<dyn std::error::Error>> + 'static,
    {
        let path = path.as_ref();
        let persy;

        if !path.exists() {
            Persy::create(path).map_err(|e| PE::PE(OpenError::from(e.error())))?;
            persy = Persy::open(path, config)?;
            prepare(&persy).map_err(|e| OpenError::InitError(format!("{}", e)))?;
        } else {
            persy = Persy::open(path, config)?;
        }

        Ok(persy)
    }

    /// Begin a new transaction.
    ///
    /// The transaction isolation level is 'read_committed'.
    /// for commit call [`prepare`] and [`commit`]
    ///
    /// [`prepare`]:struct.Transaction.html#method.prepare
    /// [`commit`]:struct.TransactionFinalize.html#method.commit
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// // ...
    /// tx.prepare()?.commit()?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn begin(&self) -> Result<Transaction, PE<BeginTransactionError>> {
        self.begin_with(TransactionConfig::new())
    }

    /// Begin a new transaction specifying parameters for the transaction.
    ///
    /// The transaction isolation level is 'read_committed'.
    /// for commit call [`prepare`] and [`commit`]
    ///
    /// [`prepare`]:struct.Transaction.html#method.prepare
    /// [`commit`]:struct.TransactionFinalize.html#method.commit
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, TransactionConfig};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let tx_id = vec![2;2];
    /// let mut tx = persy.begin_with(TransactionConfig::new().set_transaction_id(tx_id))?;
    /// // ...
    /// tx.prepare()?.commit()?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn begin_with(&self, config: TransactionConfig) -> Result<Transaction, PE<BeginTransactionError>> {
        Ok(Transaction {
            tx: Some(self.persy_impl.begin_with(config)?),
            persy_impl: self.persy_impl.clone(),
        })
    }

    /// Check if a segment already exist in the storage
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_segment("my_new_segment")?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// assert!(persy.exists_segment("my_new_segment")?);
    /// # Ok(())
    /// # }
    /// ```
    pub fn exists_segment(&self, segment: &str) -> Result<bool, PE<GenericError>> {
        Ok(self.persy_impl.exists_segment(segment))
    }

    /// Resolves the segment to a SegmentId
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_segment("my_new_segment")?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// let segment_id = persy.solve_segment_id("my_new_segment")?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> Result<SegmentId, PE<SegmentError>> {
        Ok(self.persy_impl.solve_segment_id(segment)?)
    }

    /// Resolves the index to a IndexId,
    /// this has no public use as today, may be used in future.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// let index_id = persy.solve_index_id("my_new_index")?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn solve_index_id(&self, index: impl ToIndexId) -> Result<IndexId, PE<IndexError>> {
        //TODO: find a better name and make this public again
        Ok(self.persy_impl.solve_index_id(index)?)
    }

    /// Read the record content from persistent data.
    ///
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// # tx.create_segment("seg")?;
    /// let data = vec![1;20];
    /// let id = tx.insert("seg", &data)?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// let read = persy.read("seg", &id)?.expect("record exits");
    /// assert_eq!(data,read);
    /// # Ok(())
    /// # }
    /// ```
    pub fn read(&self, segment: impl ToSegmentId, id: &PersyId) -> Result<Option<Vec<u8>>, PE<ReadError>> {
        let segment_id = self
            .solve_segment_id(segment)
            .map_err(|PE::PE(e)| PE::PE(ReadError::from(e)))?;
        Ok(self.persy_impl.read(segment_id, &id.0)?)
    }

    /// Scan a segment for persistent records
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{Persy,Config};
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// # tx.create_segment("seg")?;
    /// let data = vec![1;20];
    /// let id = tx.insert("seg", &data)?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// let mut count = 0;
    /// for (id,content) in persy.scan("seg")? {
    ///     println!("record size:{}",content.len());
    ///     count+=1;
    /// }
    /// assert_eq!(count,1);
    /// # Ok(())
    /// # }
    /// ```
    pub fn scan(&self, segment: impl ToSegmentId) -> Result<SegmentIter, PE<SegmentError>> {
        let segment_id = self.solve_segment_id(segment)?;
        Ok(SegmentIter::new(
            self.persy_impl.scan(segment_id)?,
            self.persy_impl.clone(),
        ))
    }

    /// Check if a segment already exist in the storage
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_index::<u8,u8>("my_new_index", ValueMode::Replace)?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// assert!(persy.exists_index("my_new_index")?);
    /// # Ok(())
    /// # }
    /// ```
    pub fn exists_index(&self, index_name: &str) -> Result<bool, PE<GenericError>> {
        Ok(self.persy_impl.exists_index(index_name))
    }

    /// Get a value or a group of values from a key.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{ValueMode, OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// # let mut tx = persy.begin()?;
    /// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// # tx.put::<u8,u8>("my_new_index",10,10)?;
    /// # let prepared = tx.prepare()?;
    /// # prepared.commit()?;
    /// let values = persy.get::<u8,u8>("my_new_index",&10)?;
    /// for value in values {
    ///     //...
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn get<K, V>(&self, index_name: &str, k: &K) -> Result<ValueIter<V>, PE<IndexOpsError>>
    where
        K: IndexType,
        V: IndexType,
    {
        let index_id = self
            .solve_index_id(index_name)
            .map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
        Ok(ValueIter::from(
            self.persy_impl
                .get::<K::Wrapper, V::Wrapper>(index_id, &k.clone().wrap())?,
        ))
    }

    /// Get one value or none from a key.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{ValueMode, OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// # let mut tx = persy.begin()?;
    /// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// # tx.put::<u8,u8>("my_new_index",10,10)?;
    /// # let prepared = tx.prepare()?;
    /// # prepared.commit()?;
    /// if let Some(value) = persy.one::<u8,u8>("my_new_index",&10)? {
    ///     //...
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn one<K, V>(&self, index_name: &str, k: &K) -> Result<Option<V>, PE<IndexOpsError>>
    where
        K: IndexType,
        V: IndexType,
    {
        Ok(self.get(index_name, k)?.next())
    }

    /// Browse a range of keys and values from and index.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions,ValueMode, IndexIter};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// # let mut tx = persy.begin()?;
    /// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// # tx.put::<u8,u8>("my_new_index",10,10)?;
    /// # let prepared = tx.prepare()?;
    /// # prepared.commit()?;
    /// let iter:IndexIter<u8,u8> = persy.range("my_new_index",10..12)?;
    /// for (k,values) in iter  {
    ///     for value in values {
    ///         //...
    ///     }
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn range<K, V, R>(&self, index_name: &str, range: R) -> Result<IndexIter<K, V>, PE<IndexOpsError>>
    where
        K: IndexType,
        V: IndexType,
        R: RangeBounds<K>,
    {
        let index_id = self
            .solve_index_id(index_name)
            .map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
        let rr = PersyImpl::map_index_range_bounds(range);
        let (_, raw) = self.persy_impl.range(index_id, rr)?;
        Ok(IndexIter::new(raw, self.persy_impl.clone()))
    }

    /// List all the existing segments.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_segment("seg")?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// let segments = persy.list_segments()?;
    /// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
    /// assert!(names.contains(&"seg".to_string()));
    /// # Ok(())
    /// # }
    /// ```
    pub fn list_segments(&self) -> Result<Vec<(String, SegmentId)>, PE<GenericError>> {
        Ok(self.persy_impl.list_segments())
    }

    /// List all the existing indexes.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_index::<u8,u8>("index", ValueMode::Cluster)?;
    /// let prepared = tx.prepare()?;
    /// prepared.commit()?;
    /// let indexes = persy.list_indexes()?;
    /// let names = indexes.into_iter().map(|(name,_info)|name).collect::<Vec<String>>();
    /// assert!(names.contains(&"index".to_string()));
    /// # Ok(())
    /// # }
    /// ```
    pub fn list_indexes(&self) -> Result<Vec<(String, IndexInfo)>, PE<GenericError>> {
        Ok(self.persy_impl.list_indexes()?)
    }

    ///  Create a read snapshot at the current data status.
    ///
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// // ... More logic
    /// let snapshot = persy.snapshot()?;
    /// // .. Access data from the snapshot
    /// # Ok(())
    /// # }
    /// ```
    pub fn snapshot(&self) -> Result<Snapshot, PE<GenericError>> {
        Ok(Snapshot::new(self.persy_impl.clone(), self.persy_impl.snapshot()))
    }

    #[cfg(test)]
    pub fn free_file_lock(&self) -> crate::error::PERes<()> {
        self.persy_impl.free_file_lock()
    }
}

#[cfg(test)]
mod tests {
    use crate::{OpenOptions, ReadError, PE};

    #[test]
    pub fn test_read_id_out_of_file() {
        let persy = OpenOptions::new().memory().unwrap();
        let mut tx = persy.begin().unwrap();
        tx.create_segment("test").unwrap();
        let finalizer = tx.prepare().unwrap();
        finalizer.commit().unwrap();
        let id = crate::PersyId(crate::id::RecRef { page: 10000, pos: 10 });
        let read_after = persy.read("test", &id).unwrap();
        assert!(read_after.is_none());
    }

    #[test]
    pub fn test_read_id_out_of_page() {
        let persy = OpenOptions::new().memory().unwrap();
        let mut tx = persy.begin().unwrap();
        tx.create_segment("test").unwrap();
        let id = tx.insert("test", &[0; 10]).unwrap();
        let finalizer = tx.prepare().unwrap();
        finalizer.commit().unwrap();
        let id = crate::PersyId(crate::id::RecRef {
            page: id.0.page,
            pos: 2000,
        });
        let read_after = persy.read("test", &id);
        assert!(read_after.is_err());
        match read_after.err() {
            Some(PE::PE(ReadError::InvalidPersyId(_))) => {}
            _ => assert!(false, "wrong error type"),
        }
    }
}