Skip to content

Commit

Permalink
Persistence Component: Bulk Load Support for Big Query (#2294)
Browse files Browse the repository at this point in the history
* Bug Fix: Bitemporal milestoning Derive Main schema removes the VALID_FROM/VALID_TRHOUGH field if the name matches with source specified VALID_FROM/VALID_TRHOUGH fields

* Bug Fix: Bitemporal milestoning Schema Evolution must ignore user provided validity fields

* Adding code for concurrent safety feature

* Adding test for Multi Ingest Mode with concurrent Safety

* Adding tests for concurrent safety

* Code Clean up

* Bug Fix: Bitemporal temp tables must be deleted after usage

* Update readme and code review comments

* Fix typo

* Fix typos in readme

* Bug Fix: Empty Batch Handling in Unitemp Snapshot

* Bug Fix: Code review comments

* Implement StagedFilesDatasetReference

* Implement StagedFilesSelection

* Support for Empty Batch Handling in Unitemporal Snapshot

* Support for FailEmptyBatch strategy in Unitemporal Snapshot

* Enrich datasets to add additionalDatasetproperties every where

* Add tests for Empty Data handling

* Support ICEBERG_TABLE_2022 for Iceberg tables

* Implement bulk load for h2

* Add missing datasetAdditionalProperties in BigQueryHelper

* Implement executor flow and add end-to-end test

* Add test

* Add support for digest udf in h2 and add tests

* Clean up

* Add file format and validation for file format and add tests

* Add missing statement

* Fix typo in year

* Fix comments

* Add H2 MD5

* Change file format interface

* Change stats

* Change stats - make snowflake always return stats no matter success or failure

* Implement bulk load in using copy and insert + modify interface for capabilities

* Add Support for metadata for BulkLoad Task

* Refactor Digest Generation Strategy

* Implement bulk load for big query

* Addressed Code Review Comments

* Clean up

* Add basic tests for bulk load for big query

* Refactor Code to rename AppendLog to BulkLoad

* Add default bulkLoad Batchid

* Refactor Append Log table name

* Resolve conflicts

* Add digest udf and more tests

* Fix digest problem

* Change H2 digest algo

* Fix tests

* Fix typo

* Refactor file format and load options

* Refactor dataset, selection, reference logic

* Fix other comments

* Fix big query bulk load ingestor flow and add end-to-end tests

* Add rows with error handling and test

* Address comments

* Bulk Load Batch ID and Task ID & PK Validation (#11)

* Add PK validation in bulk load

* Resolve conflict

* Remove unnecessary delete

* Introduce bulk load batch id and bulk load task id

* Rename variable

* Address Code Review Comments

---------

Co-authored-by: Ashutosh <[email protected]>
Co-authored-by: prasar-ashutosh <[email protected]>
  • Loading branch information
3 people authored Oct 19, 2023
1 parent fb7e2bb commit bc6cc1a
Show file tree
Hide file tree
Showing 70 changed files with 2,218 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ public enum FileFormat
CSV,
JSON,
AVRO,
PARQUET
PARQUET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;
package org.finos.legend.engine.persistence.components.common;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;
import org.immutables.value.Value;

@Immutable
@Style(
import java.util.Optional;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface BulkLoadBatchIdValueAbstract extends Value
public interface LoadOptionsAbstract
{
BulkLoadBatchIdValue INSTANCE = BulkLoadBatchIdValue.builder().build();
Optional<String> fieldDelimiter();

Optional<String> encoding();

Optional<String> nullMarker();

Optional<String> quote();

Optional<Long> skipLeadingRows();

Optional<Long> maxBadRecords();

Optional<String> compression();
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Dataset visitBulkLoad(BulkLoadAbstract bulkLoad)
}
Field batchIdField = Field.builder()
.name(bulkLoad.batchIdField())
.type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty()))
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
.primaryKey(false)
.build();
mainSchemaFields.add(batchIdField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package org.finos.legend.engine.persistence.components.logicalplan;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad;
import org.finos.legend.engine.persistence.components.ingestmode.IngestMode;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
Expand All @@ -32,12 +33,13 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.TabularValues;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataUtils;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.finos.legend.engine.persistence.components.util.MetadataUtils;

import java.util.List;
import java.util.Optional;

public class LogicalPlanFactory
{
Expand Down Expand Up @@ -91,14 +93,23 @@ public static LogicalPlan getLogicalPlanForConstantStats(String stats, Long valu
.build();
}

public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets, IngestMode ingestMode)
{
StringValue mainTable = StringValue.of(datasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
MetadataDataset metadataDataset = datasets.metadataDataset().isPresent()
? datasets.metadataDataset().get()
: MetadataDataset.builder().build();
MetadataUtils metadataUtils = new MetadataUtils(metadataDataset);
Selection selection = metadataUtils.getBatchId(mainTable).selection();
Selection selection;
if (ingestMode instanceof BulkLoad)
{
BulkLoadMetadataDataset bulkLoadMetadataDataset = datasets.bulkLoadMetadataDataset().orElse(BulkLoadMetadataDataset.builder().build());
BulkLoadMetadataUtils bulkLoadMetadataUtils = new BulkLoadMetadataUtils(bulkLoadMetadataDataset);
selection = bulkLoadMetadataUtils.getBatchId(mainTable).selection();
}
else
{
MetadataDataset metadataDataset = datasets.metadataDataset().orElse(MetadataDataset.builder().build());
MetadataUtils metadataUtils = new MetadataUtils(metadataDataset);
selection = metadataUtils.getBatchId(mainTable).selection();
}

return LogicalPlan.builder().addOps(selection).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;

import java.util.List;
import java.util.Optional;

@org.immutables.value.Value.Immutable
@org.immutables.value.Value.Style(
Expand All @@ -32,4 +35,6 @@ public interface DigestUdfAbstract extends Value
List<String> fieldNames();

List<Value> values();

Optional<Dataset> dataset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ public enum FunctionName
GENERATE_ARRAY,
PARSE_DATETIME,
OBJECT_CONSTRUCT,
TO_VARIANT;
TO_VARIANT,
TO_JSON;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class AppendOnlyPlanner extends Planner
{
private final Optional<Condition> dataSplitInRangeCondition;

AppendOnlyPlanner(Datasets datasets, AppendOnly ingestMode, PlannerOptions plannerOptions)
AppendOnlyPlanner(Datasets datasets, AppendOnly ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions);
super(datasets, ingestMode, plannerOptions, capabilities);

// validate
ingestMode.deduplicationStrategy().accept(new ValidatePrimaryKeys(primaryKeys, this::validatePrimaryKeysIsEmpty,
Expand All @@ -83,7 +83,7 @@ protected AppendOnly ingestMode()
}

@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability> capabilities)
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Value> fieldsToSelect = new ArrayList<>(stagingDataset().schemaReference().fieldValues());
List<Value> fieldsToInsert = new ArrayList<>(stagingDataset().schemaReference().fieldValues());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class BitemporalDeltaPlanner extends BitemporalPlanner
private List<FieldValue> primaryKeyFieldsAndFromFieldForSelection;
private List<FieldValue> dataFields;

BitemporalDeltaPlanner(Datasets datasets, BitemporalDelta ingestMode, PlannerOptions plannerOptions)
BitemporalDeltaPlanner(Datasets datasets, BitemporalDelta ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions);
super(datasets, ingestMode, plannerOptions, capabilities);

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime && ingestMode().deduplicationStrategy() instanceof FilterDuplicates)
{
Expand Down Expand Up @@ -201,7 +201,7 @@ protected BitemporalDelta ingestMode()
}

@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability> capabilities)
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Operation> operations = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.util.Capability;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

abstract class BitemporalPlanner extends UnitemporalPlanner
{
BitemporalPlanner(Datasets datasets, BitemporalMilestoned bitemporalMilestoned, PlannerOptions plannerOptions)
BitemporalPlanner(Datasets datasets, BitemporalMilestoned bitemporalMilestoned, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, bitemporalMilestoned, plannerOptions);
super(datasets, bitemporalMilestoned, plannerOptions, capabilities);

// validate
String targetValidDateTimeFrom = bitemporalMilestoned.validityMilestoning().accept(EXTRACT_TARGET_VALID_DATE_TIME_FROM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@

class BitemporalSnapshotPlanner extends BitemporalPlanner
{
BitemporalSnapshotPlanner(Datasets datasets, BitemporalSnapshot ingestMode, PlannerOptions plannerOptions)
BitemporalSnapshotPlanner(Datasets datasets, BitemporalSnapshot ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions);
super(datasets, ingestMode, plannerOptions, capabilities);

// validate

Expand All @@ -67,7 +67,7 @@ protected BitemporalSnapshot ingestMode()
}

@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability> capabilities)
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Pair<FieldValue, Value>> keyValuePairs = keyValuesForMilestoningUpdate();

Expand Down
Loading

0 comments on commit bc6cc1a

Please sign in to comment.