Skip to content

Commit

Permalink
Enhance logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rzhang10 committed Dec 4, 2024
1 parent c6c7fc9 commit 9e62cf0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {

@Override
public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException {
LOG.warn(
"In thread {} with threadId={}, trying to get a client from the pool",
Thread.currentThread(),
Thread.currentThread().getId());
C client = get();
LOG.warn(
"In thread {} with threadId={}, got a client from the pool",
Thread.currentThread(),
Thread.currentThread().getId());
try {
return action.run(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,19 +569,25 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LOG.warn(
"In thread {}, trying to call hmsclient.lock() on table {}",
"In thread {} with threadId={}, trying to call hmsclient.lock() on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
LOG.warn(
"In thread {}, hmsclient.lock() finished on table {}", Thread.currentThread(), fullName);
"In thread {} with threadId={}, hmsclient.lock() finished on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
long lockId = lockResponse.getLockid();
LOG.warn(
"In thread {}, lockId returned from hmsclient.lock() on table {} is {}",
"In thread {} with threadId={}, lockId returned from hmsclient.lock() on table {} is {} with state {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName,
lockId);
lockId,
state.get());

final long start = System.currentTimeMillis();
long duration = 0;
Expand All @@ -608,18 +614,21 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
id -> {
try {
LOG.warn(
"In thread {}, trying to call hmsclient.checkLock() on table {}",
"In thread {} with threadId={}, trying to call hmsclient.checkLock() on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
LockResponse response = metaClients.run(client -> client.checkLock(id));
LOG.warn(
"In thread {}, hmsclient.checkLock() finished on table {}",
"In thread {} with threadId={}, hmsclient.checkLock() finished on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
LockState newState = response.getState();
LOG.warn(
"In thread {}, lock state returned from hmsclient.checkLock() on table {} is {}",
"In thread {} with threadId={}, lock state returned from hmsclient.checkLock() on table {} is {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName,
newState);
state.set(newState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,15 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
tableLevelMutex.lock();
try {
LOG.warn(
"In thread {}, starting to acquire a lock for table {}",
"In thread {} with threadId={}, starting to acquire a lock for table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
lockId = Optional.of(acquireLock());
LOG.warn(
"In thread {}, acquired lock id: {} for table {}",
"In thread {} with threadId={}, acquired lock id: {} for table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
lockId.get(),
fullName);
// TODO add lock heart beating for cases where default lock timeout is too low.
Expand All @@ -212,19 +214,29 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
// base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get
// the current table
// definition and not create a new definition
LOG.warn("In thread {}, checking if table exists {}", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, checking if table exists {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
LOG.warn(
"In thread {}, checking table exists finishes with result: {}",
"In thread {} with threadId={}, checking table exists finishes with result: {}",
Thread.currentThread(),
Thread.currentThread().getId(),
tableExists);
if (tableExists) {
LOG.warn(
"In thread {}, starting to call getTable: {} from HMS",
"In thread {} with threadId={}, starting to call getTable: {} from HMS",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
tbl = metaClients.run(client -> client.getTable(database, tableName));
LOG.warn("In thread {}, getTable: {} from HMS finished", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, getTable: {} from HMS finished",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
fixMismatchedSchema(tbl);
} else {
final long currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -262,9 +274,17 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updateMetadataLocationInHms(newMetadataLocation, tbl);

try {
LOG.warn("In thread {}, starting to persist table {}", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, starting to persist table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
persistTable(tbl, tableExists);
LOG.warn("In thread {}, persisted table {}", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, persisted table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error(
Expand Down Expand Up @@ -302,14 +322,16 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

} finally {
LOG.warn(
"In thread {}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}",
"In thread {} with threadId={}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}",
Thread.currentThread(),
Thread.currentThread().getId(),
lockId,
fullName);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
LOG.warn(
"In thread {}, cleanupMetadataAndUnlock finishes for table: {}",
"In thread {} with threadId={}, cleanupMetadataAndUnlock finishes for table: {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
}
}
Expand Down

0 comments on commit 9e62cf0

Please sign in to comment.