persy 1.5.0

Transactional Persistence Engine
Documentation
use crate::{
    address::{segment::segment_page_iterator::SegmentPageIterator, Address},
    allocator::Allocator,
    error::PERes,
    id::{RecRef, SegmentId},
    journal::{Journal, JournalId},
    snapshot::data::{CleanInfo, PendingClean, RecordVersion, SegmentSnapshot, SnapshotData, SnapshotEntry},
    transaction::tx_impl::TransactionImpl,
};
use std::{
    cmp::Ordering,
    collections::{hash_map::Entry, HashMap},
    sync::{Arc, Mutex, Weak},
};

pub type SnapshotId = u64;

#[derive(Default)]
pub struct InternalSnapshots {
    mapping: HashMap<RecRef, Vec<RecordVersion>>,
    active_snapshots: Vec<SnapshotData>,
    snapshot_sequence: u64,
}

impl InternalSnapshots {
    fn next_snapshot_id(&mut self) -> SnapshotId {
        let snapshot_id = self.snapshot_sequence;
        // This should be the default behavior of the u64 anyway, let's hope the compiler trows
        // this away
        if self.snapshot_sequence == u64::MAX {
            self.snapshot_sequence = 0;
        } else {
            self.snapshot_sequence += 1;
        }
        snapshot_id
    }
    fn search(&self, snapshot_id: SnapshotId) -> Result<usize, usize> {
        let snapshot_sequence = self.snapshot_sequence;
        self.active_snapshots
            .binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence))
    }

    fn acquire_last_snapshot(&mut self) -> SnapshotId {
        let last = self.snapshot_sequence - 1;
        self.acquire_snapshot(last);
        last
    }

    fn acquire_snapshot(&mut self, snapshot_id: SnapshotId) {
        if let Ok(pos) = self.search(snapshot_id) {
            if let Some(p) = self.active_snapshots.get_mut(pos) {
                p.acquire();
            } else {
                unreachable!()
            }
        } else {
            panic!("try to acquire a not existing snapshot")
        }
    }

    pub(crate) fn pending_clean(
        &mut self,
        snapshot_id: SnapshotId,
        snapshots: &Arc<Snapshots>,
    ) -> Option<Arc<PendingClean>> {
        if let Ok(pos) = self.search(snapshot_id) {
            self.active_snapshots.get_mut(pos).map(|p| p.pending_clean(snapshots))
        } else {
            None
        }
    }

    pub(crate) fn clear_from(&mut self, snapshot_id: SnapshotId) -> Option<Vec<SnapshotData>> {
        let snapshot_sequence = self.snapshot_sequence;
        if let Ok(index) = self.search(snapshot_id) {
            let size = index + 1;
            let left_off = self.active_snapshots.split_off(size);
            let mut old = std::mem::replace(&mut self.active_snapshots, left_off);
            for tx in &mut old {
                if let Some(entries) = tx.take_entries() {
                    for record in entries {
                        if let Entry::Occupied(mut v) = self.mapping.entry(*record.rec_id()) {
                            match v
                                .get()
                                .binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence))
                            {
                                Ok(index) => {
                                    v.get_mut().drain(..=index);
                                    if v.get().is_empty() {
                                        v.remove();
                                    }
                                }
                                Err(_index) => {
                                    v.remove();
                                }
                            }
                        }
                    }
                }
            }
            Some(old)
        } else {
            None
        }
    }

    pub(crate) fn read_snapshot(&mut self) -> SnapshotId {
        let snapshot_id = self.next_snapshot_id();
        let reference_count = if self.active_snapshots.is_empty() { 1 } else { 2 };
        let snapshot = SnapshotData::new(snapshot_id, reference_count);
        if let Err(index) = self.search(snapshot_id) {
            self.active_snapshots.insert(index, snapshot);
        }
        snapshot_id
    }

    pub(crate) fn fill_records(&mut self, journal_id: JournalId, snapshot_id: SnapshotId, entries: Vec<SnapshotEntry>) {
        for entry in &entries {
            let to_add = RecordVersion::new(snapshot_id, entry.case().clone());

            let snapshot_sequence = self.snapshot_sequence;
            match self.mapping.entry(*entry.rec_id()) {
                Entry::Occupied(mut v) => {
                    let vec = v.get_mut();
                    if let Err(index) = vec.binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence)) {
                        vec.insert(index, to_add);
                    }
                }
                Entry::Vacant(e) => {
                    e.insert(vec![to_add]);
                }
            }
        }

        if let Ok(index) = self.search(snapshot_id) {
            if let Some(snap) = self.active_snapshots.get_mut(index) {
                snap.fill_records(journal_id, entries);
            }
        }
    }

    pub(crate) fn fill_clean_info(&mut self, snapshot_id: SnapshotId, freed_pages: CleanInfo) {
        if let Ok(index) = self.search(snapshot_id) {
            if let Some(snap) = self.active_snapshots.get_mut(index) {
                snap.fill_clean_info(freed_pages);
            }
        }
    }

    pub(crate) fn snapshot(
        &mut self,
        entries: Vec<SnapshotEntry>,
        freed_pages: CleanInfo,
        journal_id: JournalId,
    ) -> SnapshotId {
        let snapshot_id = self.next_snapshot_id();

        let reference_count = if self.active_snapshots.is_empty() { 1 } else { 2 };
        let snapshot = SnapshotData::new(snapshot_id, reference_count);
        if let Err(index) = self.search(snapshot_id) {
            self.active_snapshots.insert(index, snapshot);
        }

        self.fill_records(journal_id, snapshot_id, entries);
        self.fill_clean_info(snapshot_id, freed_pages);
        snapshot_id
    }

    pub(crate) fn new_snapshot(&mut self) -> SnapshotId {
        let snapshot_id = self.next_snapshot_id();

        let reference_count = if self.active_snapshots.is_empty() { 1 } else { 2 };
        let snapshot = SnapshotData::new(snapshot_id, reference_count);
        if let Err(index) = self.search(snapshot_id) {
            self.active_snapshots.insert(index, snapshot);
        }
        snapshot_id
    }

    pub(crate) fn fill_snapshot_address(
        &mut self,
        snapshot_id: SnapshotId,
        entries: Vec<SnapshotEntry>,
        journal_id: JournalId,
    ) {
        self.fill_records(journal_id, snapshot_id, entries);
    }

    pub(crate) fn fill_snapshot_clean_info(&mut self, snapshot_id: SnapshotId, freed_pages: CleanInfo) {
        self.fill_clean_info(snapshot_id, freed_pages);
    }

    pub(crate) fn read(&self, snapshot_id: SnapshotId, id: &RecRef) -> Option<RecordVersion> {
        let snapshot_sequence = self.snapshot_sequence;
        if let Some(v) = self.mapping.get(id) {
            let index = match v.binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence)) {
                Ok(index) => index,
                Err(index) => index,
            };
            v.get(index).cloned()
        } else {
            None
        }
    }

    fn current_snapshot(&mut self) -> SnapshotId {
        if self.active_snapshots.is_empty() {
            let snapshot_id = self.next_snapshot_id();

            let snapshot = SnapshotData::new(snapshot_id, 1);
            if let Err(index) = self.search(snapshot_id) {
                self.active_snapshots.insert(index, snapshot);
            }
            snapshot_id
        } else {
            self.acquire_last_snapshot()
        }
    }

    pub(crate) fn solve_segment_id(&self, snapshot_id: SnapshotId, segment: &str) -> Option<SegmentId> {
        self.find_snapshot(snapshot_id)
            .and_then(|snap| snap.find_segment(segment))
    }

    fn list(&self, snapshot_id: SnapshotId) -> Vec<(String, SegmentId)> {
        self.find_snapshot(snapshot_id)
            .and_then(|snap| snap.segments_list())
            .unwrap_or_default()
    }

    fn find_snapshot(&self, snapshot_id: SnapshotId) -> Option<&SnapshotData> {
        if let Ok(index) = self.search(snapshot_id) {
            if let Some(snap) = self.active_snapshots.get(index) {
                Some(snap)
            } else {
                None
            }
        } else {
            None
        }
    }

    fn find_segment(&self, snapshot_id: SnapshotId, segment_id: SegmentId) -> Option<&SegmentSnapshot> {
        self.find_snapshot(snapshot_id)
            .and_then(|snap| snap.find_segment_snapsthot(segment_id))
    }

    pub(crate) fn fill_segments(
        &mut self,
        snapshot_id: SnapshotId,
        segs_id: HashMap<SegmentId, SegmentSnapshot>,
        segs_name: HashMap<String, SegmentSnapshot>,
    ) {
        if let Ok(index) = self.search(snapshot_id) {
            if let Some(snap) = self.active_snapshots.get_mut(index) {
                snap.fill(segs_id, segs_name);
            }
        }
    }

    pub(crate) fn release(&mut self, snapshot_id: SnapshotId) -> Option<Vec<SnapshotData>> {
        //TODO: This work fine but can cause problems if double release is called for the same id,
        //to refactor to something a bit more safe
        let mut clear_id = None;
        if let Ok(index) = self.search(snapshot_id) {
            let mut loop_index = index;
            while let Some(snap) = self.active_snapshots.get_mut(loop_index) {
                if !snap.release() {
                    break;
                }
                clear_id = Some(snap.id());
                loop_index += 1;
            }
        }
        if let Some(c_id) = clear_id {
            self.clear_from(c_id)
        } else {
            None
        }
    }

    pub fn scan(&self, snapshot_id: SnapshotId, segment_id: SegmentId) -> Option<SegmentPageIterator> {
        self.find_segment(snapshot_id, segment_id)
            .map(|sd| SegmentPageIterator::snapshot(sd.first_page()))
    }

    pub fn solve_segment_name(&self, snapshot_id: SnapshotId, segment_id: SegmentId) -> Option<String> {
        self.find_segment(snapshot_id, segment_id)
            .map(|sd| sd.name().to_owned())
    }

    #[cfg(test)]
    pub(crate) fn active_snapshots(&self) -> usize {
        self.active_snapshots.len()
    }
    #[cfg(test)]
    pub(crate) fn mapping_count(&self) -> usize {
        self.mapping.len()
    }
}

pub struct Snapshots {
    lock: Mutex<InternalSnapshots>,
    allocator: Arc<Allocator>,
    journal: Arc<Journal>,
    address: Arc<Address>,
}

pub fn search(value: u64, value1: u64, top: u64) -> Ordering {
    if value > top {
        if value1 > top {
            value.cmp(&value1)
        } else {
            Ordering::Less
        }
    } else if value1 > top {
        Ordering::Greater
    } else {
        value.cmp(&value1)
    }
}
pub(crate) fn to_mapping(
    segments: &[SegmentSnapshot],
) -> (HashMap<SegmentId, SegmentSnapshot>, HashMap<String, SegmentSnapshot>) {
    let mut segments_id = HashMap::new();
    let mut segments_name = HashMap::new();
    for segment in segments {
        segments_id.insert(segment.segment_id(), segment.clone());
        segments_name.insert(segment.name().to_owned(), segment.clone());
    }
    (segments_id, segments_name)
}

pub struct SnapshotRef {
    ops: Weak<Snapshots>,
    id: Option<SnapshotId>,
}

impl SnapshotRef {
    fn new(ops: &Arc<Snapshots>, id: SnapshotId) -> Self {
        Self {
            ops: Arc::downgrade(ops),
            id: Some(id),
        }
    }

    #[cfg(test)]
    pub(crate) fn leak(&mut self) {
        let _ = self.id.take();
    }

    pub fn id(&self) -> SnapshotId {
        self.id.unwrap()
    }
}
impl Clone for SnapshotRef {
    fn clone(&self) -> Self {
        if let Some(ops) = self.ops.upgrade() {
            ops.acquire(self.id.unwrap());
            Self {
                ops: Arc::downgrade(&ops),
                id: self.id,
            }
        } else {
            panic!("ref outlived Persy instance");
        }
    }
}
impl Drop for SnapshotRef {
    fn drop(&mut self) {
        if let Some(ops) = self.ops.upgrade() {
            if let Some(id) = self.id.take() {
                ops.release(id).expect("snapshot release do not fail");
            }
        }
    }
}

impl Snapshots {
    pub fn new(allocator: &Arc<Allocator>, journal: &Arc<Journal>, address: &Arc<Address>) -> Self {
        Self {
            lock: Default::default(),
            allocator: allocator.clone(),
            journal: journal.clone(),
            address: address.clone(),
        }
    }
    pub fn acquire(&self, snapshot_id: SnapshotId) {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        lock.acquire_snapshot(snapshot_id);
    }

    pub fn current_snapshot(self: &Arc<Self>) -> SnapshotRef {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        SnapshotRef::new(self, lock.current_snapshot())
    }

    pub fn read_snapshot(self: &Arc<Self>) -> SnapshotRef {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        SnapshotRef::new(self, lock.read_snapshot())
    }

    pub fn snapshot(
        self: &Arc<Self>,
        entries: Vec<SnapshotEntry>,
        freed_pages: CleanInfo,
        journal_id: JournalId,
    ) -> SnapshotRef {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        SnapshotRef::new(self, lock.snapshot(entries, freed_pages, journal_id))
    }

    pub fn fill_snapshot_address(
        self: &Arc<Self>,
        snapshot_ref: &SnapshotRef,
        entries: Vec<SnapshotEntry>,
        journal_id: JournalId,
    ) {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        lock.fill_snapshot_address(snapshot_ref.id(), entries, journal_id)
    }

    pub fn fill_snapshot_clean_info(self: &Arc<Self>, snapshot_ref: &SnapshotRef, freed_pages: CleanInfo) {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        lock.fill_snapshot_clean_info(snapshot_ref.id(), freed_pages)
    }

    pub fn new_snapshot(self: &Arc<Self>) -> SnapshotRef {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        SnapshotRef::new(self, lock.new_snapshot())
    }

    pub fn fill_segments(&self, snapshot_id: &SnapshotRef, segments: &[SegmentSnapshot]) {
        let (segments_id, segments_name) = to_mapping(segments);
        let mut lock = self.lock.lock().expect("lock not poisoned");
        lock.fill_segments(snapshot_id.id(), segments_id, segments_name);
    }

    pub fn solve_segment_id(&self, snapshot_id: &SnapshotRef, segment: &str) -> Option<SegmentId> {
        let lock = self.lock.lock().expect("lock not poisoned");
        lock.solve_segment_id(snapshot_id.id(), segment)
    }

    pub fn solve_segment_name(&self, snapshot_id: &SnapshotRef, segment_id: SegmentId) -> Option<String> {
        self.lock
            .lock()
            .expect("lock not poisoned")
            .solve_segment_name(snapshot_id.id(), segment_id)
    }

    pub fn scan(&self, snapshot_id: &SnapshotRef, segment_id: SegmentId) -> Option<SegmentPageIterator> {
        self.lock
            .lock()
            .expect("lock not poisoned")
            .scan(snapshot_id.id(), segment_id)
    }

    pub fn list(&self, snapshot_id: &SnapshotRef) -> Vec<(String, SegmentId)> {
        let lock = self.lock.lock().expect("lock not poisoned");
        lock.list(snapshot_id.id())
    }

    pub fn read(&self, snapshot_id: &SnapshotRef, id: &RecRef) -> Option<RecordVersion> {
        let lock = self.lock.lock().expect("lock not poisoned");
        lock.read(snapshot_id.id(), id)
    }

    pub fn free_resources(
        self: &Arc<Self>,
        journal_id: &Option<JournalId>,
        freed_pages: &Option<CleanInfo>,
    ) -> PERes<()> {
        if let Some(clean_info) = freed_pages {
            self.allocator.free_pages(clean_info.freed_pages())?;
            TransactionImpl::free_address_structures_impl(
                &self.journal,
                self,
                &self.address,
                &self.allocator,
                clean_info.segment_pages(),
            )?;
        }
        if let Some(ji) = journal_id {
            self.journal.finished_to_clean(&[ji.clone()])?;
        }
        Ok(())
    }

    pub(crate) fn pending_clean(self: &Arc<Self>, snapshot_id: SnapshotId) -> Option<Arc<PendingClean>> {
        let mut lock = self.lock.lock().expect("lock not poisoned");
        lock.pending_clean(snapshot_id, self)
    }

    pub(crate) fn release(self: &Arc<Self>, snapshot_id: SnapshotId) -> PERes<()> {
        let to_release = { self.lock.lock().expect("lock not poisoned").release(snapshot_id) };
        if let Some(to_free_vec) = to_release {
            for mut to_free_item in to_free_vec {
                let _ = to_free_item.pending_clean(self);
            }
        }
        Ok(())
    }
}