-
Notifications
You must be signed in to change notification settings - Fork 10
/
.iex.exs
146 lines (121 loc) · 3.53 KB
/
.iex.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
defmodule ReplTest do
def start do
opts = [
stream_name: "kcl-ex-test-stream",
app_name: "cbrodt-test-app",
shard_consumer: ReplTestShardConsumer,
processors: [
default: [
concurrency: 1,
min_demand: 10,
max_demand: 20
]
],
batchers: [
default: [
concurrency: 1,
batch_size: 40
]
]
]
KinesisClient.Stream.start_link(opts)
end
def scan_table(table_name \\ "cbrodt-test-app") do
ExAws.Dynamo.scan(table_name, limit: 20) |> ExAws.request()
end
def use_localstack_services do
Application.put_env(:ex_aws, :dynamodb,
scheme: "http://",
host: "localhost",
port: "4569",
region: "us-east-1"
)
Application.put_env(:ex_aws, :kinesis,
scheme: "http://",
host: "localhost",
port: "4568",
region: "us-east-1"
)
end
end
defmodule ReplTestShardConsumer do
@behaviour Broadway
require Logger
@impl Broadway
def handle_message(_processor, %{data: data, metadata: metadata} = msg, _context) do
%{id: id} = data |> Base.decode64!() |> Jason.decode!(keys: :atoms)
Logger.info("Got message #{id}")
path = Path.join("repl_output", "#{id}.json")
:ok = File.write(path, Jason.encode!(%{data: data, metadata: metadata}))
Logger.info("processing message with id: #{id}")
Broadway.Message.put_batcher(msg, :default)
end
@impl Broadway
def handle_batch(_batcher, messages, _batch_info, _context) do
Logger.info("Got batch of size: #{length(messages)}")
messages
end
@impl Broadway
def handle_failed(messages, _context) do
messages
end
end
defmodule CBDevTesting do
alias ExAws.Kinesis
@coordinator_name MyTestCoordinator
def update_shard_count(stream_name, shard_count) do
operation = %ExAws.Operation.JSON{
http_method: :post,
headers: [
{"x-amz-target", "Kinesis_20131202.UpdateShardCount"},
{"content-type", "application/x-amz-json-1.1"}
],
path: "/",
data: %{
ScalingType: "UNIFORM_SCALING",
StreamName: stream_name,
TargetShardCount: shard_count
},
service: :kinesis
}
ExAws.request(operation)
end
def describe_stream(stream_name) do
Kinesis.describe_stream(stream_name) |> ExAws.request()
end
def describe_table(table_name) do
ExAws.Dynamo.describe_table(table_name) |> ExAws.request()
end
def list_streams do
Kinesis.list_streams() |> ExAws.request()
end
def create_stream(stream_name, shard_count) do
stream_name |> Kinesis.create_stream(shard_count) |> ExAws.request()
end
def stream_name do
"decline-roman-empire-dev"
end
def start_coordinator(_overrides) do
opts = [
name: @coordinator_name,
app_name: "uberbrodt-kinesis-client-dev-app",
stream_name: stream_name(),
shard_supervisor_name: MyShardSupervisor,
notify_pid: self()
]
KinesisClient.Stream.Coordinator.start_link(opts)
end
def list_child_to_parent() do
g = get_coordinator_graph()
n = g |> :digraph.vertices() |> Enum.map(fn v -> {v, :digraph.in_neighbours(g, v)} end)
Enum.sort(n, fn {_, n1}, {_, n2} -> length(n1) <= length(n2) end)
end
def list_parent_to_child() do
g = get_coordinator_graph()
n = g |> :digraph.vertices() |> Enum.map(fn v -> {v, :digraph.out_neighbours(g, v)} end)
Enum.sort(n, fn {_, n1}, {_, n2} -> length(n1) >= length(n2) end)
end
def get_coordinator_graph() do
GenServer.call(@coordinator_name, :get_graph)
end
end