Skip to content

Commit

Permalink
Add logging for HiveMetadataPreservingTableOperations (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzhang10 authored Dec 3, 2024
1 parent dc5f7e8 commit 928ed99
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public boolean release(String entityId, String ownerId) {
e);
} catch (DynamoDbException e) {
LOG.error(
"Failed to release lock {} by for entity: {}, owner: {}, encountered unexpected DynamoDB exception",
"Failed to release lock for entity: {}, owner: {}, encountered unexpected DynamoDB exception",
entityId,
ownerId,
e);
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ buildscript {
}
dependencies {
classpath 'gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0'
classpath 'com.palantir.baseline:gradle-baseline-java:4.0.0'
classpath 'com.palantir.baseline:gradle-baseline-java:4.40.0'
classpath 'com.palantir.gradle.gitversion:gradle-git-version:0.12.3'
classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.8.0'
classpath 'gradle.plugin.org.inferred:gradle-processors:3.3.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

// inspired in part by
// https://github.com/apache/avro/blob/release-1.8.2/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
@SuppressWarnings("ReturnValueIgnored")
public class GuavaClasses {

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private boolean isTableDir(Path path) {
return false;
} catch (IOException e) {
if (shouldSuppressPermissionError(e)) {
LOG.warn("Unable to list metadata directory {}: {}", metadataPath, e);
LOG.warn("Unable to list metadata directory {}", metadataPath, e);
return false;
} else {
throw new UncheckedIOException(e);
Expand All @@ -177,7 +177,7 @@ private boolean isDirectory(Path path) {
return false;
} catch (IOException e) {
if (shouldSuppressPermissionError(e)) {
LOG.warn("Unable to list directory {}: {}", path, e);
LOG.warn("Unable to list directory {}", path, e);
return false;
} else {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,20 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
Lists.newArrayList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LOG.warn(
"In thread {}, trying to call hmsclient.lock() on table {}",
Thread.currentThread(),
fullName);
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
LOG.warn(
"In thread {}, hmsclient.lock() finished on table {}", Thread.currentThread(), fullName);
AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
long lockId = lockResponse.getLockid();
LOG.warn(
"In thread {}, lockId returned from hmsclient.lock() on table {} is {}",
Thread.currentThread(),
fullName,
lockId);

final long start = System.currentTimeMillis();
long duration = 0;
Expand All @@ -596,8 +607,21 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
.run(
id -> {
try {
LOG.warn(
"In thread {}, trying to call hmsclient.checkLock() on table {}",
Thread.currentThread(),
fullName);
LockResponse response = metaClients.run(client -> client.checkLock(id));
LOG.warn(
"In thread {}, hmsclient.checkLock() finished on table {}",
Thread.currentThread(),
fullName);
LockState newState = response.getState();
LOG.warn(
"In thread {}, lock state returned from hmsclient.checkLock() on table {} is {}",
Thread.currentThread(),
fullName,
newState);
state.set(newState);
if (newState.equals(LockState.WAITING)) {
throw new WaitingForLockException(
Expand Down Expand Up @@ -662,6 +686,8 @@ private void unlock(Optional<Long> lockId) {
} catch (Exception e) {
LOG.warn("Failed to unlock {}.{}", database, tableName, e);
}
} else {
LOG.warn("No lockId to unlock for {}.{}", database, tableName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
public class HiveMetadataPreservingCatalog extends HiveCatalog {

private static final String DEFAULT_NAME = "hive_meta_preserving";
private static final String DEFAULT_NAME = "hive_meta_preserving_catalog";

public HiveMetadataPreservingCatalog() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,36 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
tableLevelMutex.lock();
try {
LOG.warn(
"In thread {}, starting to acquire a lock for table {}",
Thread.currentThread(),
fullName);
lockId = Optional.of(acquireLock());
LOG.warn(
"In thread {}, acquired lock id: {} for table {}",
Thread.currentThread(),
lockId.get(),
fullName);
// TODO add lock heart beating for cases where default lock timeout is too low.
Table tbl;
// [LINKEDIN] Instead of checking if base != null to check for table existence, we query
// metastore for existence
// base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get
// the current table
// definition and not create a new definition
LOG.warn("In thread {}, checking if table exists {}", Thread.currentThread(), fullName);
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
LOG.warn(
"In thread {}, checking table exists finishes with result: {}",
Thread.currentThread(),
tableExists);
if (tableExists) {
LOG.warn(
"In thread {}, starting to call getTable: {} from HMS",
Thread.currentThread(),
fullName);
tbl = metaClients.run(client -> client.getTable(database, tableName));
LOG.warn("In thread {}, getTable: {} from HMS finished", Thread.currentThread(), fullName);
fixMismatchedSchema(tbl);
} else {
final long currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -243,7 +262,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updateMetadataLocationInHms(newMetadataLocation, tbl);

try {
LOG.warn("In thread {}, starting to persist table {}", Thread.currentThread(), fullName);
persistTable(tbl, tableExists);
LOG.warn("In thread {}, persisted table {}", Thread.currentThread(), fullName);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error(
Expand Down Expand Up @@ -280,7 +301,16 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new RuntimeException("Interrupted during commit", e);

} finally {
LOG.warn(
"In thread {}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}",
Thread.currentThread(),
lockId,
fullName);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
LOG.warn(
"In thread {}, cleanupMetadataAndUnlock finishes for table: {}",
Thread.currentThread(),
fullName);
}
}

Expand Down

0 comments on commit 928ed99

Please sign in to comment.