Skip to content

Commit

Permalink
fix: saved filters load issue on server restart (#957)
Browse files Browse the repository at this point in the history
issue: server checks the version of all filters
and migrates if version is v1
if v2 (current version), it again calculates hash of the user_id from json
and puts the json back in storage and load to memory

fix: if v2, load to memory
  • Loading branch information
nikhilsinhaparseable authored Oct 8, 2024
1 parent f488970 commit 2a2d64f
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 126 deletions.
25 changes: 18 additions & 7 deletions server/src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
use crate::metrics::storage::StorageMetrics;
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -650,8 +650,10 @@ impl ObjectStorage for BlobStore {
.collect::<Vec<_>>())
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand All @@ -677,13 +679,19 @@ impl ObjectStorage for BlobStore {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
dashboards.extend(dashboard_bytes);

dashboards
.entry(dashboards_path)
.or_default()
.extend(dashboard_bytes);
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand Down Expand Up @@ -720,7 +728,10 @@ impl ObjectStorage for BlobStore {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters.extend(filter_bytes);
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
Expand Down
38 changes: 29 additions & 9 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
path::{Path, PathBuf},
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -351,8 +351,10 @@ impl ObjectStorage for LocalFS {
Ok(dirs)
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
Expand All @@ -364,15 +366,25 @@ impl ObjectStorage for LocalFS {
let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?);
let dashboards_files: Vec<DirEntry> = directories.try_collect().await?;
for dashboard in dashboards_files {
let file = fs::read(dashboard.path()).await?;
dashboards.push(file.into());
let dashboard_absolute_path = dashboard.path();
let file = fs::read(dashboard_absolute_path.clone()).await?;
let dashboard_relative_path = dashboard_absolute_path
.strip_prefix(self.root.as_path())
.unwrap();

dashboards
.entry(RelativePathBuf::from_path(dashboard_relative_path).unwrap())
.or_default()
.push(file.into());
}
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
Expand All @@ -394,8 +406,16 @@ impl ObjectStorage for LocalFS {
let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?);
let filters_files: Vec<DirEntry> = directories.try_collect().await?;
for filter in filters_files {
let file = fs::read(filter.path()).await?;
filters.push(file.into());
let filter_absolute_path = filter.path();
let file = fs::read(filter_absolute_path.clone()).await?;
let filter_relative_path = filter_absolute_path
.strip_prefix(self.root.as_path())
.unwrap();

filters
.entry(RelativePathBuf::from_path(filter_relative_path).unwrap())
.or_default()
.push(file.into());
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn list_manifest_files(
&self,
Expand Down
31 changes: 21 additions & 10 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ use std::path::Path as StdPath;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};

use super::metrics_layer::MetricLayer;
use super::object_storage::parseable_json_path;
use super::{
ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};
use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use std::collections::HashMap;

#[allow(dead_code)]
// in bytes
Expand Down Expand Up @@ -689,8 +689,10 @@ impl ObjectStorage for S3 {
.collect::<Vec<_>>())
}

async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut dashboards = vec![];
async fn get_all_dashboards(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand All @@ -716,13 +718,19 @@ impl ObjectStorage for S3 {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
dashboards.extend(dashboard_bytes);

dashboards
.entry(dashboards_path)
.or_default()
.extend(dashboard_bytes);
}
Ok(dashboards)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
Expand Down Expand Up @@ -759,7 +767,10 @@ impl ObjectStorage for S3 {
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters.extend(filter_bytes);
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
Expand Down
105 changes: 51 additions & 54 deletions server/src/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,64 +114,61 @@ impl Dashboards {
pub async fn load(&self) -> anyhow::Result<()> {
let mut this = vec![];
let store = CONFIG.storage().get_object_store();
let dashboards = store.get_all_dashboards().await.unwrap_or_default();
for dashboard in dashboards {
if dashboard.is_empty() {
continue;
}
let mut dashboard_value = serde_json::from_slice::<serde_json::Value>(&dashboard)?;
if let Some(meta) = dashboard_value.clone().as_object() {
let version = meta.get("version").and_then(|version| version.as_str());
let dashboard_id = meta
.get("dashboard_id")
.and_then(|dashboard_id| dashboard_id.as_str());
match version {
Some("v1") => {
dashboard_value = migrate_v1_v2(dashboard_value);
dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard_bytes)
{
this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
}
}
Some("v2") => {
dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard_bytes)
{
this.retain(|d| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
let all_dashboards = store.get_all_dashboards().await.unwrap_or_default();
for (dashboard_relative_path, dashboards) in all_dashboards {
for dashboard in dashboards {
if dashboard.is_empty() {
continue;
}
let mut dashboard_value = serde_json::from_slice::<serde_json::Value>(&dashboard)?;
if let Some(meta) = dashboard_value.clone().as_object() {
let version = meta.get("version").and_then(|version| version.as_str());
let dashboard_id = meta
.get("dashboard_id")
.and_then(|dashboard_id| dashboard_id.as_str());
match version {
Some("v1") => {
//delete older version of the dashboard
store.delete_object(&dashboard_relative_path).await?;

dashboard_value = migrate_v1_v2(dashboard_value);
dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
}
}
_ => {
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard) {
this.retain(|d| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
Some("v2") => {
//delete older version of the dashboard
store.delete_object(&dashboard_relative_path).await?;

dashboard_value = migrate_v2_v3(dashboard_value);
let user_id = dashboard_value
.as_object()
.unwrap()
.get("user_id")
.and_then(|user_id| user_id.as_str());
let path = dashboard_path(
user_id.unwrap(),
&format!("{}.json", dashboard_id.unwrap()),
);
let dashboard_bytes = to_bytes(&dashboard_value);
store.put_object(&path, dashboard_bytes.clone()).await?;
}
_ => {}
}
}
if let Ok(dashboard) = serde_json::from_value::<Dashboard>(dashboard_value) {
this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id);
this.push(dashboard);
}
}
}

Expand Down
Loading

0 comments on commit 2a2d64f

Please sign in to comment.