forked from alibaba/transmittable-thread-local
-
Notifications
You must be signed in to change notification settings - Fork 1
/
TtlCallable.java
152 lines (137 loc) · 6.06 KB
/
TtlCallable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.alibaba.ttl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Collections;
/**
* {@link TtlCallable} decorate {@link Callable}, so as to get {@link TransmittableThreadLocal}
* and transmit it to the time of {@link Callable} execution, needed when use {@link Callable} to thread pool.
* <p>
* Use factory method {@link #get(Callable)} to get decorated instance.
*
* @author Jerry Lee (oldratlee at gmail dot com)
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.ThreadPoolExecutor
* @see java.util.concurrent.ScheduledThreadPoolExecutor
* @see java.util.concurrent.Executors
* @see java.util.concurrent.CompletionService
* @see java.util.concurrent.ExecutorCompletionService
* @since 0.9.0
*/
public final class TtlCallable<V> implements Callable<V> {
private final AtomicReference<Map<TransmittableThreadLocal<?>, Object>> copiedRef;
private final Callable<V> callable;
private final boolean releaseTtlValueReferenceAfterCall;
private TtlCallable(Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
this.copiedRef = new AtomicReference<Map<TransmittableThreadLocal<?>, Object>>(TransmittableThreadLocal.copy());
this.callable = callable;
this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
}
/**
* wrap method {@link Callable#call()}.
*/
@Override
public V call() throws Exception {
Map<TransmittableThreadLocal<?>, Object> copied = copiedRef.get();
if (copied == null || releaseTtlValueReferenceAfterCall && !copiedRef.compareAndSet(copied, null)) {
throw new IllegalStateException("TTL value reference is released after call!");
}
Map<TransmittableThreadLocal<?>, Object> backup = TransmittableThreadLocal.backupAndSet(copied);
try {
return callable.call();
} finally {
TransmittableThreadLocal.restore(backup);
}
}
public Callable<V> getCallable() {
return callable;
}
/**
* Factory method, wrapper input {@link Callable} to {@link TtlCallable}.
* <p>
* This method is idempotent.
*
* @param callable input {@link Callable}
* @return Wrapped {@link Callable}
*/
public static <T> TtlCallable<T> get(Callable<T> callable) {
return get(callable, false);
}
/**
* Factory method, wrapper input {@link Callable} to {@link TtlCallable}.
* <p>
* This method is idempotent.
*
* @param callable input {@link Callable}
* @param releaseTtlValueReferenceAfterCall release TTL value reference after run, avoid memory leak even if {@link TtlRunnable} is referred.
* @return Wrapped {@link Callable}
*/
public static <T> TtlCallable<T> get(Callable<T> callable, boolean releaseTtlValueReferenceAfterCall) {
return get(callable, releaseTtlValueReferenceAfterCall, false);
}
/**
* Factory method, wrapper input {@link Callable} to {@link TtlCallable}.
* <p>
* This method is idempotent.
*
* @param callable input {@link Callable}
* @param releaseTtlValueReferenceAfterCall release TTL value reference after run, avoid memory leak even if {@link TtlRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bugs! <b>DO NOT</b> set, only when you know why.
* @return Wrapped {@link Callable}
*/
public static <T> TtlCallable<T> get(Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {
if (null == callable) {
return null;
}
if (callable instanceof TtlCallable) {
if (idempotent) {
// avoid redundant decoration, and ensure idempotency
return (TtlCallable<T>) callable;
} else {
throw new IllegalStateException("Already TtlCallable!");
}
}
return new TtlCallable<T>(callable, releaseTtlValueReferenceAfterCall);
}
/**
* wrapper input {@link Callable} Collection to {@link TtlCallable} Collection.
*
* @param tasks task to be wrapped
* @return Wrapped {@link Callable}
*/
public static <T> List<TtlCallable<T>> gets(Collection<? extends Callable<T>> tasks) {
return gets(tasks, false, false);
}
/**
* wrapper input {@link Callable} Collection to {@link TtlCallable} Collection.
*
* @param tasks task to be wrapped
* @param releaseTtlValueReferenceAfterCall release TTL value reference after run, avoid memory leak even if {@link TtlRunnable} is referred.
* @return Wrapped {@link Callable}
*/
public static <T> List<TtlCallable<T>> gets(Collection<? extends Callable<T>> tasks, boolean releaseTtlValueReferenceAfterCall) {
return gets(tasks, releaseTtlValueReferenceAfterCall, false);
}
/**
* wrapper input {@link Callable} Collection to {@link TtlCallable} Collection.
*
* @param tasks task to be wrapped
* @param releaseTtlValueReferenceAfterCall release TTL value reference after run, avoid memory leak even if {@link TtlRunnable} is referred.
* @param idempotent is idempotent or not. {@code true} will cover up bugs! <b>DO NOT</b> set, only when you know why.
* @return Wrapped {@link Callable}
*/
public static <T> List<TtlCallable<T>> gets(Collection<? extends Callable<T>> tasks, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {
if (null == tasks) {
return Collections.emptyList();
}
List<TtlCallable<T>> copy = new ArrayList<TtlCallable<T>>();
for (Callable<T> task : tasks) {
copy.add(TtlCallable.get(task, releaseTtlValueReferenceAfterCall, idempotent));
}
return copy;
}
}