Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance logging with thread id #158

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {

@Override
public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException {
LOG.warn(
"In thread {} with threadId={}, trying to get a client from the pool",
Thread.currentThread(),
Thread.currentThread().getId());
C client = get();
LOG.warn(
"In thread {} with threadId={}, got a client from the pool",
Thread.currentThread(),
Thread.currentThread().getId());
try {
return action.run(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,19 +569,25 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LOG.warn(
"In thread {}, trying to call hmsclient.lock() on table {}",
"In thread {} with threadId={}, trying to call hmsclient.lock() on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
LOG.warn(
"In thread {}, hmsclient.lock() finished on table {}", Thread.currentThread(), fullName);
"In thread {} with threadId={}, hmsclient.lock() finished on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
long lockId = lockResponse.getLockid();
LOG.warn(
"In thread {}, lockId returned from hmsclient.lock() on table {} is {}",
"In thread {} with threadId={}, lockId returned from hmsclient.lock() on table {} is {} with state {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName,
lockId);
lockId,
state.get());

final long start = System.currentTimeMillis();
long duration = 0;
Expand All @@ -608,18 +614,21 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
id -> {
try {
LOG.warn(
"In thread {}, trying to call hmsclient.checkLock() on table {}",
"In thread {} with threadId={}, trying to call hmsclient.checkLock() on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
LockResponse response = metaClients.run(client -> client.checkLock(id));
LOG.warn(
"In thread {}, hmsclient.checkLock() finished on table {}",
"In thread {} with threadId={}, hmsclient.checkLock() finished on table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
LockState newState = response.getState();
LOG.warn(
"In thread {}, lock state returned from hmsclient.checkLock() on table {} is {}",
"In thread {} with threadId={}, lock state returned from hmsclient.checkLock() on table {} is {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName,
newState);
state.set(newState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ protected void doRefresh() {
refreshFromMetadataLocation(metadataLocation);
}

@SuppressWarnings("checkstyle:MethodLength")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
Expand All @@ -196,13 +197,15 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
tableLevelMutex.lock();
try {
LOG.warn(
"In thread {}, starting to acquire a lock for table {}",
"In thread {} with threadId={}, starting to acquire a lock for table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
lockId = Optional.of(acquireLock());
LOG.warn(
"In thread {}, acquired lock id: {} for table {}",
"In thread {} with threadId={}, acquired lock id: {} for table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
lockId.get(),
fullName);
// TODO add lock heart beating for cases where default lock timeout is too low.
Expand All @@ -212,19 +215,29 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
// base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get
// the current table
// definition and not create a new definition
LOG.warn("In thread {}, checking if table exists {}", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, checking if table exists {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
LOG.warn(
"In thread {}, checking table exists finishes with result: {}",
"In thread {} with threadId={}, checking table exists finishes with result: {}",
Thread.currentThread(),
Thread.currentThread().getId(),
tableExists);
if (tableExists) {
LOG.warn(
"In thread {}, starting to call getTable: {} from HMS",
"In thread {} with threadId={}, starting to call getTable: {} from HMS",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
tbl = metaClients.run(client -> client.getTable(database, tableName));
LOG.warn("In thread {}, getTable: {} from HMS finished", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, getTable: {} from HMS finished",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
fixMismatchedSchema(tbl);
} else {
final long currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -262,9 +275,17 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updateMetadataLocationInHms(newMetadataLocation, tbl);

try {
LOG.warn("In thread {}, starting to persist table {}", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, starting to persist table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
persistTable(tbl, tableExists);
LOG.warn("In thread {}, persisted table {}", Thread.currentThread(), fullName);
LOG.warn(
"In thread {} with threadId={}, persisted table {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error(
Expand Down Expand Up @@ -302,14 +323,16 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

} finally {
LOG.warn(
"In thread {}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}",
"In thread {} with threadId={}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}",
Thread.currentThread(),
Thread.currentThread().getId(),
lockId,
fullName);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
LOG.warn(
"In thread {}, cleanupMetadataAndUnlock finishes for table: {}",
"In thread {} with threadId={}, cleanupMetadataAndUnlock finishes for table: {}",
Thread.currentThread(),
Thread.currentThread().getId(),
fullName);
}
}
Expand Down