Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Component: Bulk Load Support for Big Query #2294

Merged
merged 69 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
130c0c2
Bug Fix: Bitemporal milestoning Derive Main schema removes the VALID_…
prasar-ashutosh Aug 14, 2023
b25832b
Bug Fix: Bitemporal milestoning Schema Evolution must ignore user pro…
prasar-ashutosh Aug 14, 2023
f4d6186
Adding code for concurrent safety feature
prasar-ashutosh Aug 16, 2023
f43bd95
Adding test for Multi Ingest Mode with concurrent Safety
prasar-ashutosh Aug 16, 2023
f712980
Adding tests for concurrent safety
prasar-ashutosh Aug 17, 2023
8973f91
Code Clean up
prasar-ashutosh Aug 17, 2023
97d93c9
Bug Fix: Bitemporal temp tables must be deleted after usage
prasar-ashutosh Aug 18, 2023
5b6e979
Update readme and code review comments
prasar-ashutosh Aug 18, 2023
2d45a3a
Fix typo
prasar-ashutosh Aug 18, 2023
30c64f4
Fix typos in readme
prasar-ashutosh Aug 18, 2023
3263040
Bug Fix: Empty Batch Handling in Unitemp Snapshot
prasar-ashutosh Aug 21, 2023
50faeae
Bug Fix: Code review comments
prasar-ashutosh Aug 21, 2023
7d29ffd
Implement StagedFilesDatasetReference
kumuwu Aug 23, 2023
136793f
Implement StagedFilesSelection
kumuwu Aug 23, 2023
bb9b3ad
Support for Empty Batch Handling in Unitemporal Snapshot
prasar-ashutosh Aug 23, 2023
871461b
Support for FailEmptyBatch strategy in Unitemporal Snapshot
prasar-ashutosh Aug 24, 2023
ddc2bdd
Merge remote-tracking branch 'Ashutosh/concurrent_safety_support' int…
kumuwu Aug 28, 2023
fd35802
Enrich datasets to add additionalDatasetproperties every where
prasar-ashutosh Aug 28, 2023
6bd3eef
Add tests for Empty Data handling
prasar-ashutosh Aug 28, 2023
a0bccd4
Support ICEBERG_TABLE_2022 for Iceberg tables
prasar-ashutosh Aug 28, 2023
16aece8
Implement bulk load for h2
kumuwu Aug 29, 2023
96fa3b9
Merge branch 'concurrent_safety_support' of https://github.com/prasar…
kumuwu Aug 29, 2023
e2efc2a
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Aug 29, 2023
738f9c8
Add missing datasetAdditionalProperties in BigQueryHelper
kumuwu Aug 29, 2023
e82af33
Implement executor flow and add end-to-end test
kumuwu Aug 29, 2023
9753521
Add test
kumuwu Aug 30, 2023
7cc25fd
Add support for digest udf in h2 and add tests
kumuwu Aug 31, 2023
a3c7f74
Clean up
kumuwu Aug 31, 2023
a1bb51a
Add file format and validation for file format and add tests
kumuwu Aug 31, 2023
edeef49
Add missing statement
kumuwu Aug 31, 2023
b067e6e
Fix typo in year
kumuwu Sep 4, 2023
528405b
Fix comments
kumuwu Sep 4, 2023
db7a7dd
Add H2 MD5
kumuwu Sep 4, 2023
248e8f4
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 5, 2023
42ff733
Change file format interface
kumuwu Sep 5, 2023
9197ba8
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 6, 2023
a8726a9
Change stats
kumuwu Sep 7, 2023
b4b5e23
Merge branch 'zhlizh-bulk-load' of https://github.com/kumuwu/legend-e…
kumuwu Sep 11, 2023
f36e6bf
Change stats - make snowflake always return stats no matter success o…
kumuwu Sep 11, 2023
733271c
Implement bulk load in using copy and insert + modify interface for c…
kumuwu Sep 12, 2023
c82272b
Add Support for metadata for BulkLoad Task
prasar-ashutosh Sep 12, 2023
c6a2c59
Refactor Digest Generation Strategy
prasar-ashutosh Sep 13, 2023
402f683
Implement bulk load for big query
kumuwu Sep 13, 2023
68af4a8
Addressed Code Review Comments
prasar-ashutosh Sep 14, 2023
0a5b14a
Clean up
kumuwu Sep 14, 2023
998b78f
Add basic tests for bulk load for big query
kumuwu Sep 14, 2023
c1a1e54
Refactor Code to rename AppendLog to BulkLoad
prasar-ashutosh Sep 15, 2023
68f54d6
Add default bulkLoad Batchid
prasar-ashutosh Sep 15, 2023
cf6b4d0
Refactor Append Log table name
prasar-ashutosh Sep 15, 2023
9295a2a
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 19, 2023
ed912c3
Merge remote-tracking branch 'origin/master' into append_metadata
prasar-ashutosh Sep 19, 2023
f03b337
Merge branch 'append_metadata' of https://github.com/prasar-ashutosh/…
kumuwu Sep 19, 2023
3e69548
Resolve conflicts
kumuwu Sep 19, 2023
97a7eb0
Add digest udf and more tests
kumuwu Sep 20, 2023
4e28794
Fix digest problem
kumuwu Sep 20, 2023
d804bd2
Change H2 digest algo
kumuwu Sep 21, 2023
106de03
Fix tests
kumuwu Sep 21, 2023
2e630df
Fix typo
kumuwu Sep 21, 2023
00f3f0b
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Sep 22, 2023
31b2199
Refactor file format and load options
kumuwu Sep 26, 2023
b948e02
Refactor dataset, selection, reference logic
kumuwu Sep 26, 2023
6611f90
Fix other comments
kumuwu Sep 26, 2023
33f9e88
Fix big query bulk load ingestor flow and add end-to-end tests
kumuwu Sep 28, 2023
17e9349
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Oct 2, 2023
e1a5f5d
Add rows with error handling and test
kumuwu Oct 3, 2023
577082c
Address comments
kumuwu Oct 3, 2023
8d7316a
Bulk Load Batch ID and Task ID & PK Validation (#11)
kumuwu Oct 4, 2023
9ecee88
Merge branch 'master' of https://github.com/kumuwu/legend-engine into…
kumuwu Oct 12, 2023
f611912
Address Code Review Comments
prasar-ashutosh Oct 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ public enum FileFormat
CSV,
JSON,
AVRO,
PARQUET
PARQUET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;
package org.finos.legend.engine.persistence.components.common;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;
import org.immutables.value.Value;

@Immutable
@Style(
import java.util.Optional;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface BulkLoadBatchIdValueAbstract extends Value
public interface LoadOptionsAbstract
{
BulkLoadBatchIdValue INSTANCE = BulkLoadBatchIdValue.builder().build();
Optional<String> fieldDelimiter();

Optional<String> encoding();

Optional<String> nullMarker();

Optional<String> quote();

Optional<Long> skipLeadingRows();

Optional<Long> maxBadRecords();

Optional<String> compression();
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Dataset visitBulkLoad(BulkLoadAbstract bulkLoad)
}
Field batchIdField = Field.builder()
.name(bulkLoad.batchIdField())
.type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty()))
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
.primaryKey(false)
.build();
mainSchemaFields.add(batchIdField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package org.finos.legend.engine.persistence.components.logicalplan;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad;
import org.finos.legend.engine.persistence.components.ingestmode.IngestMode;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
Expand All @@ -32,12 +33,13 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.TabularValues;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset;
import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataUtils;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.finos.legend.engine.persistence.components.util.MetadataUtils;

import java.util.List;
import java.util.Optional;

public class LogicalPlanFactory
{
Expand Down Expand Up @@ -91,14 +93,23 @@ public static LogicalPlan getLogicalPlanForConstantStats(String stats, Long valu
.build();
}

public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets, IngestMode ingestMode)
{
StringValue mainTable = StringValue.of(datasets.mainDataset().datasetReference().name().orElseThrow(IllegalStateException::new));
MetadataDataset metadataDataset = datasets.metadataDataset().isPresent()
? datasets.metadataDataset().get()
: MetadataDataset.builder().build();
MetadataUtils metadataUtils = new MetadataUtils(metadataDataset);
Selection selection = metadataUtils.getBatchId(mainTable).selection();
Selection selection;
if (ingestMode instanceof BulkLoad)
{
BulkLoadMetadataDataset bulkLoadMetadataDataset = datasets.bulkLoadMetadataDataset().orElse(BulkLoadMetadataDataset.builder().build());
BulkLoadMetadataUtils bulkLoadMetadataUtils = new BulkLoadMetadataUtils(bulkLoadMetadataDataset);
selection = bulkLoadMetadataUtils.getBatchId(mainTable).selection();
}
else
{
MetadataDataset metadataDataset = datasets.metadataDataset().orElse(MetadataDataset.builder().build());
MetadataUtils metadataUtils = new MetadataUtils(metadataDataset);
selection = metadataUtils.getBatchId(mainTable).selection();
}

return LogicalPlan.builder().addOps(selection).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;

import java.util.List;
import java.util.Optional;

@org.immutables.value.Value.Immutable
@org.immutables.value.Value.Style(
Expand All @@ -32,4 +35,6 @@ public interface DigestUdfAbstract extends Value
List<String> fieldNames();

List<Value> values();

Optional<Dataset> dataset();
kumuwu marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ public enum FunctionName
GENERATE_ARRAY,
PARSE_DATETIME,
OBJECT_CONSTRUCT,
TO_VARIANT;
TO_VARIANT,
TO_JSON;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class AppendOnlyPlanner extends Planner
{
private final Optional<Condition> dataSplitInRangeCondition;

AppendOnlyPlanner(Datasets datasets, AppendOnly ingestMode, PlannerOptions plannerOptions)
AppendOnlyPlanner(Datasets datasets, AppendOnly ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions);
super(datasets, ingestMode, plannerOptions, capabilities);

// validate
ingestMode.deduplicationStrategy().accept(new ValidatePrimaryKeys(primaryKeys, this::validatePrimaryKeysIsEmpty,
Expand All @@ -83,7 +83,7 @@ protected AppendOnly ingestMode()
}

@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability> capabilities)
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Value> fieldsToSelect = new ArrayList<>(stagingDataset().schemaReference().fieldValues());
List<Value> fieldsToInsert = new ArrayList<>(stagingDataset().schemaReference().fieldValues());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class BitemporalDeltaPlanner extends BitemporalPlanner
private List<FieldValue> primaryKeyFieldsAndFromFieldForSelection;
private List<FieldValue> dataFields;

BitemporalDeltaPlanner(Datasets datasets, BitemporalDelta ingestMode, PlannerOptions plannerOptions)
BitemporalDeltaPlanner(Datasets datasets, BitemporalDelta ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions);
super(datasets, ingestMode, plannerOptions, capabilities);

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime && ingestMode().deduplicationStrategy() instanceof FilterDuplicates)
{
Expand Down Expand Up @@ -201,7 +201,7 @@ protected BitemporalDelta ingestMode()
}

@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability> capabilities)
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Operation> operations = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.util.Capability;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

abstract class BitemporalPlanner extends UnitemporalPlanner
{
BitemporalPlanner(Datasets datasets, BitemporalMilestoned bitemporalMilestoned, PlannerOptions plannerOptions)
BitemporalPlanner(Datasets datasets, BitemporalMilestoned bitemporalMilestoned, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, bitemporalMilestoned, plannerOptions);
super(datasets, bitemporalMilestoned, plannerOptions, capabilities);

// validate
String targetValidDateTimeFrom = bitemporalMilestoned.validityMilestoning().accept(EXTRACT_TARGET_VALID_DATE_TIME_FROM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@

class BitemporalSnapshotPlanner extends BitemporalPlanner
{
BitemporalSnapshotPlanner(Datasets datasets, BitemporalSnapshot ingestMode, PlannerOptions plannerOptions)
BitemporalSnapshotPlanner(Datasets datasets, BitemporalSnapshot ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
super(datasets, ingestMode, plannerOptions);
super(datasets, ingestMode, plannerOptions, capabilities);

// validate

Expand All @@ -67,7 +67,7 @@ protected BitemporalSnapshot ingestMode()
}

@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability> capabilities)
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Pair<FieldValue, Value>> keyValuePairs = keyValuesForMilestoningUpdate();

Expand Down
Loading
Loading