Skip to content

Commit

Permalink
HIVE-27746: Hive Metastore should send single AlterPartitionEvent wit…
Browse files Browse the repository at this point in the history
…h list of partitions (apache#4820) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, John Sherman, Henri Biestro)
  • Loading branch information
dengzhhu653 authored Mar 12, 2024
1 parent 764c1a2 commit ffb1165
Show file tree
Hide file tree
Showing 22 changed files with 751 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -73,6 +72,7 @@
import org.apache.hadoop.hive.metastore.events.AddUniqueConstraintEvent;
import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionsEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent;
import org.apache.hadoop.hive.metastore.events.CommitCompactionEvent;
Expand Down Expand Up @@ -110,6 +110,7 @@
import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
Expand Down Expand Up @@ -481,6 +482,25 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce
process(event, partitionEvent);
}

@Override
public void onAlterPartitions(AlterPartitionsEvent event) throws MetaException {
Table table = event.getTable();
Iterator<List<Partition>> iterator = event.getNewPartsIterator(maxBatchSize);
while (iterator.hasNext()) {
List<Partition> partitions = iterator.next();
AlterPartitionsMessage msg = MessageBuilder.getInstance()
.buildAlterPartitionsMessage(event.getTable(), partitions,
event.getIsTruncateOp(), partitions.get(0).getWriteId());
NotificationEvent notification =
new NotificationEvent(0, now(), EventType.ALTER_PARTITIONS.toString(),
msgEncoder.getSerializer().serialize(msg));
notification.setCatName(table.isSetCatName() ? table.getCatName() : getDefaultCatalog(conf));
notification.setDbName(table.getDbName());
notification.setTableName(table.getTableName());
process(notification, event);
}
}

/**
* @param dbEvent database event
* @throws MetaException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hive.metastore.api.FireEventResponse;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
Expand All @@ -64,6 +65,7 @@
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
Expand All @@ -83,7 +85,7 @@
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
Expand Down Expand Up @@ -758,16 +760,17 @@ public void alterPartition() throws Exception {
NotificationEvent event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType());
assertEquals(defaultDbName, event.getDbName());
assertEquals(tblName, event.getTableName());

// Parse the message field
AlterPartitionMessage alterPtnMsg = md.getAlterPartitionMessage(event.getMessage());
assertEquals(defaultDbName, alterPtnMsg.getDB());
assertEquals(tblName, alterPtnMsg.getTable());
assertEquals(newPart, alterPtnMsg.getPtnObjAfter());
assertEquals(TableType.MANAGED_TABLE.toString(), alterPtnMsg.getTableType());
AlterPartitionsMessage alterPtnsMsg = md.getAlterPartitionsMessage(event.getMessage());
assertEquals(defaultDbName, alterPtnsMsg.getDB());
assertEquals(tblName, alterPtnsMsg.getTable());
assertEquals(newPart, alterPtnsMsg.getPartitionObjs().iterator().next());
assertEquals(1, alterPtnsMsg.getPartitions().size());
assertEquals(TableType.MANAGED_TABLE.toString(), alterPtnsMsg.getTableType());

// Verify the eventID was passed to the non-transactional listener
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
Expand All @@ -787,6 +790,69 @@ public void alterPartition() throws Exception {
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}

@Test
public void alterPartitions() throws Exception {
String defaultDbName = "default";
String tblName = "alterptns";
new TableBuilder()
.setDbName(defaultDbName).setTableName(tblName).setOwner("me")
.addCol("col1", "int")
.addPartCol("col2", "int")
.addPartCol("col3", "string")
.setLocation(testTempDir)
.create(msClient, new HiveConf());

Table table = msClient.getTable(new GetTableRequest(defaultDbName, tblName));
List<Partition> partitions = new ArrayList<>();
for (int i = 0; i < 5; i++) {
List<String> values = Arrays.asList(i + "", "part" + i);
Partition part = new Partition(values, defaultDbName, tblName,
0, 0, table.getSd(), emptyParameters);
partitions.add(part);
}
msClient.add_partitions(partitions);
partitions.forEach(partition -> partition.setCreateTime(startTime));
msClient.alter_partitions(defaultDbName, tblName, partitions, null, null, -1);

// Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType());
assertEquals(defaultDbName, event.getDbName());
assertEquals(tblName, event.getTableName());
// Parse the message field
AlterPartitionsMessage alterPtnsMsg = md.getAlterPartitionsMessage(event.getMessage());
assertEquals(defaultDbName, alterPtnsMsg.getDB());
assertEquals(tblName, alterPtnsMsg.getTable());
Iterator<Partition> expectedIterator = partitions.iterator(),
actualIterator = alterPtnsMsg.getPartitionObjs().iterator();
while (expectedIterator.hasNext() && actualIterator.hasNext()) {
assertEquals(expectedIterator.next(), actualIterator.next());
}
assertFalse(expectedIterator.hasNext() || actualIterator.hasNext());
assertEquals(table.getTableType(), alterPtnsMsg.getTableType());

// Verify the eventID was passed to the non-transactional listener
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);

// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
try {
msClient.alter_partitions(defaultDbName, tblName, partitions, null, null, -1);
fail("Error: alter partition should've failed");
} catch (Exception ex) {
// expected
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}

@Test
public void dropPartition() throws Exception {
String defaultDbName = "default";
Expand Down Expand Up @@ -1530,7 +1596,7 @@ public void sqlInsertPartition() throws Exception {

event = rsp.getEvents().get(26);
assertEquals(firstEventId + 27, event.getEventId());
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));

// Test fromEventId different from the very first
Expand All @@ -1548,12 +1614,12 @@ public void sqlInsertPartition() throws Exception {

event = rsp.getEvents().get(29);
assertEquals(firstEventId + 30, event.getEventId());
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));

event = rsp.getEvents().get(30);
assertEquals(firstEventId + 31, event.getEventId());
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertEquals(EventType.ALTER_PARTITIONS.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
testEventCounts(defaultDbName, firstEventId, null, null, 31);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception {

// Read the altered partition via CachedStore
ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt));
Assert.assertEquals(ptn1Atl.getParameters(), ptnRead.getParameters());
Assert.assertEquals(null, ptnRead);

ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2));
Assert.assertEquals(null, ptnRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ public void accept(NotificationEvent event) {
switch (event.getEventType()) {
case MessageBuilder.ADD_PARTITION_EVENT:
case MessageBuilder.ALTER_PARTITION_EVENT:
case MessageBuilder.ALTER_PARTITIONS_EVENT:
case MessageBuilder.DROP_PARTITION_EVENT:
case MessageBuilder.ALTER_TABLE_EVENT:
case MessageBuilder.DROP_TABLE_EVENT:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse.repl.dump.events;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;

public class AlterPartitionsHandler extends AbstractEventHandler<AlterPartitionsMessage> {
private final boolean isTruncateOp;
private final org.apache.hadoop.hive.metastore.api.Table tableObject;
private final Iterable<org.apache.hadoop.hive.metastore.api.Partition> partitions;

AlterPartitionsHandler(NotificationEvent event) throws Exception {
super(event);
AlterPartitionsMessage apm = eventMessage;
isTruncateOp = apm.getIsTruncateOp();
tableObject = apm.getTableObj();
partitions = apm.getPartitionObjs();
}

@Override
AlterPartitionsMessage eventMessage(String stringRepresentation) {
return deserializer.getAlterPartitionsMessage(stringRepresentation);
}

@Override
public void handle(Context withinContext) throws Exception {
LOG.info("Processing#{} ALTER_PARTITIONS message : {}", fromEventId(), eventMessageAsJSON);

// We do not dump partitions during metadata only bootstrap dump (See TableExport
// .getPartitions(), for bootstrap dump we pass tableSpec with TABLE_ONLY set.). So don't
// dump partition related events for metadata-only dump.
if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
return;
}

Table qlMdTable = new Table(tableObject);
if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true,
withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}

if (!isTruncateOp) {
withinContext.replicationSpec.setIsMetadataOnly(true);
List<Partition> partitionObjs = new ArrayList<>();
for (org.apache.hadoop.hive.metastore.api.Partition part : partitions) {
partitionObjs.add(new Partition(qlMdTable, part));
}
Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
EximUtil.createExportDump(
metaDataPath.getFileSystem(withinContext.hiveConf),
metaDataPath,
qlMdTable,
partitionObjs,
withinContext.replicationSpec,
withinContext.hiveConf);
}
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(eventMessageAsJSON);
dmd.write();
}

@Override
public DumpType dumpType() {
return isTruncateOp ? DumpType.EVENT_TRUNCATE_PARTITION : DumpType.EVENT_ALTER_PARTITION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private EventHandlerFactory() {
register(MessageBuilder.ADD_PARTITION_EVENT, AddPartitionHandler.class);
register(MessageBuilder.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class);
register(MessageBuilder.ALTER_PARTITION_EVENT, AlterPartitionHandler.class);
register(MessageBuilder.ALTER_PARTITIONS_EVENT, AlterPartitionsHandler.class);
register(MessageBuilder.ALTER_TABLE_EVENT, AlterTableHandler.class);
register(MessageBuilder.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class);
register(MessageBuilder.CREATE_TABLE_EVENT, CreateTableHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
Expand Down Expand Up @@ -113,9 +114,16 @@ private Tuple extract(Context context) throws SemanticException {
writeId = alterTableMessage.getWriteId();
break;
case EVENT_ALTER_PARTITION:
AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
tableType = msg.getTableObj().getTableType();
writeId = msg.getWriteId();
String eventMessage = deserializer.deSerializeGenericString(context.dmd.getPayload());
if (eventMessage.contains("\"keyValues\":")) {
AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
tableType = msg.getTableObj().getTableType();
writeId = msg.getWriteId();
} else {
AlterPartitionsMessage msg = deserializer.getAlterPartitionsMessage(context.dmd.getPayload());
tableType = msg.getTableType();
writeId = msg.getWriteId();
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,12 @@ public enum ConfVars {
"hive.notification.sequence.lock.retry.sleep.interval", 10, TimeUnit.SECONDS,
"Sleep interval between retries to acquire a notification lock as described part of property "
+ NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES.name()),
NOTIFICATION_ALTER_PARTITIONS_V2_ENABLED("metastore.alterPartitions.notification.v2.enabled",
"hive.metastore.alterPartitions.notification.v2.enabled", true,
"Should send a single notification event on alter partitions. " +
"This property is for ensuring backward compatibility when it sets to false, " +
"HMS will send an old ALTER_PARTITION event per partition, so downstream consumers can " +
"still process the ALTER_PARTITION event without making changes."),
ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS("metastore.orm.retrieveMapNullsAsEmptyStrings",
"hive.metastore.orm.retrieveMapNullsAsEmptyStrings",false,
"Thrift does not support nulls in maps, so any nulls present in maps retrieved from ORM must " +
Expand Down
Loading

0 comments on commit ffb1165

Please sign in to comment.