From ffb1165f59defa66b31b4fd9cb6367b71050071b Mon Sep 17 00:00:00 2001 From: dengzh Date: Tue, 12 Mar 2024 08:51:01 +0800 Subject: [PATCH] HIVE-27746: Hive Metastore should send single AlterPartitionEvent with list of partitions (#4820) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, John Sherman, Henri Biestro) --- .../listener/DbNotificationListener.java | 22 ++- .../listener/TestDbNotificationListener.java | 86 ++++++++-- .../TestCachedStoreUpdateUsingEvents.java | 2 +- .../ql/cache/results/QueryResultsCache.java | 1 + .../dump/events/AlterPartitionsHandler.java | 93 +++++++++++ .../repl/dump/events/EventHandlerFactory.java | 1 + .../parse/repl/load/message/TableHandler.java | 14 +- .../hive/metastore/conf/MetastoreConf.java | 6 + .../hadoop/hive/metastore/HMSHandler.java | 31 ++-- .../hive/metastore/HiveAlterHandler.java | 28 ++-- .../metastore/MetaStoreEventListener.java | 8 + .../metastore/MetaStoreListenerNotifier.java | 10 ++ .../hive/metastore/cache/CachedStore.java | 6 + .../events/AlterPartitionsEvent.java | 124 ++++++++++++++ .../messaging/AlterPartitionsMessage.java | 69 ++++++++ .../metastore/messaging/EventMessage.java | 4 +- .../metastore/messaging/MessageBuilder.java | 9 + .../messaging/MessageDeserializer.java | 2 + .../json/JSONAlterPartitionsMessage.java | 157 ++++++++++++++++++ .../json/JSONMessageDeserializer.java | 10 ++ .../messaging/json/gzip/DeSerializer.java | 6 + .../events/TestAlterPartitionsEvent.java | 113 +++++++++++++ 22 files changed, 751 insertions(+), 51 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionsHandler.java create mode 100644 standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionsEvent.java create mode 100644 standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionsMessage.java create mode 100644 standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionsMessage.java create mode 100644 standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/events/TestAlterPartitionsEvent.java diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 1f9ed1052a65..7262e7f873e2 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -23,7 +23,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -73,6 +72,7 @@ import org.apache.hadoop.hive.metastore.events.AddUniqueConstraintEvent; import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionsEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent; import org.apache.hadoop.hive.metastore.events.CommitCompactionEvent; @@ -110,6 +110,7 @@ import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; @@ -481,6 +482,25 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce process(event, partitionEvent); } + @Override + public void onAlterPartitions(AlterPartitionsEvent event) throws MetaException { + Table table = event.getTable(); + Iterator> iterator = event.getNewPartsIterator(maxBatchSize); + while (iterator.hasNext()) { + List partitions = iterator.next(); + AlterPartitionsMessage msg = MessageBuilder.getInstance() + .buildAlterPartitionsMessage(event.getTable(), partitions, + event.getIsTruncateOp(), partitions.get(0).getWriteId()); + NotificationEvent notification = + new NotificationEvent(0, now(), EventType.ALTER_PARTITIONS.toString(), + msgEncoder.getSerializer().serialize(msg)); + notification.setCatName(table.isSetCatName() ? table.getCatName() : getDefaultCatalog(conf)); + notification.setDbName(table.getDbName()); + notification.setTableName(table.getTableName()); + process(notification, event); + } + } + /** * @param dbEvent database event * @throws MetaException diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 5be32602d24e..74b90c60a6f6 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -64,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; @@ -83,7 +85,7 @@ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; @@ -758,16 +760,17 @@ public void alterPartition() throws Exception { NotificationEvent event = rsp.getEvents().get(2); assertEquals(firstEventId + 3, event.getEventId()); assertTrue(event.getEventTime() >= startTime); - assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); + assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType()); assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); // Parse the message field - AlterPartitionMessage alterPtnMsg = md.getAlterPartitionMessage(event.getMessage()); - assertEquals(defaultDbName, alterPtnMsg.getDB()); - assertEquals(tblName, alterPtnMsg.getTable()); - assertEquals(newPart, alterPtnMsg.getPtnObjAfter()); - assertEquals(TableType.MANAGED_TABLE.toString(), alterPtnMsg.getTableType()); + AlterPartitionsMessage alterPtnsMsg = md.getAlterPartitionsMessage(event.getMessage()); + assertEquals(defaultDbName, alterPtnsMsg.getDB()); + assertEquals(tblName, alterPtnsMsg.getTable()); + assertEquals(newPart, alterPtnsMsg.getPartitionObjs().iterator().next()); + assertEquals(1, alterPtnsMsg.getPartitions().size()); + assertEquals(TableType.MANAGED_TABLE.toString(), alterPtnsMsg.getTableType()); // Verify the eventID was passed to the non-transactional listener MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); @@ -787,6 +790,69 @@ public void alterPartition() throws Exception { testEventCounts(defaultDbName, firstEventId, null, null, 3); } + @Test + public void alterPartitions() throws Exception { + String defaultDbName = "default"; + String tblName = "alterptns"; + new TableBuilder() + .setDbName(defaultDbName).setTableName(tblName).setOwner("me") + .addCol("col1", "int") + .addPartCol("col2", "int") + .addPartCol("col3", "string") + .setLocation(testTempDir) + .create(msClient, new HiveConf()); + + Table table = msClient.getTable(new GetTableRequest(defaultDbName, tblName)); + List partitions = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + List values = Arrays.asList(i + "", "part" + i); + Partition part = new Partition(values, defaultDbName, tblName, + 0, 0, table.getSd(), emptyParameters); + partitions.add(part); + } + msClient.add_partitions(partitions); + partitions.forEach(partition -> partition.setCreateTime(startTime)); + msClient.alter_partitions(defaultDbName, tblName, partitions, null, null, -1); + + // Get notifications from metastore + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(2); + assertEquals(firstEventId + 3, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType()); + assertEquals(defaultDbName, event.getDbName()); + assertEquals(tblName, event.getTableName()); + // Parse the message field + AlterPartitionsMessage alterPtnsMsg = md.getAlterPartitionsMessage(event.getMessage()); + assertEquals(defaultDbName, alterPtnsMsg.getDB()); + assertEquals(tblName, alterPtnsMsg.getTable()); + Iterator expectedIterator = partitions.iterator(), + actualIterator = alterPtnsMsg.getPartitionObjs().iterator(); + while (expectedIterator.hasNext() && actualIterator.hasNext()) { + assertEquals(expectedIterator.next(), actualIterator.next()); + } + assertFalse(expectedIterator.hasNext() || actualIterator.hasNext()); + assertEquals(table.getTableType(), alterPtnsMsg.getTableType()); + + // Verify the eventID was passed to the non-transactional listener + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); + MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + + // When hive.metastore.transactional.event.listeners is set, + // a failed event should not create a new notification + DummyRawStoreFailEvent.setEventSucceed(false); + try { + msClient.alter_partitions(defaultDbName, tblName, partitions, null, null, -1); + fail("Error: alter partition should've failed"); + } catch (Exception ex) { + // expected + } + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); + } + @Test public void dropPartition() throws Exception { String defaultDbName = "default"; @@ -1530,7 +1596,7 @@ public void sqlInsertPartition() throws Exception { event = rsp.getEvents().get(26); assertEquals(firstEventId + 27, event.getEventId()); - assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); + assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); // Test fromEventId different from the very first @@ -1548,12 +1614,12 @@ public void sqlInsertPartition() throws Exception { event = rsp.getEvents().get(29); assertEquals(firstEventId + 30, event.getEventId()); - assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); + assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); event = rsp.getEvents().get(30); assertEquals(firstEventId + 31, event.getEventId()); - assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); + assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); testEventCounts(defaultDbName, firstEventId, null, null, 31); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java index 1ad1349b10be..2db61aa6c06a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java @@ -544,7 +544,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { // Read the altered partition via CachedStore ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); - Assert.assertEquals(ptn1Atl.getParameters(), ptnRead.getParameters()); + Assert.assertEquals(null, ptnRead); ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); Assert.assertEquals(null, ptnRead); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 4b833b730cf9..b613df1df440 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -968,6 +968,7 @@ public void accept(NotificationEvent event) { switch (event.getEventType()) { case MessageBuilder.ADD_PARTITION_EVENT: case MessageBuilder.ALTER_PARTITION_EVENT: + case MessageBuilder.ALTER_PARTITIONS_EVENT: case MessageBuilder.DROP_PARTITION_EVENT: case MessageBuilder.ALTER_TABLE_EVENT: case MessageBuilder.DROP_TABLE_EVENT: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionsHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionsHandler.java new file mode 100644 index 000000000000..5bf9fb188e5e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionsHandler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +public class AlterPartitionsHandler extends AbstractEventHandler { + private final boolean isTruncateOp; + private final org.apache.hadoop.hive.metastore.api.Table tableObject; + private final Iterable partitions; + + AlterPartitionsHandler(NotificationEvent event) throws Exception { + super(event); + AlterPartitionsMessage apm = eventMessage; + isTruncateOp = apm.getIsTruncateOp(); + tableObject = apm.getTableObj(); + partitions = apm.getPartitionObjs(); + } + + @Override + AlterPartitionsMessage eventMessage(String stringRepresentation) { + return deserializer.getAlterPartitionsMessage(stringRepresentation); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} ALTER_PARTITIONS message : {}", fromEventId(), eventMessageAsJSON); + + // We do not dump partitions during metadata only bootstrap dump (See TableExport + // .getPartitions(), for bootstrap dump we pass tableSpec with TABLE_ONLY set.). So don't + // dump partition related events for metadata-only dump. + if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + return; + } + + Table qlMdTable = new Table(tableObject); + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, + withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { + return; + } + + if (!isTruncateOp) { + withinContext.replicationSpec.setIsMetadataOnly(true); + List partitionObjs = new ArrayList<>(); + for (org.apache.hadoop.hive.metastore.api.Partition part : partitions) { + partitionObjs.add(new Partition(qlMdTable, part)); + } + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + partitionObjs, + withinContext.replicationSpec, + withinContext.hiveConf); + } + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(eventMessageAsJSON); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return isTruncateOp ? DumpType.EVENT_TRUNCATE_PARTITION : DumpType.EVENT_ALTER_PARTITION; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 15428101c1ee..8598fb866c08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -36,6 +36,7 @@ private EventHandlerFactory() { register(MessageBuilder.ADD_PARTITION_EVENT, AddPartitionHandler.class); register(MessageBuilder.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class); register(MessageBuilder.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); + register(MessageBuilder.ALTER_PARTITIONS_EVENT, AlterPartitionsHandler.class); register(MessageBuilder.ALTER_TABLE_EVENT, AlterTableHandler.class); register(MessageBuilder.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class); register(MessageBuilder.CREATE_TABLE_EVENT, CreateTableHandler.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index a8d2bffa83cd..59188cd71477 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; @@ -113,9 +114,16 @@ private Tuple extract(Context context) throws SemanticException { writeId = alterTableMessage.getWriteId(); break; case EVENT_ALTER_PARTITION: - AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); - tableType = msg.getTableObj().getTableType(); - writeId = msg.getWriteId(); + String eventMessage = deserializer.deSerializeGenericString(context.dmd.getPayload()); + if (eventMessage.contains("\"keyValues\":")) { + AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); + tableType = msg.getTableObj().getTableType(); + writeId = msg.getWriteId(); + } else { + AlterPartitionsMessage msg = deserializer.getAlterPartitionsMessage(context.dmd.getPayload()); + tableType = msg.getTableType(); + writeId = msg.getWriteId(); + } break; default: break; diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 585b55c39f1d..5a0c8a4ba0f5 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1286,6 +1286,12 @@ public enum ConfVars { "hive.notification.sequence.lock.retry.sleep.interval", 10, TimeUnit.SECONDS, "Sleep interval between retries to acquire a notification lock as described part of property " + NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES.name()), + NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED("metastore.alterPartitions.notification.v2.enabled", + "hive.metastore.alterPartitions.notification.v2.enabled", true, + "Should send a single notification event on alter partitions. " + + "This property is for ensuring backward compatibility when it sets to false, " + + "HMS will send an old ALTER_PARTITION event per partition, so downstream consumers can " + + "still process the ALTER_PARTITION event without making changes."), ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS("metastore.orm.retrieveMapNullsAsEmptyStrings", "hive.metastore.orm.retrieveMapNullsAsEmptyStrings",false, "Thrift does not support nulls in maps, so any nulls present in maps retrieved from ORM must " + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java index 22eefbc1ad59..cd50a3480ee1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java @@ -212,24 +212,6 @@ public static String getIPAddress() { return null; } - /** - * Internal function to notify listeners for meta config change events - */ - private void notifyMetaListeners(String key, String oldValue, String newValue) throws MetaException { - for (MetaStoreEventListener listener : listeners) { - listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, newValue)); - } - - if (transactionalListeners.size() > 0) { - // All the fields of this event are final, so no reason to create a new one for each - // listener - ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, newValue); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onConfigChange(cce); - } - } - } - /** * Internal function to notify listeners to revert back to old values of keys * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupHandlerContext @@ -246,7 +228,11 @@ private void notifyMetaListenersOnShutDown() { String currVal = entry.getValue(); String oldVal = conf.get(key); if (!Objects.equals(oldVal, currVal)) { - notifyMetaListeners(key, oldVal, currVal); + for (List eventListeners : + new List[] { listeners, transactionalListeners }) { + MetaStoreListenerNotifier.notifyEvent(eventListeners, EventType.CONFIG_CHANGE, + new ConfigChangeEvent(this, key, oldVal, currVal)); + } } } logAndAudit("Meta listeners shutdown notification completed."); @@ -518,8 +504,11 @@ public void setMetaConf(String key, String value) throws MetaException { HMSHandlerContext.setHMSHandler(this); } configuration.set(key, value); - notifyMetaListeners(key, oldValue, value); - + for (List eventListeners : + new List[] { listeners, transactionalListeners }) { + MetaStoreListenerNotifier.notifyEvent(eventListeners, EventType.CONFIG_CHANGE, + new ConfigChangeEvent(this, key, oldValue, value)); + } if (ConfVars.TRY_DIRECT_SQL == confVar) { HMSHandler.LOG.info("Direct SQL optimization = {}", value); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index a2807961b757..68b522ff7471 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionsEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.model.MTable; @@ -920,23 +921,22 @@ public List alterPartitions(final RawStore msdb, Warehouse wh, final } msdb.alterPartitions(catName, dbname, name, partValsList, new_parts, writeId, writeIdList); - Iterator oldPartsIt = oldParts.iterator(); - for (Partition newPart : new_parts) { - Partition oldPart; - if (oldPartsIt.hasNext()) { - oldPart = oldPartsIt.next(); - } else { - throw new InvalidOperationException("Missing old partition corresponding to new partition " + - "when invoking MetaStoreEventListener for alterPartitions event."); - } - if (transactionalListeners != null && !transactionalListeners.isEmpty()) { - MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_PARTITION, - new AlterPartitionEvent(oldPart, newPart, tbl, false, true, newPart.getWriteId(), handler), - environmentContext); + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + boolean shouldSendSingleEvent = MetastoreConf.getBoolVar(handler.getConf(), + MetastoreConf.ConfVars.NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED); + if (shouldSendSingleEvent) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_PARTITIONS, + new AlterPartitionsEvent(oldParts, new_parts, tbl, false, true, handler), environmentContext); + } else { + for (Partition newPart : new_parts) { + Partition oldPart = oldPartMap.get(newPart.getValues()); + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_PARTITION, + new AlterPartitionEvent(oldPart, newPart, tbl, false, true, newPart.getWriteId(), handler), + environmentContext); + } } } - success = msdb.commitTransaction(); } catch (InvalidObjectException | NoSuchObjectException e) { throw new InvalidOperationException("Alter partition operation failed: " + e); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index b028f41d7517..70df6c8f7518 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.events.AlterISchemaEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionsEvent; import org.apache.hadoop.hive.metastore.events.AlterSchemaVersionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent; @@ -137,6 +138,13 @@ public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaExce public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException { } + /** + * @param event alter partitions event + * @throws MetaException + */ + public void onAlterPartitions (AlterPartitionsEvent event) throws MetaException { + } + /** * @param dbEvent database event * @throws MetaException diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java index d35cd2353868..822d8769c484 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java @@ -37,10 +37,12 @@ import org.apache.hadoop.hive.metastore.events.AlterDataConnectorEvent; import org.apache.hadoop.hive.metastore.events.AlterISchemaEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionsEvent; import org.apache.hadoop.hive.metastore.events.AlterSchemaVersionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent; import org.apache.hadoop.hive.metastore.events.CommitCompactionEvent; +import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; import org.apache.hadoop.hive.metastore.events.CreateCatalogEvent; import org.apache.hadoop.hive.metastore.events.CreateDataConnectorEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; @@ -163,6 +165,12 @@ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws listener.onAlterPartition((AlterPartitionEvent)event); } }) + .put(EventType.ALTER_PARTITIONS, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onAlterPartitions((AlterPartitionsEvent)event); + } + }) .put(EventType.INSERT, new EventNotifier() { @Override public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { @@ -283,6 +291,8 @@ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws ((listener, event) -> listener.onCommitCompaction((CommitCompactionEvent) event, null, null))) .put(EventType.RELOAD, ((listener, event) -> listener.onReload((ReloadEvent) event))) + .put(EventType.CONFIG_CHANGE, + ((listener, event) -> listener.onConfigChange((ConfigChangeEvent) event))) .build() ); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 9a2408dd2234..f7c630c5c285 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -323,6 +323,12 @@ private static void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(), catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()); break; + case MessageBuilder.ALTER_PARTITIONS_EVENT: + AlterPartitionsMessage alterPtnsMessage = deserializer.getAlterPartitionsMessage(message); + List> part_vals = new ArrayList<>(); + alterPtnsMessage.getPartitionObjs().forEach(part -> part_vals.add(part.getValues())); + sharedCache.removePartitionsFromCache(catalogName, dbName, tableName, part_vals); + break; case MessageBuilder.DROP_PARTITION_EVENT: DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message); for (Map partMap : dropPartitionMessage.getPartitions()) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionsEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionsEvent.java new file mode 100644 index 000000000000..f090b8eea585 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionsEvent.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.events; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + +import static java.util.Objects.requireNonNull; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AlterPartitionsEvent extends ListenerEvent { + + private final List old_parts; + private final List new_parts; + private final Table table; + private final boolean isTruncateOp; + + public AlterPartitionsEvent(List old_parts, List new_parts, Table table, + boolean isTruncateOp, boolean status, IHMSHandler handler) { + super(status, handler); + this.old_parts = requireNonNull(old_parts, "old_parts is null"); + this.new_parts = requireNonNull(new_parts, "new_parts is null"); + this.table = requireNonNull(table, "table is null"); + this.isTruncateOp = isTruncateOp; + } + + /** + * @return The table. + */ + public Table getTable() { + return table; + } + + /** + * @return Iterator for new partitions. + */ + public Iterator> getNewPartsIterator(int batchSize) { + return iteratePartitions(new_parts, batchSize); + } + + /** + * The partitions might have different write ids, groups the partitions by write id first, + * then iterate each group, for each iteration, there is a maximum number(e.g, the batchSize) + * of the partitions returned. + * @param partitions partitions in this alter event + * @param batchSize the maximum number of partitions returned in each iteration + * @return iterator of partitions in bulk + */ + private Iterator> iteratePartitions(List partitions, + final int batchSize) { + Map> writeIdToParts = new HashMap<>(); + partitions.forEach(part -> + writeIdToParts.computeIfAbsent(part.getWriteId(), k -> new ArrayList<>()).add(part)); + Iterator>> iterator = writeIdToParts.entrySet().iterator(); + return new Iterator>() { + Map.Entry> mapEntry; + Iterator current; + @Override + public boolean hasNext() { + return iterator.hasNext() || (current != null && current.hasNext()); + } + @Override + public List next() { + List result = new ArrayList<>(); + if (mapEntry == null && iterator.hasNext()) { + mapEntry = iterator.next(); + } + if (current == null) { + current = mapEntry.getValue().iterator(); + } else if (!current.hasNext() && iterator.hasNext()) { + mapEntry = iterator.next(); + current = mapEntry.getValue().iterator(); + } + + int i = 0; + while (current != null && current.hasNext() && + (batchSize <= 0 || i++ < batchSize)) { + result.add(current.next()); + } + return result; + } + }; + } + + /** + * @return Iterator for old partitions. + */ + public Iterator> getOldPartsIterator(int batchSize) { + return iteratePartitions(old_parts, batchSize); + } + + /** + * Get the truncate table flag + */ + public boolean getIsTruncateOp() { + return isTruncateOp; + } + +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionsMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionsMessage.java new file mode 100644 index 000000000000..4fb7f0261611 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionsMessage.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + +public abstract class AlterPartitionsMessage extends EventMessage { + + protected AlterPartitionsMessage() { + super(EventType.ALTER_PARTITIONS); + } + + public abstract String getTable(); + + public abstract String getTableType(); + + public abstract Table getTableObj() throws Exception; + + public abstract boolean getIsTruncateOp(); + + public abstract Long getWriteId(); + + /** + * Getter for list of partitions. + * @return List of maps, where each map identifies values for each partition-key, for every altered partition. + */ + public abstract List> getPartitions(); + + public abstract Iterable getPartitionObjs() throws Exception; + + @Override + public EventMessage checkValid() { + if (getTable() == null) + throw new IllegalStateException("Table name unset."); + if (getPartitions() == null) + throw new IllegalStateException("Partition-list unset."); + + try { + getPartitionObjs().forEach(partition -> { + if (getWriteId() != partition.getWriteId()) { + throw new IllegalStateException("Different write id in the same event"); + } + }); + } catch (Exception e) { + throw new IllegalStateException("Unable to get the partition objects"); + } + + return super.checkValid(); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java index 06da78e1011b..c7ddd58127b9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -39,6 +39,7 @@ public enum EventType { ALTER_DATABASE(MessageBuilder.ALTER_DATABASE_EVENT), ALTER_TABLE(MessageBuilder.ALTER_TABLE_EVENT), ALTER_PARTITION(MessageBuilder.ALTER_PARTITION_EVENT), + ALTER_PARTITIONS(MessageBuilder.ALTER_PARTITIONS_EVENT), INSERT(MessageBuilder.INSERT_EVENT), CREATE_FUNCTION(MessageBuilder.CREATE_FUNCTION_EVENT), DROP_FUNCTION(MessageBuilder.DROP_FUNCTION_EVENT), @@ -74,7 +75,8 @@ public enum EventType { COMMIT_COMPACTION(MessageBuilder.COMMIT_COMPACTION_EVENT), CREATE_DATACONNECTOR(MessageBuilder.CREATE_DATACONNECTOR_EVENT), DROP_DATACONNECTOR(MessageBuilder.DROP_DATACONNECTOR_EVENT), - ALTER_DATACONNECTOR(MessageBuilder.ALTER_DATACONNECTOR_EVENT); + ALTER_DATACONNECTOR(MessageBuilder.ALTER_DATACONNECTOR_EVENT), + CONFIG_CHANGE(MessageBuilder.CONFIG_CHANGE_EVENT); private String typeString; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java index 21accd3b31dc..5cc570256c5e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterCatalogMessage; import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.json.JSONCommitCompactionMessage; import org.apache.hadoop.hive.metastore.messaging.json.JSONCommitTxnMessage; @@ -102,6 +103,7 @@ public class MessageBuilder { public static final String ADD_PARTITION_EVENT = "ADD_PARTITION"; public static final String ALTER_PARTITION_EVENT = "ALTER_PARTITION"; + public static final String ALTER_PARTITIONS_EVENT = "ALTER_PARTITIONS"; public static final String DROP_PARTITION_EVENT = "DROP_PARTITION"; public static final String CREATE_TABLE_EVENT = "CREATE_TABLE"; public static final String ALTER_TABLE_EVENT = "ALTER_TABLE"; @@ -144,6 +146,7 @@ public class MessageBuilder { public static final String ALTER_DATACONNECTOR_EVENT = "ALTER_DATACONNECTOR"; public static final String DROP_DATACONNECTOR_EVENT = "DROP_DATACONNECTOR"; public static final String RELOAD_EVENT = "RELOAD"; + public static final String CONFIG_CHANGE_EVENT = "CONFIG_CHANGE_EVENT"; protected static final Configuration conf = MetastoreConf.newMetastoreConf(); @@ -222,6 +225,12 @@ public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition b table, before, after, isTruncateOp, writeId, now()); } + public AlterPartitionsMessage buildAlterPartitionsMessage(Table table, List after, + boolean isTruncateOp, Long writeId) { + return new JSONAlterPartitionsMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, + table, after, isTruncateOp, writeId, now()); + } + public DropPartitionMessage buildDropPartitionMessage(Table table, Iterator partitionsIterator) { return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java index 237bfbf8b355..49acfd2d6f71 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java @@ -137,6 +137,8 @@ public EventMessage getEventMessage(String eventTypeString, String messageBody) */ public abstract AlterPartitionMessage getAlterPartitionMessage(String messageBody); + public abstract AlterPartitionsMessage getAlterPartitionsMessage(String messageBody); + /** * Method to de-serialize DropPartitionMessage instance. */ diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionsMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionsMessage.java new file mode 100644 index 000000000000..41622c8c90bb --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionsMessage.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; +import org.apache.thrift.TException; + +public class JSONAlterPartitionsMessage extends AlterPartitionsMessage { + + @JsonProperty + String server, servicePrincipal, db, table, tableType, tableObjJson; + + @JsonProperty + String isTruncateOp; + + @JsonProperty + Long timestamp, writeId; + + @JsonProperty + List> partitions; + + @JsonProperty + List partitionListJson; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAlterPartitionsMessage() { + } + + public JSONAlterPartitionsMessage(String server, String servicePrincipal, Table tableObj, + List partitionsAfter, boolean isTruncateOp, Long writeId, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.db = tableObj.getDbName(); + this.table = tableObj.getTableName(); + this.tableType = tableObj.getTableType(); + this.isTruncateOp = Boolean.toString(isTruncateOp); + this.timestamp = timestamp; + this.writeId = writeId; + this.partitions = new ArrayList<>(); + this.partitionListJson = new ArrayList<>(); + try { + this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); + Iterator iterator = partitionsAfter.iterator(); + while (iterator.hasNext()) { + Partition partitionObj = iterator.next(); + partitions.add(MessageBuilder.getPartitionKeyValues(tableObj, partitionObj)); + partitionListJson.add(MessageBuilder.createPartitionObjJson(partitionObj)); + } + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + checkValid(); + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return db; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getTable() { + return table; + } + + @Override + public String getTableType() { + return tableType != null ? tableType : ""; + } + + @Override + public Table getTableObj() throws Exception { + return (Table) MessageBuilder.getTObj(tableObjJson,Table.class); + } + + @Override + public boolean getIsTruncateOp() { + return Boolean.parseBoolean(isTruncateOp); + } + + @Override + public Long getWriteId() { + return writeId == null ? 0 : writeId; + } + + @Override + public List> getPartitions() { + return partitions; + } + + @Override + public Iterable getPartitionObjs() throws Exception { + // glorified cast from Iterable to Iterable + return Iterables.transform( + MessageBuilder.getTObjs(partitionListJson, Partition.class), + new Function() { + @Nullable + @Override + public Partition apply(@Nullable Object input) { + return (Partition) input; + } + }); + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java index 0761ff74db5b..903d49bd4e24 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; @@ -151,6 +152,15 @@ public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { } } + @Override + public AlterPartitionsMessage getAlterPartitionsMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAlterPartitionsMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AlterPartitionsMessage.", e); + } + } + @Override public DropPartitionMessage getDropPartitionMessage(String messageBody) { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java index a25f5f14b43b..ad7e1a85fab7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage; @@ -134,6 +135,11 @@ public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { return super.getAlterPartitionMessage(deCompress(messageBody)); } + @Override + public AlterPartitionsMessage getAlterPartitionsMessage(String messageBody) { + return super.getAlterPartitionsMessage(deCompress(messageBody)); + } + @Override public DropPartitionMessage getDropPartitionMessage(String messageBody) { return super.getDropPartitionMessage(deCompress(messageBody)); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/events/TestAlterPartitionsEvent.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/events/TestAlterPartitionsEvent.java new file mode 100644 index 000000000000..c6ed8a9c80f4 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/events/TestAlterPartitionsEvent.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.events; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category(MetastoreUnitTest.class) +public class TestAlterPartitionsEvent { + + private AlterPartitionsEvent event; + private List expectedParts; + private Map> writeIdToParts; + private int batchSize; + private int expectedBatch; + + @Parameterized.Parameters + public static Collection getIteratorToTest() { + List parts = new ArrayList<>(); + IntStream.range(0, 10).forEach(i -> { + Partition partition = new Partition(); + partition.setValues(Arrays.asList(i + "", "part" + i)); + partition.setWriteId(i < 5 ? 1 : 2); + parts.add(partition); + }); + Partition partition = new Partition(); + partition.setValues(Arrays.asList(10 + "", "part" + 10)); + partition.setWriteId(3); + parts.add(partition); + + AlterPartitionsEvent event = new AlterPartitionsEvent(parts, parts, new Table(), false, true, null); + Collection params = new ArrayList<>(); + params.add(new Object[]{event, 1000, parts, 3}); + params.add(new Object[]{event, 5, parts, 3}); + params.add(new Object[]{event, 3, parts, 5}); + params.add(new Object[]{event, 2, parts, 7}); + params.add(new Object[]{event, 1, parts, 11}); + params.add(new Object[]{event, -1, parts, 3}); + return params; + } + + public TestAlterPartitionsEvent(AlterPartitionsEvent event, int batchSize, + List expectedParts, int expectedBatch) { + this.event = event; + this.batchSize = batchSize; + this.expectedParts = expectedParts; + this.expectedBatch = expectedBatch; + this.writeIdToParts = new HashMap<>(); + expectedParts.stream().forEach(partition -> + writeIdToParts.computeIfAbsent(partition.getWriteId(), k -> new ArrayList<>()).add(partition)); + } + + @Test + public void testGetNewPartitionsIterator() { + int batch = 0; + List actual = new ArrayList<>(); + Map> idToParts = new HashMap<>(); + Iterator> iterator = event.getNewPartsIterator(batchSize); + while (iterator.hasNext()) { + List partitions = iterator.next(); + Assert.assertTrue(batchSize <=0 || partitions.size() <= batchSize); + Long writeId = null; + for (Partition part : partitions) { + if (writeId == null) { + writeId = part.getWriteId(); + } else { + Assert.assertEquals(writeId.longValue(), part.getWriteId()); + } + } + idToParts.putIfAbsent(writeId, new ArrayList<>()); + idToParts.get(writeId).addAll(partitions); + batch++; + actual.addAll(partitions); + } + Assert.assertEquals(5, idToParts.get(1L).size()); + Assert.assertEquals(5, idToParts.get(2L).size()); + Assert.assertEquals(writeIdToParts, idToParts); + Assert.assertEquals(expectedBatch, batch); + Assert.assertEquals(expectedParts, actual); + } + +}