persy 1.5.0

Transactional Persistence Engine
Documentation
use crate::{
    address::Address,
    error::{IndexError, InvalidPersyId, PERes, SegmentError, PE},
    index::config::{format_segment_name_data, format_segment_name_meta, index_name_from_meta_segment},
    io::{
        InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite, InfallibleWriteFormat,
        InfallibleWriteVarInt, ReadVarInt,
    },
    persy::PersyImpl,
    snapshots::{SnapshotRef, Snapshots},
    transaction::tx_impl::{TransactionImpl, TxSegCheck},
};

use data_encoding::BASE32_DNSSEC;
use std::{fmt, io::Read, str};
use unsigned_varint::{
    decode::{u32 as u32_vdec, u64 as u64_vdec},
    encode::{u32 as u32_venc, u32_buffer, u64 as u64_venc, u64_buffer},
};

fn write_id(f: &mut fmt::Formatter, id: u64) -> fmt::Result {
    let mut buffer = Vec::new();
    buffer.write_all(u64_venc(id, &mut u64_buffer()));
    buffer.push(0b0101_0101);
    write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
}

fn read_id(s: &str) -> Result<u64, InvalidPersyId> {
    let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
    Ok(u64_vdec(&bytes)
        .map_err(|_| InvalidPersyId::InvalidPersyId(String::from(s)))?
        .0)
}

#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug, Default, Copy)]
pub struct RecRef {
    pub page: u64,
    pub pos: u32,
}

impl RecRef {
    #[inline]
    pub(crate) fn new(page: u64, pos: u32) -> RecRef {
        RecRef { page, pos }
    }
    pub(crate) fn write(&self, write: &mut dyn InfallibleWrite) {
        write.write_varint_u64(self.page);
        write.write_varint_u32(self.pos);
    }
    pub(crate) fn read(read: &mut dyn InfallibleRead) -> RecRef {
        let page = read.read_varint_u64();
        let pos = read.read_varint_u32();
        Self::new(page, pos)
    }
}

impl fmt::Display for RecRef {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let mut buffer = Vec::new();
        buffer.write_all(u64_venc(self.page, &mut u64_buffer()));
        buffer.push(0b0101_0101);
        buffer.write_all(u32_venc(self.pos, &mut u32_buffer()));
        write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
    }
}

impl std::str::FromStr for RecRef {
    type Err = PE<InvalidPersyId>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let bytes = BASE32_DNSSEC
            .decode(s.as_bytes())
            .map_err(|e| PE::PE(InvalidPersyId::from(e)))?;
        let (page, rest) = u64_vdec(&bytes).map_err(|_| PE::PE(InvalidPersyId::InvalidPersyId(String::from(s))))?;
        let (pos, _) = u32_vdec(&rest[1..]).map_err(|_| PE::PE(InvalidPersyId::InvalidPersyId(String::from(s))))?;
        Ok(RecRef::new(page, pos))
    }
}

/// Identifier of a persistent record, can be used for read, update or delete the record
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug, Copy)]
pub struct PersyId(pub(crate) RecRef);

impl fmt::Display for PersyId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl std::str::FromStr for PersyId {
    type Err = PE<InvalidPersyId>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(PersyId(s.parse()?))
    }
}

/// Represents a looked-up segment
///
/// # Invalidation
/// When a segment is deleted, the corresponding SegmentId becomes invalid.
///
/// # Serialization
/// This type supports serialization,
/// It does NOT serialize to the segment name.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Default)]
pub struct SegmentId {
    pub(crate) id: u64,
}

impl SegmentId {
    #[inline]
    pub(crate) fn new(id: u64) -> Self {
        SegmentId { id }
    }

    pub(crate) fn write_varint(&self, write: &mut dyn InfallibleWrite) {
        write.write_varint_u64(self.id);
    }

    pub(crate) fn read_varint(read: &mut dyn Read) -> PERes<SegmentId> {
        let id = read.read_varint_u64()?;
        Ok(Self::new(id))
    }

    pub(crate) fn write(&self, write: &mut dyn InfallibleWrite) {
        write.write_u64(self.id);
    }

    pub(crate) fn read(read: &mut dyn InfallibleRead) -> SegmentId {
        let id = read.read_u64();
        Self::new(id)
    }
}

pub trait ToSegmentId {
    fn to_segment_id(self, address: &Address) -> Result<SegmentId, SegmentError>;
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(SegmentId, bool), SegmentError>;
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<SegmentId, SegmentError>;
}

impl ToSegmentId for &SegmentId {
    #[inline]
    fn to_segment_id(self, address: &Address) -> Result<SegmentId, SegmentError> {
        (*self).to_segment_id(address)
    }
    #[inline]
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(SegmentId, bool), SegmentError> {
        (*self).to_segment_id_tx(persy, tx)
    }
    #[inline]
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<SegmentId, SegmentError> {
        (*self).to_segment_id_snapshot(snapshots, snapshot)
    }
}

impl ToSegmentId for SegmentId {
    #[inline]
    fn to_segment_id(self, _address: &Address) -> Result<SegmentId, SegmentError> {
        Ok(self)
    }
    #[inline]
    fn to_segment_id_tx(self, _persy: &PersyImpl, tx: &TransactionImpl) -> Result<(SegmentId, bool), SegmentError> {
        // Is safe to check only if is created in tx, because a segment cannot be created and
        // dropped in the same tx
        Ok((self, tx.segment_created_in_tx(self)))
    }
    #[inline]
    fn to_segment_id_snapshot(
        self,
        _snapshots: &Snapshots,
        _snapshot: &SnapshotRef,
    ) -> Result<SegmentId, SegmentError> {
        Ok(self)
    }
}

impl<T: AsRef<str>> ToSegmentId for T {
    #[inline]
    fn to_segment_id(self, address: &Address) -> Result<SegmentId, SegmentError> {
        address
            .segment_id(self.as_ref())
            .map_or(Err(SegmentError::SegmentNotFound), Ok)
    }
    #[inline]
    fn to_segment_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(SegmentId, bool), SegmentError> {
        persy
            .check_segment_tx(tx, self.as_ref())
            .map(|(cr_in_tx, id)| (id, cr_in_tx))
    }
    fn to_segment_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<SegmentId, SegmentError> {
        snapshots
            .solve_segment_id(snapshot, self.as_ref())
            .ok_or(SegmentError::SegmentNotFound)
    }
}

impl fmt::Display for SegmentId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write_id(f, self.id)
    }
}

impl std::str::FromStr for SegmentId {
    type Err = PE<InvalidPersyId>;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        Ok(SegmentId::new(read_id(s)?))
    }
}

/// Unique identifier of an index
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct IndexId {
    meta: SegmentId,
    data: SegmentId,
}
impl IndexId {
    pub(crate) fn get_meta_id(&self) -> SegmentId {
        self.meta
    }

    pub(crate) fn get_data_id(&self) -> SegmentId {
        self.data
    }
}

pub trait ToIndexId {
    fn to_index_id(self, address: &Address) -> Result<IndexId, IndexError>;
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(IndexId, bool), IndexError>;
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<IndexId, IndexError>;
}

impl ToIndexId for &IndexId {
    #[inline]
    fn to_index_id(self, address: &Address) -> Result<IndexId, IndexError> {
        self.clone().to_index_id(address)
    }
    #[inline]
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(IndexId, bool), IndexError> {
        self.clone().to_index_id_tx(persy, tx)
    }
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<IndexId, IndexError> {
        self.clone().to_index_id_snapshot(snapshots, snapshot)
    }
}

impl ToIndexId for IndexId {
    #[inline]
    fn to_index_id(self, address: &Address) -> Result<IndexId, IndexError> {
        if self.data.id == 0 {
            // backward compatible serialization does not have data, find it by name convention
            let meta_name = address.segment_name_by_id(self.meta).ok_or(IndexError::IndexNotFound)?;
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
            let data = address.segment_id(&data_name).ok_or(IndexError::IndexNotFound)?;
            Ok(IndexId::new(self.meta, data))
        } else {
            Ok(self)
        }
    }

    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<IndexId, IndexError> {
        if self.data.id == 0 {
            // backward compatible serialization does not have data, find it by name convention
            let meta_name = snapshots
                .solve_segment_name(snapshot, self.meta)
                .ok_or(IndexError::IndexNotFound)?;
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
            let data = snapshots
                .solve_segment_id(snapshot, &data_name)
                .ok_or(IndexError::IndexNotFound)?;
            Ok(IndexId::new(self.meta, data))
        } else {
            Ok(self)
        }
    }

    #[inline]
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(IndexId, bool), IndexError> {
        if self.data.id == 0 {
            // backward compatible serialization does not have data, find it by name convention
            let (meta_name, in_tx) = persy.segment_name_tx(tx, self.meta).ok_or(IndexError::IndexNotFound)?;
            let data_name = format_segment_name_data(&index_name_from_meta_segment(&meta_name));
            let data = if in_tx {
                if let TxSegCheck::Created(id) = tx.exists_segment(&data_name) {
                    id
                } else {
                    return Err(IndexError::IndexNotFound);
                }
            } else {
                persy
                    .address()
                    .segment_id(&data_name)
                    .ok_or(IndexError::IndexNotFound)?
            };
            Ok((IndexId::new(self.meta, data), in_tx))
        } else {
            // Is safe to check only if is created in tx, because a segment cannot be created and
            // dropped in the same tx

            let id = self.meta;
            Ok((self, tx.segment_created_in_tx(id)))
        }
    }
}

impl<T: AsRef<str>> ToIndexId for T {
    #[inline]
    fn to_index_id(self, address: &Address) -> Result<IndexId, IndexError> {
        let meta_name = format_segment_name_meta(self.as_ref());
        let data_name = format_segment_name_data(self.as_ref());
        let meta = address.segment_id(&meta_name).ok_or(IndexError::IndexNotFound)?;
        let data = address.segment_id(&data_name).ok_or(IndexError::IndexNotFound)?;
        Ok(IndexId::new(meta, data))
    }
    fn to_index_id_snapshot(self, snapshots: &Snapshots, snapshot: &SnapshotRef) -> Result<IndexId, IndexError> {
        let meta_name = format_segment_name_meta(self.as_ref());
        let data_name = format_segment_name_data(self.as_ref());
        let meta = snapshots
            .solve_segment_id(snapshot, &meta_name)
            .ok_or(IndexError::IndexNotFound)?;
        let data = snapshots
            .solve_segment_id(snapshot, &data_name)
            .ok_or(IndexError::IndexNotFound)?;
        Ok(IndexId::new(meta, data))
    }
    #[inline]
    fn to_index_id_tx(self, persy: &PersyImpl, tx: &TransactionImpl) -> Result<(IndexId, bool), IndexError> {
        let meta_name = format_segment_name_meta(self.as_ref());
        let data_name = format_segment_name_data(self.as_ref());
        let (_, meta) = persy.check_segment_tx(tx, &meta_name)?;
        let (in_tx, data) = persy.check_segment_tx(tx, &data_name)?;
        Ok((IndexId::new(meta, data), in_tx))
    }
}
impl IndexId {
    pub(crate) fn new(meta: SegmentId, data: SegmentId) -> IndexId {
        IndexId { meta, data }
    }
}

impl fmt::Display for IndexId {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let mut buffer = Vec::new();
        buffer.write_all(u64_venc(self.meta.id, &mut u64_buffer()));
        buffer.write_all(u64_venc(self.data.id, &mut u64_buffer()));
        buffer.push(0b0101_0101);
        write!(f, "{}", BASE32_DNSSEC.encode(&buffer))
    }
}

impl std::str::FromStr for IndexId {
    type Err = InvalidPersyId;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let bytes = BASE32_DNSSEC.decode(s.as_bytes())?;
        let (meta, bytes) = u64_vdec(&bytes).map_err(|_| InvalidPersyId::InvalidPersyId(String::from(s)))?;
        let data = if bytes.len() > 1 {
            let (d, _) = u64_vdec(bytes).map_err(|_| InvalidPersyId::InvalidPersyId(String::from(s)))?;
            d
        } else {
            0
        };
        Ok(IndexId::new(SegmentId::new(meta), SegmentId::new(data)))
    }
}

#[cfg(test)]
mod tests {
    use super::{IndexId, PersyId, RecRef, SegmentId};

    #[test]
    fn test_persy_id_string() {
        let id = PersyId(RecRef::new(20, 30));
        let s = format!("{}", id);
        assert_eq!(s.parse::<PersyId>().ok(), Some(id));
    }

    #[test]
    fn test_persy_id_fixed_parse() {
        let id = RecRef::new(20, 30);
        assert_eq!("2hahs".parse::<RecRef>().ok(), Some(id));
    }

    #[test]
    fn test_persy_id_parse_failure() {
        let s = "ACCC";
        assert!(s.parse::<RecRef>().is_err());
    }

    #[test]
    fn test_segment_id_string() {
        let id = SegmentId::new(20);
        let s = format!("{}", id);
        assert_eq!(s.parse::<SegmentId>().ok(), Some(id));
    }

    #[test]
    fn test_segment_id_parse_failure() {
        let s = "ACCC";
        assert!(s.parse::<SegmentId>().is_err());
    }

    #[test]
    fn test_index_id_string() {
        let id = IndexId::new(SegmentId::new(20), SegmentId::new(30));
        let s = format!("{}", id);
        assert_eq!(s.parse::<IndexId>().ok(), Some(id));
    }

    #[test]
    fn test_index_id_parse_failure() {
        let s = "ACCC";
        assert!(s.parse::<IndexId>().is_err());
    }
}