From c01a9bc638a50ba775dcbe0c240e1c026523b82a Mon Sep 17 00:00:00 2001 From: fmendezh Date: Fri, 16 Aug 2024 17:38:39 +0200 Subject: [PATCH] https://github.com/gbif/pipelines/issues/1078 --- .../table/backfill/TableBackfill.java | 232 ++++++++---------- .../backfill/TableBackfillConfiguration.java | 10 +- 2 files changed, 111 insertions(+), 131 deletions(-) diff --git a/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfill.java b/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfill.java index 61c5f4b30..dfc8fdfcb 100644 --- a/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfill.java +++ b/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfill.java @@ -13,7 +13,9 @@ */ package org.gbif.occurrence.table.backfill; +import org.apache.avro.Schema; import org.gbif.occurrence.download.hive.ExtensionTable; +import org.gbif.occurrence.download.hive.OccurrenceAvroHdfsTableDefinition; import org.gbif.occurrence.download.hive.OccurrenceHDFSTableDefinition; import org.gbif.occurrence.spark.udf.UDFS; @@ -106,17 +108,14 @@ private SparkSession createSparkSession() { .enableHiveSupport() .config("spark.sql.catalog.iceberg.type", "hive") .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.defaultCatalog", "iceberg"); + .config("spark.sql.defaultCatalog", "iceberg") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); if (configuration.getHiveThriftAddress() != null) { sparkBuilder .config("hive.metastore.uris", configuration.getHiveThriftAddress()) .config("spark.sql.warehouse.dir", configuration.getWarehouseLocation()); } - - if (configuration.isUsePartitionedTable()) { - sparkBuilder.config("spark.sql.sources.partitionOverwriteMode", "dynamic"); - } return sparkBuilder.getOrCreate(); } @@ -126,8 +125,10 @@ private void createTableUsingSpark(SparkSession spark) { fromAvroToTable( spark, getSnapshotPath(configuration.getCoreName()), // FROM - selectFromAvro(), // SELECT - configuration.getTableNameWithPrefix() // INSERT OVERWRITE INTO + configuration.getAvroTableName(), + occurrenceTableFields(), // SELECT + configuration.getTableNameWithPrefix(), // INSERT OVERWRITE INTO + OccurrenceAvroHdfsTableDefinition.avroDefinition() ); } @@ -235,37 +236,44 @@ private static boolean isDirectoryEmpty(String fromSourceDir, SparkSession spark } private void fromAvroToTable( - SparkSession spark, String fromSourceDir, Column[] select, String saveToTable) { + SparkSession spark, String fromSourceDir, String avroTableName, String selectFields, String saveToTable, Schema schema) { if (!isDirectoryEmpty(fromSourceDir, spark)) { if (configuration.isUsePartitionedTable()) { spark.sql(" set hive.exec.dynamic.partition.mode=nonstrict"); } - Dataset input = - spark.read().format("avro").load(fromSourceDir + "/*.avro").select(select); - - if (configuration.getTablePartitions() != null - && input.rdd().getNumPartitions() > configuration.getTablePartitions()) { - log.info("Setting partitions options {}", configuration.getTablePartitions()); - input = - input - .withColumn( - "_salted_key", - col("gbifid").cast(DataTypes.LongType).mod(configuration.getTablePartitions())) - .repartition(configuration.getTablePartitions()) - .drop("_salted_key"); - } - - input.writeTo(saveToTable).createOrReplace(); + createSourceAvroTable(spark, avroTableName, schema, fromSourceDir); + spark.sql(insertOverWrite(saveToTable, selectFields, avroTableName)); } } + private String occurrenceTableFields() { + return OccurrenceHDFSTableDefinition.definition().stream() + // Excluding partitioned columns + .filter(field -> configuration.isUsePartitionedTable() && !field.getHiveField().equalsIgnoreCase("datasetkey")) + .map(field -> field.getHiveField() + " " + field.getHiveDataType()) + .collect(Collectors.joining(", ")); + } + private void createSourceAvroTable(SparkSession spark, String tableName, Schema schema, String location) { + // Create Hive Table if it doesn't exist + spark.sql("DROP TABLE IF EXISTS " + tableName); + spark.sql("CREATE EXTERNAL TABLE " + tableName + " USING avro " + + "OPTIONS( 'format' = 'avro', 'schema' = '" + schema.toString(true) + "') " + + "LOCATION '" + location + "' TBLPROPERTIES('iceberg.catalog'='location_based_table')"); + } + + private String insertOverWrite(String targetTableName, String selectFields, String sourceTable) { + return "INSERT OVERWRITE TABLE " + targetTableName + + (Strings.isNullOrEmpty(configuration.getDatasetKey())? " PARTITION (datasetkey = '" + configuration.getDatasetKey() + "') " : " ") + + "SELECT " + selectFields + " FROM " + sourceTable; + } + private void createExtensionTable(SparkSession spark, ExtensionTable extensionTable) { spark.sql( configuration.isUsePartitionedTable() ? createExtensionExternalTable(extensionTable) : createExtensionTable(extensionTable)); - List columns = + String select = extensionTable.getFields().stream() .filter( field -> @@ -279,24 +287,25 @@ private void createExtensionTable(SparkSession spark, ExtensionTable extensionTa col( field.substring( field.indexOf('(') + 1, field.lastIndexOf(')')))) - .alias(field.substring(field.indexOf('(') + 1, field.lastIndexOf(')'))) - : col(field)) - .collect(Collectors.toList()); + .alias(field.substring(field.indexOf('(') + 1, field.lastIndexOf(')'))).toString() + : col(field).toString()) + .collect(Collectors.joining(",")); - // Partitioned columns must be at the end - if (configuration.isUsePartitionedTable()) { - columns.add(col("datasetkey")); - } fromAvroToTable( spark, getSnapshotPath(extensionTable.getDirectoryTableName()), // FROM sourceDir - columns.toArray(new Column[] {}), // SELECT - extensionTableName(extensionTable)); // INSERT OVERWRITE INTO + extensionAvroTableName(extensionTable), + select, // SELECT + extensionTableName(extensionTable), + extensionTable.getSchema()); // INSERT OVERWRITE INTO } private String extensionTableName(ExtensionTable extensionTable) { - return String.format( - "%s_ext_%s", configuration.getTableName(), extensionTable.getHiveTableName()); + return configuration.getTableName() + "_ext_" + extensionTable.getHiveTableName(); + } + + private String extensionAvroTableName(ExtensionTable extensionTable) { + return configuration.getTableName() + "_ext" + extensionTable.getHiveTableName() + "_avro"; } private String getPrefix() { @@ -313,7 +322,7 @@ private String createExtensionTable(ExtensionTable extensionTable) { .map(f -> f.name() + " STRING") .collect(Collectors.joining(",\n")) + ')' - + "STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"GZIP\")\n", + + "STORED AS PARQUET TBLPROPERTIES ('parquet.compression'='GZIP')\n", getPrefix() + extensionTableName(extensionTable)); } @@ -327,8 +336,7 @@ private String createExtensionExternalTable(ExtensionTable extensionTable) { .collect(Collectors.joining(",\n")) + ')' + "PARTITIONED BY(datasetkey STRING) " - + "LOCATION '%s'" - + "STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"GZIP\")\n", + + "TBLPROPERTIES ('iceberg.catalog'='location_based_table')\n", getPrefix() + extensionTableName(extensionTable), Paths.get(configuration.getTargetDirectory(), extensionTable.getHiveTableName())); } @@ -356,7 +364,7 @@ public String createIfNotExistsGbifMultimedia() { + "(gbifid STRING, type STRING, format STRING, identifier STRING, references STRING, title STRING, description STRING,\n" + "source STRING, audience STRING, created STRING, creator STRING, contributor STRING,\n" + "publisher STRING, license STRING, rightsHolder STRING) \n" - + "STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"GZIP\")", + + "STORED AS PARQUET TBLPROPERTIES ('parquet.compression'='GZIP')", getPrefix() + multimediaTableName()); } @@ -365,54 +373,55 @@ private String multimediaTableName() { } public void insertOverwriteMultimediaTable(SparkSession spark) { - spark - .table("occurrence") - .select( - col("gbifid"), - from_json( - col("ext_multimedia"), - new ArrayType( - new StructType() - .add("type", "string", false) - .add("format", "string", false) - .add("identifier", "string", false) - .add("references", "string", false) - .add("title", "string", false) - .add("description", "string", false) - .add("source", "string", false) - .add("audience", "string", false) - .add("created", "string", false) - .add("creator", "string", false) - .add("contributor", "string", false) - .add("publisher", "string", false) - .add("license", "string", false) - .add("rightsHolder", "string", false), - true)) - .alias("mm_record")) - .select(col("gbifid"), explode(col("mm_record")).alias("mm_record")) - .select( - col("gbifid"), - callUDF("cleanDelimiters", col("mm_record.type")).alias("type"), - callUDF("cleanDelimiters", col("mm_record.format")).alias("format"), - callUDF("cleanDelimiters", col("mm_record.identifier")).alias("identifier"), - callUDF("cleanDelimiters", col("mm_record.references")).alias("references"), - callUDF("cleanDelimiters", col("mm_record.title")).alias("title"), - callUDF("cleanDelimiters", col("mm_record.description")).alias("description"), - callUDF("cleanDelimiters", col("mm_record.source")).alias("source"), - callUDF("cleanDelimiters", col("mm_record.audience")).alias("audience"), - col("mm_record.created").alias("created"), - callUDF("cleanDelimiters", col("mm_record.creator")).alias("creator"), - callUDF("cleanDelimiters", col("mm_record.contributor")).alias("contributor"), - callUDF("cleanDelimiters", col("mm_record.publisher")).alias("publisher"), - callUDF("cleanDelimiters", col("mm_record.license")).alias("license"), - callUDF("cleanDelimiters", col("mm_record.rightsHolder")).alias("rightsHolder")) - .registerTempTable("mm_records"); - - spark.sql( - String.format( - "INSERT OVERWRITE TABLE %1$s_multimedia \n" - + "SELECT gbifid, type, format, identifier, references, title, description, source, audience, created, creator, contributor, publisher, license, rightsHolder FROM mm_records", - configuration.getTableName())); + Dataset mmRecords = spark + .table(configuration.getTableName()) + .select( + col("gbifid"), + from_json( + col("ext_multimedia"), + new ArrayType( + new StructType() + .add("type", "string", false) + .add("format", "string", false) + .add("identifier", "string", false) + .add("references", "string", false) + .add("title", "string", false) + .add("description", "string", false) + .add("source", "string", false) + .add("audience", "string", false) + .add("created", "string", false) + .add("creator", "string", false) + .add("contributor", "string", false) + .add("publisher", "string", false) + .add("license", "string", false) + .add("rightsHolder", "string", false), + true)) + .alias("mm_record")) + .select(col("gbifid"), explode(col("mm_record")).alias("mm_record")) + .select( + col("gbifid"), + callUDF("cleanDelimiters", col("mm_record.type")).alias("type"), + callUDF("cleanDelimiters", col("mm_record.format")).alias("format"), + callUDF("cleanDelimiters", col("mm_record.identifier")).alias("identifier"), + callUDF("cleanDelimiters", col("mm_record.references")).alias("references"), + callUDF("cleanDelimiters", col("mm_record.title")).alias("title"), + callUDF("cleanDelimiters", col("mm_record.description")).alias("description"), + callUDF("cleanDelimiters", col("mm_record.source")).alias("source"), + callUDF("cleanDelimiters", col("mm_record.audience")).alias("audience"), + col("mm_record.created").alias("created"), + callUDF("cleanDelimiters", col("mm_record.creator")).alias("creator"), + callUDF("cleanDelimiters", col("mm_record.contributor")).alias("contributor"), + callUDF("cleanDelimiters", col("mm_record.publisher")).alias("publisher"), + callUDF("cleanDelimiters", col("mm_record.license")).alias("license"), + callUDF("cleanDelimiters", col("mm_record.rightsHolder")).alias("rightsHolder")); + if (configuration.getDatasetKey() != null) { + mmRecords = mmRecords.where("datasetkey = " + configuration.getDatasetKey()); + } + mmRecords.createOrReplaceTempView("mm_records"); + + spark.sql("INSERT OVERWRITE TABLE " + configuration.getTableName() + "_multimedia \n" + + (Strings.isNullOrEmpty(configuration.getDatasetKey())? " PARTITION (datasetkey = '" + configuration.getDatasetKey() + "') " : " ") + + "SELECT gbifid, type, format, identifier, references, title, description, source, audience, created, creator, contributor, publisher, license, rightsHolder FROM mm_records"); } private String createTableIfNotExists() { @@ -427,13 +436,13 @@ private String createParquetTableIfNotExists() { + OccurrenceHDFSTableDefinition.definition().stream() .map(field -> field.getHiveField() + " " + field.getHiveDataType()) .collect(Collectors.joining(", \n")) - + ") STORED AS PARQUET TBLPROPERTIES (\"parquet.compression\"=\"SNAPPY\")", + + ") STORED AS PARQUET TBLPROPERTIES ('parquet.compression'='SNAPPY')", configuration.getTableNameWithPrefix()); } private String createPartitionedTableIfNotExists() { return String.format( - "CREATE EXTERNAL TABLE IF NOT EXISTS %s (" + "CREATE TABLE IF NOT EXISTS %s (" + OccurrenceHDFSTableDefinition.definition().stream() .filter( field -> @@ -443,46 +452,9 @@ private String createPartitionedTableIfNotExists() { .equalsIgnoreCase("datasetkey")) // Excluding partitioned columns .map(field -> field.getHiveField() + " " + field.getHiveDataType()) .collect(Collectors.joining(", ")) - + ") " - + "PARTITIONED BY(datasetkey STRING) " - + "STORED AS PARQUET " - + "LOCATION '%s'" - + "TBLPROPERTIES (\"parquet.compression\"=\"GZIP\", \"auto.purge\"=\"true\")", - configuration.getTableNameWithPrefix(), - Paths.get(configuration.getTargetDirectory(), configuration.getCoreName().toLowerCase())); - } - - private Column[] selectFromAvro() { - List columns = - OccurrenceHDFSTableDefinition.definition().stream() - .filter( - field -> - !configuration.isUsePartitionedTable() - || !field - .getHiveField() - .equalsIgnoreCase( - "datasetkey")) // Partitioned columns must be at the end - .map( - field -> - field.getInitializer().equals(field.getHiveField()) - ? col(field.getHiveField()) - : callUDF( - field - .getInitializer() - .substring(0, field.getInitializer().indexOf("(")), - col(field.getHiveField())) - .alias(field.getHiveField())) - .collect(Collectors.toList()); - - // Partitioned columns must be at the end - if (configuration.isUsePartitionedTable()) { - columns.add(col("datasetkey")); - } - Column[] selectColumns = columns.toArray(new Column[] {}); - log.info( - "Selecting columns from Avro {}", - columns.stream().map(Column::toString).collect(Collectors.joining(", "))); - return selectColumns; + + ") PARTITIONED BY(datasetkey STRING) USING iceberg " + + "TBLPROPERTIES ('parquet.compression'='GZIP', 'auto.purge'='true')", + configuration.getTableNameWithPrefix()); } private void swapTables(Command command, SparkSession spark) { diff --git a/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfillConfiguration.java b/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfillConfiguration.java index 3430a2d68..ac39d3af1 100644 --- a/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfillConfiguration.java +++ b/occurrence-table-build-spark/src/main/java/org/gbif/occurrence/table/backfill/TableBackfillConfiguration.java @@ -72,8 +72,16 @@ public JsonPOJOBuilder.Value findPOJOBuilderConfig(AnnotatedClass ac) { private String prefixTable; + private String datasetKey; + public String getTableNameWithPrefix() { - return Strings.isNullOrEmpty(prefixTable) ? tableName : prefixTable + "_" + tableName; + String datasetKeyPostFix = Strings.isNullOrEmpty(datasetKey)? "" : '_' + datasetKey; + String tableNameWithPrefix = Strings.isNullOrEmpty(prefixTable) ? tableName : prefixTable + "_" + tableName; + return tableNameWithPrefix + datasetKeyPostFix; + } + + public String getAvroTableName() { + return getTableNameWithPrefix() + "_avro"; } @Nullable private final String warehouseLocation;