use crate::{
address::segment_iter::SnapshotSegmentIter,
error::{GenericError, IndexError, IndexOpsError, SegmentError, PE},
id::{IndexId, SegmentId, ToIndexId, ToSegmentId},
index::{config::IndexType, iter::IndexIter, value_iter::ValueIter},
persy::{IndexInfo, PersyImpl},
snapshots::SnapshotRef,
PersyId, ReadError,
};
use std::{ops::RangeBounds, sync::Arc};
pub(crate) mod data;
#[cfg(test)]
mod tests;
/// Read snapshot at a specific point in time.
///
/// All the changes from transactions committed at the specific point in time were the snapshot was
/// create are readable from this snapshot, all subsequent transactions are ignored.
///
/// Copy of the data old data is kept on the disc, with indexing access from in memory structures,
/// on drop of the Snapshot, if there are no older snapshot all the data old by this snapshot not
/// existing anymore in the final state will be cleaned up.
///
#[derive(Clone)]
pub struct Snapshot {
snap: Arc<SnapshotInt>,
}
struct SnapshotInt {
persy_impl: Arc<PersyImpl>,
snapshot_ref: SnapshotRef,
}
impl Snapshot {
pub(crate) fn new(persy_impl: Arc<PersyImpl>, snapshot_ref: SnapshotRef) -> Snapshot {
Snapshot {
snap: Arc::new(SnapshotInt {
persy_impl,
snapshot_ref,
}),
}
}
fn solve_segment_id(&self, segment: impl ToSegmentId) -> Result<SegmentId, PE<SegmentError>> {
Ok(self
.snap
.persy_impl
.solve_segment_id_snapshot(&self.snap.snapshot_ref, segment)?)
}
fn solve_index_id(&self, index: impl ToIndexId) -> Result<IndexId, PE<IndexError>> {
Ok(self
.snap
.persy_impl
.solve_index_id_snapshot(&self.snap.snapshot_ref, index)?)
}
/// Read the record content at the point of time the snapshot was taken ignoring all following
/// committed transactions
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert("seg", &data)?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let read = snapshot.read("seg", &id)?.expect("record exists");
/// assert_eq!(data,read);
/// # Ok(())
/// # }
/// ```
pub fn read(&self, segment: impl ToSegmentId, id: &PersyId) -> Result<Option<Vec<u8>>, PE<ReadError>> {
let segment_id = self.solve_segment_id(segment).map_err(|PE::PE(e)| ReadError::from(e))?;
Ok(self
.snap
.persy_impl
.read_snap(segment_id, &id.0, &self.snap.snapshot_ref)?)
}
/// Scan for records existing at the moment of snapshot creation, ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// # tx.create_segment("seg")?;
/// let data = vec![1;20];
/// let id = tx.insert("seg", &data)?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let mut count = 0;
/// for (id,content) in snapshot.scan("seg")? {
/// println!("record size:{}",content.len());
/// count+=1;
/// }
/// assert_eq!(count,1);
/// # Ok(())
/// # }
/// ```
pub fn scan(&self, segment: impl ToSegmentId) -> Result<SnapshotSegmentIter, PE<SegmentError>> {
let segment_id = self.solve_segment_id(segment)?;
Ok(SnapshotSegmentIter::new(
self.snap
.persy_impl
.scan_snapshot(segment_id, &self.snap.snapshot_ref)?,
self.snap.persy_impl.clone(),
))
}
/// Get a value or a group of values from a key at the point the snapshot was taken ignoring
/// all following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let values = snapshot.get::<u8,u8>("my_new_index",&10)?;
/// for value in values {
/// //...
/// }
/// # Ok(())
/// # }
/// ```
pub fn get<K, V>(&self, index_name: &str, k: &K) -> Result<ValueIter<V>, PE<IndexOpsError>>
where
K: IndexType,
V: IndexType,
{
let index_id = self
.solve_index_id(index_name)
.map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
let value = self.snap.persy_impl.get_snapshot::<K::Wrapper, V::Wrapper>(
index_id,
&self.snap.snapshot_ref,
&k.clone().wrap(),
)?;
Ok(ValueIter::from(value))
}
/// Get a value or None from a key at the point the snapshot was taken ignoring
/// all following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// # let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// if let Some(value) = snapshot.one::<u8,u8>("my_new_index",&10)?{
/// //...
/// }
/// # Ok(())
/// # }
/// ```
///
pub fn one<K, V>(&self, index_name: &str, k: &K) -> Result<Option<V>, PE<IndexOpsError>>
where
K: IndexType,
V: IndexType,
{
Ok(self.get(index_name, k)?.next())
}
///
/// Browse a range of keys and values from an index at the pointing that the snapshot was created ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode, IndexIter};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// # tx.create_index::<u8,u8>("my_new_index", ValueMode::Cluster)?;
/// tx.put::<u8,u8>("my_new_index",10,10)?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let iter:IndexIter<u8,u8> = snapshot.range("my_new_index",10..12)?;
/// for (k,values) in iter {
/// for value in values {
/// //...
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub fn range<K, V, R>(&self, index_name: &str, range: R) -> Result<IndexIter<K, V>, PE<IndexOpsError>>
where
K: IndexType,
V: IndexType,
R: RangeBounds<K>,
{
let index_id = self
.solve_index_id(index_name)
.map_err(|e| PE::PE(IndexOpsError::from(e.error())))?;
let rr = PersyImpl::map_index_range_bounds(range);
let (_, raw) = self
.snap
.persy_impl
.range_snapshot(index_id, &self.snap.snapshot_ref, rr)?;
Ok(IndexIter::new(raw, self.snap.persy_impl.clone()))
}
/// List all the existing segments, at the pointing that the snapshot was created ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_segment("seg")?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let segments = snapshot.list_segments()?;
/// let names = segments.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"seg".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_segments(&self) -> Result<Vec<(String, SegmentId)>, PE<GenericError>> {
Ok(self.snap.persy_impl.list_segments_snapshot(&self.snap.snapshot_ref))
}
/// List all the existing indexes, at the pointing that the snapshot was created ignoring all
/// the following committed transactions.
///
/// # Example
///
/// ```rust
/// # use persy::{OpenOptions, ValueMode};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let persy = OpenOptions::new().memory()?;
/// let mut tx = persy.begin()?;
/// tx.create_index::<u8, u8>("idx", ValueMode::Replace)?;
/// tx.prepare()?.commit()?;
/// let snapshot = persy.snapshot()?;
/// let indexes = snapshot.list_indexes()?;
/// let names = indexes.into_iter().map(|(name,_id)|name).collect::<Vec<String>>();
/// assert!(names.contains(&"idx".to_string()));
/// # Ok(())
/// # }
/// ```
pub fn list_indexes(&self) -> Result<Vec<(String, IndexInfo)>, PE<GenericError>> {
Ok(self.snap.persy_impl.list_indexes_snapshot(&self.snap.snapshot_ref)?)
}
}