use crate::{
address::{segment::segment_page_iterator::SegmentPageIterator, Address},
allocator::Allocator,
error::PERes,
id::{RecRef, SegmentId},
journal::{Journal, JournalId},
snapshot::data::{CleanInfo, PendingClean, RecordVersion, SegmentSnapshot, SnapshotData, SnapshotEntry},
transaction::tx_impl::TransactionImpl,
};
use std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex, Weak},
};
pub type SnapshotId = u64;
#[derive(Default)]
pub struct InternalSnapshots {
mapping: HashMap<RecRef, Vec<RecordVersion>>,
active_snapshots: Vec<SnapshotData>,
snapshot_sequence: u64,
}
impl InternalSnapshots {
fn next_snapshot_id(&mut self) -> SnapshotId {
let snapshot_id = self.snapshot_sequence;
if self.snapshot_sequence == u64::MAX {
self.snapshot_sequence = 0;
} else {
self.snapshot_sequence += 1;
}
snapshot_id
}
fn search(&self, snapshot_id: SnapshotId) -> Result<usize, usize> {
let snapshot_sequence = self.snapshot_sequence;
self.active_snapshots
.binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence))
}
fn acquire_last_snapshot(&mut self) -> SnapshotId {
let last = self.snapshot_sequence - 1;
self.acquire_snapshot(last);
last
}
fn acquire_snapshot(&mut self, snapshot_id: SnapshotId) {
if let Ok(pos) = self.search(snapshot_id) {
if let Some(p) = self.active_snapshots.get_mut(pos) {
p.acquire();
} else {
unreachable!()
}
} else {
panic!("try to acquire a not existing snapshot")
}
}
pub(crate) fn pending_clean(
&mut self,
snapshot_id: SnapshotId,
snapshots: &Arc<Snapshots>,
) -> Option<Arc<PendingClean>> {
if let Ok(pos) = self.search(snapshot_id) {
self.active_snapshots.get_mut(pos).map(|p| p.pending_clean(snapshots))
} else {
None
}
}
pub(crate) fn clear_from(&mut self, snapshot_id: SnapshotId) -> Option<Vec<SnapshotData>> {
let snapshot_sequence = self.snapshot_sequence;
if let Ok(index) = self.search(snapshot_id) {
let size = index + 1;
let left_off = self.active_snapshots.split_off(size);
let mut old = std::mem::replace(&mut self.active_snapshots, left_off);
for tx in &mut old {
if let Some(entries) = tx.take_entries() {
for record in entries {
if let Entry::Occupied(mut v) = self.mapping.entry(*record.rec_id()) {
match v
.get()
.binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence))
{
Ok(index) => {
v.get_mut().drain(..=index);
if v.get().is_empty() {
v.remove();
}
}
Err(_index) => {
v.remove();
}
}
}
}
}
}
Some(old)
} else {
None
}
}
pub(crate) fn read_snapshot(&mut self) -> SnapshotId {
let snapshot_id = self.next_snapshot_id();
let reference_count = if self.active_snapshots.is_empty() { 1 } else { 2 };
let snapshot = SnapshotData::new(snapshot_id, reference_count);
if let Err(index) = self.search(snapshot_id) {
self.active_snapshots.insert(index, snapshot);
}
snapshot_id
}
pub(crate) fn fill_records(&mut self, journal_id: JournalId, snapshot_id: SnapshotId, entries: Vec<SnapshotEntry>) {
for entry in &entries {
let to_add = RecordVersion::new(snapshot_id, entry.case().clone());
let snapshot_sequence = self.snapshot_sequence;
match self.mapping.entry(*entry.rec_id()) {
Entry::Occupied(mut v) => {
let vec = v.get_mut();
if let Err(index) = vec.binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence)) {
vec.insert(index, to_add);
}
}
Entry::Vacant(e) => {
e.insert(vec![to_add]);
}
}
}
if let Ok(index) = self.search(snapshot_id) {
if let Some(snap) = self.active_snapshots.get_mut(index) {
snap.fill_records(journal_id, entries);
}
}
}
pub(crate) fn fill_clean_info(&mut self, snapshot_id: SnapshotId, freed_pages: CleanInfo) {
if let Ok(index) = self.search(snapshot_id) {
if let Some(snap) = self.active_snapshots.get_mut(index) {
snap.fill_clean_info(freed_pages);
}
}
}
pub(crate) fn snapshot(
&mut self,
entries: Vec<SnapshotEntry>,
freed_pages: CleanInfo,
journal_id: JournalId,
) -> SnapshotId {
let snapshot_id = self.next_snapshot_id();
let reference_count = if self.active_snapshots.is_empty() { 1 } else { 2 };
let snapshot = SnapshotData::new(snapshot_id, reference_count);
if let Err(index) = self.search(snapshot_id) {
self.active_snapshots.insert(index, snapshot);
}
self.fill_records(journal_id, snapshot_id, entries);
self.fill_clean_info(snapshot_id, freed_pages);
snapshot_id
}
pub(crate) fn new_snapshot(&mut self) -> SnapshotId {
let snapshot_id = self.next_snapshot_id();
let reference_count = if self.active_snapshots.is_empty() { 1 } else { 2 };
let snapshot = SnapshotData::new(snapshot_id, reference_count);
if let Err(index) = self.search(snapshot_id) {
self.active_snapshots.insert(index, snapshot);
}
snapshot_id
}
pub(crate) fn fill_snapshot_address(
&mut self,
snapshot_id: SnapshotId,
entries: Vec<SnapshotEntry>,
journal_id: JournalId,
) {
self.fill_records(journal_id, snapshot_id, entries);
}
pub(crate) fn fill_snapshot_clean_info(&mut self, snapshot_id: SnapshotId, freed_pages: CleanInfo) {
self.fill_clean_info(snapshot_id, freed_pages);
}
pub(crate) fn read(&self, snapshot_id: SnapshotId, id: &RecRef) -> Option<RecordVersion> {
let snapshot_sequence = self.snapshot_sequence;
if let Some(v) = self.mapping.get(id) {
let index = match v.binary_search_by(|n| search(n.id(), snapshot_id, snapshot_sequence)) {
Ok(index) => index,
Err(index) => index,
};
v.get(index).cloned()
} else {
None
}
}
fn current_snapshot(&mut self) -> SnapshotId {
if self.active_snapshots.is_empty() {
let snapshot_id = self.next_snapshot_id();
let snapshot = SnapshotData::new(snapshot_id, 1);
if let Err(index) = self.search(snapshot_id) {
self.active_snapshots.insert(index, snapshot);
}
snapshot_id
} else {
self.acquire_last_snapshot()
}
}
pub(crate) fn solve_segment_id(&self, snapshot_id: SnapshotId, segment: &str) -> Option<SegmentId> {
self.find_snapshot(snapshot_id)
.and_then(|snap| snap.find_segment(segment))
}
fn list(&self, snapshot_id: SnapshotId) -> Vec<(String, SegmentId)> {
self.find_snapshot(snapshot_id)
.and_then(|snap| snap.segments_list())
.unwrap_or_default()
}
fn find_snapshot(&self, snapshot_id: SnapshotId) -> Option<&SnapshotData> {
if let Ok(index) = self.search(snapshot_id) {
if let Some(snap) = self.active_snapshots.get(index) {
Some(snap)
} else {
None
}
} else {
None
}
}
fn find_segment(&self, snapshot_id: SnapshotId, segment_id: SegmentId) -> Option<&SegmentSnapshot> {
self.find_snapshot(snapshot_id)
.and_then(|snap| snap.find_segment_snapsthot(segment_id))
}
pub(crate) fn fill_segments(
&mut self,
snapshot_id: SnapshotId,
segs_id: HashMap<SegmentId, SegmentSnapshot>,
segs_name: HashMap<String, SegmentSnapshot>,
) {
if let Ok(index) = self.search(snapshot_id) {
if let Some(snap) = self.active_snapshots.get_mut(index) {
snap.fill(segs_id, segs_name);
}
}
}
pub(crate) fn release(&mut self, snapshot_id: SnapshotId) -> Option<Vec<SnapshotData>> {
let mut clear_id = None;
if let Ok(index) = self.search(snapshot_id) {
let mut loop_index = index;
while let Some(snap) = self.active_snapshots.get_mut(loop_index) {
if !snap.release() {
break;
}
clear_id = Some(snap.id());
loop_index += 1;
}
}
if let Some(c_id) = clear_id {
self.clear_from(c_id)
} else {
None
}
}
pub fn scan(&self, snapshot_id: SnapshotId, segment_id: SegmentId) -> Option<SegmentPageIterator> {
self.find_segment(snapshot_id, segment_id)
.map(|sd| SegmentPageIterator::snapshot(sd.first_page()))
}
pub fn solve_segment_name(&self, snapshot_id: SnapshotId, segment_id: SegmentId) -> Option<String> {
self.find_segment(snapshot_id, segment_id)
.map(|sd| sd.name().to_owned())
}
#[cfg(test)]
pub(crate) fn active_snapshots(&self) -> usize {
self.active_snapshots.len()
}
#[cfg(test)]
pub(crate) fn mapping_count(&self) -> usize {
self.mapping.len()
}
}
pub struct Snapshots {
lock: Mutex<InternalSnapshots>,
allocator: Arc<Allocator>,
journal: Arc<Journal>,
address: Arc<Address>,
}
pub fn search(value: u64, value1: u64, top: u64) -> Ordering {
if value > top {
if value1 > top {
value.cmp(&value1)
} else {
Ordering::Less
}
} else if value1 > top {
Ordering::Greater
} else {
value.cmp(&value1)
}
}
pub(crate) fn to_mapping(
segments: &[SegmentSnapshot],
) -> (HashMap<SegmentId, SegmentSnapshot>, HashMap<String, SegmentSnapshot>) {
let mut segments_id = HashMap::new();
let mut segments_name = HashMap::new();
for segment in segments {
segments_id.insert(segment.segment_id(), segment.clone());
segments_name.insert(segment.name().to_owned(), segment.clone());
}
(segments_id, segments_name)
}
pub struct SnapshotRef {
ops: Weak<Snapshots>,
id: Option<SnapshotId>,
}
impl SnapshotRef {
fn new(ops: &Arc<Snapshots>, id: SnapshotId) -> Self {
Self {
ops: Arc::downgrade(ops),
id: Some(id),
}
}
#[cfg(test)]
pub(crate) fn leak(&mut self) {
let _ = self.id.take();
}
pub fn id(&self) -> SnapshotId {
self.id.unwrap()
}
}
impl Clone for SnapshotRef {
fn clone(&self) -> Self {
if let Some(ops) = self.ops.upgrade() {
ops.acquire(self.id.unwrap());
Self {
ops: Arc::downgrade(&ops),
id: self.id,
}
} else {
panic!("ref outlived Persy instance");
}
}
}
impl Drop for SnapshotRef {
fn drop(&mut self) {
if let Some(ops) = self.ops.upgrade() {
if let Some(id) = self.id.take() {
ops.release(id).expect("snapshot release do not fail");
}
}
}
}
impl Snapshots {
pub fn new(allocator: &Arc<Allocator>, journal: &Arc<Journal>, address: &Arc<Address>) -> Self {
Self {
lock: Default::default(),
allocator: allocator.clone(),
journal: journal.clone(),
address: address.clone(),
}
}
pub fn acquire(&self, snapshot_id: SnapshotId) {
let mut lock = self.lock.lock().expect("lock not poisoned");
lock.acquire_snapshot(snapshot_id);
}
pub fn current_snapshot(self: &Arc<Self>) -> SnapshotRef {
let mut lock = self.lock.lock().expect("lock not poisoned");
SnapshotRef::new(self, lock.current_snapshot())
}
pub fn read_snapshot(self: &Arc<Self>) -> SnapshotRef {
let mut lock = self.lock.lock().expect("lock not poisoned");
SnapshotRef::new(self, lock.read_snapshot())
}
pub fn snapshot(
self: &Arc<Self>,
entries: Vec<SnapshotEntry>,
freed_pages: CleanInfo,
journal_id: JournalId,
) -> SnapshotRef {
let mut lock = self.lock.lock().expect("lock not poisoned");
SnapshotRef::new(self, lock.snapshot(entries, freed_pages, journal_id))
}
pub fn fill_snapshot_address(
self: &Arc<Self>,
snapshot_ref: &SnapshotRef,
entries: Vec<SnapshotEntry>,
journal_id: JournalId,
) {
let mut lock = self.lock.lock().expect("lock not poisoned");
lock.fill_snapshot_address(snapshot_ref.id(), entries, journal_id)
}
pub fn fill_snapshot_clean_info(self: &Arc<Self>, snapshot_ref: &SnapshotRef, freed_pages: CleanInfo) {
let mut lock = self.lock.lock().expect("lock not poisoned");
lock.fill_snapshot_clean_info(snapshot_ref.id(), freed_pages)
}
pub fn new_snapshot(self: &Arc<Self>) -> SnapshotRef {
let mut lock = self.lock.lock().expect("lock not poisoned");
SnapshotRef::new(self, lock.new_snapshot())
}
pub fn fill_segments(&self, snapshot_id: &SnapshotRef, segments: &[SegmentSnapshot]) {
let (segments_id, segments_name) = to_mapping(segments);
let mut lock = self.lock.lock().expect("lock not poisoned");
lock.fill_segments(snapshot_id.id(), segments_id, segments_name);
}
pub fn solve_segment_id(&self, snapshot_id: &SnapshotRef, segment: &str) -> Option<SegmentId> {
let lock = self.lock.lock().expect("lock not poisoned");
lock.solve_segment_id(snapshot_id.id(), segment)
}
pub fn solve_segment_name(&self, snapshot_id: &SnapshotRef, segment_id: SegmentId) -> Option<String> {
self.lock
.lock()
.expect("lock not poisoned")
.solve_segment_name(snapshot_id.id(), segment_id)
}
pub fn scan(&self, snapshot_id: &SnapshotRef, segment_id: SegmentId) -> Option<SegmentPageIterator> {
self.lock
.lock()
.expect("lock not poisoned")
.scan(snapshot_id.id(), segment_id)
}
pub fn list(&self, snapshot_id: &SnapshotRef) -> Vec<(String, SegmentId)> {
let lock = self.lock.lock().expect("lock not poisoned");
lock.list(snapshot_id.id())
}
pub fn read(&self, snapshot_id: &SnapshotRef, id: &RecRef) -> Option<RecordVersion> {
let lock = self.lock.lock().expect("lock not poisoned");
lock.read(snapshot_id.id(), id)
}
pub fn free_resources(
self: &Arc<Self>,
journal_id: &Option<JournalId>,
freed_pages: &Option<CleanInfo>,
) -> PERes<()> {
if let Some(clean_info) = freed_pages {
self.allocator.free_pages(clean_info.freed_pages())?;
TransactionImpl::free_address_structures_impl(
&self.journal,
self,
&self.address,
&self.allocator,
clean_info.segment_pages(),
)?;
}
if let Some(ji) = journal_id {
self.journal.finished_to_clean(&[ji.clone()])?;
}
Ok(())
}
pub(crate) fn pending_clean(self: &Arc<Self>, snapshot_id: SnapshotId) -> Option<Arc<PendingClean>> {
let mut lock = self.lock.lock().expect("lock not poisoned");
lock.pending_clean(snapshot_id, self)
}
pub(crate) fn release(self: &Arc<Self>, snapshot_id: SnapshotId) -> PERes<()> {
let to_release = { self.lock.lock().expect("lock not poisoned").release(snapshot_id) };
if let Some(to_free_vec) = to_release {
for mut to_free_item in to_free_vec {
let _ = to_free_item.pending_clean(self);
}
}
Ok(())
}
}