-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.erl
executable file
·162 lines (148 loc) · 5.76 KB
/
worker.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
-module(worker).
-include("logging.hrl").
-include("workerstate.hrl").
-export([prepare_socket/1, start/1, send_to_ring/2]).
-define(SETUP_TIME, 1000).
-define(PACKET_TYPE, {packet, 4}).
prepare_socket(MyPort) ->
{ok, LS} = gen_tcp:listen(MyPort, [binary, ?PACKET_TYPE]),
{ok, Port} = inet:port(LS),
spawn_link(fun() -> acceptAll(LS) end),
Port.
start(St) -> Pid = spawn(fun() -> restart(St) end), register(worker, Pid), Pid.
%prepara/reinicia el worker
restart(St) ->
clear_msgs(),
[Client!reset || Client <- St#wstate.clients],%avisa a los clientes que se reinicio el anillo
cache!cleanup,
{ok, Prv, Nxt}=ring:create(St#wstate.port),
megaphone!{enable, St#wstate.id},
?INFO("Ring created"),
filesystem!dosanitycheck,
handler(workerstate:reset_start_time(
workerstate:set_prv(Prv,
workerstate:set_nxt(Nxt,
workerstate:set_leader(St#wstate.id, St))))).
clear_msgs() ->
receive {tcp, _, _} -> clear_msgs()
after 0 -> ok
end.
%recibe todos los mensajes del worker
handler(St) ->
{Prv, Nxt} = {St#wstate.prv, St#wstate.nxt},
receive
{udp,_,_,_,B} ->
%~ ?DF("UDP~p", [B]),
handle_announcements(cmd:parse(B), St);
{tcp, Prv, B} ->
%~ ?DF("TCP~p", [cmd:parse(B)]),
handlePrv(cmd:parse(B), St);
{tcp, Nxt, _} -> exit(?ERROR("Received msg from Nxt"));
{tcp, S, B} -> % sender desconocido
newconnection_handler(S, cmd:parse(B), St);
{send, Data} -> send_data(Data, St), handler(St);
{make_ring_packet, Client, Cmd, Args} ->
Client!cmd:make(["RING", St#wstate.id, St#wstate.numpaq, Cmd, {Client, Args}]),
handler(workerstate:increment_numpaq(St));
{client_closed, Pid} -> ?INFO("Client disconnected"), handler(workerstate:remove_client(Pid, St));
{tcp_closed, Nxt} -> gen_tcp:close(Prv), restart(St);
{tcp_closed, Prv} -> gen_tcp:close(Nxt), restart(St);
{tcp_closed, _} -> handler(St);
Msg -> exit(?ERROR("Unhandled MSG: ~p", [Msg]))
end.
handle_announcements(["SERVER", Id, Ip, Port], St) when Id>St#wstate.leader_id ->
case ring:join(Ip, Port, St#wstate.prv, St#wstate.nxt, St#wstate.port) of
continue -> handler(St);
{ok, PrvN, NxtN} ->
?INFO("Joined to server ~p at port ~p", [Id, Port]),
handler(workerstate:reset_start_time(
workerstate:set_prv(PrvN,
workerstate:set_nxt(NxtN,
workerstate:set_leader(Id, St)))))
end;
handle_announcements(_, St) -> handler(St).
pass_megaphone(St) ->
megaphone!{is_enabled, self()},
receive
{is_enabled, true} ->
megaphone!disable,
send_data(cmd:make(["ANNOUNCE"]), St);
{is_enabled, false} -> ok
end.
newconnection_handler(S, ["WORK", Port], St) ->%es un worker que quiere unirse
case ring:addWorker(S, Port, St#wstate.prv, St#wstate.nxt) of
continue -> handler(St);
reset -> restart(St);
{ok, PrvN, NxtN} ->
?INFO("Worker connected"),
handler(workerstate:reset_start_time(workerstate:set_prv(PrvN, workerstate:set_nxt(NxtN, St))))
end;
newconnection_handler(S, ["CON"], St) ->
Pid=client:start(clientstate:create(S)),
gen_tcp:controlling_process(S, Pid),
pass_megaphone(St),
?INFO("Client connected"),
handler(workerstate:add_client(Pid, St));
%aqui llegan los mensajes de anillos ya destruidos(descartados):
%o de clientes que ponen mal el primer comando
newconnection_handler(S, _, St) ->
gen_tcp:close(S), handler(St).
handlePrv(["SETPRV", Ip, Port], St) ->
case ring:setPrv(Ip, Port, St#wstate.prv) of
continue -> handler(St);
{ok,PrvN} -> handler(workerstate:reset_start_time(workerstate:set_prv(PrvN, St)))
end;
handlePrv(["RING", Id, N, Cmd, {Client, Args}], St) -> %se recibio un mensaje del fs
spawn(fun() -> %procesarlo y volverlo a mandar
if
Id==St#wstate.id ->
filesystem!{ring, self(), Cmd, Args},
receive NArgs -> Client!{ans, NArgs} end;
true ->
cache!{get, self(), Id, N},
receive
{ok, Ans} -> worker!{send, Ans};
nocached ->
filesystem!{ring, self(), Cmd, Args},
receive NArgs ->
NAns=cmd:make(["RING", Id, N, Cmd, {Client, NArgs}]),
cache!{put, Id, N, NAns},
worker!{send, NAns}
end
end
end
end), handler(St);
handlePrv(["ANNOUNCE"], St) ->
megaphone!{enable, St#wstate.leader_id}, handler(St);
handlePrv({error, B}, _) -> exit(?ERROR("Badformed TCP-MSG: ~p", [B])).
%Solicita al anillo que cree un paquete y lo envie
send_to_ring(Cmd, Args) ->
worker!{make_ring_packet, self(), Cmd, Args},
receive Paq -> Paq end, send(Paq).
%Envia un paquete y espera su respuesta (reenvia si se reinicio)
send(Paq) ->
worker!{send, Paq},
receive
{ans, Ans} -> Ans;
reset -> timer:apply_after(?SETUP_TIME, ?MODULE, send, [Paq])
end.
send_data(Data, St) ->
ElapsedTime=elapsed_since_ms(St#wstate.start_time),
case ElapsedTime>=?SETUP_TIME of%chequea si el anillo esta esable
true ->
case gen_tcp:send(St#wstate.nxt, Data) of
ok -> ok;
_ -> timer:send_after(?SETUP_TIME, {send, Data}) %worker may have been disconected
end;
false -> %espera un rato antes de mandar
timer:send_after(max(?SETUP_TIME-ElapsedTime+10, 0), {send, Data})
end.
elapsed_since_ms(TimeStamp) ->
erlang:convert_time_unit(erlang:monotonic_time()-TimeStamp, native, milli_seconds).
%acepta todas las conexiones y las manda al worker
acceptAll(LS) ->
case gen_tcp:accept(LS) of
{ok, NS} -> gen_udp:controlling_process(NS, whereis(worker));
{error, Reason} -> ?INFO("Can't accept client: ~p", [Reason])
end,
acceptAll(LS).