From 24dc8d5eab5b700a362e5e31f3fcd58e8c395ee0 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 3 Apr 2024 11:01:05 +0200 Subject: [PATCH] Lambda search endpoints using a warp adapter (#4805) * Lambda search endpoints using a warp adapter * Fix request id span * Make logging 30% more compact * Minor improvements * Propagete poll_ready to warp svc * Upgrade lambda_runtime --- distribution/lambda/Makefile | 12 +- distribution/lambda/cdk/cli.py | 107 +++++++++-- .../cdk/stacks/examples/mock_data_stack.py | 15 +- distribution/lambda/resources/hdfs-logs.yaml | 2 +- distribution/lambda/resources/mock-sales.yaml | 2 +- quickwit/Cargo.lock | 113 +++++++++-- quickwit/quickwit-lambda/Cargo.toml | 9 +- quickwit/quickwit-lambda/src/bin/searcher.rs | 10 +- quickwit/quickwit-lambda/src/environment.rs | 9 +- .../src/indexer/environment.rs | 2 +- quickwit/quickwit-lambda/src/logger.rs | 80 +++++--- quickwit/quickwit-lambda/src/searcher/api.rs | 144 ++++++++++++++ .../src/searcher/environment.rs | 9 +- .../quickwit-lambda/src/searcher/handler.rs | 85 --------- quickwit/quickwit-lambda/src/searcher/mod.rs | 6 +- .../quickwit-lambda/src/searcher/search.rs | 128 ------------- .../src/searcher/warp_lambda.rs | 175 ++++++++++++++++++ .../src/elasticsearch_api/mod.rs | 15 +- quickwit/quickwit-serve/src/lib.rs | 12 ++ 19 files changed, 613 insertions(+), 322 deletions(-) create mode 100644 quickwit/quickwit-lambda/src/searcher/api.rs delete mode 100644 quickwit/quickwit-lambda/src/searcher/handler.rs delete mode 100644 quickwit/quickwit-lambda/src/searcher/search.rs create mode 100644 quickwit/quickwit-lambda/src/searcher/warp_lambda.rs diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile index 3ccd3633ea9..5b121197dd9 100644 --- a/distribution/lambda/Makefile +++ b/distribution/lambda/Makefile @@ -111,7 +111,7 @@ bench-index: done bench-search-term: - export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true + export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true mem_sizes=( 1024 2048 4096 8192 ) for mem_size in "$${mem_sizes[@]}" do @@ -121,7 +121,7 @@ bench-search-term: done bench-search-histogram: - export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true + export QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS=true mem_sizes=( 1024 2048 4096 8192 ) for mem_size in "$${mem_sizes[@]}" do @@ -133,15 +133,13 @@ bench-search-histogram: bench-search: for run in {1..30} do - export QW_LAMBDA_DISABLE_SEARCH_CACHE=true - $(MAKE) bench-search-term - $(MAKE) bench-search-histogram - export QW_LAMBDA_DISABLE_SEARCH_CACHE=false export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=0 $(MAKE) bench-search-term $(MAKE) bench-search-histogram - export QW_LAMBDA_DISABLE_SEARCH_CACHE=false export QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY=64MB $(MAKE) bench-search-term $(MAKE) bench-search-histogram done + +test-mock-data-endpoints: + python -c 'from cdk import cli; cli.test_mock_data_endpoints()' diff --git a/distribution/lambda/cdk/cli.py b/distribution/lambda/cdk/cli.py index 0cb9d522770..ab9460c91ba 100644 --- a/distribution/lambda/cdk/cli.py +++ b/distribution/lambda/cdk/cli.py @@ -14,6 +14,7 @@ from dataclasses import dataclass from functools import cache from io import BytesIO +from urllib.parse import urlparse import boto3 import botocore.config @@ -29,6 +30,8 @@ retries={"max_attempts": 0}, read_timeout=60 * 15 ) session = boto3.Session(region_name=region) +mock_sales_index_id = "mock-sales" +hdfs_logs_index_id = "hdfs-logs" @cache @@ -39,19 +42,27 @@ def _get_cloudformation_output_value(stack_name: str, export_name: str) -> str: print(f"Stack {stack_name} not identified uniquely, found {stacks}") outputs = stacks[0]["Outputs"] for output in outputs: - if output["ExportName"] == export_name: + if "ExportName" in output and output["ExportName"] == export_name: return output["OutputValue"] else: print(f"Export name {export_name} not found in stack {stack_name}") exit(1) +def _decompress_if_gzip(payload: bytes, headers: dict) -> str: + if headers.get("content-encoding", "") == "gzip": + return gzip.GzipFile(mode="rb", fileobj=BytesIO(payload)).read().decode() + else: + return payload.decode() + + @dataclass class LambdaResult: function_error: str log_tail: str payload: str raw_size_bytes: int + status_code: int @staticmethod def from_lambda_response(lambda_resp: dict) -> "LambdaResult": @@ -61,28 +72,28 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult": log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(), payload=payload, raw_size_bytes=len(payload), + status_code=0, ) @staticmethod def from_lambda_gateway_response(lambda_resp: dict) -> "LambdaResult": gw_str = lambda_resp["Payload"].read().decode() gw_obj = json.loads(gw_str) - payload = gw_obj["body"] - if gw_obj["isBase64Encoded"]: + if "body" in gw_obj: + payload = gw_obj["body"] + status_code = gw_obj["statusCode"] + else: + payload = gw_str + status_code = -1 + if gw_obj.get("isBase64Encoded", False): dec_payload = base64.b64decode(payload) - if gw_obj.get("headers", {}).get("content-encoding", "") == "gzip": - payload = ( - gzip.GzipFile(mode="rb", fileobj=BytesIO(dec_payload)) - .read() - .decode() - ) - else: - payload = dec_payload.decode() + payload = _decompress_if_gzip(dec_payload, gw_obj.get("headers", {})) return LambdaResult( function_error=lambda_resp.get("FunctionError", ""), log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(), payload=payload, raw_size_bytes=len(gw_str), + status_code=status_code, ) def extract_report(self) -> str: @@ -108,12 +119,13 @@ def _format_lambda_output( if lambda_result.function_error != "": print("\n## FUNCTION ERROR:") print(lambda_result.function_error) - print("\n## LOG TAIL:") - print(lambda_result.log_tail) print("\n## RAW RESPONSE SIZE (BYTES):") - ratio = lambda_result.raw_size_bytes / len(lambda_result.payload) - print(f"{lambda_result.raw_size_bytes} ({ratio:.1f}x the final payload)") - print("\n## RESPONSE:") + if len(lambda_result.payload) == 0: + ratio = "empty payload" + else: + ratio = f"{(lambda_result.raw_size_bytes / len(lambda_result.payload)):.1f}x the final payload" + print(f"{lambda_result.raw_size_bytes} ({ratio})") + print(f"\n## RESPONSE [{lambda_result.status_code}]:") payload_size = len(lambda_result.payload) print(lambda_result.payload[:max_resp_size]) if payload_size > max_resp_size: @@ -184,6 +196,7 @@ def invoke_hdfs_indexer() -> LambdaResult: def _invoke_searcher( stack_name: str, + index_id: str, function_export_name: str, payload: str, download_logs: bool, @@ -198,9 +211,14 @@ def _invoke_searcher( LogType="Tail", Payload=json.dumps( { - "headers": {"Content-Type": "application/json"}, + "resource": f"/api/v1/{index_id}/search", + "path": f"/api/v1/{index_id}/search", + "httpMethod": "POST", + "headers": { + "Content-Type": "application/json", + }, "requestContext": { - "http": {"method": "POST"}, + "httpMethod": "POST", }, "body": payload, "isBase64Encoded": False, @@ -218,6 +236,7 @@ def _invoke_searcher( def invoke_hdfs_searcher(payload: str, download_logs: bool = True) -> LambdaResult: return _invoke_searcher( app.HDFS_STACK_NAME, + hdfs_logs_index_id, hdfs_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME, payload, download_logs, @@ -249,7 +268,6 @@ def get_logs( last_event_id = event["eventId"] yield event["message"] if event["message"].startswith("REPORT"): - print(event["message"]) lower_time_bound = int(event["timestamp"]) last_event_id = "REPORT" break @@ -277,6 +295,7 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo int(invoke_start * 1000), ): f.write(log) + print(f"Logs written to lambda.{request_id}.log") except Exception as e: print(f"Failed to download logs: {e}") @@ -284,6 +303,7 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo def invoke_mock_data_searcher(): _invoke_searcher( app.MOCK_DATA_STACK_NAME, + mock_sales_index_id, mock_data_stack.SEARCHER_FUNCTION_NAME_EXPORT_NAME, """{"query": "id:1", "sort_by": "ts", "max_hits": 10}""", True, @@ -321,7 +341,9 @@ def print_mock_data_metastore(): app.MOCK_DATA_STACK_NAME, mock_data_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME ) s3 = session.client("s3") - response = s3.get_object(Bucket=bucket_name, Key="index/mock-sales/metastore.json") + response = s3.get_object( + Bucket=bucket_name, Key=f"index/{mock_sales_index_id}/metastore.json" + ) print(response["Body"].read().decode()) @@ -387,3 +409,48 @@ def benchmark_hdfs_search(payload: str): with open(f"lambda-bench.log", "a+") as f: f.write(json.dumps(bench_result)) f.write("\n") + + +def test_mock_data_endpoints(): + apigw_url = _get_cloudformation_output_value( + app.MOCK_DATA_STACK_NAME, mock_data_stack.API_GATEWAY_EXPORT_NAME + ) + + def req(method, path, body=None, expected_status=200): + conn = http.client.HTTPSConnection(urlparse(apigw_url).netloc) + conn.request( + method, + path, + body, + headers={"x-api-key": os.getenv("SEARCHER_API_KEY")}, + ) + response = conn.getresponse() + print(f"{method} {path}") + headers = {k: v for (k, v) in response.getheaders()} + body = _decompress_if_gzip(response.read(), headers) + if response.status != expected_status: + print(f"[{response.status}] => {body}") + exit(1) + else: + print(f"[{response.status}] => {json.dumps(json.loads(body))[0:100]}") + + req("GET", f"/api/v1/{mock_sales_index_id}/search?query=animal") + req( + "POST", + f"/api/v1/{mock_sales_index_id}/search", + '{"query":"quantity:>5", "max_hits": 10}', + ) + req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_search?q=animal") + req( + "POST", + f"/api/v1/_elastic/{mock_sales_index_id}/_search", + '{"query":{"bool":{"must":[{"range":{"quantity":{"gt":5}}}]}},"size":10}', + ) + req("GET", f"/api/v1/_elastic/{mock_sales_index_id}/_field_caps?fields=quantity") + # expected errors + req( + "GET", + f"/api/v1/_elastic/{mock_sales_index_id}/_search?query=animal", + expected_status=400, + ) + req("GET", f"/api/v1/_elastic/_search?q=animal", expected_status=501) diff --git a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py index 4822e69723a..2ddfc9350cd 100644 --- a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py +++ b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py @@ -17,6 +17,7 @@ SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name" INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name" SOURCE_BUCKET_NAME_EXPORT_NAME = "mock-data-source-bucket-name" +API_GATEWAY_EXPORT_NAME = "mock-data-api-gateway-url" class Source(Construct): @@ -98,11 +99,12 @@ def __init__( searcher_integration = aws_apigateway.LambdaIntegration( qw_svc.searcher.lambda_function ) - search_resource = ( - api.root.add_resource("v1").add_resource(index_id).add_resource("search") - ) + search_resource = api.root.add_resource("v1").add_resource("{proxy+}") search_resource.add_method("POST", searcher_integration, api_key_required=True) - api_deployment = aws_apigateway.Deployment(self, "api-deployment", api=api) + search_resource.add_method("GET", searcher_integration, api_key_required=True) + # Change the deployment id (api-deployment-x) each time the API changes, + # otherwise changes are not deployed. + api_deployment = aws_apigateway.Deployment(self, "api-deployment-1", api=api) api_stage = aws_apigateway.Stage( self, "api", deployment=api_deployment, stage_name="api" ) @@ -122,7 +124,10 @@ def __init__( api.deployment_stage = api_stage aws_cdk.CfnOutput( - self, "search-api-url", value=api.url.rstrip("/") + search_resource.path + self, + "search-api-url", + value=api.url.rstrip("/") + search_resource.path, + export_name=API_GATEWAY_EXPORT_NAME, ) diff --git a/distribution/lambda/resources/hdfs-logs.yaml b/distribution/lambda/resources/hdfs-logs.yaml index ceccba394b5..3538359a0f5 100644 --- a/distribution/lambda/resources/hdfs-logs.yaml +++ b/distribution/lambda/resources/hdfs-logs.yaml @@ -2,7 +2,7 @@ # Index config file for hdfs-logs dataset. # -version: 0.6 +version: 0.7 index_id: hdfs-logs diff --git a/distribution/lambda/resources/mock-sales.yaml b/distribution/lambda/resources/mock-sales.yaml index 46d7405c055..7831039745c 100644 --- a/distribution/lambda/resources/mock-sales.yaml +++ b/distribution/lambda/resources/mock-sales.yaml @@ -2,7 +2,7 @@ # Index config file for mock-sales data generator. # -version: 0.6 +version: 0.7 index_id: mock-sales diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6d63cdac3ab..5458641e695 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -258,6 +258,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-compression" version = "0.4.6" @@ -735,6 +748,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws_lambda_events" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03611508dd1e514e311caec235b581c99a4cb66fa1771bd502819eed69894f12" +dependencies = [ + "base64 0.21.7", + "bytes", + "http 0.2.12", + "http-body 0.4.6", + "http-serde 1.1.3", + "query_map", + "serde", + "serde_json", +] + [[package]] name = "aws_lambda_events" version = "0.15.0" @@ -909,6 +938,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" + [[package]] name = "base64" version = "0.21.7" @@ -3318,24 +3353,21 @@ dependencies = [ [[package]] name = "lambda_http" -version = "0.10.0" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef8fafd7a4ce0bc6093cf1bed3dcdfc1239c27df1e79e3f2154f4d3299d4f60e" +checksum = "2505c4a24f5a8d8ac66a87691215ec1f79736c5bc6e62bb921788dca9753f650" dependencies = [ - "aws_lambda_events", + "aws_lambda_events 0.12.1", "base64 0.21.7", "bytes", "encoding_rs", "futures", - "futures-util", - "http 1.1.0", - "http-body 1.0.0", - "http-body-util", - "hyper 1.2.0", - "lambda_runtime", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "lambda_runtime 0.8.3", "mime", "percent-encoding", - "pin-project-lite", "serde", "serde_json", "serde_urlencoded", @@ -3345,9 +3377,33 @@ dependencies = [ [[package]] name = "lambda_runtime" -version = "0.10.0" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deca8f65d7ce9a8bfddebb49d7d91b22e788a59ca0c5190f26794ab80ed7a702" +dependencies = [ + "async-stream", + "base64 0.20.0", + "bytes", + "futures", + "http 0.2.12", + "http-body 0.4.6", + "http-serde 1.1.3", + "hyper 0.14.28", + "lambda_runtime_api_client 0.8.0", + "serde", + "serde_json", + "serde_path_to_error", + "tokio", + "tokio-stream", + "tower", + "tracing", +] + +[[package]] +name = "lambda_runtime" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2904c10fbeaf07aa317fc96a0e28e89c80ed12f7949ed06afd7869b21fef32" +checksum = "276c835f2217fac810a97f2ed8eabfe9be71afe4f3ffd8671b05cb528e95ff8a" dependencies = [ "async-stream", "base64 0.21.7", @@ -3359,21 +3415,35 @@ dependencies = [ "http-serde 2.0.0", "hyper 1.2.0", "hyper-util", - "lambda_runtime_api_client", + "lambda_runtime_api_client 0.11.0", + "pin-project", "serde", "serde_json", "serde_path_to_error", "tokio", "tokio-stream", "tower", + "tower-layer", "tracing", ] [[package]] name = "lambda_runtime_api_client" -version = "0.10.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1364cd67281721d2a9a4444ba555cf4d74a195e647061fa4ccac46e6f5c3b0ae" +checksum = "690c5ae01f3acac8c9c3348b556fc443054e9b7f1deaf53e9ebab716282bf0ed" +dependencies = [ + "http 0.2.12", + "hyper 0.14.28", + "tokio", + "tower-service", +] + +[[package]] +name = "lambda_runtime_api_client" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "722b02764422524d3f49a934b570f7c567f811eda1f9c4bdebebcfae1bad4f23" dependencies = [ "bytes", "futures-channel", @@ -5704,7 +5774,7 @@ version = "0.8.0" dependencies = [ "anyhow", "arc-swap", - "async-compression", + "async-compression 0.4.6", "async-trait", "aws-sdk-kinesis", "bytes", @@ -5887,12 +5957,14 @@ name = "quickwit-lambda" version = "0.8.0" dependencies = [ "anyhow", - "aws_lambda_events", + "aws_lambda_events 0.15.0", "chitchat", "chrono", "flate2", + "http 0.2.12", "lambda_http", - "lambda_runtime", + "lambda_runtime 0.11.1", + "mime_guess", "once_cell", "opentelemetry", "opentelemetry-otlp", @@ -5912,6 +5984,7 @@ dependencies = [ "quickwit-storage", "quickwit-telemetry", "rand 0.8.5", + "reqwest", "serde", "serde_json", "time", @@ -5919,6 +5992,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "tracing-subscriber", + "warp", ] [[package]] @@ -8360,7 +8434,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "async-compression", + "async-compression 0.4.6", "bitflags 2.5.0", "bytes", "futures-core", @@ -8928,6 +9002,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1e92e22e03ff1230c03a1a8ee37d2f89cd489e2e541b7550d6afad96faed169" dependencies = [ + "async-compression 0.3.15", "bytes", "futures-channel", "futures-util", diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml index 68ad0de2683..abb15cbcca0 100644 --- a/quickwit/quickwit-lambda/Cargo.toml +++ b/quickwit/quickwit-lambda/Cargo.toml @@ -24,8 +24,10 @@ aws_lambda_events = "0.15.0" chitchat = { workspace = true } chrono = { workspace = true } flate2 = { workspace = true } -lambda_http = "0.10.0" -lambda_runtime = "0.10.0" +http = { workspace = true } +lambda_http = "0.8.0" +lambda_runtime = "0.11.1" +mime_guess = { workspace = true } once_cell = { workspace = true } opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true, features = [ @@ -34,6 +36,7 @@ opentelemetry-otlp = { workspace = true, features = [ "http-proto", ] } rand = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } time = { workspace = true } @@ -41,6 +44,8 @@ tokio = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["json"] } +warp = { workspace = true , features = ["compression-gzip"] } + quickwit-actors = { workspace = true } quickwit-cli = { workspace = true } diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs index 564ea4e6653..af06c9e0698 100644 --- a/quickwit/quickwit-lambda/src/bin/searcher.rs +++ b/quickwit/quickwit-lambda/src/bin/searcher.rs @@ -17,13 +17,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use lambda_http::{run, service_fn}; use quickwit_lambda::logger; -use quickwit_lambda::searcher::handler; +use quickwit_lambda::searcher::{setup_searcher_api, warp_lambda}; #[tokio::main] async fn main() -> anyhow::Result<()> { logger::setup_lambda_tracer(tracing::Level::INFO)?; - let func = service_fn(handler); - run(func).await.map_err(|e| anyhow::anyhow!(e)) + let routes = setup_searcher_api().await?; + let warp_service = warp::service(routes); + warp_lambda::run(warp_service) + .await + .map_err(|e| anyhow::anyhow!(e)) } diff --git a/quickwit/quickwit-lambda/src/environment.rs b/quickwit/quickwit-lambda/src/environment.rs index 57f339d351d..ea916eacde4 100644 --- a/quickwit/quickwit-lambda/src/environment.rs +++ b/quickwit/quickwit-lambda/src/environment.rs @@ -24,10 +24,11 @@ use once_cell::sync::Lazy; pub static INDEX_ID: Lazy = Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); -/// Configure the fmt tracing subscriber to log span boundaries. This is very verbose and is -/// only used to generate advanced KPIs from Lambda runs (e.g for blogpost benchmarks) -pub static LOG_SPAN_BOUNDARIES: Lazy = - Lazy::new(|| var("QW_LAMBDA_LOG_SPAN_BOUNDARIES").is_ok_and(|v| v.as_str() == "true")); +/// Configure the fmt tracing subscriber to log as json and include span +/// boundaries. This is very verbose and is only used to generate advanced KPIs +/// from Lambda runs (e.g for blog post benchmarks) +pub static ENABLE_VERBOSE_JSON_LOGS: Lazy = + Lazy::new(|| var("QW_LAMBDA_ENABLE_VERBOSE_JSON_LOGS").is_ok_and(|v| v.as_str() == "true")); pub static OPENTELEMETRY_URL: Lazy> = Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_URL").ok()); diff --git a/quickwit/quickwit-lambda/src/indexer/environment.rs b/quickwit/quickwit-lambda/src/indexer/environment.rs index 763ae3a299e..48a419ee72c 100644 --- a/quickwit/quickwit-lambda/src/indexer/environment.rs +++ b/quickwit/quickwit-lambda/src/indexer/environment.rs @@ -21,7 +21,7 @@ use std::env::var; use once_cell::sync::Lazy; -pub const CONFIGURATION_TEMPLATE: &str = "version: 0.6 +pub const CONFIGURATION_TEMPLATE: &str = "version: 0.7 node_id: lambda-indexer cluster_id: lambda-ephemeral metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index diff --git a/quickwit/quickwit-lambda/src/logger.rs b/quickwit/quickwit-lambda/src/logger.rs index 632c0abc817..11e4d32ffd4 100644 --- a/quickwit/quickwit-lambda/src/logger.rs +++ b/quickwit/quickwit-lambda/src/logger.rs @@ -32,46 +32,76 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::{EnvFilter, Layer}; -use crate::environment::{LOG_SPAN_BOUNDARIES, OPENTELEMETRY_AUTHORIZATION, OPENTELEMETRY_URL}; +use crate::environment::{ + ENABLE_VERBOSE_JSON_LOGS, OPENTELEMETRY_AUTHORIZATION, OPENTELEMETRY_URL, +}; static TRACER_PROVIDER: OnceCell> = OnceCell::new(); pub(crate) const RUNTIME_CONTEXT_SPAN: &str = "runtime_context"; -fn fmt_layer(level: Level) -> impl Layer -where - S: for<'a> LookupSpan<'a>, - S: tracing::Subscriber, -{ +fn fmt_env_filter(level: Level) -> EnvFilter { let default_filter = format!("quickwit={level}") .parse() .expect("Invalid default filter"); - let env_filter = EnvFilter::builder() + EnvFilter::builder() .with_default_directive(default_filter) - .from_env_lossy(); + .from_env_lossy() +} + +fn fmt_time_format() -> UtcTime>> { + // We do not rely on the Rfc3339 implementation, because it has a nanosecond precision. + // See discussion here: https://github.com/time-rs/time/discussions/418 + UtcTime::new( + time::format_description::parse( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z", + ) + .expect("Time format invalid."), + ) +} + +fn compact_fmt_layer(level: Level) -> impl Layer +where + S: for<'a> LookupSpan<'a>, + S: tracing::Subscriber, +{ let event_format = tracing_subscriber::fmt::format() .with_target(true) - .with_timer( - // We do not rely on the Rfc3339 implementation, because it has a nanosecond precision. - // See discussion here: https://github.com/time-rs/time/discussions/418 - UtcTime::new( - time::format_description::parse( - "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z", - ) - .expect("Time format invalid."), - ), - ) + .with_timer(fmt_time_format()) + .compact(); + + tracing_subscriber::fmt::layer::() + .event_format(event_format) + .with_ansi(false) + .with_filter(fmt_env_filter(level)) +} + +fn json_fmt_layer(level: Level) -> impl Layer +where + S: for<'a> LookupSpan<'a>, + S: tracing::Subscriber, +{ + let event_format = tracing_subscriber::fmt::format() + .with_target(true) + .with_timer(fmt_time_format()) .json(); - let fmt_span = if *LOG_SPAN_BOUNDARIES { - FmtSpan::NEW | FmtSpan::CLOSE - } else { - FmtSpan::NONE - }; tracing_subscriber::fmt::layer::() - .with_span_events(fmt_span) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .event_format(event_format) .fmt_fields(JsonFields::default()) .with_ansi(false) - .with_filter(env_filter) + .with_filter(fmt_env_filter(level)) +} + +fn fmt_layer(level: Level) -> Box + Send + Sync + 'static> +where + S: for<'a> LookupSpan<'a>, + S: tracing::Subscriber, +{ + if *ENABLE_VERBOSE_JSON_LOGS { + json_fmt_layer(level).boxed() + } else { + compact_fmt_layer(level).boxed() + } } fn otlp_layer( diff --git a/quickwit/quickwit-lambda/src/searcher/api.rs b/quickwit/quickwit-lambda/src/searcher/api.rs new file mode 100644 index 00000000000..b7b3150e7bd --- /dev/null +++ b/quickwit/quickwit-lambda/src/searcher/api.rs @@ -0,0 +1,144 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashSet; +use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::Arc; + +use http::Method; +use quickwit_config::service::QuickwitService; +use quickwit_config::SearcherConfig; +use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_search::{ + ClusterClient, SearchJobPlacer, SearchService, SearchServiceClient, SearchServiceImpl, + SearcherContext, SearcherPool, +}; +use quickwit_serve::lambda_search_api::*; +use quickwit_storage::StorageResolver; +use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; +use tracing::{error, info}; +use warp::filters::path::FullPath; +use warp::reject::Rejection; +use warp::Filter; + +use crate::searcher::environment::CONFIGURATION_TEMPLATE; +use crate::utils::load_node_config; + +async fn create_local_search_service( + searcher_config: SearcherConfig, + metastore: MetastoreServiceClient, + storage_resolver: StorageResolver, +) -> Arc { + let searcher_pool = SearcherPool::default(); + let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); + let cluster_client = ClusterClient::new(search_job_placer); + let searcher_context = Arc::new(SearcherContext::new(searcher_config, None)); + let search_service = Arc::new(SearchServiceImpl::new( + metastore, + storage_resolver, + cluster_client.clone(), + searcher_context.clone(), + )); + // Add search service to pool to avoid "no available searcher nodes in the pool" error + let socket_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 7280u16); + let search_service_client = + SearchServiceClient::from_service(search_service.clone(), socket_addr); + searcher_pool.insert(socket_addr, search_service_client); + search_service +} + +fn native_api( + search_service: Arc, +) -> impl Filter + Clone { + search_get_handler(search_service.clone()).or(search_post_handler(search_service)) +} + +fn es_compat_api( + search_service: Arc, + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + es_compat_search_handler(search_service.clone()) + .or(es_compat_index_search_handler(search_service.clone())) + .or(es_compat_index_count_handler(search_service.clone())) + .or(es_compat_scroll_handler(search_service.clone())) + .or(es_compat_index_multi_search_handler(search_service.clone())) + .or(es_compat_index_field_capabilities_handler( + search_service.clone(), + )) + .or(es_compat_index_stats_handler(metastore.clone())) + .or(es_compat_stats_handler(metastore.clone())) + .or(es_compat_index_cat_indices_handler(metastore.clone())) + .or(es_compat_cat_indices_handler(metastore.clone())) +} + +fn v1_searcher_api( + search_service: Arc, + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + warp::path!("api" / "v1" / ..) + .and(native_api(search_service.clone()).or(es_compat_api(search_service, metastore))) + .with(warp::filters::compression::gzip()) + .recover(|rejection| { + error!(?rejection, "request rejected"); + recover_fn(rejection) + }) +} + +pub async fn setup_searcher_api( +) -> anyhow::Result + Clone> { + let (node_config, storage_resolver, metastore) = + load_node_config(CONFIGURATION_TEMPLATE).await?; + + let telemetry_info = QuickwitTelemetryInfo::new( + HashSet::from_iter([QuickwitService::Searcher.as_str().to_string()]), + HashSet::from_iter([QuickwitFeature::AwsLambda]), + ); + let _telemetry_handle_opt = quickwit_telemetry::start_telemetry_loop(telemetry_info); + + let search_service = create_local_search_service( + node_config.searcher_config, + metastore.clone(), + storage_resolver, + ) + .await; + + let before_hook = warp::path::full() + .and(warp::method()) + .and_then(|route: FullPath, method: Method| async move { + info!( + method = method.as_str(), + route = route.as_str(), + "new request" + ); + quickwit_telemetry::send_telemetry_event(TelemetryEvent::RunCommand).await; + Ok::<_, std::convert::Infallible>(()) + }) + .untuple_one(); + + let after_hook = warp::log::custom(|info| { + info!(status = info.status().as_str(), "request completed"); + }); + + let api = warp::any() + .and(before_hook) + .and(v1_searcher_api(search_service, metastore)) + .with(after_hook); + + Ok(api) +} diff --git a/quickwit/quickwit-lambda/src/searcher/environment.rs b/quickwit/quickwit-lambda/src/searcher/environment.rs index 2aee5752463..31505d4c0a4 100644 --- a/quickwit/quickwit-lambda/src/searcher/environment.rs +++ b/quickwit/quickwit-lambda/src/searcher/environment.rs @@ -17,11 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::env::var; - -use once_cell::sync::Lazy; - -pub(crate) const CONFIGURATION_TEMPLATE: &str = "version: 0.6 +pub(crate) const CONFIGURATION_TEMPLATE: &str = "version: 0.7 node_id: lambda-searcher metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index @@ -29,6 +25,3 @@ data_dir: /tmp searcher: partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M} "; - -pub(crate) static DISABLE_SEARCH_CACHE: Lazy = - Lazy::new(|| var("QW_LAMBDA_DISABLE_SEARCH_CACHE").is_ok_and(|v| v.as_str() == "true")); diff --git a/quickwit/quickwit-lambda/src/searcher/handler.rs b/quickwit/quickwit-lambda/src/searcher/handler.rs deleted file mode 100644 index 00257e49d05..00000000000 --- a/quickwit/quickwit-lambda/src/searcher/handler.rs +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use anyhow::Context; -use flate2::write::GzEncoder; -use flate2::Compression; -use lambda_http::http::header::{CONTENT_ENCODING, CONTENT_TYPE}; -use lambda_http::{Body, Error, IntoResponse, Request, RequestExt, RequestPayloadExt, Response}; -use quickwit_search::SearchResponseRest; -use quickwit_serve::SearchRequestQueryString; -use tracing::{debug_span, error, info_span, instrument, Instrument}; - -use super::search::{search, SearchArgs}; -use crate::environment::INDEX_ID; -use crate::logger; -use crate::searcher::environment::DISABLE_SEARCH_CACHE; -use crate::utils::LambdaContainerContext; - -#[instrument(skip_all)] -fn deflate_serialize(resp: SearchResponseRest) -> anyhow::Result> { - let value = serde_json::to_value(resp)?; - let mut buffer = Vec::new(); - let mut gz = GzEncoder::new(&mut buffer, Compression::default()); - serde_json::to_writer(&mut gz, &value)?; - gz.finish()?; - Ok(buffer) -} - -pub async fn searcher_handler(request: Request) -> Result { - let container_ctx = LambdaContainerContext::load(); - let memory = request.lambda_context().env_config.memory; - let payload = request - .payload::()? - .context("Empty payload")?; - - let search_res = search(SearchArgs { query: payload }) - .instrument(debug_span!( - "search", - memory, - env.INDEX_ID = *INDEX_ID, - env.DISABLE_SEARCH_CACHE = *DISABLE_SEARCH_CACHE, - cold = container_ctx.cold, - container_id = container_ctx.container_id, - )) - .await?; - - let response_body = deflate_serialize(search_res)?; - - let response = Response::builder() - .header(CONTENT_ENCODING, "gzip") - .header(CONTENT_TYPE, "application/json") - .header("x-lambda-request-id", request.lambda_context().request_id) - .body(Body::Binary(response_body)) - .context("Could not build response")?; - Ok(response) -} - -pub async fn handler(request: Request) -> Result { - let request_id = request.lambda_context().request_id.clone(); - let response = searcher_handler(request) - .instrument(info_span!("searcher_handler", request_id)) - .await; - - if let Err(e) = &response { - error!(err=?e, "Handler failed"); - } - logger::flush_tracer(); - response -} diff --git a/quickwit/quickwit-lambda/src/searcher/mod.rs b/quickwit/quickwit-lambda/src/searcher/mod.rs index ed17d810d9d..f518f212a1d 100644 --- a/quickwit/quickwit-lambda/src/searcher/mod.rs +++ b/quickwit/quickwit-lambda/src/searcher/mod.rs @@ -17,8 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +mod api; mod environment; -mod handler; -mod search; +pub mod warp_lambda; -pub use handler::handler; +pub use api::setup_searcher_api; diff --git a/quickwit/quickwit-lambda/src/searcher/search.rs b/quickwit/quickwit-lambda/src/searcher/search.rs deleted file mode 100644 index 11ec85238ac..00000000000 --- a/quickwit/quickwit-lambda/src/searcher/search.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::collections::HashSet; -use std::net::{Ipv4Addr, SocketAddr}; -use std::sync::Arc; - -use quickwit_config::service::QuickwitService; -use quickwit_config::SearcherConfig; -use quickwit_proto::metastore::MetastoreServiceClient; -use quickwit_proto::search::{SearchRequest, SearchResponse}; -use quickwit_search::{ - root_search, ClusterClient, Result as SearchResult, SearchJobPlacer, SearchResponseRest, - SearchServiceClient, SearchServiceImpl, SearcherContext, SearcherPool, -}; -use quickwit_serve::{search_request_from_api_request, SearchRequestQueryString}; -use quickwit_storage::StorageResolver; -use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; -use tokio::sync::OnceCell; -use tracing::debug; - -use crate::environment::INDEX_ID; -use crate::searcher::environment::{CONFIGURATION_TEMPLATE, DISABLE_SEARCH_CACHE}; -use crate::utils::load_node_config; - -static LAMBDA_SEARCH_CACHE: OnceCell = OnceCell::const_new(); - -#[derive(Clone)] -struct LambdaSearchCtx { - pub searcher_context: Arc, - pub cluster_client: ClusterClient, -} - -impl LambdaSearchCtx { - async fn instantiate( - searcher_config: SearcherConfig, - metastore: MetastoreServiceClient, - storage_resolver: StorageResolver, - ) -> Self { - let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280u16); - let searcher_pool = SearcherPool::default(); - let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); - let cluster_client = ClusterClient::new(search_job_placer); - let searcher_context = Arc::new(SearcherContext::new(searcher_config, None)); - let search_service = Arc::new(SearchServiceImpl::new( - metastore, - storage_resolver, - cluster_client.clone(), - searcher_context.clone(), - )); - let search_service_client = - SearchServiceClient::from_service(search_service.clone(), socket_addr); - searcher_pool.insert(socket_addr, search_service_client); - Self { - searcher_context, - cluster_client, - } - } -} - -async fn single_node_search( - search_config: SearcherConfig, - search_request: SearchRequest, - metastore: MetastoreServiceClient, - storage_resolver: StorageResolver, -) -> SearchResult { - let lambda_search_ctx = if *DISABLE_SEARCH_CACHE { - LambdaSearchCtx::instantiate(search_config, metastore.clone(), storage_resolver).await - } else { - let cached_ctx = LAMBDA_SEARCH_CACHE - .get_or_init(|| { - LambdaSearchCtx::instantiate(search_config, metastore.clone(), storage_resolver) - }) - .await; - LambdaSearchCtx::clone(cached_ctx) - }; - root_search( - &lambda_search_ctx.searcher_context, - search_request, - metastore, - &lambda_search_ctx.cluster_client, - ) - .await -} - -#[derive(Debug, Eq, PartialEq)] -pub struct SearchArgs { - pub query: SearchRequestQueryString, -} - -pub async fn search(args: SearchArgs) -> anyhow::Result { - debug!(args=?args, "lambda-search"); - let (node_config, storage_resolver, metastore) = - load_node_config(CONFIGURATION_TEMPLATE).await?; - let services: HashSet = - HashSet::from_iter([QuickwitService::Searcher.as_str().to_string()]); - let telemetry_info = - QuickwitTelemetryInfo::new(services, HashSet::from_iter([QuickwitFeature::AwsLambda])); - let _telemetry_handle_opt = quickwit_telemetry::start_telemetry_loop(telemetry_info); - quickwit_telemetry::send_telemetry_event(TelemetryEvent::RunCommand).await; - let search_request = search_request_from_api_request(vec![INDEX_ID.clone()], args.query)?; - debug!(search_request=?search_request, "search-request"); - let search_response: SearchResponse = single_node_search( - node_config.searcher_config, - search_request, - metastore, - storage_resolver, - ) - .await?; - let search_response_rest = SearchResponseRest::try_from(search_response)?; - Ok(search_response_rest) -} diff --git a/quickwit/quickwit-lambda/src/searcher/warp_lambda.rs b/quickwit/quickwit-lambda/src/searcher/warp_lambda.rs new file mode 100644 index 00000000000..5fec17e4de8 --- /dev/null +++ b/quickwit/quickwit-lambda/src/searcher/warp_lambda.rs @@ -0,0 +1,175 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Based on https://github.com/aslamplr/warp_lambda under MIT license + +use core::future::Future; +use std::collections::HashSet; +use std::convert::Infallible; +use std::marker::PhantomData; +use std::pin::Pin; +use std::str::FromStr; +use std::task::{Context, Poll}; + +use anyhow::anyhow; +use http::header::Entry; +use lambda_http::http::response::Parts; +use lambda_http::http::HeaderValue; +use lambda_http::{ + lambda_runtime, Adapter, Body as LambdaBody, Error as LambdaError, Request, RequestExt, + Response, Service, +}; +use mime_guess::{mime, Mime}; +use once_cell::sync::Lazy; +use tracing::{info_span, Instrument}; +use warp::hyper::Body as WarpBody; +pub use {lambda_http, warp}; + +pub type WarpRequest = warp::http::Request; +pub type WarpResponse = warp::http::Response; + +pub async fn run<'a, S>(service: S) -> Result<(), LambdaError> +where + S: Service + Send + 'a, + S::Future: Send + 'a, +{ + lambda_runtime::run(Adapter::from(WarpAdapter::new(service))).await +} + +#[derive(Clone)] +pub struct WarpAdapter<'a, S> +where + S: Service, + S::Future: Send + 'a, +{ + warp_service: S, + _phantom_data: PhantomData<&'a WarpResponse>, +} + +impl<'a, S> WarpAdapter<'a, S> +where + S: Service, + S::Future: Send + 'a, +{ + pub fn new(warp_service: S) -> Self { + Self { + warp_service, + _phantom_data: PhantomData, + } + } +} + +static PLAINTEXT_MIMES: Lazy> = Lazy::new(|| { + vec![ + mime::APPLICATION_JAVASCRIPT, + mime::APPLICATION_JAVASCRIPT_UTF_8, + mime::APPLICATION_JSON, + ] + .into_iter() + .collect() +}); + +async fn warp_body_as_lambda_body( + warp_body: WarpBody, + parts: &Parts, +) -> Result { + // Concatenate all bytes into a single buffer + let raw_bytes = warp::hyper::body::to_bytes(warp_body).await?; + + // Attempt to determine the Content-Type + let content_type: Option<&HeaderValue> = parts.headers.get("Content-Type"); + let content_encoding: Option<&HeaderValue> = parts.headers.get("Content-Encoding"); + + // If Content-Encoding is present, assume compression + // If Content-Type is not present, don't assume is a string + let body = if let (Some(typ), None) = (content_type, content_encoding) { + let typ = typ.to_str()?; + let m = typ.parse::()?; + if PLAINTEXT_MIMES.contains(&m) || m.type_() == mime::TEXT { + Some(String::from_utf8(raw_bytes.to_vec()).map(LambdaBody::Text)?) + } else { + None + } + } else { + None + }; + + // Not a text response, make binary + Ok(body.unwrap_or_else(|| LambdaBody::Binary(raw_bytes.to_vec()))) +} + +impl<'a, S> Service for WarpAdapter<'a, S> +where + S: Service + 'a, + S::Future: Send + 'a, +{ + type Response = Response; + type Error = LambdaError; + type Future = Pin> + Send + 'a>>; + + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.warp_service + .poll_ready(ctx) + .map_err(|err| match err {}) + } + + fn call(&mut self, req: Request) -> Self::Future { + let query_params = req.query_string_parameters(); + let request_id = req.lambda_context().request_id.clone(); + let (mut parts, body) = req.into_parts(); + let (content_len, body) = match body { + LambdaBody::Empty => (0, WarpBody::empty()), + LambdaBody::Text(t) => (t.len(), WarpBody::from(t.into_bytes())), + LambdaBody::Binary(b) => (b.len(), WarpBody::from(b)), + }; + + let mut uri = format!("http://{}{}", "127.0.0.1", parts.uri.path()); + if !query_params.is_empty() { + let url_res = reqwest::Url::parse_with_params(&uri, query_params.iter()); + if let Ok(url) = url_res { + uri = url.into(); + } else { + return Box::pin(async { Err(anyhow!("Invalid url").into()) }); + } + } + + // REST API Gateways swallow the content-length header which is required + // by many Quickwit routes (`warp::body::content_length_limit(xxx)`) + if let Entry::Vacant(v) = parts.headers.entry("Content-Length") { + v.insert(content_len.into()); + } + + parts.uri = warp::hyper::Uri::from_str(uri.as_str()).unwrap(); + let warp_request = WarpRequest::from_parts(parts, body); + + // Call warp service with warp request, save future + let warp_fut = self.warp_service.call(warp_request); + + // Create lambda future + let fut = async move { + let warp_response = warp_fut.await?; + let (parts, res_body): (_, _) = warp_response.into_parts(); + let body = warp_body_as_lambda_body(res_body, &parts).await?; + let lambda_response = Response::from_parts(parts, body); + Ok::(lambda_response) + } + .instrument(info_span!("searcher request", request_id)); + Box::pin(fut) + } +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 0fdbd85c540..2cf955eac50 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -34,19 +34,16 @@ use quickwit_ingest::IngestServiceClient; use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::SearchService; -use rest_handler::{ - es_compat_cluster_info_handler, es_compat_index_multi_search_handler, - es_compat_index_search_handler, es_compat_scroll_handler, es_compat_search_handler, +pub use rest_handler::{ + es_compat_cat_indices_handler, es_compat_cluster_info_handler, es_compat_delete_index_handler, + es_compat_index_cat_indices_handler, es_compat_index_count_handler, + es_compat_index_field_capabilities_handler, es_compat_index_multi_search_handler, + es_compat_index_search_handler, es_compat_index_stats_handler, es_compat_scroll_handler, + es_compat_search_handler, es_compat_stats_handler, }; use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; -use self::rest_handler::{ - es_compat_cat_indices_handler, es_compat_delete_index_handler, - es_compat_index_cat_indices_handler, es_compat_index_count_handler, - es_compat_index_field_capabilities_handler, es_compat_index_stats_handler, - es_compat_stats_handler, -}; use crate::elasticsearch_api::model::ElasticsearchError; use crate::rest_api_response::RestApiResponse; use crate::{BodyFormat, BuildInfo}; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index fdbc9092365..d1aabf09a51 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -1189,6 +1189,18 @@ async fn check_cluster_configuration( Ok(()) } +pub mod lambda_search_api { + pub use crate::elasticsearch_api::{ + es_compat_cat_indices_handler, es_compat_index_cat_indices_handler, + es_compat_index_count_handler, es_compat_index_field_capabilities_handler, + es_compat_index_multi_search_handler, es_compat_index_search_handler, + es_compat_index_stats_handler, es_compat_scroll_handler, es_compat_search_handler, + es_compat_stats_handler, + }; + pub use crate::rest::recover_fn; + pub use crate::search_api::{search_get_handler, search_post_handler}; +} + #[cfg(test)] mod tests { use quickwit_cluster::{create_cluster_for_test, ChannelTransport, ClusterNode};