Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support COPY operation #129

Open
penghuo opened this issue Dec 9, 2022 · 5 comments
Open

[FEATURE] Support COPY operation #129

penghuo opened this issue Dec 9, 2022 · 5 comments
Assignees
Labels
feature New feature

Comments

@penghuo
Copy link
Collaborator

penghuo commented Dec 9, 2022

Feature - COPY

Overview

OpenSearch is the search and analytics suite powering popular use cases such as application search, log analytics, and more. (1) Users use the _bulk indexing API to ingest and index. The current _bulk indexing API places a high configuration burden on users today to avoid RejectedExecutionException due to TOO_MANY_REQUESTS. (2) While OpenSearch is part of critical business and application workflows, it is seldom used as a primary data store because there are no strong guarantees on data durability as the cluster is susceptible to data loss in case of hardware failure. In this document, we propose providing a solution to let customers manage raw data on a highly reliable object storage (e.g. S3), then use the COPY command to transfer data to OpenSearch at any time.

COPY

SYNTAX

LOAD DATA *index**-name* 
FROM *data_source* LOCATION *location*
[ COMPRESSION *file-compression* ] 
[ DATA FORMAT *data**-**format* ] [ *parameter* [ *argument* ] [, ... ] ]
[ AUTO ON | OFF [ notification ]]

Overview

You can perform a COPY operation with as few as three parameters: a index name, a data source and a location.
OpenSearch COPY command enable you to load data in several data formats from multiple data sources, control access to load data, manage data transformations, and manage the load operation.

Index name

The name of the index for the COPY command. The index must already exist in the OpenSearch. The COPY command appends the new input data to any existing docs in the index.

FROM data_source LOCATION location

data source

The data source must already exist in OpenSearch. More reading Datasource Metadata Management.

location - Amazon S3

E.g. object path to load data from Amazon S3.

["s3://objectpath"]

File compression

File compression parameters

  • BZIP2, A value that specifies that the input file or files are in compressed bzip2 format (.bz2 files). The COPY operation reads each compressed file and uncompresses the data as it loads.
  • GZIP, A value that specifies that the input file or files are in compressed gzip format (.gz files). The COPY operation reads each compressed file and uncompresses the data as it loads.

Data format

You can load data from text files in fixed-width, character-delimited, comma-separated values (CSV), or JSON format, or from Avro files.

JSON

The source data is in JSON format. The JSON data file contains a set of objects. COPY load each JSON object into index as a doc. Order in a JSON object doesn't matter. Internally, engine use _bulk api to index JSON object.
For each error, OpenSearch records a row in the STL_LOAD_ERRORS system table. The LINE_NUMBER column records the last line of the JSON object that caused the error.

AUTO

If AUTO is set to true, the OpenSearch COPY operation will automatically detect any newly added objects and index them automatically.
User could enable Amazon S3 event notification, then instead of pulling new data regularly, the COPY operation can pull objects after receiving the notification.

Usage

Load data from Amazon S3 into logs index.

LOAD DATA logs
FROM myS3 LOCATION "s3://my_http_logs"
COMPRESSION gzip
DATA FORMAT json

Solution

Leverage opensearch-project/sql#948.

@penghuo penghuo added enhancement New feature or request untriaged and removed untriaged labels Dec 9, 2022
@penghuo penghuo changed the title [FEATURE] - Support COPY operation. [FEATURE] - Support COPY operation Dec 9, 2022
@dai-chen dai-chen transferred this issue from opensearch-project/sql Nov 4, 2023
@dai-chen dai-chen added feature New feature and removed untriaged enhancement New feature or request labels Nov 13, 2023
@YANG-DB
Copy link
Member

YANG-DB commented Dec 18, 2023

@penghuo I like the idea !!
can we also add a filter clause to narrow the load ?

@dai-chen
Copy link
Collaborator

@penghuo I like the idea !! can we also add a filter clause to narrow the load ?

I think it's doable. Currently the challenge is in Flint data source. If there is a simple way to pass entire JSON doc to data source to generate bulk, the rest is just translate COPY SQL statement to data frame code, similar as covering index/MV with WHERE clause.

@dai-chen dai-chen changed the title [FEATURE] - Support COPY operation [FEATURE] Support COPY operation Mar 14, 2024
@penghuo
Copy link
Collaborator Author

penghuo commented Jun 4, 2024

Storing Semi-structured Data in a VARIANT Column vs. Flattening the Nested Structure? https://docs.snowflake.com/en/user-guide/semistructured-considerations

Spark 4.0 include Variant data type. https://issues.apache.org/jira/browse/SPARK-45891

@dai-chen
Copy link
Collaborator

dai-chen commented Aug 8, 2024

It seems this can be quickly tested and benchmarked/micro-benchmarked as follows:

  • Load the entire row data in TEXT format to avoid JSON parsing.

Note that there is an option wholeText to load the entire file data as a single Row object. This approach can be more efficient, especially for small file without considering rate limit (spark.datasource.flint.write.batch_bytes). However, OpenSearch requires each JSON line to be preceded by an action line. We might want to consider whether a more efficient protocol than bulk is useful in our case.

@@ -31,6 +32,62 @@ class FlintDataSourceV2ITSuite

   import testImplicits._

+  test("copy from location to Flint data source") {
+    // Create a temporary JSON file with 5 lines
+    val jsonLines = Seq(
+      """{"accountId": "1", "eventName": "login", "eventSource": "source1"}""",
+      """{"accountId": "2", "eventName": "logout", "eventSource": "source2"}""",
+      """{"accountId": "3", "eventName": "login", "eventSource": "source3"}""",
+      """{"accountId": "4", "eventName": "logout", "eventSource": "source4"}""",
+      """{"accountId": "5", "eventName": "login", "eventSource": "source5"}"""
+    )
+    val tempFilePath = Files.createTempFile("tempJson", ".json")
+    Files.write(tempFilePath, jsonLines.mkString("\n").getBytes)
+
+    val tempFile = tempFilePath.toFile
+    try {
+      // Read JSON file as whole text
+      val df = spark.read
+        // .option("wholetext", "true")
+        .text(tempFile.getAbsolutePath)
+
+      df.show(false)
+
+      val indexName = "flint_test_index"
+
+      // Write to Flint data source
+      df.write
+        .format("flint")
+        .options(openSearchOptions)
+        .mode("overwrite")
+        .save(indexName)
+
+      // Read from Flint data source
+      val resultDf = spark.sqlContext.read
+        .format("flint")
+        .options(openSearchOptions)
+        .schema("accountId STRING, eventName STRING, eventSource STRING")
+        .load(indexName)
+
+      resultDf.show(false)
+
+      assert(resultDf.count() == 5)
+      val expectedRows = Seq(
+        Row("1", "login", "source1"),
+        Row("2", "logout", "source2"),
+        Row("3", "login", "source3"),
+        Row("4", "logout", "source4"),
+        Row("5", "login", "source5")
+      )
+
+      expectedRows.foreach { row =>
+        assert(resultDf.collect().contains(row))
+      }
+    } finally {
+      tempFile.delete()
+    }
+  }
  • Change FlintJacksonGenerator to write entire row data to the bulk request buffer. (This can be determined by some Flint data source option etc)
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala
@@ -264,11 +264,14 @@ case class FlintJacksonGenerator(
    *   The row to convert
    */
   def write(row: InternalRow): Unit = {
+    gen.writeRaw(row.getString(0))
+    /*
     writeObject(
       writeFields(
         fieldWriters = rootFieldWriters,
         row = row,
         schema = dataType.asInstanceOf[StructType]))
+     */
   }

@dai-chen
Copy link
Collaborator

dai-chen commented Oct 1, 2024

Design [WIP]

Problem Statement

This feature aims to address several key pain points that users currently face:

  1. SQL Schema Complexity

    • The current Flint covering index (CI) and materialized view (MV) only work with SparkSQL tables. Users often struggle with how to create these Spark tables, especially when trying to load data from JSON files or application logs in custom formats. Manually defining schemas in Spark can be tedious and error-prone, particularly when the schema evolves over time or is unknown upfront.
    • Related: [FEATURE] Enhance Spark JSON table for semi-structured log analytics #535
  2. Performance Overhead

    • JSON files are parsed and loaded into Spark rows before being converted into OpenSearch bulk request. This parsing and transformation process introduces unnecessary performance overhead, especially in cases where Flint CI loads all columns from large datasets.
    • Related: [FEATURE] Support COPY operation #129 (comment)
  3. Restrictions on OpenSearch Index Reuse

    • The Flint CI and MV solution does not allow users to leverage existing OpenSearch indexes. This limitation reduces flexibility for users who have already invested on building and populating OpenSearch indexes.
    • Related: [FEATURE]Support existing Index usage in Flint #72
  4. Limited Support for Complex Data Types

  5. Limited Control Over Data Ingestion

    • Currently, users can only control the number of files read in each microbatch during Flint CI and MV refreshes. This lack of flexible control introduces significant limitations, particularly when users require more granular control over how much data is ingested and which specific rows are processed.
    • Related: [FEATURE] Support loading source data to OpenSearch index in batches #699

Use Cases

The following use cases highlight the various scenarios where this feature will provide significant value for users:

  1. Data Loading (Object Store -> OpenSearch): Users want to efficiently load entire datasets from S3 into OpenSearch without creating intermediary tables or schemas in Spark. This streamlines the process for indexing raw data in OpenSearch for search and analysis purposes.

  2. Data Offloading (OpenSearch -> Object Store): Users need to export data from OpenSearch back to S3 for backup, archival, or integration with other systems. This provides users with an alternative to OpenSearch snapshots, enabling them to unload hot data into other formats, such as Iceberg or CSV, for better integration with other analytics or business intelligence (BI) tools.

High Level Design

Proposed Syntax [TBD]

COPY [INTO] <dst_data_source>
FROM <src_data_source>
FORMAT <file_format>
WITH ( <options> )

Parameters:

  • dst_data_source: data location (S3/OS index), table name
  • src_data_source: data location (S3/OS index), table name, subquery
  • file_format: source file format name
  • options: copy command options

Examples

Load into OpenSearch index without creating OpenSearch table:

COPY INTO "opensearch://cluster/index"
FROM (
  SELECT ...
  FROM glue.default.http_logs
)
FORMAT json;

Unload OpenSearch index into an existing Iceberg table:

COPY INTO glue.default.http_logs_iceberg
FROM "opensearch://cluster/index";

Implementation Approach

TODO

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature
Projects
None yet
Development

No branches or pull requests

5 participants