From 9e62cf0c5f2771b4e1944dee8e327d47bced1d19 Mon Sep 17 00:00:00 2001 From: Raymond Zhang Date: Wed, 4 Dec 2024 12:19:19 -0800 Subject: [PATCH] Enhance logging --- .../org/apache/iceberg/ClientPoolImpl.java | 8 ++++ .../iceberg/hive/HiveTableOperations.java | 23 ++++++---- ...HiveMetadataPreservingTableOperations.java | 42 ++++++++++++++----- 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java index f63e0e0f3..8530b17e7 100644 --- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java +++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java @@ -53,7 +53,15 @@ public R run(Action action) throws E, InterruptedException { @Override public R run(Action action, boolean retry) throws E, InterruptedException { + LOG.warn( + "In thread {} with threadId={}, trying to get a client from the pool", + Thread.currentThread(), + Thread.currentThread().getId()); C client = get(); + LOG.warn( + "In thread {} with threadId={}, got a client from the pool", + Thread.currentThread(), + Thread.currentThread().getId()); try { return action.run(client); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index a73cf8d2d..cd95ad2fe 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -569,19 +569,25 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte System.getProperty("user.name"), InetAddress.getLocalHost().getHostName()); LOG.warn( - "In thread {}, trying to call hmsclient.lock() on table {}", + "In thread {} with threadId={}, trying to call hmsclient.lock() on table {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName); LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest)); LOG.warn( - "In thread {}, hmsclient.lock() finished on table {}", Thread.currentThread(), fullName); + "In thread {} with threadId={}, hmsclient.lock() finished on table {}", + Thread.currentThread(), + Thread.currentThread().getId(), + fullName); AtomicReference state = new AtomicReference<>(lockResponse.getState()); long lockId = lockResponse.getLockid(); LOG.warn( - "In thread {}, lockId returned from hmsclient.lock() on table {} is {}", + "In thread {} with threadId={}, lockId returned from hmsclient.lock() on table {} is {} with state {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName, - lockId); + lockId, + state.get()); final long start = System.currentTimeMillis(); long duration = 0; @@ -608,18 +614,21 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte id -> { try { LOG.warn( - "In thread {}, trying to call hmsclient.checkLock() on table {}", + "In thread {} with threadId={}, trying to call hmsclient.checkLock() on table {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName); LockResponse response = metaClients.run(client -> client.checkLock(id)); LOG.warn( - "In thread {}, hmsclient.checkLock() finished on table {}", + "In thread {} with threadId={}, hmsclient.checkLock() finished on table {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName); LockState newState = response.getState(); LOG.warn( - "In thread {}, lock state returned from hmsclient.checkLock() on table {} is {}", + "In thread {} with threadId={}, lock state returned from hmsclient.checkLock() on table {} is {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName, newState); state.set(newState); diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java index a96fb674b..a98541c29 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java @@ -196,13 +196,15 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { tableLevelMutex.lock(); try { LOG.warn( - "In thread {}, starting to acquire a lock for table {}", + "In thread {} with threadId={}, starting to acquire a lock for table {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName); lockId = Optional.of(acquireLock()); LOG.warn( - "In thread {}, acquired lock id: {} for table {}", + "In thread {} with threadId={}, acquired lock id: {} for table {}", Thread.currentThread(), + Thread.currentThread().getId(), lockId.get(), fullName); // TODO add lock heart beating for cases where default lock timeout is too low. @@ -212,19 +214,29 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { // base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get // the current table // definition and not create a new definition - LOG.warn("In thread {}, checking if table exists {}", Thread.currentThread(), fullName); + LOG.warn( + "In thread {} with threadId={}, checking if table exists {}", + Thread.currentThread(), + Thread.currentThread().getId(), + fullName); boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName)); LOG.warn( - "In thread {}, checking table exists finishes with result: {}", + "In thread {} with threadId={}, checking table exists finishes with result: {}", Thread.currentThread(), + Thread.currentThread().getId(), tableExists); if (tableExists) { LOG.warn( - "In thread {}, starting to call getTable: {} from HMS", + "In thread {} with threadId={}, starting to call getTable: {} from HMS", Thread.currentThread(), + Thread.currentThread().getId(), fullName); tbl = metaClients.run(client -> client.getTable(database, tableName)); - LOG.warn("In thread {}, getTable: {} from HMS finished", Thread.currentThread(), fullName); + LOG.warn( + "In thread {} with threadId={}, getTable: {} from HMS finished", + Thread.currentThread(), + Thread.currentThread().getId(), + fullName); fixMismatchedSchema(tbl); } else { final long currentTimeMillis = System.currentTimeMillis(); @@ -262,9 +274,17 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { updateMetadataLocationInHms(newMetadataLocation, tbl); try { - LOG.warn("In thread {}, starting to persist table {}", Thread.currentThread(), fullName); + LOG.warn( + "In thread {} with threadId={}, starting to persist table {}", + Thread.currentThread(), + Thread.currentThread().getId(), + fullName); persistTable(tbl, tableExists); - LOG.warn("In thread {}, persisted table {}", Thread.currentThread(), fullName); + LOG.warn( + "In thread {} with threadId={}, persisted table {}", + Thread.currentThread(), + Thread.currentThread().getId(), + fullName); commitStatus = CommitStatus.SUCCESS; } catch (Throwable persistFailure) { LOG.error( @@ -302,14 +322,16 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } finally { LOG.warn( - "In thread {}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}", + "In thread {} with threadId={}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}", Thread.currentThread(), + Thread.currentThread().getId(), lockId, fullName); cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex); LOG.warn( - "In thread {}, cleanupMetadataAndUnlock finishes for table: {}", + "In thread {} with threadId={}, cleanupMetadataAndUnlock finishes for table: {}", Thread.currentThread(), + Thread.currentThread().getId(), fullName); } }