forked from miciek/grokkingfp-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathch10_CheckInsImperative.java
184 lines (161 loc) · 6.87 KB
/
ch10_CheckInsImperative.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import akka.actor.*;
import static akka.pattern.Patterns.ask;
/**
* These are not fully-blown solutions to check-ins example.
* We use a small part of the example that exposes the concurrent state problem and shows potential solutions.
*/
public class ch10_CheckInsImperative {
public static void main(String[] args) throws InterruptedException {
noSynchronization();
monitors();
actors();
threadSafeDataStructures();
atomicReferencesImperative();
atomicReferencesFunctional();
}
static void noSynchronization() throws InterruptedException {
var cityCheckIns = new HashMap<String, Integer>();
Runnable task = () -> {
for(int i = 0; i < 1000; i++) {
var cityName = i % 2 == 0 ? "Cairo" : "Auckland";
cityCheckIns.compute(cityName,
(city, checkIns) -> checkIns != null ? checkIns + 1 : 1);
}
};
new Thread(task).start();
new Thread(task).start();
// main thread is the ranking computation thread (a simulation that shows the problem)
Thread.sleep(300);
System.out.println("[no synchronization] Computing ranking based on: " + cityCheckIns);
}
static void monitors() throws InterruptedException {
var cityCheckIns = new HashMap<String, Integer>();
Runnable task = () -> {
for(int i = 0; i < 1000; i++) {
var cityName = i % 2 == 0 ? "Cairo" : "Auckland";
synchronized (cityCheckIns) {
cityCheckIns.compute(cityName,
(city, checkIns) -> checkIns != null ? checkIns + 1 : 1);
}
}
};
new Thread(task).start();
new Thread(task).start();
// main thread is the ranking computation thread (a simulation that shows the problem)
Thread.sleep(300);
System.out.println("[monitors] Computing ranking based on: " + cityCheckIns);
}
static void actors() throws InterruptedException {
ActorSystem system = ActorSystem.create("test-system");
ActorRef checkInsActor = system.actorOf(Props.create(CheckInsActor.class), "check-ins-actor");
ActorRef rankingActor = system.actorOf(Props.create(RankingActor.class), "ranking-actor");
Runnable task = () -> {
for(int i = 0; i < 1000; i++) {
var cityName = i % 2 == 0 ? "Cairo" : "Auckland";
checkInsActor.tell(new StoreCheckIn(cityName), null);
}
};
new Thread(task).start();
new Thread(task).start();
Thread.sleep(300);
rankingActor.tell(new ComputeRanking(checkInsActor), null);
Thread.sleep(100);
system.terminate();
}
static void threadSafeDataStructures() throws InterruptedException {
var cityCheckIns = new ConcurrentHashMap<String, Integer>();
Runnable task = () -> {
for(int i = 0; i < 1000; i++) {
var cityName = i % 2 == 0 ? "Cairo" : "Auckland";
cityCheckIns.compute(cityName,
(city, checkIns) -> checkIns != null ? checkIns + 1 : 1);
}
};
new Thread(task).start();
new Thread(task).start();
// main thread is the ranking computation thread (a simulation that shows the problem)
Thread.sleep(300);
System.out.println("[thread-safe data structures] Computing ranking based on: " + cityCheckIns);
}
static void atomicReferencesImperative() throws InterruptedException {
var cityCheckIns = new AtomicReference<>(new HashMap<String, Integer>());
Runnable task = () -> {
for(int i = 0; i < 1000; i++) {
var cityName = i % 2 == 0 ? "Cairo" : "Auckland";
var updated = false;
while(!updated) {
var currentCheckIns = cityCheckIns.get();
var newCheckIns = new HashMap<>(currentCheckIns); // this is critical, because AtomicReference in Java expects two different objects passed to CAS
newCheckIns.compute(cityName,
(city, checkIns) -> checkIns != null ? checkIns + 1 : 1);
updated = cityCheckIns.compareAndSet(currentCheckIns, newCheckIns);
}
}
};
new Thread(task).start();
new Thread(task).start();
Thread.sleep(300);
System.out.println("[atomic reference imperative] Computing ranking based on: " + cityCheckIns.get());
}
static void atomicReferencesFunctional() throws InterruptedException {
var cityCheckIns = new AtomicReference<>(new HashMap<String, Integer>());
Runnable task = () -> {
for(int i = 0; i < 1000; i++) {
var cityName = i % 2 == 0 ? "Cairo" : "Auckland";
cityCheckIns.updateAndGet(oldCheckIns -> {
var newCheckIns = new HashMap<>(oldCheckIns);
newCheckIns.compute(cityName,
(city, checkIns) -> checkIns != null ? checkIns + 1 : 1);
return newCheckIns;
});
}
};
new Thread(task).start();
new Thread(task).start();
Thread.sleep(300);
System.out.println("[atomic reference functional] Computing ranking based on: " + cityCheckIns.get());
}
}
class CheckInsActor extends AbstractActor {
private Map<String, Integer> cityCheckIns = new HashMap<>();
public Receive createReceive() {
return receiveBuilder().match(StoreCheckIn.class, message -> {
cityCheckIns.compute(message.cityName,
(city, checkIns) -> checkIns != null ? checkIns + 1 : 1);
}).match(GetCurrentCheckIns.class, message -> {
getSender().tell(new HashMap<>(cityCheckIns), null);
}).build();
}
}
class RankingActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder().match(ComputeRanking.class, message -> {
ask(message.checkInsActor, new GetCurrentCheckIns(), 1000).foreach(
cityCheckIns -> {
System.out.println("[actors] Computing ranking based on: " + cityCheckIns);
return this;
},
getContext().dispatcher()
);
}).build();
}
}
class StoreCheckIn {
public final String cityName;
public StoreCheckIn(String cityName) {
this.cityName = cityName;
}
}
class GetCurrentCheckIns {
}
class ComputeRanking {
public final ActorRef checkInsActor;
public ComputeRanking(ActorRef checkInsActor) {
this.checkInsActor = checkInsActor;
}
}