Skip to content

Commit

Permalink
introducign asymmetric reader/writer
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Sep 5, 2024
1 parent d33cfa5 commit 0a5fbea
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ pub(crate) struct FileBackedIndex {
delete_tasks: Vec<DeleteTask>,
/// Stamper.
stamper: Stamper,
/// Flag used to avoid polling the metastore if
/// the process is actually writing the metastore.
///
/// The logic is "soft". We avoid the polling step
/// if the metastore wrote some value since the last
/// polling loop.
recently_modified: bool,

}

#[cfg(any(test, feature = "testsuite"))]
Expand Down Expand Up @@ -150,7 +142,6 @@ impl From<IndexMetadata> for FileBackedIndex {
per_source_shards,
delete_tasks: Default::default(),
stamper: Default::default(),
recently_modified: false,
}
}
}
Expand Down Expand Up @@ -185,20 +176,9 @@ impl FileBackedIndex {
per_source_shards,
delete_tasks,
stamper: Stamper::new(last_opstamp),
recently_modified: false,
}
}

/// Sets the `recently_modified` flag to false and returns the previous value.
pub fn flip_recently_modified_down(&mut self) -> bool {
std::mem::replace(&mut self.recently_modified, false)
}

/// Marks the file as `recently_modified`.
pub fn set_recently_modified(&mut self) {
self.recently_modified = true;
}

/// Index ID accessor.
pub fn index_id(&self) -> &str {
self.metadata.index_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,19 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError, MetastoreResult};
use quickwit_proto::types::IndexId;
use quickwit_storage::Storage;
use tokio::sync::{Mutex, OnceCell};
use tokio::time::Instant;
use tracing::error;

use super::file_backed_index::FileBackedIndex;
use super::store_operations::{load_index, METASTORE_FILE_NAME};
use super::FileBackedIndexCell;
use super::{FileBackedIndexCell, FileBackedIndexWriter};

/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first
/// loaded, it optionally spawns a task to periodically poll the storage and update the index.
pub(crate) struct LazyFileBackedIndex {
index_id: IndexId,
storage: Arc<dyn Storage>,
polling_interval_opt: Option<Duration>,
lazy_index: OnceCell<Arc<Mutex<FileBackedIndexCell>>>,
lazy_index: OnceCell<FileBackedIndexCell>,
}

impl LazyFileBackedIndex {
Expand All @@ -48,16 +47,15 @@ impl LazyFileBackedIndex {
polling_interval_opt: Option<Duration>,
file_backed_index: Option<FileBackedIndex>,
) -> Self {
let index_mutex_opt =
file_backed_index.map(|index| Arc::new(Mutex::new(FileBackedIndexCell::new(index))));
let index_mutex_opt = file_backed_index.map(FileBackedIndexCell::new);
// If the polling interval is configured and the index is already loaded,
// spawn immediately the polling task
if let Some(index_mutex) = &index_mutex_opt {
if let Some(polling_interval) = polling_interval_opt {
spawn_index_metadata_polling_task(
storage.clone(),
index_id.clone(),
Arc::downgrade(index_mutex),
Arc::downgrade(&index_mutex.writer),
polling_interval,
);
}
Expand All @@ -72,23 +70,23 @@ impl LazyFileBackedIndex {

/// Gets a synchronized `FileBackedIndex`. If the index wasn't provided on creation, we load it
/// lazily on the first call of this method.
pub async fn get(&self) -> MetastoreResult<Arc<Mutex<FileBackedIndexCell>>> {
pub(crate) async fn get(&self) -> MetastoreResult<FileBackedIndexCell> {
self.lazy_index
.get_or_try_init(|| async move {
let index = load_index(&*self.storage, &self.index_id).await?;
let file_backed_index_cell = FileBackedIndexCell::new(index);
let index_mutex = Arc::new(Mutex::new(file_backed_index_cell));
let file_backed_index_writer = Arc::downgrade(&file_backed_index_cell.writer);
// When the index is loaded lazily, the polling task is not started in the
// constructor so we do it here when the index is actually loaded.
if let Some(polling_interval) = self.polling_interval_opt {
spawn_index_metadata_polling_task(
self.storage.clone(),
self.index_id.clone(),
Arc::downgrade(&index_mutex),
file_backed_index_writer,
polling_interval,
);
}
Ok(index_mutex)
Ok(file_backed_index_cell)
})
.await
.cloned()
Expand All @@ -98,20 +96,22 @@ impl LazyFileBackedIndex {
async fn poll_index_metadata_once(
storage: &dyn Storage,
index_id: &str,
index_mutex: &Mutex<FileBackedIndexCell>,
index_writer: &Mutex<FileBackedIndexWriter>,
) {
todo!();
// FIXME
/*
let mut locked_index = index_mutex.lock().await;
if locked_index.flip_recently_modified_down() {
let mut locked_index = index_writer.lock().await;
if locked_index.upload_task.is_none() {
return;
}
// TODO COol down period.
if locked_index.last_push.elapsed() < Duration::from_secs(30) {
return;
}
let load_index_result = load_index(storage, index_id).await;

match load_index_result {
Ok(index) => {
*locked_index = index;
locked_index.write_state = index;
locked_index.publish();
}
Err(MetastoreError::NotFound(EntityKind::Index { .. })) => {
// The index has been deleted by the file-backed metastore holding a reference to this
Expand All @@ -127,23 +127,22 @@ async fn poll_index_metadata_once(
);
}
}
*/
}

fn spawn_index_metadata_polling_task(
storage: Arc<dyn Storage>,
index_id: IndexId,
metastore_weak: Weak<Mutex<FileBackedIndexCell>>,
metastore_weak: Weak<Mutex<FileBackedIndexWriter>>,
polling_interval: Duration,
) {
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(polling_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await; //< this is to prevent fetch right after the first population of the data.

while let Some(metadata_mutex) = metastore_weak.upgrade() {
while let Some(metadata_writer) = metastore_weak.upgrade() {
interval.tick().await;
poll_index_metadata_once(&*storage, &index_id, &metadata_mutex).await;
poll_index_metadata_once(&*storage, &index_id, &*metadata_writer).await;
}
});
}
Loading

0 comments on commit 0a5fbea

Please sign in to comment.