pub use std::fs::OpenOptions;
use std::{
fs::File,
mem::replace,
ops::{Bound, RangeBounds},
path::Path,
str,
sync::Arc,
};
use crate::{
address::record_scanner::{SegmentRawIter, SegmentSnapshotRawIter, TxSegmentRawIter},
address::Address,
allocator::Allocator,
config::{Config, TransactionConfig},
device::{Device, FileDevice, MemoryDevice, PageOps},
error::{
BeginTransactionError, CreateError, CreateIndexError, CreateSegmentError, DeleteError, DropIndexError,
DropSegmentError, IndexChangeError, IndexError, IndexOpsError, IndexPutError, InsertError, OpenError, PERes,
SegmentError, UpdateError,
},
id::{IndexId, PersyId, RecRef, SegmentId, ToIndexId, ToSegmentId},
index::{
config::{
change_segment_meta_name_to_index_name, format_segment_name_meta, is_index_name_data, is_index_name_meta,
IndexType, IndexTypeId, IndexTypeInternal, Indexes, ValueMode,
},
keeper::IndexKeeper,
raw_iter::IndexRawIter,
raw_iter_tx::TxIndexRawIter,
tree::{
nodes::{PageIter, PageIterBack, Value},
Index,
},
},
io::{
ArcSliceRead, InfallibleRead, InfallibleReadFormat, InfallibleReadVarInt, InfallibleWrite,
InfallibleWriteFormat, InfallibleWriteVarInt,
},
journal::{recover_impl::RecoverImpl, Journal},
snapshot::data::{EntryCase, SegmentSnapshot},
snapshots::{SnapshotRef, Snapshots},
transaction::tx_impl::{PreparedState, SyncMode, TransactionImpl, TxRead, TxSegCheck},
PrepareError, ReadError,
};
#[cfg(feature = "background_ops")]
use crate::background::BackgroundOps;
pub(crate) const DEFAULT_PAGE_EXP: u8 = 10; const PERSY_TAG_BYTES: &[u8; 4] = b"prsy"; pub struct PersyImpl {
config: Arc<Config>,
journal: Arc<Journal>,
address: Arc<Address>,
indexes: Indexes,
allocator: Arc<Allocator>,
snapshots: Arc<Snapshots>,
#[cfg(feature = "background_ops")]
background_ops: BackgroundOps<SnapshotRef>,
}
pub struct TxFinalize {
transaction: TransactionImpl,
prepared: PreparedState,
}
impl TxFinalize {
#[cfg(test)]
pub(crate) fn leak(&mut self) {
self.prepared.leak();
}
}
#[derive(PartialEq, Eq, Debug)]
pub enum CommitStatus {
Rollback,
Commit,
}
#[derive(Clone)]
pub struct IndexInfo {
pub id: IndexId,
pub value_mode: ValueMode,
pub key_type: IndexTypeId,
pub value_type: IndexTypeId,
}
#[cfg(feature = "background_ops")]
fn create_background_ops(
journal: Arc<Journal>,
allocator: Arc<Allocator>,
snapshots: Arc<Snapshots>,
) -> PERes<BackgroundOps<SnapshotRef>> {
let all_sync = allocator.clone();
let after_sync = allocator.clone();
BackgroundOps::new(
move || {
let empty = all_sync.disc_sync()?;
let next = !empty;
Ok(next)
},
move |all: Vec<SnapshotRef>| {
all.into_iter().for_each(|x| drop(x));
journal.clear_in_queue(&snapshots)?;
Ok(after_sync.need_sync())
},
)
}
pub trait RecordReader<T> {
fn read(self, rec: ArcSliceRead) -> T;
}
impl<T, F: FnOnce(ArcSliceRead) -> T> RecordReader<T> for F {
fn read(self, rec: ArcSliceRead) -> T {
(self)(rec)
}
}
impl PersyImpl {
pub fn create(path: &Path) -> Result<(), CreateError> {
let f = OpenOptions::new().write(true).read(true).create_new(true).open(path)?;
PersyImpl::create_from_file(f)
}
pub fn create_from_file(f: File) -> Result<(), CreateError> {
PersyImpl::init(Box::new(FileDevice::new(f)?))?;
Ok(())
}
fn init(device: Box<dyn Device>) -> PERes<Box<dyn Device>> {
let root_page = device.create_page_raw(DEFAULT_PAGE_EXP)?;
let (allocator_page, allocator) = Allocator::init(device, &Config::new())?;
let address_page = Address::init(&allocator)?;
let journal_page = Journal::init(&allocator)?;
{
let mut root = allocator.disc().load_page_raw(root_page, DEFAULT_PAGE_EXP)?;
root.write_u16(0);
root.write_u64(address_page);
root.write_u64(journal_page);
root.write_u64(allocator_page);
root.write_all(PERSY_TAG_BYTES);
allocator.flush_page(root)?;
}
let res = allocator.disc_sync()?;
debug_assert!(res);
Ok(allocator.release())
}
fn new(disc: Box<dyn Device>, config: Config) -> Result<PersyImpl, OpenError> {
let address_page;
let journal_page;
let allocator_page;
{
let mut pg = disc.load_page_raw(0, DEFAULT_PAGE_EXP)?;
pg.read_u16(); address_page = pg.read_u64();
journal_page = pg.read_u64();
allocator_page = pg.read_u64();
let mut buff = [0u8; 4];
pg.read_exact(&mut buff);
if buff != *PERSY_TAG_BYTES && buff != [0u8; 4] {
return Err(OpenError::NotPersyFile);
}
}
let config = Arc::new(config);
let allocator = Arc::new(Allocator::new(disc, &config, allocator_page)?);
let address = Arc::new(Address::new(&allocator, &config, address_page)?);
let journal = Arc::new(Journal::new(&allocator, journal_page)?);
let indexes = Indexes::new(&config);
let snapshots = Arc::new(Snapshots::new(&allocator, &journal, &address));
#[cfg(feature = "background_ops")]
let background_ops = create_background_ops(journal.clone(), allocator.clone(), snapshots.clone())?;
Ok(PersyImpl {
config,
journal,
address,
indexes,
allocator,
snapshots,
#[cfg(feature = "background_ops")]
background_ops,
})
}
fn recover(&self) -> PERes<RecoverImpl> {
let mut recover = RecoverImpl::default();
self.journal.recover(&mut recover)?;
recover.finish_journal_read();
Ok(recover)
}
pub fn final_recover(&self, recover: RecoverImpl) -> PERes<()> {
recover.final_recover(self)
}
pub fn truncate_and_open(f: File, config: Config) -> Result<(PersyImpl, RecoverImpl), OpenError> {
let device = Box::new(FileDevice::new_truncate(f)?);
let device = PersyImpl::init(device)?;
let persy = PersyImpl::new(device, config)?;
let rec = persy.recover()?;
Ok((persy, rec))
}
pub fn create_and_open(f: File, config: Config) -> Result<(PersyImpl, RecoverImpl), OpenError> {
let device = Box::new(FileDevice::new(f)?);
let device = PersyImpl::init(device)?;
let persy = PersyImpl::new(device, config)?;
let rec = persy.recover()?;
Ok((persy, rec))
}
pub fn open_recover(f: File, config: Config) -> Result<(PersyImpl, RecoverImpl), OpenError> {
let persy = PersyImpl::new(Box::new(FileDevice::new(f)?), config)?;
let rec = persy.recover()?;
Ok((persy, rec))
}
pub fn memory(config: Config) -> Result<PersyImpl, OpenError> {
let device = PersyImpl::init(Box::new(MemoryDevice::new(None)?))?;
PersyImpl::new(device, config)
}
pub fn begin_with(&self, mut config: TransactionConfig) -> Result<TransactionImpl, BeginTransactionError> {
let journal = &self.journal;
let strategy = if let Some(st) = config.tx_strategy {
st
} else {
self.config.tx_strategy().clone()
};
let meta_id = if let Some(id) = replace(&mut config.transaction_id, None) {
if id.len() > 512 {
return Err(BeginTransactionError::InvalidTransactionId);
}
id
} else {
Vec::new()
};
let sync_mode = if Some(true) == config.background_sync {
SyncMode::BackgroundSync
} else {
SyncMode::Sync
};
let timeout = self.config.transaction_lock_timeout();
Ok(TransactionImpl::new(journal, &strategy, sync_mode, meta_id, *timeout)?)
}
pub fn create_segment(&self, tx: &mut TransactionImpl, segment: &str) -> Result<SegmentId, CreateSegmentError> {
match tx.exists_segment(segment) {
TxSegCheck::Dropped => {}
TxSegCheck::Created(_) => {
return Err(CreateSegmentError::SegmentAlreadyExists);
}
TxSegCheck::None => {
if self.address.exists_segment(segment) {
return Err(CreateSegmentError::SegmentAlreadyExists);
}
}
}
let seg = self.address.create_temp_segment(segment)?;
let id = seg.get_segment_id();
tx.add_create_segment(&self.journal, seg)?;
Ok(id)
}
pub fn drop_segment(&self, tx: &mut TransactionImpl, segment: &str) -> Result<(), DropSegmentError> {
let (_, segment_id) = self.check_segment_tx(tx, segment)?;
tx.add_drop_segment(&self.journal, segment, segment_id)?;
Ok(())
}
pub fn exists_segment(&self, segment: &str) -> bool {
self.address.exists_segment(segment)
}
pub fn exists_segment_by_id(&self, segment: &SegmentId) -> bool {
self.address.exists_segment_by_id(segment)
}
pub fn exists_segment_tx(&self, tx: &TransactionImpl, segment: &str) -> bool {
match tx.exists_segment(segment) {
TxSegCheck::Dropped => false,
TxSegCheck::Created(_) => true,
TxSegCheck::None => self.address.exists_segment(segment),
}
}
pub fn exists_index(&self, index: &str) -> bool {
self.exists_segment(&format_segment_name_meta(index))
}
pub fn exists_index_tx(&self, tx: &TransactionImpl, index: &str) -> bool {
self.exists_segment_tx(tx, &format_segment_name_meta(index))
}
pub fn solve_segment_id(&self, segment: impl ToSegmentId) -> Result<SegmentId, SegmentError> {
segment.to_segment_id(&self.address)
}
pub fn solve_segment_id_tx(
&self,
tx: &TransactionImpl,
segment: impl ToSegmentId,
) -> Result<SegmentId, SegmentError> {
let (sid, _) = segment.to_segment_id_tx(self, tx)?;
Ok(sid)
}
pub fn solve_segment_id_snapshot(
&self,
snapshot: &SnapshotRef,
segment: impl ToSegmentId,
) -> Result<SegmentId, SegmentError> {
segment.to_segment_id_snapshot(&self.snapshots, snapshot)
}
pub fn solve_index_id(&self, index: impl ToIndexId) -> Result<IndexId, IndexError> {
index.to_index_id(&self.address)
}
pub fn solve_index_id_tx(
&self,
tx: &TransactionImpl,
index: impl ToIndexId,
) -> Result<(IndexId, bool), IndexError> {
index.to_index_id_tx(self, tx)
}
pub fn solve_index_id_snapshot(
&self,
snapshot: &SnapshotRef,
index: impl ToIndexId,
) -> Result<IndexId, IndexError> {
index.to_index_id_snapshot(&self.snapshots, snapshot)
}
pub fn check_segment_tx(&self, tx: &TransactionImpl, segment: &str) -> Result<(bool, SegmentId), SegmentError> {
match tx.exists_segment(segment) {
TxSegCheck::Dropped => Err(SegmentError::SegmentNotFound),
TxSegCheck::Created(segment_id) => Ok((true, segment_id)),
TxSegCheck::None => self
.address
.segment_id(segment)
.map_or(Err(SegmentError::SegmentNotFound), |id| Ok((false, id))),
}
}
pub fn write_record_metadata(len: u64, id: &RecRef) -> Vec<u8> {
let mut val = Vec::new();
val.write_u8(0);
val.write_varint_u64(len);
id.write(&mut val);
val
}
pub fn read_record_metadata(meta: &mut dyn InfallibleRead) -> (u64, RecRef) {
let _metadata_version = meta.read_u8();
let len = meta.read_varint_u64();
let id = RecRef::read(meta);
(len, id)
}
pub fn insert_record(
&self,
tx: &mut TransactionImpl,
segment: impl ToSegmentId,
rec: &[u8],
) -> Result<RecRef, InsertError> {
if self.record_over_size_limit(rec) {
return Err(InsertError::RecordToBig);
}
let (segment_id, in_tx) = segment.to_segment_id_tx(self, tx)?;
let len = rec.len() as u64;
let allocator = &self.allocator;
let address = &self.address;
let (rec_ref, maybe_new_page) = if in_tx {
address.allocate_temp_seg(tx.get_segment_mut(segment_id))?
} else {
address.allocate(segment_id)?
};
let metadata = PersyImpl::write_record_metadata(len, &rec_ref);
let allocation_exp = allocator.exp_from_content_size(len + metadata.len() as u64);
let mut pg = allocator.allocate(allocation_exp)?;
let page = pg.get_index();
tx.add_insert(&self.journal, segment_id, &rec_ref, page)?;
if let Some(new_page) = maybe_new_page {
tx.add_new_segment_page(&self.journal, segment_id, new_page.new_page, new_page.previous_page)?;
}
pg.write_all(&metadata);
pg.write_all(rec);
allocator.flush_page(pg)?;
Ok(rec_ref)
}
fn read_snapshot(&self) -> SnapshotRef {
self.snapshots.current_snapshot()
}
pub fn snapshot(&self) -> SnapshotRef {
let snapshot_ref = self.snapshots.read_snapshot();
let segs = self
.address
.snapshot_list()
.into_iter()
.map(|(name, id, first_page)| SegmentSnapshot::new(&name, id, first_page))
.collect::<Vec<_>>();
self.snapshots.fill_segments(&snapshot_ref, &segs);
snapshot_ref
}
fn read_ref_segment(
&self,
tx: &TransactionImpl,
segment_id: SegmentId,
rec_ref: &RecRef,
) -> Result<Option<(u64, u16, SegmentId)>, ReadError> {
Ok(match tx.read(rec_ref) {
TxRead::Record(rec) => Some((rec.0, rec.1, segment_id)),
TxRead::Deleted => None,
TxRead::None => self
.address
.read(rec_ref, segment_id)?
.map(|(pos, version)| (pos, version, segment_id)),
})
}
fn read_page(&self, match_id: &RecRef, page: u64) -> PERes<Option<Vec<u8>>> {
self.read_page_fn(match_id, page, |x: ArcSliceRead| x.to_vec())
}
fn read_page_fn<T, F: RecordReader<T>>(&self, match_id: &RecRef, page: u64, f: F) -> PERes<Option<T>> {
if let Some(mut pg) = self.allocator.load_page_not_free(page)? {
let (len, id) = PersyImpl::read_record_metadata(&mut pg);
if id == *match_id {
Ok(Some(f.read(pg.arc_slice(len as usize))))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub fn read_tx_internal_fn<T>(
&self,
tx: &TransactionImpl,
segment_id: SegmentId,
id: &RecRef,
f: fn(ArcSliceRead) -> T,
) -> Result<Option<(T, u16)>, ReadError> {
let mut retry_count = 0;
loop {
if let Some((page, version, _)) = self.read_ref_segment(tx, segment_id, id)? {
if let Some(record) = self.read_page_fn(id, page, f)? {
break Ok(Some((record, version)));
}
debug_assert!(
retry_count < 100,
"more than 100 retry probably means an issue id:{} page:{}",
id,
page
);
retry_count += 1;
} else {
break Ok(None);
}
}
}
pub fn read_tx_internal(
&self,
tx: &TransactionImpl,
segment_id: SegmentId,
id: &RecRef,
) -> Result<Option<(Vec<u8>, u16)>, ReadError> {
self.read_tx_internal_fn(tx, segment_id, id, |x| x.to_vec())
}
pub fn read_tx(
&self,
tx: &mut TransactionImpl,
segment: SegmentId,
id: &RecRef,
) -> Result<Option<Vec<u8>>, ReadError> {
if let Some((rec, version)) = self.read_tx_internal(tx, segment, id)? {
tx.add_read(&self.journal, segment, id, version)?;
Ok(Some(rec))
} else {
Ok(None)
}
}
pub fn read(&self, segment: SegmentId, rec_ref: &RecRef) -> Result<Option<Vec<u8>>, ReadError> {
let mut retry_count = 0;
loop {
if let Some((page, _)) = self.address.read(rec_ref, segment)? {
if let Some(record) = self.read_page(rec_ref, page)? {
break Ok(Some(record));
}
debug_assert!(
retry_count < 100,
"more than 100 retry probably means an issue id:{} page:{}",
rec_ref,
page
);
retry_count += 1;
} else {
break Ok(None);
}
}
}
pub fn read_snap(
&self,
segment: SegmentId,
rec_ref: &RecRef,
snapshot: &SnapshotRef,
) -> Result<Option<Vec<u8>>, ReadError> {
self.read_snap_fn(segment, rec_ref, snapshot, |x: ArcSliceRead| x.to_vec())
}
pub fn read_snap_fn<T, F: RecordReader<T>>(
&self,
segment: SegmentId,
rec_ref: &RecRef,
snapshot: &SnapshotRef,
f: F,
) -> Result<Option<T>, ReadError> {
let segment_id = segment;
if let Some(rec_vers) = self.snapshots.read(snapshot, rec_ref) {
match rec_vers.case() {
EntryCase::Change(change) => {
if let Some(record) = self.read_page_fn(rec_ref, change.pos, f)? {
Ok(Some(record))
} else {
panic!("if page do not match the content the snapshot is broken");
}
}
EntryCase::Insert => Ok(None),
}
} else if let Some((page, _)) = self.address.read(rec_ref, segment_id)? {
if let Some(record) = self.read_page_fn(rec_ref, page, f)? {
Ok(Some(record))
} else {
panic!("if page do not match the content the snapshot is broken");
}
} else {
Ok(None)
}
}
pub fn scan(&self, segment: SegmentId) -> Result<SegmentRawIter, SegmentError> {
let iter = self.address.scan(segment)?;
Ok(SegmentRawIter::new(segment, iter))
}
pub fn scan_snapshot_index(
&self,
segment_id: SegmentId,
snapshot: &SnapshotRef,
) -> Result<SegmentSnapshotRawIter, SegmentError> {
let res = if let Some(r) = self.snapshots.scan(snapshot, segment_id) {
r
} else {
self.address.scan(segment_id)?
};
Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
}
pub fn scan_snapshot(&self, segment_id: SegmentId, snapshot: &SnapshotRef) -> PERes<SegmentSnapshotRawIter> {
let res = self.snapshots.scan(snapshot, segment_id).unwrap();
Ok(SegmentSnapshotRawIter::new(segment_id, res, snapshot))
}
pub fn scan_tx<'a>(
&'a self,
tx: &'a TransactionImpl,
segment_id: SegmentId,
) -> Result<TxSegmentRawIter, SegmentError> {
let read_snapshot = self.read_snapshot();
let iter = if tx.segment_created_in_tx(segment_id) {
self.address.scan_segment(tx.get_segment(segment_id))?
} else {
self.address.scan(segment_id)?
};
Ok(TxSegmentRawIter::new(tx, segment_id, iter, read_snapshot))
}
pub fn update(
&self,
tx: &mut TransactionImpl,
segment: SegmentId,
rec_ref: &RecRef,
rec: &[u8],
) -> Result<(), UpdateError> {
if self.record_over_size_limit(rec) {
return Err(UpdateError::RecordToBig);
}
if let Some((_, version, segment)) = self.read_ref_segment(tx, segment, rec_ref)? {
let allocator = &self.allocator;
let journal = &self.journal;
let len = rec.len();
let metadata = PersyImpl::write_record_metadata(len as u64, rec_ref);
let allocation_exp = allocator.exp_from_content_size((len + metadata.len()) as u64);
let mut pg = allocator.allocate(allocation_exp)?;
let page = pg.get_index();
tx.add_update(journal, segment, rec_ref, page, version)?;
pg.write_all(&metadata);
pg.write_all(rec);
Ok(allocator.flush_page(pg)?)
} else {
Err(UpdateError::RecordNotFound(PersyId(*rec_ref)))
}
}
pub fn delete(&self, tx: &mut TransactionImpl, segment: SegmentId, rec_ref: &RecRef) -> Result<(), DeleteError> {
if let Some((_, version, seg)) = self.read_ref_segment(tx, segment, rec_ref)? {
Ok(tx.add_delete(&self.journal, seg, rec_ref, version)?)
} else {
Err(DeleteError::RecordNotFound(PersyId(*rec_ref)))
}
}
pub fn rollback(&self, mut tx: TransactionImpl) -> PERes<()> {
tx.rollback(self)
}
pub fn prepare(&self, tx: TransactionImpl) -> Result<TxFinalize, PrepareError> {
let (tx, prepared) = tx.prepare(self)?;
Ok(TxFinalize {
transaction: tx,
prepared,
})
}
pub fn rollback_prepared(&self, finalizer: &mut TxFinalize) -> PERes<()> {
let prepared = finalizer.prepared.clone();
let tx = &mut finalizer.transaction;
tx.rollback_prepared(self, prepared)
}
pub fn commit(&self, finalizer: &mut TxFinalize) -> PERes<()> {
let prepared = finalizer.prepared.clone();
let tx = &mut finalizer.transaction;
tx.commit(self, prepared)
}
pub fn create_index<K, V>(
&self,
tx: &mut TransactionImpl,
index_name: &str,
value_mode: ValueMode,
) -> Result<(), CreateIndexError>
where
K: IndexType,
V: IndexType,
{
Indexes::create_index::<K::Wrapper, V::Wrapper>(self, tx, index_name, 32, 128, value_mode)
}
pub fn drop_index(&self, tx: &mut TransactionImpl, index_name: &str) -> Result<(), DropIndexError> {
Indexes::drop_index(self, tx, index_name)
}
pub fn put<K, V>(&self, tx: &mut TransactionImpl, index_id: IndexId, k: K, v: V) -> Result<(), IndexPutError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if k.over_size_limit() || v.over_size_limit() {
return Err(IndexPutError::KeyOrValueTooBig);
}
Indexes::check_index::<K, V>(self, tx, &index_id)?;
tx.add_put(index_id, k, v);
Ok(())
}
pub fn remove<K, V>(
&self,
tx: &mut TransactionImpl,
index_id: IndexId,
k: K,
v: Option<V>,
) -> Result<(), IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
Indexes::check_index::<K, V>(self, tx, &index_id)?;
tx.add_remove(index_id, k, v);
Ok(())
}
pub fn get_tx<K, V>(
&self,
tx: &mut TransactionImpl,
index_id: IndexId,
k: &K,
) -> Result<Option<Value<V>>, IndexChangeError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
let (result, vm, name) = {
let snapshot = self.snapshots.read_snapshot();
let mut ik = Indexes::get_index_keeper_tx_read::<K, V>(self, &snapshot, tx, &index_id)?;
(
ik.get(k)?,
IndexKeeper::<K, V>::value_mode(&ik),
IndexKeeper::<K, V>::index_name(&ik).clone(),
)
};
tx.apply_changes::<K, V>(vm, index_id, &name, k, result)
}
pub fn get<K, V>(&self, index_id: IndexId, k: &K) -> Result<Option<Value<V>>, IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
let read_snapshot = self.snapshots.read_snapshot();
self.get_snapshot(index_id, &read_snapshot, k)
}
#[cfg(feature = "experimental_inspect")]
pub fn inspect_tree<K, V, I>(&self, index_id: IndexId, inspector: &mut I) -> Result<(), IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
I: crate::inspect::TreeInspector<K::Wrapped, V::Wrapped>,
{
let read_snapshot = self.snapshots.read_snapshot();
let mut index = Indexes::get_index_keeper::<K, V>(self, &read_snapshot, &index_id)?;
crate::inspect::inspect_tree::<K, V, I>(&mut index, inspector)?;
Ok(())
}
pub fn get_snapshot<K, V>(
&self,
index_id: IndexId,
snapshot: &SnapshotRef,
k: &K,
) -> Result<Option<Value<V>>, IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
Ok(Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?.get(k)?)
}
pub fn index_next<K, V>(
&self,
index_id: &IndexId,
read_snapshot: &SnapshotRef,
next: Bound<&K>,
) -> Result<PageIter<K, V>, IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
Ok(Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.iter_from(next)?)
}
pub fn index_back<K, V>(
&self,
index_id: &IndexId,
read_snapshot: &SnapshotRef,
next: Bound<&K>,
) -> Result<PageIterBack<K, V>, IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
Ok(Indexes::get_index_keeper::<K, V>(self, read_snapshot, index_id)?.back_iter_from(next)?)
}
pub(crate) fn map_index_range_bounds<R: RangeBounds<X>, X: IndexType>(
r: R,
) -> (Bound<X::Wrapper>, Bound<X::Wrapper>) {
(
match r.start_bound() {
Bound::Included(x) => Bound::Included(x.clone().wrap()),
Bound::Excluded(e) => Bound::Excluded(e.clone().wrap()),
Bound::Unbounded => Bound::Unbounded,
},
match r.end_bound() {
Bound::Included(x) => Bound::Included(x.clone().wrap()),
Bound::Excluded(e) => Bound::Excluded(e.clone().wrap()),
Bound::Unbounded => Bound::Unbounded,
},
)
}
pub fn range<K, V, R>(&self, index_id: IndexId, range: R) -> Result<(ValueMode, IndexRawIter<K, V>), IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
let read_snapshot = self.snapshots.read_snapshot();
self.range_snapshot(index_id, &read_snapshot, range)
}
pub fn range_snapshot<K, V, R>(
&self,
index_id: IndexId,
snapshot: &SnapshotRef,
range: R,
) -> Result<(ValueMode, IndexRawIter<K, V>), IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
let mut ik = Indexes::get_index_keeper::<K, V>(self, snapshot, &index_id)?;
let after = ik.iter_from(range.start_bound())?;
let before = ik.back_iter_from(range.end_bound())?;
Ok((
IndexKeeper::<K, V>::value_mode(&ik),
IndexRawIter::new(index_id, snapshot, after, before, clone_range(&range)),
))
}
pub fn range_tx<K, V, R>(
&self,
tx: &mut TransactionImpl,
index_id: IndexId,
range: R,
) -> Result<TxIndexRawIter<K, V>, IndexOpsError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
let (base, value_mode, name) = {
if tx.index_created_in_tx(&index_id) {
let (config, _) = Indexes::get_index_tx(self, tx, &index_id)?;
(None, config.value_mode, config.name)
} else {
let snapshot = self.snapshots.read_snapshot();
let mut ik = Indexes::get_index_keeper_tx_read::<K, V>(self, &snapshot, tx, &index_id)?;
let after = ik.iter_from(range.start_bound())?;
let before = ik.back_iter_from(range.end_bound())?;
let value_mode = IndexKeeper::<K, V>::value_mode(&ik);
let name = IndexKeeper::<K, V>::index_name(&ik).to_owned();
let iter = IndexRawIter::new(index_id.clone(), &snapshot, after, before, clone_range(&range));
(Some(iter), value_mode, name)
}
};
let tx_iter = tx.index_range::<K, V, _>(index_id.clone(), range);
Ok(TxIndexRawIter::new(index_id, name, tx_iter, base, value_mode))
}
pub fn segment_name_tx(&self, tx: &TransactionImpl, id: SegmentId) -> Option<(String, bool)> {
if tx.segment_created_in_tx(id) {
tx.segment_name_by_id(id).map(|x| (x, true))
} else {
self.address.segment_name_by_id(id).map(|x| (x, false))
}
}
pub fn list_segments(&self) -> Vec<(String, SegmentId)> {
self.address
.list()
.into_iter()
.filter(|(name, _)| !is_index_name_meta(name) && !is_index_name_data(name))
.collect()
}
pub fn list_segments_snapshot(&self, snapshot_ref: &SnapshotRef) -> Vec<(String, SegmentId)> {
let list = self.snapshots.list(snapshot_ref);
list.into_iter()
.filter(|(name, _)| !is_index_name_meta(name) && !is_index_name_data(name))
.collect()
}
pub fn list_indexes(&self) -> PERes<Vec<(String, IndexInfo)>> {
let snapshot = self.snapshot();
let res = self.list_indexes_snapshot(&snapshot);
drop(snapshot);
res
}
pub fn list_indexes_snapshot(&self, snapshot: &SnapshotRef) -> PERes<Vec<(String, IndexInfo)>> {
let list = self.snapshots.list(snapshot);
Ok(list
.into_iter()
.filter(|(name, _)| is_index_name_meta(name))
.filter_map(|(mut name, _id)| -> Option<(String, IndexInfo)> {
change_segment_meta_name_to_index_name(&mut name);
self.index_info(snapshot, &name).ok().map(|info| (name, info))
})
.collect())
}
pub fn list_segments_tx(&self, tx: &TransactionImpl) -> Vec<(String, SegmentId)> {
tx.filter_list(&self.address.list())
.filter(|(name, _)| !is_index_name_meta(name) && !is_index_name_data(name))
.map(|(name, id)| (name.to_string(), id))
.collect()
}
pub fn list_indexes_tx(&self, tx: &TransactionImpl) -> Vec<(String, IndexInfo)> {
tx.filter_list(&self.address.list())
.filter(|(name, _)| is_index_name_meta(name))
.map(|(name, id)| (name.to_string(), id))
.filter_map(|(mut name, _id)| -> Option<(String, IndexInfo)> {
change_segment_meta_name_to_index_name(&mut name);
self.index_info_tx(tx, &name).ok().map(|info| (name, info))
})
.collect()
}
fn index_info(&self, snapshot: &SnapshotRef, name: &str) -> Result<IndexInfo, IndexError> {
let id = self.solve_index_id_snapshot(snapshot, name)?;
let index = Indexes::get_index(self, snapshot, &id)?;
Ok(IndexInfo {
id,
value_mode: index.value_mode,
key_type: IndexTypeId::from(index.key_type),
value_type: IndexTypeId::from(index.value_type),
})
}
fn index_info_tx(&self, tx: &TransactionImpl, name: &str) -> Result<IndexInfo, IndexError> {
let id = self.solve_index_id_tx(tx, name)?.0;
let (index, _version) = Indexes::get_index_tx(self, tx, &id)?;
Ok(IndexInfo {
id,
value_mode: index.value_mode,
key_type: IndexTypeId::from(index.key_type),
value_type: IndexTypeId::from(index.value_type),
})
}
pub(crate) fn journal(&self) -> &Journal {
&self.journal
}
pub(crate) fn address(&self) -> &Address {
&self.address
}
pub(crate) fn allocator(&self) -> &Allocator {
&self.allocator
}
pub(crate) fn indexes(&self) -> &Indexes {
&self.indexes
}
pub(crate) fn snapshots(&self) -> &Arc<Snapshots> {
&self.snapshots
}
#[cfg(feature = "background_ops")]
pub(crate) fn transaction_sync(&self, sync_mode: &SyncMode, snapshot_ref: &SnapshotRef) -> PERes<()> {
match sync_mode {
SyncMode::Sync => {
let allocator = self.allocator();
allocator.disc_sync()?;
self.journal().clear_in_queue(&self.snapshots())?;
}
SyncMode::BackgroundSync => {
self.background_ops.add_pending(snapshot_ref.clone())?;
}
}
Ok(())
}
#[cfg(not(feature = "background_ops"))]
pub(crate) fn transaction_sync(&self, _sync_mode: &SyncMode, _snapshot_ref: &SnapshotRef) -> PERes<()> {
let allocator = self.allocator();
allocator.disc_sync()?;
self.journal().clear_in_queue(self.snapshots())?;
Ok(())
}
fn record_over_size_limit(&self, record: &[u8]) -> bool {
record.len() > 1024 * 1024 * 512 - 1024
}
#[cfg(test)]
pub fn free_file_lock(&self) -> PERes<()> {
self.allocator.free_file_lock()
}
#[cfg(feature = "experimental_inspect")]
pub fn page_state(&self, page: u64) -> Option<crate::inspect::PageState> {
self.allocator.page_state(page)
}
}
impl Drop for PersyImpl {
fn drop(&mut self) {
#[cfg(feature = "background_ops")]
self.background_ops.finish();
}
}
fn clone_range<R: RangeBounds<X>, X: Clone>(r: &R) -> (Bound<X>, Bound<X>) {
(
match r.start_bound() {
Bound::Included(x) => Bound::Included(x.clone()),
Bound::Excluded(e) => Bound::Excluded(e.clone()),
Bound::Unbounded => Bound::Unbounded,
},
match r.end_bound() {
Bound::Included(x) => Bound::Included(x.clone()),
Bound::Excluded(e) => Bound::Excluded(e.clone()),
Bound::Unbounded => Bound::Unbounded,
},
)
}