Skip to content

Commit

Permalink
fix: stream sync for ingestors (#826)
Browse files Browse the repository at this point in the history
fix for stream sync for ingestors at create and 
update stream - sync from querier to all live ingestors
sync stream and schema at server start
  • Loading branch information
nikhilsinhaparseable authored Jun 21, 2024
1 parent 26eafa7 commit f8935f3
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 258 deletions.
103 changes: 22 additions & 81 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::handlers::http::cluster::utils::{
};
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
Expand Down Expand Up @@ -96,62 +95,47 @@ pub async fn sync_cache_with_ingestors(
}

// forward the request to all ingestors to keep them in sync
#[allow(dead_code)]
pub async fn sync_streams_with_ingestors(
req: HttpRequest,
body: Bytes,
stream_name: &str,
time_partition: &str,
static_schema: &str,
schema: Bytes,
) -> Result<(), StreamError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::Anyhow(err)
})?;

let mut errored = false;
let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {
let url = format!(
"{}{}/logstream/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);
let res = client
.put(url)
.headers(req.headers().into())
.header(header::AUTHORIZATION, &ingestor.token)
.body(body.clone())
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

match send_stream_sync_request(
&url,
ingestor.clone(),
time_partition,
static_schema,
schema.clone(),
)
.await
{
Ok(_) => continue,
Err(_) => {
errored = true;
break;
}
}
}

if errored {
for ingestor in ingestor_infos {
let url = format!(
"{}{}/logstream/{}",
if !res.status().is_success() {
log::error!(
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
res
);

// delete the stream
send_stream_delete_request(&url, ingestor.clone()).await?;
}

// this might be a bit too much
return Err(StreamError::Custom {
msg: "Failed to sync stream with ingestors".to_string(),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
}

Ok(())
Expand Down Expand Up @@ -301,49 +285,6 @@ pub async fn fetch_stats_from_ingestors(
Ok(vec![qs])
}

#[allow(dead_code)]
async fn send_stream_sync_request(
url: &str,
ingestor: IngestorMetadata,
time_partition: &str,
static_schema: &str,
schema: Bytes,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}

let client = reqwest::Client::new();
let res = client
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(TIME_PARTITION_KEY, time_partition)
.header(STATIC_SCHEMA_FLAG, static_schema)
.header(header::AUTHORIZATION, ingestor.token)
.body(schema)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward create stream request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res
);
return Err(StreamError::Network(res.error_for_status().unwrap_err()));
}

Ok(())
}

/// send a delete stream request to all ingestors
pub async fn send_stream_delete_request(
url: &str,
Expand Down
183 changes: 100 additions & 83 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors};
use super::cluster::{
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
Expand Down Expand Up @@ -169,89 +171,14 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>

pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
fetch_headers_from_put_stream_request(&req);

if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
),
status: StatusCode::BAD_REQUEST,
});
}

if !time_partition.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let mut time_partition_in_days: &str = "";
if !time_partition_limit.is_empty() {
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
if let Err(err) = time_partition_days {
return Err(StreamError::CreateStream(err));
} else {
time_partition_in_days = time_partition_days.unwrap();
if update_stream == "true" {
if let Err(err) = update_time_partition_limit_in_stream(
stream_name.clone(),
time_partition_in_days,
)
.await
{
return Err(StreamError::CreateStream(err));
}
return Ok(("Log stream updated", StatusCode::OK));
}
}
}

if !static_schema_flag.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !custom_partition.is_empty() {
if let Err(err) = validate_custom_partition(&custom_partition) {
return Err(StreamError::CreateStream(err));
}
if update_stream == "true" {
if let Err(err) =
update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await
{
return Err(StreamError::CreateStream(err));
}
return Ok(("Log stream updated", StatusCode::OK));
}
}

let schema = validate_static_schema(
&body,
&stream_name,
&time_partition,
&custom_partition,
&static_schema_flag,
);
if let Err(err) = schema {
return Err(StreamError::CreateStream(err));
if CONFIG.parseable.mode == Mode::Query {
create_update_stream(&req, &body, &stream_name).await?;
sync_streams_with_ingestors(req, body, &stream_name).await?;
} else {
create_update_stream(&req, &body, &stream_name).await?;
}

create_stream(
stream_name,
&time_partition,
time_partition_in_days,
&custom_partition,
&static_schema_flag,
schema.unwrap(),
false,
)
.await?;

Ok(("Log stream created", StatusCode::OK))
}

Expand Down Expand Up @@ -355,6 +282,96 @@ fn validate_static_schema(
Ok(schema)
}

async fn create_update_stream(
req: &HttpRequest,
body: &Bytes,
stream_name: &str,
) -> Result<(), StreamError> {
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
fetch_headers_from_put_stream_request(req);

if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
),
status: StatusCode::BAD_REQUEST,
});
}

if !time_partition.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let mut time_partition_in_days: &str = "";
if !time_partition_limit.is_empty() {
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
if let Err(err) = time_partition_days {
return Err(StreamError::CreateStream(err));
} else {
time_partition_in_days = time_partition_days.unwrap();
if update_stream == "true" {
if let Err(err) = update_time_partition_limit_in_stream(
stream_name.to_string(),
time_partition_in_days,
)
.await
{
return Err(StreamError::CreateStream(err));
}
return Ok(());
}
}
}

if !static_schema_flag.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !custom_partition.is_empty() {
if let Err(err) = validate_custom_partition(&custom_partition) {
return Err(StreamError::CreateStream(err));
}
if update_stream == "true" {
if let Err(err) =
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await
{
return Err(StreamError::CreateStream(err));
}
return Ok(());
}
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
&custom_partition,
&static_schema_flag,
);
if let Err(err) = schema {
return Err(StreamError::CreateStream(err));
}

create_stream(
stream_name.to_string(),
&time_partition,
time_partition_in_days,
&custom_partition,
&static_schema_flag,
schema.unwrap(),
false,
)
.await?;

Ok(())
}
pub async fn put_alert(
req: HttpRequest,
body: web::Json<serde_json::Value>,
Expand Down Expand Up @@ -471,7 +488,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea
_ => {}
}

let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?;
let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
Ok((web::Json(cache_enabled), StatusCode::OK))
}

Expand Down Expand Up @@ -545,7 +562,7 @@ pub async fn put_enable_cache(
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;

STREAM_INFO.set_stream_cache(&stream_name, enable_cache)?;
STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?;
Ok((
format!("Cache set to {enable_cache} for log stream {stream_name}"),
StatusCode::OK,
Expand Down
Loading

0 comments on commit f8935f3

Please sign in to comment.