persy 1.5.0

Transactional Persistence Engine
Documentation
use crate::{
    allocator::Allocator,
    device::{Page, PageOps, ReadPage},
    error::PERes,
    io::{
        read_u64, write_u64, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat, InfallibleWriteVarInt,
        ReadVarInt,
    },
    journal::{
        records::{
            Cleanup, Commit, CreateSegment, DeleteRecord, DropSegment, FreedPage, InsertRecord, Metadata,
            NewSegmentPage, PrepareCommit, ReadRecord, Rollback, RollbackPage, Start, UpdateRecord,
        },
        recover_impl::{RecoverImpl, RecoverRefs},
    },
    snapshot::data::CleanInfo,
    snapshots::Snapshots,
    RecoverStatus,
};
use std::{
    collections::{hash_map::HashMap, hash_set::HashSet},
    io::Read,
    sync::{Arc, Mutex},
};

pub(crate) mod records;
pub(crate) mod recover_impl;

#[cfg(test)]
mod tests;

pub const JOURNAL_PAGE_EXP: u8 = 10; // 2^10
const JOURNAL_PAGE_NEXT_OFFSET: u32 = 0;
const JOURNAL_PAGE_PREV_OFFSET: u32 = 8;
const JOURNAL_PAGE_CONTENT_OFFSET: u32 = 16;
const JOURNAL_ROOT_VERSION_0: u8 = 0;
const JOURNAL_ROOT_VERSION: u8 = JOURNAL_ROOT_VERSION_0;

struct StartListEntry {
    next: Option<JournalId>,
    prev: Option<JournalId>,
}

impl StartListEntry {
    pub fn new(prev: Option<JournalId>) -> StartListEntry {
        StartListEntry { next: None, prev }
    }
}

struct StartList {
    transactions: HashMap<JournalId, StartListEntry>,
    last: Option<JournalId>,
}

impl StartList {
    fn new() -> StartList {
        StartList {
            transactions: HashMap::new(),
            last: None,
        }
    }

    fn push(&mut self, id: &JournalId) {
        self.transactions
            .insert(id.clone(), StartListEntry::new(self.last.clone()));
        if let Some(ref lst) = self.last {
            self.transactions.get_mut(lst).unwrap().next = Some(id.clone());
        }
        self.last = Some(id.clone());
    }

    fn remove(&mut self, id: &JournalId) -> bool {
        if let Some(entry) = self.transactions.remove(id) {
            if let Some(ref next) = entry.next {
                self.transactions.get_mut(next).unwrap().prev = entry.prev.clone();
            }
            if let Some(ref prev) = entry.prev {
                self.transactions.get_mut(prev).unwrap().next = entry.next.clone();
            }
            if let Some(ref l) = self.last {
                if l == id {
                    self.last = entry.prev.clone();
                }
            }
            entry.prev.is_none()
        } else {
            false
        }
    }

    fn is_page_in_start_list(&self, page: u64) -> bool {
        for k in self.transactions.keys() {
            if k.page == page {
                return true;
            }
        }
        false
    }
}

struct JournalShared {
    root: u64,
    first_page: u64,
    last_page: u64,
    last_pos: u32,
    starts: StartList,
    current: Page,
    to_clear: Vec<JournalId>,
}

impl JournalShared {
    fn new_version_0(mut page: ReadPage, all: &Allocator) -> PERes<JournalShared> {
        let first_page;
        {
            let buffer = all.read_root_journal(&mut page, 11);
            first_page = read_u64(&buffer[0..8]);
        }
        let current = if first_page != 0 {
            all.write_page(first_page)?
        } else {
            // Empty 0 sized page
            Page::new(Vec::new(), 0, 0, 0)
        };
        Ok(JournalShared {
            root: page.get_index(),
            first_page,
            last_page: first_page,
            last_pos: JOURNAL_PAGE_CONTENT_OFFSET,
            starts: StartList::new(),
            current,
            to_clear: Vec::new(),
        })
    }

    fn required_space(&mut self, space: u32, allocator: &Allocator) -> PERes<()> {
        // if there is no page or the  'current content' + 'space required' + 'end marker' is more
        // than the page, allocate new page and link the previous one
        if self.last_pos + space + 1 >= self.current.get_content_size() {
            let prev = self.last_page;
            let last_pos = self.last_pos;
            let new_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
            let new_index = new_page.get_index();
            let mut old = std::mem::replace(&mut self.current, new_page);
            self.last_page = new_index;
            self.current.seek(JOURNAL_PAGE_PREV_OFFSET);
            self.current.write_u64(prev);
            allocator.flush_journal(&self.current)?;
            if prev != 0 {
                old.seek(JOURNAL_PAGE_NEXT_OFFSET);
                old.write_u64(new_index);
                old.seek(last_pos);
                old.write_u8(0);
                allocator.flush_page(old)?;
            }
            self.last_pos = JOURNAL_PAGE_CONTENT_OFFSET;
        }
        Ok(())
    }

    fn append_buffer(&mut self, buffer: &[u8]) -> PERes<(u64, u32)> {
        let cur_page = self.last_page;
        let cur_pos = self.last_pos;
        self.current.seek(cur_pos);
        self.current.write_all(&*buffer);
        self.last_pos += buffer.len() as u32;
        Ok((cur_page, cur_pos))
    }

    fn log_bytes(&mut self, buffer: &[u8], allocator: &Allocator, flush: bool) -> PERes<()> {
        self.required_space(buffer.len() as u32, allocator)?;
        self.append_buffer(buffer)?;
        if flush {
            allocator.flush_journal(&self.current)?;
        }
        Ok(())
    }

    pub fn cleaned_to_trim(&mut self, ids: &[JournalId]) {
        self.to_clear.extend_from_slice(ids);
    }

    pub fn start(&mut self, allocator: &Allocator) -> PERes<JournalId> {
        let buffer = Journal::prepare_buffer(&Start::default(), &JournalId::new(0, 0))?;
        self.required_space(buffer.len() as u32, allocator)?;
        let val = self.append_buffer(&buffer)?;
        let id = JournalId::new(val.0, val.1);
        self.starts.push(&id);
        Ok(id)
    }

    pub fn clear_in_queue(&mut self, allocator: &Allocator, snapshots: &Arc<Snapshots>) -> PERes<()> {
        let mut pages_to_free = Vec::new();
        let ids = self.to_clear.clone();
        self.to_clear.clear();
        let mut new_first = None;
        for id in ids {
            if self.starts.remove(&id) {
                let first_page = self.first_page;
                let mut free_cursor = id.page;
                loop {
                    let read = if free_cursor == self.last_page {
                        let pos = self.current.cursor_pos();
                        self.current.seek(JOURNAL_PAGE_PREV_OFFSET);
                        let prev = self.current.read_u64();
                        self.current.seek(pos as u32);
                        prev
                    } else if let Some(mut cur) = allocator.load_page_not_free(free_cursor)? {
                        cur.seek(JOURNAL_PAGE_PREV_OFFSET);
                        cur.read_u64()
                    } else {
                        break;
                    };
                    if free_cursor != id.page {
                        pages_to_free.push(free_cursor)
                    }
                    if free_cursor == first_page {
                        break;
                    }
                    free_cursor = read;
                }
                new_first = Some(id.page);
                self.first_page = id.page;
            }
        }
        if let Some(first) = new_first {
            let mut buffer = [0; 11];
            write_u64(&mut buffer[0..8], first);
            let root = allocator.write_page(self.root)?;
            allocator.write_journal_root(root, &mut buffer, JOURNAL_ROOT_VERSION)?;
        }
        self.free_pages_tx(pages_to_free, allocator, snapshots)?;
        Ok(())
    }

    pub fn free_pages_tx(
        &mut self,
        pages_to_free: Vec<u64>,
        allocator: &Allocator,
        snapshots: &Arc<Snapshots>,
    ) -> PERes<()> {
        if !pages_to_free.is_empty() {
            // Log the pages removed from the journal for free
            let id = self.start(allocator)?;
            let mut pages_log = Vec::new();

            for page in &pages_to_free {
                let free_page = FreedPage::new(*page);
                self.log_bytes(&Journal::prepare_buffer(&free_page, &id)?, allocator, false)?;
                pages_log.push(free_page)
            }
            self.log_bytes(&Journal::prepare_buffer(&PrepareCommit::new(), &id)?, allocator, false)?;
            self.log_bytes(&Journal::prepare_buffer(&Commit::new(), &id)?, allocator, true)?;
            let snapshot_ref = snapshots.snapshot(Vec::new(), CleanInfo::new(pages_log, Vec::new()), id);
            if let Some(pc) = snapshots.pending_clean(snapshot_ref.id()) {
                allocator.to_release_next_sync(pc);
            }
        }
        Ok(())
    }

    pub fn finished_to_clean(&mut self, ids: &[JournalId], allocator: &Allocator) -> PERes<()> {
        let mut iter = ids.iter().peekable();
        while let Some(id) = iter.next() {
            self.log_bytes(
                &Journal::prepare_buffer(&Cleanup::new(), id)?,
                allocator,
                iter.peek().is_none(),
            )?;
        }
        self.cleaned_to_trim(ids);
        Ok(())
    }

    fn recover(&mut self, recover: &mut RecoverImpl, allocator: &Allocator) -> PERes<()> {
        let mut cur_page = self.first_page;
        let mut journal_pages = HashSet::new();
        self.last_page = self.first_page;
        recover.journal_page(cur_page);
        let mut page = allocator.load_page(cur_page)?;
        page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
        let mut reset_next = false;
        loop {
            let cursor_pos = page.cursor_pos();
            let tp = page.read_u8();
            let last_pos = page.cursor_pos() as u32;
            if tp == 0 {
                page.seek(JOURNAL_PAGE_NEXT_OFFSET);
                cur_page = page.read_u64();
                if cur_page == 0 {
                    self.last_pos = last_pos - 1;
                    break;
                }
                recover.journal_page(cur_page);
                page = allocator.load_page(cur_page)?;
                page.seek(JOURNAL_PAGE_CONTENT_OFFSET);
                self.last_page = cur_page;
                if journal_pages.contains(&cur_page) {
                    // This in case the log looped itself.
                    // It should never happen, but there are report of infinite recovers
                    break;
                }
                journal_pages.insert(cur_page);
            } else {
                let ref_page = &mut page;

                let success = match tp {
                    //The Start entry has no valid id, should not be recovered
                    1 => {
                        //Ignore the id info for start record
                        let is_ok = ref_page.read_varint_u64().is_ok();
                        let is_ok = ref_page.read_varint_u32().is_ok() && is_ok;
                        match Start::default().read(ref_page) {
                            Ok(()) => {
                                let tx_id = JournalId::new(self.last_page, cursor_pos as u32);
                                self.starts.push(&tx_id);
                                is_ok
                            }
                            Err(_) => false,
                        }
                    }
                    2 => recover_entry(&mut InsertRecord::default(), ref_page, recover),
                    3 => recover_entry(&mut PrepareCommit::default(), ref_page, recover),
                    4 => recover_entry(&mut Commit::default(), ref_page, recover),
                    5 => recover_entry(&mut UpdateRecord::default(), ref_page, recover),
                    6 => recover_entry(&mut DeleteRecord::default(), ref_page, recover),
                    7 => recover_entry(&mut Rollback::default(), ref_page, recover),
                    8 => recover_entry(&mut CreateSegment::default(), ref_page, recover),
                    9 => recover_entry(&mut DropSegment::default(), ref_page, recover),
                    10 => recover_entry(&mut ReadRecord::default(), ref_page, recover),
                    11 => recover_entry(&mut Metadata::default(), ref_page, recover),
                    12 => recover_entry(&mut FreedPage::default(), ref_page, recover),
                    13 => recover_entry(&mut NewSegmentPage::default(), ref_page, recover),
                    14 => recover_entry(&mut Cleanup::default(), ref_page, recover),
                    15 => recover_entry(&mut RollbackPage::default(), ref_page, recover),
                    _ => false,
                };
                if !success {
                    self.last_pos = last_pos - 1;
                    reset_next = true;
                    break;
                }
            }
        }
        self.current = allocator.write_page(self.last_page)?;
        if reset_next {
            // Truncate the journal if failed
            self.current.seek(JOURNAL_PAGE_NEXT_OFFSET);
            self.current.write_u64(0);
            self.current.seek(self.last_pos);
        }
        Ok(())
    }
}

/// Journal segment is the area where the transactional log is kept
pub struct Journal {
    allocator: Arc<Allocator>,
    journal: Mutex<JournalShared>,
}

pub(crate) trait JournalEntry {
    fn get_type(&self) -> u8;
    fn write(&self, buffer: &mut dyn InfallibleWrite) -> PERes<()>;
    fn read(&mut self, buffer: &mut dyn Read) -> PERes<()>;
    fn recover(&self, recover: &mut RecoverRefs) -> PERes<RecoverStatus>;
}

#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct JournalId {
    page: u64,
    pos: u32,
}

fn recover_entry(entry: &mut dyn JournalEntry, page: &mut ReadPage, recover: &mut RecoverImpl) -> bool {
    let page_id_res = page.read_varint_u64();
    let pos_res = page.read_varint_u32();
    if let (Ok(page_id), Ok(pos)) = (page_id_res, pos_res) {
        let id = JournalId::new(page_id, pos);
        if entry.read(page).is_ok() {
            recover.recover_journal_entry(entry, &id);
            true
        } else {
            false
        }
    } else {
        false
    }
}

impl Journal {
    pub fn new(all: &Arc<Allocator>, page: u64) -> PERes<Journal> {
        let mut page = all.load_page(page)?;
        let journal = match page.read_u8() {
            JOURNAL_ROOT_VERSION_0 => JournalShared::new_version_0(page, all)?,
            _ => panic!("version not supported"),
        };
        Ok(Journal {
            allocator: all.clone(),
            journal: Mutex::new(journal),
        })
    }

    pub fn init(allocator: &Allocator) -> PERes<u64> {
        let root_page = allocator.allocate(5)?;
        let root_page_index = root_page.get_index();
        let first_page = allocator.allocate(JOURNAL_PAGE_EXP)?;
        let mut buffer = [0; 11];
        write_u64(&mut buffer[0..8], first_page.get_index());
        allocator.write_journal_root(root_page, &mut buffer, JOURNAL_ROOT_VERSION)?;
        Ok(root_page_index)
    }

    pub fn start(&self) -> PERes<JournalId> {
        self.journal
            .lock()
            .expect("journal lock not poisoned")
            .start(&self.allocator)
    }

    pub(crate) fn prepare(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
        self.internal_log(entry, id, true)?;
        Ok(())
    }

    pub(crate) fn end(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
        self.internal_log(entry, id, true)?;
        Ok(())
    }

    pub fn clear_in_queue(&self, snapshots: &Arc<Snapshots>) -> PERes<()> {
        self.journal
            .lock()
            .expect("journal lock not poisoned")
            .clear_in_queue(&self.allocator, snapshots)
    }

    pub fn finished_to_clean(&self, ids: &[JournalId]) -> PERes<()> {
        let mut lock = self.journal.lock().expect("journal lock not poisoned");
        lock.finished_to_clean(ids, &self.allocator)
    }

    pub fn cleaned_to_trim(&self, ids: &[JournalId]) {
        let mut lock = self.journal.lock().expect("journal lock not poisoned");
        lock.cleaned_to_trim(ids)
    }

    pub(crate) fn log(&self, entry: &dyn JournalEntry, id: &JournalId) -> PERes<()> {
        self.internal_log(entry, id, false)?;
        Ok(())
    }

    fn prepare_buffer(entry: &dyn JournalEntry, id: &JournalId) -> PERes<Vec<u8>> {
        let mut buffer = Vec::<u8>::new();
        buffer.write_u8(entry.get_type());
        buffer.write_varint_u64(id.page);
        buffer.write_varint_u32(id.pos);
        entry.write(&mut buffer)?;
        Ok(buffer)
    }

    fn internal_log(&self, entry: &dyn JournalEntry, id: &JournalId, flush: bool) -> PERes<()> {
        let buffer = Self::prepare_buffer(entry, id)?;
        let mut jr = self.journal.lock().expect("journal lock not poisoned");
        jr.log_bytes(&buffer, &self.allocator, flush)
    }

    pub fn is_page_in_start_list(&self, page: u64) -> bool {
        self.journal
            .lock()
            .expect("journal lock not poisoned")
            .starts
            .is_page_in_start_list(page)
    }

    pub(crate) fn recover(&self, recover: &mut RecoverImpl) -> PERes<()> {
        let mut jr = self.journal.lock().expect("journal lock not poisoned");
        jr.recover(recover, &self.allocator)
    }
}

impl JournalId {
    pub fn new(page: u64, pos: u32) -> JournalId {
        JournalId { page, pos }
    }
}