persy 1.5.0

Transactional Persistence Engine
Documentation
use crate::{
    address::segment_iter::SnapshotSegmentIter,
    error::{GenericError, IndexError, IndexOpsError, SegmentError, PE},
    id::{IndexId, SegmentId, ToIndexId, ToSegmentId},
    index::{config::IndexType, iter::IndexIter, value_iter::ValueIter},
    persy::{IndexInfo, PersyImpl},
    snapshots::SnapshotRef,
    PersyId, ReadError,
};
use std::{ops::RangeBounds, sync::Arc};

pub(crate) mod data;

#[cfg(test)]
mod tests;

/// Read snapshot at a specific point in time.
///
/// All the changes from transactions committed at the specific point in time were the snapshot was
/// create are readable from this snapshot, all subsequent transactions are ignored.
///
/// Copy of the data old data is kept on the disc, with indexing access from in memory structures,
/// on drop of the Snapshot, if there are no older snapshot all the data old by this snapshot not
/// existing anymore in the final state will be cleaned up.
///
#[derive(Clone)]
pub struct Snapshot {
    snap: Arc<SnapshotInt>,
}
struct SnapshotInt {
    persy_impl: Arc<PersyImpl>,
    snapshot_ref: SnapshotRef,
}

impl Snapshot {
    pub(crate) fn new(persy_impl: Arc<PersyImpl>, snapshot_ref: SnapshotRef) -> Snapshot {
        Snapshot {
            snap: Arc::new(SnapshotInt {
                persy_impl,
                snapshot_ref,
            }),
        }
    }

    fn solve_segment_id(&self, segment: impl ToSegmentId) -> Result<SegmentId, PE<SegmentError>> {
        Ok(self
            .snap
            .persy_impl
            .solve_segment_id_snapshot(&self.snap.snapshot_ref, segment)?)
    }

    fn solve_index_id(&self, index: impl ToIndexId) -> Result<IndexId, PE<IndexError>> {
        Ok(self
            .snap
            .persy_impl
            .solve_index_id_snapshot(&self.snap.snapshot_ref, index)?)
    }

    /// Read the record content at the point of time the snapshot was taken ignoring all following
    /// committed transactions
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// # tx.create_segment("seg")?;
    /// let data = vec![1;20];
    /// let id = tx.insert("seg", &data)?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// let read = snapshot.read("seg", &id)?.expect("record exists");
    /// assert_eq!(data,read);
    /// # Ok(())
    /// # }
    /// ```
    pub fn read(&self, segment: impl ToSegmentId, id: &PersyId) -> Result<Option<Vec<u8>>, PE<ReadError>> {
        let segment_id = self.solve_segment_id(segment).map_err(|PE::PE(e)| ReadError::from(e))?;
        Ok(self
            .snap
            .persy_impl
            .read_snap(segment_id, &id.0, &self.snap.snapshot_ref)?)
    }

    /// Scan for records existing at the moment of snapshot creation, ignoring all
    /// the following committed transactions.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// # tx.create_segment("seg")?;
    /// let data = vec![1;20];
    /// let id = tx.insert("seg", &data)?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// let mut count = 0;
    /// for (id,content) in snapshot.scan("seg")? {
    ///     println!("record size:{}",content.len());
    ///     count+=1;
    /// }
    /// assert_eq!(count,1);
    /// # Ok(())
    /// # }
    /// ```
    pub fn scan(&self, segment: impl ToSegmentId) -> Result<SnapshotSegmentIter, PE<SegmentError>> {
        let segment_id = self.solve_segment_id(segment)?;
        Ok(SnapshotSegmentIter::new(
            self.snap
                .persy_impl
                .scan_snapshot(segment_id, &self.snap.snapshot_ref)?,
            self.snap.persy_impl.clone(),
        ))
    }

    /// Get a value or a group of values from a key at the point the snapshot was taken ignoring
    /// all following committed transactions.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// # let mut tx = persy.begin()?;
    /// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// tx.put::<u8,u8>("my_new_index",10,10)?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// let values = snapshot.get::<u8,u8>("my_new_index",&10)?;
    /// for value in values {
    ///  //...
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn get<K, V>(&self, index_name: &str, k: &K) -> Result<ValueIter<V>, PE<IndexOpsError>>
    where
        K: IndexType,
        V: IndexType,
    {
        let index_id = self
            .solve_index_id(index_name)
            .map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
        let value = self.snap.persy_impl.get_snapshot::<K::Wrapper, V::Wrapper>(
            index_id,
            &self.snap.snapshot_ref,
            &k.clone().wrap(),
        )?;
        Ok(ValueIter::from(value))
    }

    /// Get a value or None from a key at the point the snapshot was taken ignoring
    /// all following committed transactions.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// # let mut tx = persy.begin()?;
    /// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// tx.put::<u8,u8>("my_new_index",10,10)?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// if let Some(value) = snapshot.one::<u8,u8>("my_new_index",&10)?{
    ///  //...
    /// }
    /// # Ok(())
    /// # }
    /// ```
    ///
    pub fn one<K, V>(&self, index_name: &str, k: &K) -> Result<Option<V>, PE<IndexOpsError>>
    where
        K: IndexType,
        V: IndexType,
    {
        Ok(self.get(index_name, k)?.next())
    }

    ///
    /// Browse a range of keys and values from an index at the pointing that the snapshot was created ignoring all
    /// the following committed transactions.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode, IndexIter};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
    /// tx.put::<u8,u8>("my_new_index",10,10)?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// let iter:IndexIter<u8,u8> = snapshot.range("my_new_index",10..12)?;
    /// for (k,values) in iter  {
    ///     for value in values {
    ///         //...
    ///     }
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn range<K, V, R>(&self, index_name: &str, range: R) -> Result<IndexIter<K, V>, PE<IndexOpsError>>
    where
        K: IndexType,
        V: IndexType,
        R: RangeBounds<K>,
    {
        let index_id = self
            .solve_index_id(index_name)
            .map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
        let rr = PersyImpl::map_index_range_bounds(range);
        let (_, raw) = self
            .snap
            .persy_impl
            .range_snapshot(index_id, &self.snap.snapshot_ref, rr)?;
        Ok(IndexIter::new(raw, self.snap.persy_impl.clone()))
    }

    /// List all the existing segments, at the pointing that the snapshot was created ignoring all
    /// the following committed transactions.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_segment("seg")?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// let segments = snapshot.list_segments()?;
    /// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
    /// assert!(names.contains(&"seg".to_string()));
    /// # Ok(())
    /// # }
    /// ```
    pub fn list_segments(&self) -> Result<Vec<(String, SegmentId)>, PE<GenericError>> {
        Ok(self.snap.persy_impl.list_segments_snapshot(&self.snap.snapshot_ref))
    }

    /// List all the existing indexes, at the pointing that the snapshot was created ignoring all
    /// the following committed transactions.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use persy::{OpenOptions, ValueMode};
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let persy = OpenOptions::new().memory()?;
    /// let mut tx = persy.begin()?;
    /// tx.create_index::<u8, u8>("idx", ValueMode::Replace)?;
    /// tx.prepare()?.commit()?;
    /// let snapshot = persy.snapshot()?;
    /// let indexes = snapshot.list_indexes()?;
    /// let names = indexes.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
    /// assert!(names.contains(&"idx".to_string()));
    /// # Ok(())
    /// # }
    /// ```
    pub fn list_indexes(&self) -> Result<Vec<(String, IndexInfo)>, PE<GenericError>> {
        Ok(self.snap.persy_impl.list_indexes_snapshot(&self.snap.snapshot_ref)?)
    }
}