persy 1.5.0

Transactional Persistence Engine
Documentation
use crate::{
    address::segment::{
        segment_page_iterator::SegmentPageIterator, AllocatedSegmentPage, Segment, SegmentPage, SegmentPageRead,
        Segments,
    },
    allocator::Allocator,
    config::Config,
    device::{Page, PageOps},
    error::{PERes, ReadError, SegmentError, TimeoutError},
    id::{PersyId, RecRef, SegmentId},
    index::config::{is_index_name_data, is_index_name_meta},
    journal::records::{DeleteRecord, InsertRecord, NewSegmentPage, RollbackPage, UpdateRecord},
    locks::{LockManager, RwLockManager},
    transaction::tx_impl::{CheckRecord, Locks, SegmentOperation},
    PrepareError,
};
use std::{
    collections::{hash_map::Entry, HashMap, HashSet},
    sync::{Arc, RwLock},
    time::Duration,
};

pub mod record_scanner;
pub mod segment;
pub mod segment_iter;
#[cfg(test)]
mod tests;

pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; // 2^6
pub const ADDRESS_PAGE_EXP: u8 = 10; // 2^10
pub const FLAG_EXISTS: u8 = 0b000_0001;
pub const FLAG_DELETED: u8 = 0b000_0010;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 24;
pub const SEGMENT_DATA_OFFSET: u32 = 26;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2; // Pointer to data page + flags + version management (not yet used)

pub struct OldRecordInfo {
    pub recref: RecRef,
    pub segment: SegmentId,
    pub record_page: u64,
    pub version: u16,
}

impl OldRecordInfo {
    fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
        OldRecordInfo {
            recref: *recref,
            segment,
            record_page,
            version,
        }
    }
}

/// Address segment keep the basic addressing of the data in the data segment for a specific
/// data block
pub struct Address {
    allocator: Arc<Allocator>,
    record_locks: LockManager<RecRef>,
    create_segment_locks: LockManager<String>,
    segment_locks: RwLockManager<SegmentId>,
    segments: RwLock<Segments>,
}

impl Address {
    pub fn new(all: &Arc<Allocator>, _config: &Arc<Config>, page: u64) -> PERes<Address> {
        let segments = Segments::new(page, all)?;
        Ok(Address {
            allocator: all.clone(),
            record_locks: Default::default(),
            create_segment_locks: Default::default(),
            segment_locks: Default::default(),
            segments: RwLock::new(segments),
        })
    }

    pub fn init(all: &Allocator) -> PERes<u64> {
        let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
        let page_index = page.get_index();
        Segments::init(page, all)?;
        Ok(page_index)
    }

    pub fn scan_segment(&self, segment: Option<&Segment>) -> Result<SegmentPageIterator, SegmentError> {
        if let Some(segment) = segment {
            Ok(SegmentPageIterator::new(segment.first_page))
        } else {
            Err(SegmentError::SegmentNotFound)
        }
    }

    pub fn scan(&self, segment: SegmentId) -> Result<SegmentPageIterator, SegmentError> {
        let segments = self.segments.read().expect("lock not poisoned");
        if let Some(segment) = segments.segment_by_id(segment) {
            Ok(SegmentPageIterator::new(segment.first_page))
        } else {
            Err(SegmentError::SegmentNotFound)
        }
    }

    pub fn scan_page_all(&self, cur_page: u64) -> PERes<(u64, Vec<(u32, bool)>)> {
        // THIS IS ONLY FOR LOCK PROTECTION
        let _lock = self.segments.read().expect("lock not poisoned");
        let mut page = self.allocator.load_page(cur_page)?;
        Ok(page.segment_scan_all_entries())
    }

    pub fn allocate_temp_seg(
        &self,
        segment: Option<&mut Segment>,
    ) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
        if let Some(found) = segment {
            Ok(found.allocate_internal(&self.allocator)?)
        } else {
            Err(SegmentError::SegmentNotFound)
        }
    }

    pub fn create_temp_segment(&self, segment: &str) -> PERes<Segment> {
        self.segments
            .write()
            .expect("lock not poisoned")
            .create_temp_segment(&self.allocator, segment)
    }

    pub fn allocate(&self, segment: SegmentId) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
        if let Some(found) = self
            .segments
            .write()
            .expect("lock not poisoned")
            .segments
            .get_mut(&segment)
        {
            Ok(found.allocate_internal(&self.allocator)?)
        } else {
            Err(SegmentError::SegmentNotFound)
        }
    }

    pub(crate) fn acquire_locks(&self, locks: &Locks, timeout: Duration) -> Result<(), PrepareError> {
        self.create_segment_locks.lock_all(locks.created_segments(), timeout)?;
        if let Err(x) = self.segment_locks.lock_all_write(locks.dropped_segments(), timeout) {
            self.create_segment_locks.unlock_all(locks.created_segments());
            return Err(PrepareError::from(x));
        }
        if let Err(x) = self
            .segment_locks
            .lock_all_read(locks.created_updated_segments(), timeout)
        {
            self.create_segment_locks.unlock_all(locks.created_segments());
            self.segment_locks.unlock_all_write(locks.dropped_segments());
            return Err(PrepareError::from(x));
        }

        if let Err(x) = self.record_locks.lock_all(locks.records(), timeout) {
            self.create_segment_locks.unlock_all(locks.created_segments());
            self.segment_locks.unlock_all_write(locks.dropped_segments());
            self.segment_locks.unlock_all_read(locks.created_updated_segments());
            return Err(PrepareError::from(x));
        }

        Ok(())
    }
    pub fn check_segments(
        &self,
        created: &[String],
        updated: impl Iterator<Item = SegmentId>,
    ) -> Result<(), PrepareError> {
        let segs = self.segments.read().expect("lock not poisoned");
        for c in created {
            if segs.has_segment(c) {
                if is_index_name_meta(c) || is_index_name_data(c) {
                    return Err(PrepareError::IndexAlreadyExists);
                } else {
                    return Err(PrepareError::SegmentAlreadyExists);
                }
            }
        }
        for u in updated {
            if !segs.has_segment_by_id(&u) {
                return Err(PrepareError::SegmentNotFound);
            }
        }
        Ok(())
    }

    pub fn acquire_segment_read_lock(&self, segment: SegmentId, timeout: Duration) -> Result<(), TimeoutError> {
        self.segment_locks.lock_all_read(&[segment], timeout)?;
        Ok(())
    }

    pub fn acquire_segments_read_lock(&self, segments: &[SegmentId], timeout: Duration) -> Result<(), TimeoutError> {
        self.segment_locks.lock_all_read(segments, timeout)?;
        Ok(())
    }
    pub fn acquire_record_lock(&self, id: &RecRef, timeout: Duration) -> Result<(), TimeoutError> {
        self.record_locks.lock_all(&[*id], timeout)?;
        Ok(())
    }

    pub fn release_segment_read_lock(&self, segment: SegmentId) {
        self.segment_locks.unlock_all_read(&[segment]);
    }
    pub fn release_record_lock(&self, id: &RecRef) {
        self.record_locks.unlock_all(&[*id]);
    }

    pub fn recover_allocations(&self, segs: &[SegmentId], created: &mut HashMap<SegmentId, Segment>) -> PERes<()> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        segments.recover_allocations(segs, created, &self.allocator)?;
        Ok(())
    }

    pub fn recompute_last_pages(&self) -> PERes<()> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        segments.recompute_last_pages(&self.allocator)?;
        Ok(())
    }

    pub(crate) fn check_persistent_records(
        &self,
        records: &[CheckRecord],
        check_version: bool,
    ) -> Result<Vec<OldRecordInfo>, PrepareError> {
        let mut current_record_pages = Vec::with_capacity(records.len());
        for &CheckRecord {
            segment_id,
            ref record_id,
            version,
        } in records
        {
            let val = self.read(record_id, segment_id)?;
            if let Some((record, pers_version)) = val {
                current_record_pages.push(OldRecordInfo::new(record_id, segment_id, record, pers_version));
                if check_version && pers_version != version {
                    return Err(PrepareError::VersionNotLatest);
                }
            } else {
                return Err(PrepareError::RecordNotFound(PersyId(*record_id)));
            }
        }
        Ok(current_record_pages)
    }

    pub(crate) fn release_locks(&self, locks: &Locks) {
        self.record_locks.unlock_all(locks.records());
        self.segment_locks.unlock_all_read(locks.created_updated_segments());
        self.segment_locks.unlock_all_write(locks.dropped_segments());
        self.create_segment_locks.unlock_all(locks.created_segments());
    }

    pub fn rollback(&self, inserts: &[InsertRecord]) -> PERes<Vec<(SegmentId, u64)>> {
        let segments = self.segments.write().expect("lock not poisoned");
        let mut pages_to_remove = Vec::new();
        let mut pages = HashMap::new();
        for insert in inserts {
            if segments.segments.contains_key(&insert.segment) {
                let page = insert.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                if seg_page.segment_delete_entry(insert.segment, insert.recref.pos) && seg_page.get_next()? != 0 {
                    pages_to_remove.push((insert.segment, page));
                }
            }
        }
        for (_, to_flush) in pages.into_iter() {
            self.allocator.flush_page(to_flush)?;
        }
        Ok(pages_to_remove)
    }

    pub fn recover_rollback(&self, rollbacks: &[RollbackPage]) -> PERes<()> {
        let mut pages = HashMap::new();
        for rollback in rollbacks {
            let page = rollback.recref.page;
            let seg_page = self.get_or_insert_mut(&mut pages, page)?;
            seg_page.segment_update_entry(rollback.segment, rollback.recref.pos, rollback.record_page);
        }
        for (_, to_flush) in pages.into_iter() {
            self.allocator.flush_page(to_flush)?;
        }
        Ok(())
    }

    pub fn apply(
        &self,
        segs_new_pages: &[NewSegmentPage],
        inserts: &[InsertRecord],
        updates: &[UpdateRecord],
        deletes: &[DeleteRecord],
        seg_ops: &[SegmentOperation],
        created: &mut HashMap<SegmentId, Segment>,
        recover: bool,
    ) -> PERes<Vec<(SegmentId, u64)>> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        let mut dropped = HashSet::new();
        for seg_op in seg_ops {
            if let SegmentOperation::Drop(ref op) = *seg_op {
                dropped.insert(op.segment_id);
            }
        }
        let mut pages = HashMap::new();

        if recover {
            for new_page in segs_new_pages {
                if new_page.previous == 0 {
                    segments.set_first_page(new_page.segment, new_page.page);
                } else {
                    let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
                    p_page.set_next(new_page.page)?;
                    let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
                    n_page.set_prev(new_page.previous)?;
                    n_page.set_segment_id(new_page.segment)?;
                }
            }
        }
        for insert in inserts {
            if !dropped.contains(&insert.segment) {
                let page = insert.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page);
            }
        }

        for update in updates {
            if !dropped.contains(&update.segment) {
                let page = update.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page);
            }
        }
        let mut pages_to_remove = Vec::new();

        for delete in deletes {
            if !dropped.contains(&delete.segment) {
                let page = delete.recref.page;
                let seg_page = self.get_or_insert_mut(&mut pages, page)?;
                if seg_page.segment_delete_entry(delete.segment, delete.recref.pos) {
                    // Avoid to remove last pages, for avoid concurrent operations with page
                    // creation
                    if seg_page.get_next()? != 0 {
                        pages_to_remove.push((delete.segment, page));
                    }
                }
            }
        }

        if recover {
            for (_, mut to_flush) in pages.into_iter() {
                to_flush.recalc_count()?;
                self.allocator.flush_page(to_flush)?;
            }

            let recover_page = |record_page: u64| {
                let page = self.allocator.load_page(record_page)?;
                self.allocator.remove_from_free(record_page, page.get_size_exp())
            };
            let mut segs = HashSet::new();
            for insert in inserts {
                recover_page(insert.record_page)?;
                segs.insert(insert.segment);
            }
            for update in updates {
                recover_page(update.record_page)?;
                segs.insert(update.segment);
            }
            for delete in deletes {
                segs.insert(delete.segment);
            }

            segments.recover_allocations(&segs.into_iter().collect::<Vec<_>>(), created, &self.allocator)?;
        } else {
            for (_, to_flush) in pages.into_iter() {
                self.allocator.flush_page(to_flush)?;
            }
        }

        for seg_op in seg_ops {
            if let SegmentOperation::Drop(ref op) = *seg_op {
                segments.drop_segment(&op.name);
            }
        }

        for seg_op in seg_ops {
            if let SegmentOperation::Create(ref op) = *seg_op {
                if let Some(s) = created.remove(&op.segment_id) {
                    segments.finalize_create_segment(s);
                }
            }
        }
        segments.flush_segments(&self.allocator)?;

        Ok(pages_to_remove)
    }
    pub fn recover_segment_operations(
        &self,
        seg_ops: &[SegmentOperation],
        created: &mut HashMap<SegmentId, Segment>,
        segs_new_pages: &[NewSegmentPage],
    ) -> PERes<()> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        for seg_op in seg_ops {
            if let SegmentOperation::Drop(ref op) = *seg_op {
                segments.drop_segment(&op.name);
            }
        }

        for seg_op in seg_ops {
            if let SegmentOperation::Create(ref op) = *seg_op {
                if let Some(s) = created.remove(&op.segment_id) {
                    segments.recover_finalize_create_segment(s);
                }
            }
        }
        for new_page in segs_new_pages {
            if new_page.previous == 0 {
                segments.set_first_page(new_page.segment, new_page.page);
            }
        }
        segments.flush_segments(&self.allocator)?;
        Ok(())
    }

    pub fn recover_remove_pages(&self, delete_pages: &[(SegmentId, u64)]) -> PERes<Vec<u64>> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        segments.recover_remove_pages(&self.allocator, delete_pages)
    }

    pub fn collect_segment_pages(&self, segment: SegmentId) -> PERes<Vec<u64>> {
        let segments = self.segments.read().expect("lock not poisoned");
        segments.collect_segment_pages(&self.allocator, segment)
    }

    pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PERes<Vec<(SegmentId, u64)>> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        segments.clear_empty(&self.allocator, empty)
    }

    pub fn flush_segments(&self) -> PERes<()> {
        let mut segments = self.segments.write().expect("lock not poisoned");
        segments.flush_segments(&self.allocator)
    }

    pub fn exists_segment(&self, segment: &str) -> bool {
        self.segments.read().expect("lock not poisoned").has_segment(segment)
    }

    pub fn exists_segment_by_id(&self, segment: &SegmentId) -> bool {
        self.segments
            .read()
            .expect("lock not poisoned")
            .has_segment_by_id(segment)
    }

    pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
        self.segments.read().expect("lock not poisoned").segment_id(segment)
    }

    pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
        self.segments
            .read()
            .expect("lock not poisoned")
            .segment_name_by_id(segment)
    }

    #[cfg(test)]
    pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PERes<()> {
        let mut page = self.allocator.write_page(recref.page)?;
        page.segment_insert_entry(segment_id, recref.pos, record_page);
        self.allocator.flush_page(page)?;
        Ok(())
    }

    pub fn read(&self, recref: &RecRef, segment: SegmentId) -> Result<Option<(u64, u16)>, ReadError> {
        if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
            if recref.pos > page.get_content_size() - ADDRESS_ENTRY_SIZE {
                return Err(ReadError::InvalidPersyId(*recref));
            }
            Ok(page.segment_read_entry(segment, recref.pos))
        } else {
            Ok(None)
        }
    }

    fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PERes<&'a mut Page> {
        Ok(match map.entry(k) {
            Entry::Occupied(entry) => entry.into_mut(),
            Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
        })
    }

    pub fn list(&self) -> Vec<(String, SegmentId)> {
        self.segments.read().expect("lock not poisoned").list()
    }
    pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
        self.segments.read().expect("lock not poisoned").snapshot_list()
    }
}