-
Notifications
You must be signed in to change notification settings - Fork 3
/
CommunicatorInput.java
178 lines (165 loc) · 6.03 KB
/
CommunicatorInput.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package org.ibrdtnapi.dispatcher;
import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import org.ibrdtnapi.Api;
import org.ibrdtnapi.ApiException;
import org.ibrdtnapi.dispatcher.Dispatcher.State;
import org.ibrdtnapi.entities.Bundle;
import org.ibrdtnapi.entities.FifoBundleQueue;
/**
*
* The input stream from the socket with the daemon
* is processed by this class.
* It log everything, by default (hard-coded) in a file.
* If the file cannot be open, it logs in the ... log at
* (info-level).
*
* This class, among others, changes the state of the
* {@link Dispatcher}. All return-codes from the daemon
* are received by the {@link CommunicatorInput}.
*
*/
public class CommunicatorInput implements Runnable {
private static final Logger log = Logger.getLogger(CommunicatorInput.class.getName());
private BufferedReader br = null;
private FifoBundleQueue toFetchBundles = new FifoBundleQueue();
private Dispatcher dispatcher = null;
private StringBuilder buffer = null;
private List<String> neighborList = new ArrayList<String>();
public CommunicatorInput(BufferedReader br, Dispatcher dispatcher) {
this.br = br;
this.dispatcher = dispatcher;
this.dispatcher.setState(State.CONNECTED);
this.toFetchBundles.addObserver(this.dispatcher);
synchronized(Api.lockFile) {
try {
//Open and trunk the file.
Api.logFile = new FileWriter(Api.LOG_FILE_PATH);
Api.logFile.close();
} catch (IOException e) {
CommunicatorInput.log.info("Could not open the file '" + Api.LOG_FILE_PATH + "' to write it. Log will be display at finest level.");
}
}
}
public FifoBundleQueue getToFetchBundles() {
return this.toFetchBundles;
}
@Override
public void run() {
String str;
final int payloadLinesToBufferize = 2;
int emptyLinesCount = 0;
try {
while ((str = this.br.readLine()) != null) {
this.log(str);
//First thing first, we check if it's a 602 notify
if(str.startsWith("602 NOTIFY BUNDLE")) {
this.notifyBundle(str);
//Otherwise, if bundle is loaded (and info are sent), bufferize them:
} else if(this.dispatcher.getState() == State.BDL_LOADED) {
this.buffer.append(str + "\n");
if(str.startsWith("Blocks: "))
this.dispatcher.setState(State.INFO_BUFFERED);
//Otherwise, if payload is sent, bufferize it:
} else if(this.dispatcher.getState() == State.PLD_BUFFERING) {
//If the line is empty, count it
if (str.trim().isEmpty()) {
emptyLinesCount++;
if(emptyLinesCount == payloadLinesToBufferize) {
this.dispatcher.setState(State.PLD_BUFFERED);
emptyLinesCount = 0;
}
} else {
this.buffer.append(str + '\n');
}
//Otherwise, the neighbor list has been requested:
} else if(this.dispatcher.getState() == State.NEIGHBOR_LIST) {
//If the line is not empty, we add this neighbor
if(!str.trim().isEmpty()) {
this.neighborList.add(str);
//Otherwise, if the line is empty, the listing is done.
} else {
this.dispatcher.setState(State.NEIGHBOR_LISTED);
}
} else {
this.parse(str);
}
}
} catch (IOException e) {
CommunicatorInput.log.severe("CommunicatorInput Interrupted. " + e.getMessage());
}
}
private void log(String str) {
try {
synchronized(Api.lockFile) {
Api.logFile = new FileWriter(Api.LOG_FILE_PATH, Api.APPEND);
Api.logFile.append(str + "\n");
Api.logFile.flush();
Api.logFile.close();
}
} catch (IOException e) {
CommunicatorInput.log.info("Could not open the file '" + Api.LOG_FILE_PATH + "' to write it. Log will be display at finest level.");
CommunicatorInput.log.finest(str);
}
}
public String getBuffer() {
String ret = this.buffer.toString();
this.buffer = new StringBuilder();//Clear the buffer
return ret;
}
public List<String> getNeighborList() {
List<String> list = new ArrayList<String>();
list.addAll(this.neighborList);
this.neighborList.clear();
return list;
}
private void parse(String str) {
if(str == null) {
throw new ApiException("Input string null.");
} else if("200 SWITCHED TO EXTENDED".equals(str)) {
this.dispatcher.setState(Dispatcher.State.EXTENDED);
} else if("200 OK".equals(str) && this.dispatcher.getState() == State.EXTENDED) {
this.dispatcher.setState(Dispatcher.State.IDLE);
} else if(str.startsWith("200 BUNDLE LOADED")) {
this.dispatcher.setState(State.BDL_LOADED);
this.buffer = new StringBuilder();//Clear the buffer
} else if(str.startsWith("200 BUNDLE DELIVERED ACCEPTED")) {
this.dispatcher.setState(State.BDL_DELIVERED);
} else if(str.startsWith("100 PUT BUNDLE PLAIN")) {
this.dispatcher.setState(State.PUTTING);
} else if(str.startsWith("200 BUNDLE IN REGISTER")) {
this.dispatcher.setState(State.BDL_REGISTERED);
} else if(str.startsWith("200 BUNDLE SENT")) {
this.dispatcher.setState(State.BDL_SENT);
} else if(str.startsWith("200 BUNDLE INFO")) {
this.dispatcher.setState(State.BDL_INFO);
} else if(str.startsWith("200 PAYLOAD GET")) {
this.dispatcher.setState(State.PLD_BUFFERING);
} else if(str.startsWith("200 NODENAME")) {
this.dispatcher.setNodeName(str.split(" ")[2]);
} else if(str.startsWith("100 BUNDLE BLOCK ADD")) {
this.dispatcher.setState(State.BDL_BLOCK_ADDING);
} else if(str.startsWith("200 BUNDLE BLOCK ADD SUCCESSFUL")) {
this.dispatcher.setState(State.BLOCK_ADDED);
} else if(str.startsWith("200 NEIGHBOR LIST")) {
this.dispatcher.setState(State.NEIGHBOR_LIST);
}
}
private void notifyBundle(String str) {
Thread fetcherLauncher = new Thread(new FetcherLauncher(this.bundleNotified(str), this.toFetchBundles));
fetcherLauncher.setName("Fetcher launcher");
this.dispatcher.addFetcher(fetcherLauncher);
}
private Bundle bundleNotified(String str) {
String[] parsed = str.split(" ");
long timestamp = Long.parseLong(parsed[3]);
int blockNumber = Integer.parseInt(parsed[4]);
String source = parsed[5];
Bundle bundle = new Bundle(timestamp, blockNumber, source, null);
return bundle;
}
}