persy 1.5.0

Transactional Persistence Engine
Documentation
pub use std::fs::OpenOptions;
use std::{
    fs::File,
    mem::replace,
    ops::{Bound, RangeBounds},
    path::Path,
    str,
    sync::Arc,
};

use crate::{
    address::record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
    address::Address,
    allocator::Allocator,
    config::{Config, TransactionConfig},
    device::{Device, FileDevice, MemoryDevice, PageOps},
    error::{
        BeginTransactionError, CreateError, CreateIndexError, CreateSegmentError, DeleteError, DropIndexError,
        DropSegmentError, IndexChangeError, IndexError, IndexOpsError, IndexPutError, InsertError, OpenError, PERes,
        SegmentError, UpdateError,
    },
    id::{IndexId, PersyId, RecRef, SegmentId, ToIndexId, ToSegmentId},
    index::{
        config::{
            change_segment_meta_name_to_index_name, format_segment_name_meta, is_index_name_data, is_index_name_meta,
            IndexType, IndexTypeId, IndexTypeInternal, Indexes, ValueMode,
        },
        keeper::IndexKeeper,
        raw_iter::IndexRawIter,
        raw_iter_tx::TxIndexRawIter,
        tree::{
            nodes::{PageIter, PageIterBack, Value},
            Index,
        },
    },
    io::{
        ArcSliceRead, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
        InfallibleWriteFormat, InfallibleWriteVarInt,
    },
    journal::{recover_impl::RecoverImpl, Journal},
    snapshot::data::{EntryCase, SegmentSnapshot},
    snapshots::{SnapshotRef, Snapshots},
    transaction::tx_impl::{PreparedState, SyncMode, TransactionImpl, TxRead, TxSegCheck},
    PrepareError, ReadError,
};

#[cfg(feature = "background_ops")]
use crate::background::BackgroundOps;

pub(crate) const DEFAULT_PAGE_EXP: u8 = 10; // 2^10
const PERSY_TAG_BYTES: &[u8; 4] = b"prsy"; // 2^10

pub struct PersyImpl {
    config: Arc<Config>,
    journal: Arc<Journal>,
    address: Arc<Address>,
    indexes: Indexes,
    allocator: Arc<Allocator>,
    snapshots: Arc<Snapshots>,
    #[cfg(feature = "background_ops")]
    background_ops: BackgroundOps<SnapshotRef>,
}

pub struct TxFinalize {
    transaction: TransactionImpl,
    prepared: PreparedState,
}
impl TxFinalize {
    #[cfg(test)]
    pub(crate) fn leak(&mut self) {
        self.prepared.leak();
    }
}

/// Possible option for recover a transaction in prepared commit state
#[derive(PartialEq, Eq, Debug)]
pub enum CommitStatus {
    Rollback,
    Commit,
}

#[derive(Clone)]
/// Index definition details
pub struct IndexInfo {
    pub id: IndexId,
    pub value_mode: ValueMode,
    pub key_type: IndexTypeId,
    pub value_type: IndexTypeId,
}

#[cfg(feature = "background_ops")]
fn create_background_ops(
    journal: Arc<Journal>,
    allocator: Arc<Allocator>,
    snapshots: Arc<Snapshots>,
) -> PERes<BackgroundOps<SnapshotRef>> {
    let all_sync = allocator.clone();
    let after_sync = allocator.clone();
    BackgroundOps::new(
        move || {
            let empty = all_sync.disc_sync()?;
            let next = !empty;
            Ok(next)
        },
        move |all: Vec<SnapshotRef>| {
            all.into_iter().for_each(|x| drop(x));
            journal.clear_in_queue(&snapshots)?;
            Ok(after_sync.need_sync())
        },
    )
}

pub trait RecordReader<T> {
    fn read(self, rec: ArcSliceRead) -> T;
}

impl<T, F: FnOnce(ArcSliceRead) -> T> RecordReader<T> for F {
    fn read(self, rec: ArcSliceRead) -> T {
        (self)(rec)
    }
}

impl PersyImpl {
    pub fn create(path: &Path) -> Result<(), CreateError> {
        let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
        PersyImpl::create_from_file(f)
    }

    pub fn create_from_file(f: File) -> Result<(), CreateError> {
        PersyImpl::init(Box::new(FileDevice::new(f)?))?;
        Ok(())
    }

    fn init(device: Box<dyn Device>) -> PERes<Box<dyn Device>> {
        // root_page is every time 0
        let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
        let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
        let address_page = Address::init(&allocator)?;
        let journal_page = Journal::init(&allocator)?;
        {
            let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
            // Version of the disc format
            root.write_u16(0);
            // Position of the start of address structure
            root.write_u64(address_page);
            // Start of the Log data, if shutdown well this will be every time 0
            root.write_u64(journal_page);
            root.write_u64(allocator_page);
            root.write_all(PERSY_TAG_BYTES);
            allocator.flush_page(root)?;
        }
        let res = allocator.disc_sync()?;
        debug_assert!(res);
        Ok(allocator.release())
    }

    fn new(disc: Box<dyn Device>, config: Config) -> Result<PersyImpl, OpenError> {
        let address_page;
        let journal_page;
        let allocator_page;
        {
            let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
            pg.read_u16(); //THIS NOW is 0 all the times
            address_page = pg.read_u64();
            journal_page = pg.read_u64();
            allocator_page = pg.read_u64();
            let mut buff = [0u8; 4];
            pg.read_exact(&mut buff);
            // Checking alternatively the for bytes to be 0 for backward compatibility
            if buff != *PERSY_TAG_BYTES && buff != [0u8; 4] {
                return Err(OpenError::NotPersyFile);
            }
        }
        let config = Arc::new(config);
        let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
        let address = Arc::new(Address::new(&allocator, &config, address_page)?);
        let journal = Arc::new(Journal::new(&allocator, journal_page)?);
        let indexes = Indexes::new(&config);
        let snapshots = Arc::new(Snapshots::new(&allocator, &journal, &address));
        #[cfg(feature = "background_ops")]
        let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
        Ok(PersyImpl {
            config,
            journal,
            address,
            indexes,
            allocator,
            snapshots,
            #[cfg(feature = "background_ops")]
            background_ops,
        })
    }

    fn recover(&self) -> PERes<RecoverImpl> {
        let mut recover = RecoverImpl::default();
        self.journal.recover(&mut recover)?;
        recover.finish_journal_read();
        Ok(recover)
    }

    pub fn final_recover(&self, recover: RecoverImpl) -> PERes<()> {
        recover.final_recover(self)
    }

    pub fn truncate_and_open(f: File, config: Config) -> Result<(PersyImpl, RecoverImpl), OpenError> {
        let device = Box::new(FileDevice::new_truncate(f)?);
        let device = PersyImpl::init(device)?;
        let persy = PersyImpl::new(device, config)?;
        let rec = persy.recover()?;
        Ok((persy, rec))
    }

    pub fn create_and_open(f: File, config: Config) -> Result<(PersyImpl, RecoverImpl), OpenError> {
        let device = Box::new(FileDevice::new(f)?);
        let device = PersyImpl::init(device)?;
        let persy = PersyImpl::new(device, config)?;
        let rec = persy.recover()?;
        Ok((persy, rec))
    }

    pub fn open_recover(f: File, config: Config) -> Result<(PersyImpl, RecoverImpl), OpenError> {
        let persy = PersyImpl::new(Box::new(FileDevice::new(f)?), config)?;
        let rec = persy.recover()?;
        Ok((persy, rec))
    }

    pub fn memory(config: Config) -> Result<PersyImpl, OpenError> {
        let device = PersyImpl::init(Box::new(MemoryDevice::new(None)?))?;
        PersyImpl::new(device, config)
    }

    pub fn begin_with(&self, mut config: TransactionConfig) -> Result<TransactionImpl, BeginTransactionError> {
        let journal = &self.journal;
        let strategy = if let Some(st) = config.tx_strategy {
            st
        } else {
            self.config.tx_strategy().clone()
        };
        let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
            if id.len() > 512 {
                return Err(BeginTransactionError::InvalidTransactionId);
            }
            id
        } else {
            Vec::new()
        };
        let sync_mode = if Some(true) == config.background_sync {
            SyncMode::BackgroundSync
        } else {
            SyncMode::Sync
        };

        let timeout = self.config.transaction_lock_timeout();

        Ok(TransactionImpl::new(journal, &strategy, sync_mode, meta_id, *timeout)?)
    }

    pub fn create_segment(&self, tx: &mut TransactionImpl, segment: &str) -> Result<SegmentId, CreateSegmentError> {
        match tx.exists_segment(segment) {
            TxSegCheck::Dropped => {}
            TxSegCheck::Created(_) => {
                return Err(CreateSegmentError::SegmentAlreadyExists);
            }
            TxSegCheck::None => {
                if self.address.exists_segment(segment) {
                    return Err(CreateSegmentError::SegmentAlreadyExists);
                }
            }
        }
        let seg = self.address.create_temp_segment(segment)?;
        let id = seg.get_segment_id();
        tx.add_create_segment(&self.journal, seg)?;
        Ok(id)
    }

    pub fn drop_segment(&self, tx: &mut TransactionImpl, segment: &str) -> Result<(), DropSegmentError> {
        let (_, segment_id) = self.check_segment_tx(tx, segment)?;
        tx.add_drop_segment(&self.journal, segment, segment_id)?;
        Ok(())
    }

    pub fn exists_segment(&self, segment: &str) -> bool {
        self.address.exists_segment(segment)
    }

    pub fn exists_segment_by_id(&self, segment: &SegmentId) -> bool {
        self.address.exists_segment_by_id(segment)
    }

    pub fn exists_segment_tx(&self, tx: &TransactionImpl, segment: &str) -> bool {
        match tx.exists_segment(segment) {
            TxSegCheck::Dropped => false,
            TxSegCheck::Created(_) => true,
            TxSegCheck::None => self.address.exists_segment(segment),
        }
    }
    pub fn exists_index(&self, index: &str) -> bool {
        self.exists_segment(&format_segment_name_meta(index))
    }

    pub fn exists_index_tx(&self, tx: &TransactionImpl, index: &str) -> bool {
        self.exists_segment_tx(tx, &format_segment_name_meta(index))
    }

    pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> Result<SegmentId, SegmentError> {
        segment.to_segment_id(&self.address)
    }

    pub fn solve_segment_id_tx(
        &self,
        tx: &TransactionImpl,
        segment: impl ToSegmentId,
    ) -> Result<SegmentId, SegmentError> {
        let (sid, _) = segment.to_segment_id_tx(self, tx)?;
        Ok(sid)
    }

    pub fn solve_segment_id_snapshot(
        &self,
        snapshot: &SnapshotRef,
        segment: impl ToSegmentId,
    ) -> Result<SegmentId, SegmentError> {
        segment.to_segment_id_snapshot(&self.snapshots, snapshot)
    }

    pub fn solve_index_id(&self, index: impl ToIndexId) -> Result<IndexId, IndexError> {
        index.to_index_id(&self.address)
    }

    pub fn solve_index_id_tx(
        &self,
        tx: &TransactionImpl,
        index: impl ToIndexId,
    ) -> Result<(IndexId, bool), IndexError> {
        index.to_index_id_tx(self, tx)
    }

    pub fn solve_index_id_snapshot(
        &self,
        snapshot: &SnapshotRef,
        index: impl ToIndexId,
    ) -> Result<IndexId, IndexError> {
        index.to_index_id_snapshot(&self.snapshots, snapshot)
    }

    /// check if a segment exist persistent or in tx.
    ///
    /// @return true if the segment was created in tx.
    pub fn check_segment_tx(&self, tx: &TransactionImpl, segment: &str) -> Result<(bool, SegmentId), SegmentError> {
        match tx.exists_segment(segment) {
            TxSegCheck::Dropped => Err(SegmentError::SegmentNotFound),
            TxSegCheck::Created(segment_id) => Ok((true, segment_id)),
            TxSegCheck::None => self
                .address
                .segment_id(segment)
                .map_or(Err(SegmentError::SegmentNotFound), |id| Ok((false, id))),
        }
    }

    pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
        let mut val = Vec::new();
        val.write_u8(0);
        val.write_varint_u64(len);
        id.write(&mut val);
        val
    }
    pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
        let _metadata_version = meta.read_u8();
        let len = meta.read_varint_u64();
        let id = RecRef::read(meta);
        (len, id)
    }

    pub fn insert_record(
        &self,
        tx: &mut TransactionImpl,
        segment: impl ToSegmentId,
        rec: &[u8],
    ) -> Result<RecRef, InsertError> {
        if self.record_over_size_limit(rec) {
            return Err(InsertError::RecordToBig);
        }
        let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
        let len = rec.len() as u64;
        let allocator = &self.allocator;
        let address = &self.address;
        let (rec_ref, maybe_new_page) = if in_tx {
            address.allocate_temp_seg(tx.get_segment_mut(segment_id))?
        } else {
            address.allocate(segment_id)?
        };
        let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
        let allocation_exp = allocator.exp_from_content_size(len + metadata.len() as u64);
        let mut pg = allocator.allocate(allocation_exp)?;
        let page = pg.get_index();
        tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
        if let Some(new_page) = maybe_new_page {
            tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previous_page)?;
        }
        pg.write_all(&metadata);
        pg.write_all(rec);
        allocator.flush_page(pg)?;
        Ok(rec_ref)
    }

    fn read_snapshot(&self) -> SnapshotRef {
        self.snapshots.current_snapshot()
    }

    pub fn snapshot(&self) -> SnapshotRef {
        let snapshot_ref = self.snapshots.read_snapshot();
        let segs = self
            .address
            .snapshot_list()
            .into_iter()
            .map(|(name, id, first_page)| SegmentSnapshot::new(&name, id, first_page))
            .collect::<Vec<_>>();
        self.snapshots.fill_segments(&snapshot_ref, &segs);
        snapshot_ref
    }

    fn read_ref_segment(
        &self,
        tx: &TransactionImpl,
        segment_id: SegmentId,
        rec_ref: &RecRef,
    ) -> Result<Option<(u64, u16, SegmentId)>, ReadError> {
        Ok(match tx.read(rec_ref) {
            TxRead::Record(rec) => Some((rec.0, rec.1, segment_id)),
            TxRead::Deleted => None,
            TxRead::None => self
                .address
                .read(rec_ref, segment_id)?
                .map(|(pos, version)| (pos, version, segment_id)),
        })
    }

    fn read_page(&self, match_id: &RecRef, page: u64) -> PERes<Option<Vec<u8>>> {
        self.read_page_fn(match_id, page, |x: ArcSliceRead| x.to_vec())
    }

    fn read_page_fn<T, F: RecordReader<T>>(&self, match_id: &RecRef, page: u64, f: F) -> PERes<Option<T>> {
        if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
            let (len, id) = PersyImpl::read_record_metadata(&mut pg);
            if id == *match_id {
                Ok(Some(f.read(pg.arc_slice(len as usize))))
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    }

    pub fn read_tx_internal_fn<T>(
        &self,
        tx: &TransactionImpl,
        segment_id: SegmentId,
        id: &RecRef,
        f: fn(ArcSliceRead) -> T,
    ) -> Result<Option<(T, u16)>, ReadError> {
        let mut retry_count = 0;
        loop {
            if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
                if let Some(record) = self.read_page_fn(id, page, f)? {
                    break Ok(Some((record, version)));
                }
                debug_assert!(
                    retry_count < 100,
                    "more than 100 retry probably means an issue id:{} page:{}",
                    id,
                    page
                );
                retry_count += 1;
            } else {
                break Ok(None);
            }
        }
    }

    pub fn read_tx_internal(
        &self,
        tx: &TransactionImpl,
        segment_id: SegmentId,
        id: &RecRef,
    ) -> Result<Option<(Vec<u8>, u16)>, ReadError> {
        self.read_tx_internal_fn(tx, segment_id, id, |x| x.to_vec())
    }

    pub fn read_tx(
        &self,
        tx: &mut TransactionImpl,
        segment: SegmentId,
        id: &RecRef,
    ) -> Result<Option<Vec<u8>>, ReadError> {
        if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
            tx.add_read(&self.journal, segment, id, version)?;
            Ok(Some(rec))
        } else {
            Ok(None)
        }
    }

    pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> Result<Option<Vec<u8>>, ReadError> {
        let mut retry_count = 0;
        loop {
            if let Some((page, _)) = self.address.read(rec_ref, segment)? {
                if let Some(record) = self.read_page(rec_ref, page)? {
                    break Ok(Some(record));
                }
                debug_assert!(
                    retry_count < 100,
                    "more than 100 retry probably means an issue id:{} page:{}",
                    rec_ref,
                    page
                );
                retry_count += 1;
            } else {
                break Ok(None);
            }
        }
    }

    pub fn read_snap(
        &self,
        segment: SegmentId,
        rec_ref: &RecRef,
        snapshot: &SnapshotRef,
    ) -> Result<Option<Vec<u8>>, ReadError> {
        self.read_snap_fn(segment, rec_ref, snapshot, |x: ArcSliceRead| x.to_vec())
    }

    pub fn read_snap_fn<T, F: RecordReader<T>>(
        &self,
        segment: SegmentId,
        rec_ref: &RecRef,
        snapshot: &SnapshotRef,
        f: F,
    ) -> Result<Option<T>, ReadError> {
        let segment_id = segment;
        if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref) {
            match rec_vers.case() {
                EntryCase::Change(change) => {
                    if let Some(record) = self.read_page_fn(rec_ref, change.pos, f)? {
                        Ok(Some(record))
                    } else {
                        panic!("if page do not match the content the snapshot is broken");
                    }
                }
                EntryCase::Insert => Ok(None),
            }
        } else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
            if let Some(record) = self.read_page_fn(rec_ref, page, f)? {
                Ok(Some(record))
            } else {
                panic!("if page do not match the content the snapshot is broken");
            }
        } else {
            Ok(None)
        }
    }

    pub fn scan(&self, segment: SegmentId) -> Result<SegmentRawIter, SegmentError> {
        let iter = self.address.scan(segment)?;
        Ok(SegmentRawIter::new(segment, iter))
    }

    pub fn scan_snapshot_index(
        &self,
        segment_id: SegmentId,
        snapshot: &SnapshotRef,
    ) -> Result<SegmentSnapshotRawIter, SegmentError> {
        let res = if let Some(r) = self.snapshots.scan(snapshot, segment_id) {
            r
        } else {
            self.address.scan(segment_id)?
        };
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
    }

    pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: &SnapshotRef) -> PERes<SegmentSnapshotRawIter> {
        let res = self.snapshots.scan(snapshot, segment_id).unwrap();
        Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
    }

    pub fn scan_tx<'a>(
        &'a self,
        tx: &'a TransactionImpl,
        segment_id: SegmentId,
    ) -> Result<TxSegmentRawIter, SegmentError> {
        let read_snapshot = self.read_snapshot();
        let iter = if tx.segment_created_in_tx(segment_id) {
            self.address.scan_segment(tx.get_segment(segment_id))?
        } else {
            self.address.scan(segment_id)?
        };
        Ok(TxSegmentRawIter::new(tx, segment_id, iter, read_snapshot))
    }

    pub fn update(
        &self,
        tx: &mut TransactionImpl,
        segment: SegmentId,
        rec_ref: &RecRef,
        rec: &[u8],
    ) -> Result<(), UpdateError> {
        if self.record_over_size_limit(rec) {
            return Err(UpdateError::RecordToBig);
        }
        if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
            let allocator = &self.allocator;
            let journal = &self.journal;
            let len = rec.len();
            let metadata = PersyImpl::write_record_metadata(len as u64, rec_ref);
            let allocation_exp = allocator.exp_from_content_size((len + metadata.len()) as u64);
            let mut pg = allocator.allocate(allocation_exp)?;
            let page = pg.get_index();
            tx.add_update(journal, segment, rec_ref, page, version)?;
            pg.write_all(&metadata);
            pg.write_all(rec);
            Ok(allocator.flush_page(pg)?)
        } else {
            Err(UpdateError::RecordNotFound(PersyId(*rec_ref)))
        }
    }

    pub fn delete(&self, tx: &mut TransactionImpl, segment: SegmentId, rec_ref: &RecRef) -> Result<(), DeleteError> {
        if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
            Ok(tx.add_delete(&self.journal, seg, rec_ref, version)?)
        } else {
            Err(DeleteError::RecordNotFound(PersyId(*rec_ref)))
        }
    }

    pub fn rollback(&self, mut tx: TransactionImpl) -> PERes<()> {
        tx.rollback(self)
    }

    pub fn prepare(&self, tx: TransactionImpl) -> Result<TxFinalize, PrepareError> {
        let (tx, prepared) = tx.prepare(self)?;

        Ok(TxFinalize {
            transaction: tx,
            prepared,
        })
    }

    pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PERes<()> {
        let prepared = finalizer.prepared.clone();
        let tx = &mut finalizer.transaction;
        tx.rollback_prepared(self, prepared)
    }

    pub fn commit(&self, finalizer: &mut TxFinalize) -> PERes<()> {
        let prepared = finalizer.prepared.clone();
        let tx = &mut finalizer.transaction;
        tx.commit(self, prepared)
    }

    pub fn create_index<K, V>(
        &self,
        tx: &mut TransactionImpl,
        index_name: &str,
        value_mode: ValueMode,
    ) -> Result<(), CreateIndexError>
    where
        K: IndexType,
        V: IndexType,
    {
        Indexes::create_index::<K::Wrapper, V::Wrapper>(self, tx, index_name, 32, 128, value_mode)
    }

    pub fn drop_index(&self, tx: &mut TransactionImpl, index_name: &str) -> Result<(), DropIndexError> {
        Indexes::drop_index(self, tx, index_name)
    }

    pub fn put<K, V>(&self, tx: &mut TransactionImpl, index_id: IndexId, k: K, v: V) -> Result<(), IndexPutError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        if k.over_size_limit() || v.over_size_limit() {
            return Err(IndexPutError::KeyOrValueTooBig);
        }
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
        tx.add_put(index_id, k, v);
        Ok(())
    }

    pub fn remove<K, V>(
        &self,
        tx: &mut TransactionImpl,
        index_id: IndexId,
        k: K,
        v: Option<V>,
    ) -> Result<(), IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        Indexes::check_index::<K, V>(self, tx, &index_id)?;
        tx.add_remove(index_id, k, v);
        Ok(())
    }

    pub fn get_tx<K, V>(
        &self,
        tx: &mut TransactionImpl,
        index_id: IndexId,
        k: &K,
    ) -> Result<Option<Value<V>>, IndexChangeError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        let (result, vm, name) = {
            let snapshot = self.snapshots.read_snapshot();
            // The low level index do not need to be aware of the transaction when reading records
            // because index changes are stored all at high level of the transaction
            let mut ik = Indexes::get_index_keeper_tx_read::<K, V>(self, &snapshot, tx, &index_id)?;
            (
                ik.get(k)?,
                IndexKeeper::<K, V>::value_mode(&ik),
                IndexKeeper::<K, V>::index_name(&ik).clone(),
            )
        };
        tx.apply_changes::<K, V>(vm, index_id, &name, k, result)
    }

    pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> Result<Option<Value<V>>, IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        let read_snapshot = self.snapshots.read_snapshot();
        self.get_snapshot(index_id, &read_snapshot, k)
    }

    #[cfg(feature = "experimental_inspect")]
    pub fn inspect_tree<K, V, I>(&self, index_id: IndexId, inspector: &mut I) -> Result<(), IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
        I: crate::inspect::TreeInspector<K::Wrapped, V::Wrapped>,
    {
        let read_snapshot = self.snapshots.read_snapshot();
        let mut index = Indexes::get_index_keeper::<K, V>(self, &read_snapshot, &index_id)?;
        crate::inspect::inspect_tree::<K, V, I>(&mut index, inspector)?;
        Ok(())
    }

    pub fn get_snapshot<K, V>(
        &self,
        index_id: IndexId,
        snapshot: &SnapshotRef,
        k: &K,
    ) -> Result<Option<Value<V>>, IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        Ok(Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)?)
    }

    pub fn index_next<K, V>(
        &self,
        index_id: &IndexId,
        read_snapshot: &SnapshotRef,
        next: Bound<&K>,
    ) -> Result<PageIter<K, V>, IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        Ok(Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)?)
    }

    pub fn index_back<K, V>(
        &self,
        index_id: &IndexId,
        read_snapshot: &SnapshotRef,
        next: Bound<&K>,
    ) -> Result<PageIterBack<K, V>, IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
    {
        Ok(Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)?)
    }

    pub(crate) fn map_index_range_bounds<R: RangeBounds<X>, X: IndexType>(
        r: R,
    ) -> (Bound<X::Wrapper>, Bound<X::Wrapper>) {
        (
            match r.start_bound() {
                Bound::Included(x) => Bound::Included(x.clone().wrap()),
                Bound::Excluded(e) => Bound::Excluded(e.clone().wrap()),
                Bound::Unbounded => Bound::Unbounded,
            },
            match r.end_bound() {
                Bound::Included(x) => Bound::Included(x.clone().wrap()),
                Bound::Excluded(e) => Bound::Excluded(e.clone().wrap()),
                Bound::Unbounded => Bound::Unbounded,
            },
        )
    }
    pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> Result<(ValueMode, IndexRawIter<K, V>), IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
        R: RangeBounds<K>,
    {
        let read_snapshot = self.snapshots.read_snapshot();
        self.range_snapshot(index_id, &read_snapshot, range)
    }

    pub fn range_snapshot<K, V, R>(
        &self,
        index_id: IndexId,
        snapshot: &SnapshotRef,
        range: R,
    ) -> Result<(ValueMode, IndexRawIter<K, V>), IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
        R: RangeBounds<K>,
    {
        let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
        let after = ik.iter_from(range.start_bound())?;
        let before = ik.back_iter_from(range.end_bound())?;
        Ok((
            IndexKeeper::<K, V>::value_mode(&ik),
            IndexRawIter::new(index_id, snapshot, after, before, clone_range(&range)),
        ))
    }

    pub fn range_tx<K, V, R>(
        &self,
        tx: &mut TransactionImpl,
        index_id: IndexId,
        range: R,
    ) -> Result<TxIndexRawIter<K, V>, IndexOpsError>
    where
        K: IndexTypeInternal,
        V: IndexTypeInternal,
        R: RangeBounds<K>,
    {
        let (base, value_mode, name) = {
            if tx.index_created_in_tx(&index_id) {
                let (config, _) = Indexes::get_index_tx(self, tx, &index_id)?;
                (None, config.value_mode, config.name)
            } else {
                let snapshot = self.snapshots.read_snapshot();
                // The low level index do not need to be aware of the transaction when reading records
                // because index changes are stored all at high level of the transaction
                let mut ik = Indexes::get_index_keeper_tx_read::<K, V>(self, &snapshot, tx, &index_id)?;
                let after = ik.iter_from(range.start_bound())?;
                let before = ik.back_iter_from(range.end_bound())?;
                let value_mode = IndexKeeper::<K, V>::value_mode(&ik);
                let name = IndexKeeper::<K, V>::index_name(&ik).to_owned();
                let iter = IndexRawIter::new(index_id.clone(), &snapshot, after, before, clone_range(&range));
                (Some(iter), value_mode, name)
            }
        };
        let tx_iter = tx.index_range::<K, V, _>(index_id.clone(), range);
        Ok(TxIndexRawIter::new(index_id, name, tx_iter, base, value_mode))
    }

    pub fn segment_name_tx(&self, tx: &TransactionImpl, id: SegmentId) -> Option<(String, bool)> {
        if tx.segment_created_in_tx(id) {
            tx.segment_name_by_id(id).map(|x| (x, true))
        } else {
            self.address.segment_name_by_id(id).map(|x| (x, false))
        }
    }

    pub fn list_segments(&self) -> Vec<(String, SegmentId)> {
        self.address
            .list()
            .into_iter()
            .filter(|(name, _)| !is_index_name_meta(name) && !is_index_name_data(name))
            .collect()
    }

    pub fn list_segments_snapshot(&self, snapshot_ref: &SnapshotRef) -> Vec<(String, SegmentId)> {
        let list = self.snapshots.list(snapshot_ref);
        list.into_iter()
            .filter(|(name, _)| !is_index_name_meta(name) && !is_index_name_data(name))
            .collect()
    }

    pub fn list_indexes(&self) -> PERes<Vec<(String, IndexInfo)>> {
        let snapshot = self.snapshot();
        let res = self.list_indexes_snapshot(&snapshot);
        drop(snapshot);
        res
    }

    pub fn list_indexes_snapshot(&self, snapshot: &SnapshotRef) -> PERes<Vec<(String, IndexInfo)>> {
        let list = self.snapshots.list(snapshot);
        Ok(list
            .into_iter()
            .filter(|(name, _)| is_index_name_meta(name))
            .filter_map(|(mut name, _id)| -> Option<(String, IndexInfo)> {
                change_segment_meta_name_to_index_name(&mut name);
                self.index_info(snapshot, &name).ok().map(|info| (name, info))
            })
            .collect())
    }

    pub fn list_segments_tx(&self, tx: &TransactionImpl) -> Vec<(String, SegmentId)> {
        tx.filter_list(&self.address.list())
            .filter(|(name, _)| !is_index_name_meta(name) && !is_index_name_data(name))
            .map(|(name, id)| (name.to_string(), id))
            .collect()
    }

    pub fn list_indexes_tx(&self, tx: &TransactionImpl) -> Vec<(String, IndexInfo)> {
        tx.filter_list(&self.address.list())
            .filter(|(name, _)| is_index_name_meta(name))
            .map(|(name, id)| (name.to_string(), id))
            .filter_map(|(mut name, _id)| -> Option<(String, IndexInfo)> {
                change_segment_meta_name_to_index_name(&mut name);
                self.index_info_tx(tx, &name).ok().map(|info| (name, info))
            })
            .collect()
    }

    fn index_info(&self, snapshot: &SnapshotRef, name: &str) -> Result<IndexInfo, IndexError> {
        let id = self.solve_index_id_snapshot(snapshot, name)?;
        let index = Indexes::get_index(self, snapshot, &id)?;
        Ok(IndexInfo {
            id,
            value_mode: index.value_mode,
            key_type: IndexTypeId::from(index.key_type),
            value_type: IndexTypeId::from(index.value_type),
        })
    }
    fn index_info_tx(&self, tx: &TransactionImpl, name: &str) -> Result<IndexInfo, IndexError> {
        let id = self.solve_index_id_tx(tx, name)?.0;
        let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
        Ok(IndexInfo {
            id,
            value_mode: index.value_mode,
            key_type: IndexTypeId::from(index.key_type),
            value_type: IndexTypeId::from(index.value_type),
        })
    }

    pub(crate) fn journal(&self) -> &Journal {
        &self.journal
    }

    pub(crate) fn address(&self) -> &Address {
        &self.address
    }

    pub(crate) fn allocator(&self) -> &Allocator {
        &self.allocator
    }

    pub(crate) fn indexes(&self) -> &Indexes {
        &self.indexes
    }

    pub(crate) fn snapshots(&self) -> &Arc<Snapshots> {
        &self.snapshots
    }

    #[cfg(feature = "background_ops")]
    pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_ref: &SnapshotRef) -> PERes<()> {
        match sync_mode {
            SyncMode::Sync => {
                let allocator = self.allocator();
                allocator.disc_sync()?;
                self.journal().clear_in_queue(&self.snapshots())?;
            }
            SyncMode::BackgroundSync => {
                self.background_ops.add_pending(snapshot_ref.clone())?;
            }
        }
        Ok(())
    }

    #[cfg(not(feature = "background_ops"))]
    pub(crate) fn transaction_sync(&self, _sync_mode: &SyncMode, _snapshot_ref: &SnapshotRef) -> PERes<()> {
        let allocator = self.allocator();
        allocator.disc_sync()?;
        self.journal().clear_in_queue(self.snapshots())?;
        Ok(())
    }

    fn record_over_size_limit(&self, record: &[u8]) -> bool {
        // Leave a kilobyte short for 512MB for metadata and avoid got allocate 1GB page for just
        // few bytes
        record.len() > 1024 * 1024 * 512 - 1024
    }

    #[cfg(test)]
    pub fn free_file_lock(&self) -> PERes<()> {
        self.allocator.free_file_lock()
    }
    #[cfg(feature = "experimental_inspect")]
    pub fn page_state(&self, page: u64) -> Option<crate::inspect::PageState> {
        self.allocator.page_state(page)
    }
}
impl Drop for PersyImpl {
    fn drop(&mut self) {
        #[cfg(feature = "background_ops")]
        self.background_ops.finish();
    }
}

fn clone_range<R: RangeBounds<X>, X: Clone>(r: &R) -> (Bound<X>, Bound<X>) {
    (
        match r.start_bound() {
            Bound::Included(x) => Bound::Included(x.clone()),
            Bound::Excluded(e) => Bound::Excluded(e.clone()),
            Bound::Unbounded => Bound::Unbounded,
        },
        match r.end_bound() {
            Bound::Included(x) => Bound::Included(x.clone()),
            Bound::Excluded(e) => Bound::Excluded(e.clone()),
            Bound::Unbounded => Bound::Unbounded,
        },
    )
}