From d0973266bddb47a7cc2c72368d31e1af0f2ec523 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 3 Apr 2024 10:11:49 +0200 Subject: [PATCH] Prune checkpoints in Lambda (#4777) * Prune old file sources on Lambda * Refactor the custom source id serde into a separate module * Improve tests * Fix some bugs * Fix filter on existing sources * Simplify source cleanup rule * Minor adjustments * Go back to a drastic pruning strategy * Use reset_source_checkpoint instead of delete_source --- quickwit/Cargo.lock | 1 + quickwit/quickwit-lambda/Cargo.toml | 1 + .../src/indexer/environment.rs | 7 + .../src/indexer/ingest/helpers.rs | 139 ++++++++++++------ .../quickwit-lambda/src/indexer/ingest/mod.rs | 12 +- 5 files changed, 110 insertions(+), 50 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index e9f85941f92..6d63cdac3ab 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5889,6 +5889,7 @@ dependencies = [ "anyhow", "aws_lambda_events", "chitchat", + "chrono", "flate2", "lambda_http", "lambda_runtime", diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml index ab4edbd23f0..68ad0de2683 100644 --- a/quickwit/quickwit-lambda/Cargo.toml +++ b/quickwit/quickwit-lambda/Cargo.toml @@ -22,6 +22,7 @@ path = "src/bin/searcher.rs" anyhow = { workspace = true } aws_lambda_events = "0.15.0" chitchat = { workspace = true } +chrono = { workspace = true } flate2 = { workspace = true } lambda_http = "0.10.0" lambda_runtime = "0.10.0" diff --git a/quickwit/quickwit-lambda/src/indexer/environment.rs b/quickwit/quickwit-lambda/src/indexer/environment.rs index 0a7e2f19acd..763ae3a299e 100644 --- a/quickwit/quickwit-lambda/src/indexer/environment.rs +++ b/quickwit/quickwit-lambda/src/indexer/environment.rs @@ -38,3 +38,10 @@ pub static DISABLE_MERGE: Lazy = pub static DISABLE_JANITOR: Lazy = Lazy::new(|| var("QW_LAMBDA_DISABLE_JANITOR").is_ok_and(|v| v.as_str() == "true")); + +pub static MAX_CHECKPOINTS: Lazy = Lazy::new(|| { + var("QW_LAMBDA_MAX_CHECKPOINTS").map_or(100, |v| { + v.parse() + .expect("QW_LAMBDA_MAX_CHECKPOINTS must be a positive integer") + }) +}); diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index fc9accb616a..55704650a97 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -25,7 +25,6 @@ use anyhow::{bail, Context}; use chitchat::transport::ChannelTransport; use chitchat::FailureDetectorConfig; use quickwit_actors::{ActorHandle, Mailbox, Universe}; -use quickwit_cli::run_index_checklist; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimesConfig; @@ -34,7 +33,7 @@ use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_config::service::QuickwitService; use quickwit_config::{ load_index_config_from_user_config, ConfigFormat, IndexConfig, NodeConfig, SourceConfig, - SourceInputFormat, SourceParams, TransformConfig, CLI_SOURCE_ID, + SourceInputFormat, SourceParams, TransformConfig, }; use quickwit_index_management::IndexService; use quickwit_indexing::actors::{ @@ -44,10 +43,13 @@ use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, Spa use quickwit_indexing::IndexingPipeline; use quickwit_ingest::IngesterPool; use quickwit_janitor::{start_janitor_service, JanitorService}; -use quickwit_metastore::CreateIndexRequestExt; +use quickwit_metastore::{ + CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt, +}; use quickwit_proto::indexing::CpuCapacity; use quickwit_proto::metastore::{ - CreateIndexRequest, MetastoreError, MetastoreService, MetastoreServiceClient, + CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService, + MetastoreServiceClient, ResetSourceCheckpointRequest, }; use quickwit_proto::types::{NodeId, PipelineUid}; use quickwit_search::SearchJobPlacer; @@ -56,7 +58,11 @@ use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, Teleme use tracing::{debug, info, instrument}; use crate::environment::INDEX_ID; -use crate::indexer::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI}; +use crate::indexer::environment::{ + DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI, MAX_CHECKPOINTS, +}; + +const LAMBDA_SOURCE_ID: &str = "_ingest-lambda-source"; /// The indexing service needs to update its cluster chitchat state so that the control plane is /// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service @@ -131,66 +137,71 @@ pub(super) async fn send_telemetry() { quickwit_telemetry::send_telemetry_event(TelemetryEvent::RunCommand).await; } -pub(super) fn configure_source( +/// Convert the incomming file path to a source config +pub(super) async fn configure_source( input_path: PathBuf, input_format: SourceInputFormat, vrl_script: Option, -) -> SourceConfig { - let source_params = SourceParams::file(input_path); +) -> anyhow::Result { let transform_config = vrl_script.map(|vrl_script| TransformConfig::new(vrl_script, None)); - SourceConfig { - source_id: CLI_SOURCE_ID.to_string(), + let source_params = SourceParams::file(input_path.clone()); + Ok(SourceConfig { + source_id: LAMBDA_SOURCE_ID.to_owned(), num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), enabled: true, source_params, transform_config, input_format, - } + }) } /// Check if the index exists, creating or overwriting it if necessary pub(super) async fn init_index_if_necessary( metastore: &mut MetastoreServiceClient, storage_resolver: &StorageResolver, - source_config: &SourceConfig, default_index_root_uri: &Uri, overwrite: bool, -) -> anyhow::Result<()> { - let checklist_result = - run_index_checklist(metastore, storage_resolver, &INDEX_ID, Some(source_config)).await; - if let Err(e) = checklist_result { - let is_not_found = e - .downcast_ref() - .is_some_and(|meta_error| matches!(meta_error, MetastoreError::NotFound(_))); - if !is_not_found { - bail!(e); +) -> anyhow::Result { + let metadata_result = metastore + .index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone())) + .await; + let metadata = match metadata_result { + Ok(_) if overwrite => { + info!( + index_id = *INDEX_ID, + "Overwrite enabled, clearing existing index", + ); + let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone()); + index_service.clear_index(&INDEX_ID).await?; + metastore + .index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone())) + .await? + .deserialize_index_metadata()? } - info!( - index_id = *INDEX_ID, - index_config_uri = *INDEX_CONFIG_URI, - "Index not found, creating it" - ); - let index_config = load_index_config(storage_resolver, default_index_root_uri).await?; - if index_config.index_id != *INDEX_ID { - bail!( - "Expected index ID was {} but config file had {}", - *INDEX_ID, - index_config.index_id, + Ok(metadata_resp) => metadata_resp.deserialize_index_metadata()?, + Err(MetastoreError::NotFound(_)) => { + info!( + index_id = *INDEX_ID, + index_config_uri = *INDEX_CONFIG_URI, + "Index not found, creating it" ); + let index_config = load_index_config(storage_resolver, default_index_root_uri).await?; + if index_config.index_id != *INDEX_ID { + bail!( + "Expected index ID was {} but config file had {}", + *INDEX_ID, + index_config.index_id, + ); + } + let create_resp = metastore + .create_index(CreateIndexRequest::try_from_index_config(&index_config)?) + .await?; + info!("index created"); + create_resp.deserialize_index_metadata()? } - metastore - .create_index(CreateIndexRequest::try_from_index_config(&index_config)?) - .await?; - info!("index created"); - } else if overwrite { - info!( - index_id = *INDEX_ID, - "Overwrite enabled, clearing existing index", - ); - let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone()); - index_service.clear_index(&INDEX_ID).await?; - } - Ok(()) + Err(e) => bail!(e), + }; + Ok(metadata) } pub(super) async fn spawn_services( @@ -249,6 +260,7 @@ pub(super) async fn spawn_services( Ok((indexing_service_handle, janitor_service_opt)) } +/// Spawn and split an indexing pipeline pub(super) async fn spawn_pipelines( indexing_server_mailbox: &Mailbox, source_config: SourceConfig, @@ -271,6 +283,43 @@ pub(super) async fn spawn_pipelines( Ok((indexing_pipeline_handle, merge_pipeline_handle)) } +/// Prune old Lambda file checkpoints if there are too many +/// +/// Without pruning checkpoints accumulate indifinitely. This is particularly +/// problematic when indexing a lot of small files, as the metastore will grow +/// large even for a small index. +/// +/// The current implementation just deletes all checkpoints if there are more +/// than QW_LAMBDA_MAX_CHECKPOINTS. When this purging is performed, the Lambda +/// indexer might ingest the same file again if it receives a duplicate +/// notification. +pub(super) async fn prune_lambda_source( + metastore: &mut MetastoreServiceClient, + index_metadata: IndexMetadata, +) -> anyhow::Result<()> { + let lambda_checkpoint_opt = index_metadata + .checkpoint + .source_checkpoint(LAMBDA_SOURCE_ID); + + if let Some(lambda_checkpoint) = lambda_checkpoint_opt { + if lambda_checkpoint.num_partitions() > *MAX_CHECKPOINTS { + info!( + partitions = lambda_checkpoint.num_partitions(), + "prune Lambda checkpoints" + ); + metastore + .reset_source_checkpoint(ResetSourceCheckpointRequest { + index_uid: Some(index_metadata.index_uid.clone()), + source_id: LAMBDA_SOURCE_ID.to_owned(), + }) + .await?; + } + } + + Ok(()) +} + +/// Observe the merge pipeline until there are no more ongoing merges pub(super) async fn wait_for_merges( merge_pipeline_handle: ActorHandle, ) -> anyhow::Result<()> { diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs index 80bdd9a3d5c..31c98ab73f9 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -38,7 +38,7 @@ use quickwit_indexing::models::IndexingStatistics; use tracing::{debug, info}; use crate::indexer::environment::{CONFIGURATION_TEMPLATE, DISABLE_JANITOR}; -use crate::indexer::ingest::helpers::wait_for_merges; +use crate::indexer::ingest::helpers::{prune_lambda_source, wait_for_merges}; use crate::utils::load_node_config; #[derive(Debug, Eq, PartialEq)] @@ -58,17 +58,17 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { let (config, storage_resolver, mut metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?; - let source_config = configure_source(args.input_path, args.input_format, args.vrl_script); - - init_index_if_necessary( + let index_metadata = init_index_if_necessary( &mut metastore, &storage_resolver, - &source_config, &config.default_index_root_uri, args.overwrite, ) .await?; + let source_config = + configure_source(args.input_path, args.input_format, args.vrl_script).await?; + let mut services = vec![QuickwitService::Indexer]; if !*DISABLE_JANITOR { services.push(QuickwitService::Janitor); @@ -92,6 +92,8 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { let (indexing_pipeline_handle, merge_pipeline_handle) = spawn_pipelines(indexing_service_handle.mailbox(), source_config).await?; + prune_lambda_source(&mut metastore, index_metadata).await?; + debug!("wait for indexing to complete"); let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, false).await?;