use crate::{
address::segment::{
segment_page_iterator::SegmentPageIterator, AllocatedSegmentPage, Segment, SegmentPage, SegmentPageRead,
Segments,
},
allocator::Allocator,
config::Config,
device::{Page, PageOps},
error::{PERes, ReadError, SegmentError, TimeoutError},
id::{PersyId, RecRef, SegmentId},
index::config::{is_index_name_data, is_index_name_meta},
journal::records::{DeleteRecord, InsertRecord, NewSegmentPage, RollbackPage, UpdateRecord},
locks::{LockManager, RwLockManager},
transaction::tx_impl::{CheckRecord, Locks, SegmentOperation},
PrepareError,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{Arc, RwLock},
time::Duration,
};
pub mod record_scanner;
pub mod segment;
pub mod segment_iter;
#[cfg(test)]
mod tests;
pub const ADDRESS_ROOT_PAGE_EXP: u8 = 6; pub const ADDRESS_PAGE_EXP: u8 = 10; pub const FLAG_EXISTS: u8 = 0b000_0001;
pub const FLAG_DELETED: u8 = 0b000_0010;
pub const SEGMENT_HASH_OFFSET: u32 = 16;
pub const SEGMENT_PAGE_DELETE_COUNT_OFFSET: u32 = 24;
pub const SEGMENT_DATA_OFFSET: u32 = 26;
pub const ADDRESS_ENTRY_SIZE: u32 = 8 + 1 + 2;
pub struct OldRecordInfo {
pub recref: RecRef,
pub segment: SegmentId,
pub record_page: u64,
pub version: u16,
}
impl OldRecordInfo {
fn new(recref: &RecRef, segment: SegmentId, record_page: u64, version: u16) -> OldRecordInfo {
OldRecordInfo {
recref: *recref,
segment,
record_page,
version,
}
}
}
pub struct Address {
allocator: Arc<Allocator>,
record_locks: LockManager<RecRef>,
create_segment_locks: LockManager<String>,
segment_locks: RwLockManager<SegmentId>,
segments: RwLock<Segments>,
}
impl Address {
pub fn new(all: &Arc<Allocator>, _config: &Arc<Config>, page: u64) -> PERes<Address> {
let segments = Segments::new(page, all)?;
Ok(Address {
allocator: all.clone(),
record_locks: Default::default(),
create_segment_locks: Default::default(),
segment_locks: Default::default(),
segments: RwLock::new(segments),
})
}
pub fn init(all: &Allocator) -> PERes<u64> {
let page = all.allocate(ADDRESS_ROOT_PAGE_EXP)?;
let page_index = page.get_index();
Segments::init(page, all)?;
Ok(page_index)
}
pub fn scan_segment(&self, segment: Option<&Segment>) -> Result<SegmentPageIterator, SegmentError> {
if let Some(segment) = segment {
Ok(SegmentPageIterator::new(segment.first_page))
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub fn scan(&self, segment: SegmentId) -> Result<SegmentPageIterator, SegmentError> {
let segments = self.segments.read().expect("lock not poisoned");
if let Some(segment) = segments.segment_by_id(segment) {
Ok(SegmentPageIterator::new(segment.first_page))
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub fn scan_page_all(&self, cur_page: u64) -> PERes<(u64, Vec<(u32, bool)>)> {
let _lock = self.segments.read().expect("lock not poisoned");
let mut page = self.allocator.load_page(cur_page)?;
Ok(page.segment_scan_all_entries())
}
pub fn allocate_temp_seg(
&self,
segment: Option<&mut Segment>,
) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
if let Some(found) = segment {
Ok(found.allocate_internal(&self.allocator)?)
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub fn create_temp_segment(&self, segment: &str) -> PERes<Segment> {
self.segments
.write()
.expect("lock not poisoned")
.create_temp_segment(&self.allocator, segment)
}
pub fn allocate(&self, segment: SegmentId) -> Result<(RecRef, Option<AllocatedSegmentPage>), SegmentError> {
if let Some(found) = self
.segments
.write()
.expect("lock not poisoned")
.segments
.get_mut(&segment)
{
Ok(found.allocate_internal(&self.allocator)?)
} else {
Err(SegmentError::SegmentNotFound)
}
}
pub(crate) fn acquire_locks(&self, locks: &Locks, timeout: Duration) -> Result<(), PrepareError> {
self.create_segment_locks.lock_all(locks.created_segments(), timeout)?;
if let Err(x) = self.segment_locks.lock_all_write(locks.dropped_segments(), timeout) {
self.create_segment_locks.unlock_all(locks.created_segments());
return Err(PrepareError::from(x));
}
if let Err(x) = self
.segment_locks
.lock_all_read(locks.created_updated_segments(), timeout)
{
self.create_segment_locks.unlock_all(locks.created_segments());
self.segment_locks.unlock_all_write(locks.dropped_segments());
return Err(PrepareError::from(x));
}
if let Err(x) = self.record_locks.lock_all(locks.records(), timeout) {
self.create_segment_locks.unlock_all(locks.created_segments());
self.segment_locks.unlock_all_write(locks.dropped_segments());
self.segment_locks.unlock_all_read(locks.created_updated_segments());
return Err(PrepareError::from(x));
}
Ok(())
}
pub fn check_segments(
&self,
created: &[String],
updated: impl Iterator<Item = SegmentId>,
) -> Result<(), PrepareError> {
let segs = self.segments.read().expect("lock not poisoned");
for c in created {
if segs.has_segment(c) {
if is_index_name_meta(c) || is_index_name_data(c) {
return Err(PrepareError::IndexAlreadyExists);
} else {
return Err(PrepareError::SegmentAlreadyExists);
}
}
}
for u in updated {
if !segs.has_segment_by_id(&u) {
return Err(PrepareError::SegmentNotFound);
}
}
Ok(())
}
pub fn acquire_segment_read_lock(&self, segment: SegmentId, timeout: Duration) -> Result<(), TimeoutError> {
self.segment_locks.lock_all_read(&[segment], timeout)?;
Ok(())
}
pub fn acquire_segments_read_lock(&self, segments: &[SegmentId], timeout: Duration) -> Result<(), TimeoutError> {
self.segment_locks.lock_all_read(segments, timeout)?;
Ok(())
}
pub fn acquire_record_lock(&self, id: &RecRef, timeout: Duration) -> Result<(), TimeoutError> {
self.record_locks.lock_all(&[*id], timeout)?;
Ok(())
}
pub fn release_segment_read_lock(&self, segment: SegmentId) {
self.segment_locks.unlock_all_read(&[segment]);
}
pub fn release_record_lock(&self, id: &RecRef) {
self.record_locks.unlock_all(&[*id]);
}
pub fn recover_allocations(&self, segs: &[SegmentId], created: &mut HashMap<SegmentId, Segment>) -> PERes<()> {
let mut segments = self.segments.write().expect("lock not poisoned");
segments.recover_allocations(segs, created, &self.allocator)?;
Ok(())
}
pub fn recompute_last_pages(&self) -> PERes<()> {
let mut segments = self.segments.write().expect("lock not poisoned");
segments.recompute_last_pages(&self.allocator)?;
Ok(())
}
pub(crate) fn check_persistent_records(
&self,
records: &[CheckRecord],
check_version: bool,
) -> Result<Vec<OldRecordInfo>, PrepareError> {
let mut current_record_pages = Vec::with_capacity(records.len());
for &CheckRecord {
segment_id,
ref record_id,
version,
} in records
{
let val = self.read(record_id, segment_id)?;
if let Some((record, pers_version)) = val {
current_record_pages.push(OldRecordInfo::new(record_id, segment_id, record, pers_version));
if check_version && pers_version != version {
return Err(PrepareError::VersionNotLatest);
}
} else {
return Err(PrepareError::RecordNotFound(PersyId(*record_id)));
}
}
Ok(current_record_pages)
}
pub(crate) fn release_locks(&self, locks: &Locks) {
self.record_locks.unlock_all(locks.records());
self.segment_locks.unlock_all_read(locks.created_updated_segments());
self.segment_locks.unlock_all_write(locks.dropped_segments());
self.create_segment_locks.unlock_all(locks.created_segments());
}
pub fn rollback(&self, inserts: &[InsertRecord]) -> PERes<Vec<(SegmentId, u64)>> {
let segments = self.segments.write().expect("lock not poisoned");
let mut pages_to_remove = Vec::new();
let mut pages = HashMap::new();
for insert in inserts {
if segments.segments.contains_key(&insert.segment) {
let page = insert.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
if seg_page.segment_delete_entry(insert.segment, insert.recref.pos) && seg_page.get_next()? != 0 {
pages_to_remove.push((insert.segment, page));
}
}
}
for (_, to_flush) in pages.into_iter() {
self.allocator.flush_page(to_flush)?;
}
Ok(pages_to_remove)
}
pub fn recover_rollback(&self, rollbacks: &[RollbackPage]) -> PERes<()> {
let mut pages = HashMap::new();
for rollback in rollbacks {
let page = rollback.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_update_entry(rollback.segment, rollback.recref.pos, rollback.record_page);
}
for (_, to_flush) in pages.into_iter() {
self.allocator.flush_page(to_flush)?;
}
Ok(())
}
pub fn apply(
&self,
segs_new_pages: &[NewSegmentPage],
inserts: &[InsertRecord],
updates: &[UpdateRecord],
deletes: &[DeleteRecord],
seg_ops: &[SegmentOperation],
created: &mut HashMap<SegmentId, Segment>,
recover: bool,
) -> PERes<Vec<(SegmentId, u64)>> {
let mut segments = self.segments.write().expect("lock not poisoned");
let mut dropped = HashSet::new();
for seg_op in seg_ops {
if let SegmentOperation::Drop(ref op) = *seg_op {
dropped.insert(op.segment_id);
}
}
let mut pages = HashMap::new();
if recover {
for new_page in segs_new_pages {
if new_page.previous == 0 {
segments.set_first_page(new_page.segment, new_page.page);
} else {
let p_page = self.get_or_insert_mut(&mut pages, new_page.previous)?;
p_page.set_next(new_page.page)?;
let n_page = self.get_or_insert_mut(&mut pages, new_page.page)?;
n_page.set_prev(new_page.previous)?;
n_page.set_segment_id(new_page.segment)?;
}
}
}
for insert in inserts {
if !dropped.contains(&insert.segment) {
let page = insert.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_insert_entry(insert.segment, insert.recref.pos, insert.record_page);
}
}
for update in updates {
if !dropped.contains(&update.segment) {
let page = update.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
seg_page.segment_update_entry(update.segment, update.recref.pos, update.record_page);
}
}
let mut pages_to_remove = Vec::new();
for delete in deletes {
if !dropped.contains(&delete.segment) {
let page = delete.recref.page;
let seg_page = self.get_or_insert_mut(&mut pages, page)?;
if seg_page.segment_delete_entry(delete.segment, delete.recref.pos) {
if seg_page.get_next()? != 0 {
pages_to_remove.push((delete.segment, page));
}
}
}
}
if recover {
for (_, mut to_flush) in pages.into_iter() {
to_flush.recalc_count()?;
self.allocator.flush_page(to_flush)?;
}
let recover_page = |record_page: u64| {
let page = self.allocator.load_page(record_page)?;
self.allocator.remove_from_free(record_page, page.get_size_exp())
};
let mut segs = HashSet::new();
for insert in inserts {
recover_page(insert.record_page)?;
segs.insert(insert.segment);
}
for update in updates {
recover_page(update.record_page)?;
segs.insert(update.segment);
}
for delete in deletes {
segs.insert(delete.segment);
}
segments.recover_allocations(&segs.into_iter().collect::<Vec<_>>(), created, &self.allocator)?;
} else {
for (_, to_flush) in pages.into_iter() {
self.allocator.flush_page(to_flush)?;
}
}
for seg_op in seg_ops {
if let SegmentOperation::Drop(ref op) = *seg_op {
segments.drop_segment(&op.name);
}
}
for seg_op in seg_ops {
if let SegmentOperation::Create(ref op) = *seg_op {
if let Some(s) = created.remove(&op.segment_id) {
segments.finalize_create_segment(s);
}
}
}
segments.flush_segments(&self.allocator)?;
Ok(pages_to_remove)
}
pub fn recover_segment_operations(
&self,
seg_ops: &[SegmentOperation],
created: &mut HashMap<SegmentId, Segment>,
segs_new_pages: &[NewSegmentPage],
) -> PERes<()> {
let mut segments = self.segments.write().expect("lock not poisoned");
for seg_op in seg_ops {
if let SegmentOperation::Drop(ref op) = *seg_op {
segments.drop_segment(&op.name);
}
}
for seg_op in seg_ops {
if let SegmentOperation::Create(ref op) = *seg_op {
if let Some(s) = created.remove(&op.segment_id) {
segments.recover_finalize_create_segment(s);
}
}
}
for new_page in segs_new_pages {
if new_page.previous == 0 {
segments.set_first_page(new_page.segment, new_page.page);
}
}
segments.flush_segments(&self.allocator)?;
Ok(())
}
pub fn recover_remove_pages(&self, delete_pages: &[(SegmentId, u64)]) -> PERes<Vec<u64>> {
let mut segments = self.segments.write().expect("lock not poisoned");
segments.recover_remove_pages(&self.allocator, delete_pages)
}
pub fn collect_segment_pages(&self, segment: SegmentId) -> PERes<Vec<u64>> {
let segments = self.segments.read().expect("lock not poisoned");
segments.collect_segment_pages(&self.allocator, segment)
}
pub fn clear_empty(&self, empty: &[(SegmentId, u64)]) -> PERes<Vec<(SegmentId, u64)>> {
let mut segments = self.segments.write().expect("lock not poisoned");
segments.clear_empty(&self.allocator, empty)
}
pub fn flush_segments(&self) -> PERes<()> {
let mut segments = self.segments.write().expect("lock not poisoned");
segments.flush_segments(&self.allocator)
}
pub fn exists_segment(&self, segment: &str) -> bool {
self.segments.read().expect("lock not poisoned").has_segment(segment)
}
pub fn exists_segment_by_id(&self, segment: &SegmentId) -> bool {
self.segments
.read()
.expect("lock not poisoned")
.has_segment_by_id(segment)
}
pub fn segment_id(&self, segment: &str) -> Option<SegmentId> {
self.segments.read().expect("lock not poisoned").segment_id(segment)
}
pub fn segment_name_by_id(&self, segment: SegmentId) -> Option<String> {
self.segments
.read()
.expect("lock not poisoned")
.segment_name_by_id(segment)
}
#[cfg(test)]
pub fn insert(&self, segment_id: SegmentId, recref: &RecRef, record_page: u64) -> PERes<()> {
let mut page = self.allocator.write_page(recref.page)?;
page.segment_insert_entry(segment_id, recref.pos, record_page);
self.allocator.flush_page(page)?;
Ok(())
}
pub fn read(&self, recref: &RecRef, segment: SegmentId) -> Result<Option<(u64, u16)>, ReadError> {
if let Some(mut page) = self.allocator.load_page_not_free(recref.page)? {
if recref.pos > page.get_content_size() - ADDRESS_ENTRY_SIZE {
return Err(ReadError::InvalidPersyId(*recref));
}
Ok(page.segment_read_entry(segment, recref.pos))
} else {
Ok(None)
}
}
fn get_or_insert_mut<'a>(&self, map: &'a mut HashMap<u64, Page>, k: u64) -> PERes<&'a mut Page> {
Ok(match map.entry(k) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(self.allocator.write_page(k)?),
})
}
pub fn list(&self) -> Vec<(String, SegmentId)> {
self.segments.read().expect("lock not poisoned").list()
}
pub fn snapshot_list(&self) -> Vec<(String, SegmentId, u64)> {
self.segments.read().expect("lock not poisoned").snapshot_list()
}
}