Skip to content
Jon Meredith edited this page Sep 3, 2014 · 3 revisions

WORK IN PROGRESS

RiakNet

Goals:

  • Common layer for establishing streaming connections.

  • Protocol, version and capability negotiation.

  • Reuse performance pattern for realtime code (block/non-block coprocesses).

  • Fault detection - heartbeat for 'dead' remote processes

  • Connection to logical resources - other nodes in the same cluster, nodes in remote clusters.

  • Connection retry/backoff for unreachable remotes.

  • NAT traversal.

  • Encryption/protoocol independence.

  • Per-connection statistics (perhaps integrated with heartbeat)

  • Aggregated statistics - how much realtime, fullsync, handoff etc

  • No erlang terms used until after negotiation/establishment so that non-Erlang libraries could be easily written.

  • Realtime, Fullsync and Proxyget should be rewritten on top of it, as well as possibly handoff/repair.

Future:

  • Message passing layer to build global strong consistency on

Post-refactor State

  • riak_net component usable by all of core.

  • Realtime replication simplified

    Plumbtree overlay for all clusters in the domain. Realtime puts will need to be able to answer whether messages have been seen before from other sources, should be able to compare based on vclock. Perhaps modify the put FSM to return extra status on whether a put descends.

    Queue simplified to just track objects that need sending. Can we detect drops with at-least-once delivery?

    Would you still have a consumer for each remote, and skip delivery if plumbtree said not to send? Would you have a single consumer? Who worries about that?

    multiple vnodes -> multi-consumer queues -> realtime connections

    postcommit/vnode hook -> add to realtime queue realtime source becomes framing+pull+ack

  • Proxy-get simplified

    Use riak_net to simplify connection logic. Callback module for requesting/responding. What about cascading - how do you request which cluster it comes from, what if you don't have a direct connection to it? Can plumbtree handle that? Does it need the same service as global SC?

  • Fullsync

    Make a peer-to-peer difference exchanging connection Make a diff-sending connection Make a diff-requesting connection ... or should you mux?

Outbound connections

Currently Connection manager

Inbound connections

Currently Service manager

Locators

Locators provide a service for looking up remote services and converting them to a {Transport, Address} pair. They may take time to execute and can either reply immediately or provide a unique reference they will answer on in the future.

% Register locator function - if Pid provided, monitor and
% remove locator on death, otherwise live forever.
% Last registration wins, previous are discarded
riak_net:register_locator(Location, LocatorFun)
riak_net:register_locator(Location, LocatorFun, Pid)

% Remove locator
riak_net:unregister_locator(Location)

% Locator function called to find connection information.  May
% reply immediately, or return a reference it will call
% riak_net:location with later
LocatorFun(Location, Name, Opts) -> {reply, Transport, Address} | {noreply, Ref} | {error, Reason}

riak_net:location(Ref, Transport, Address) -> ok | {error, Reason}

For local applications like handoff, expect to register something as simple as {localnode, Node} and have the locator return the best Transport/Address. That could be made aware of which network interface to pick in the configuration.

Future usase could include finding nodes responsible for a portion of the keyspace {keyspace, RingSize, Index, Nval} or responsible for a vnode {vnode, RingSize, Index}

Locators need a robust way to re-register on crashes. It was a major source of problems in 1.3/1.4.

Cluster manager

Each local cluster needs enough information to connect to the remote cluster. Currently stored in the ring, should be stored in cluster or global metadata.

For simple MDC connections where both clusters can see one another, a direct IP/port can be determined the same way as a local node.

The cluster manager is responsible for

  • Updating the local bootstrap IP address for establishing connections.
  • Providing Transport/Address pairs for each MDC connection as well as any other needed information (like certificates)
  • Balance the MDC workload across the cluster with the background manager.

Perhaps

  • Track when fullsync connection is active between both sides if bidir, for status reporting/preventing duplicate work.

SSL

SSL is currently enabled/disabled for all connections. It should be more granular, by remote cluster.

NAT connections

Connections may have to traverse firewalls with NAT enabled. Repl2/3 stored a NAT map for converting local IP addresses to remote IP addresses.

Not sure if that makes more sense, or specifying inbound transport/address by node name, and applying that for all addresses.

NAT is currently applied based on the IP address that is expecting to make the connection. That functionality is duplicated in a few places.

Outbound-only clusters

Some clusters can only make outbound connections. This is why proxy get has such perverse setup.

For clusters that can only make outbound connections, the cluster manager would need to establish a connection to the desired endpoint and reverse the normal source/sink as done previously.

Some kind of protocol saying that the connection is for a particular connection request and the ability to change what protocol is being run.

Connection negotiation

Statistics and reporting

riak_core_tcp_mon

riak_net_stream

Brainstorming on how a 'generic' version of the realtime networking code could be created.

Current realtime modules

riak_repl2_rtsource_conn - controlling process source side
riak_repl2_rtsource_helper - sending process
riak_repl2_rtsink_conn - controlling process sink side
riak_repl2_rtsink_helper - sending process

Looking at the state record on the source/source helper side, annotating each entry whether it is to do with reporting about the status/health of connections or the actual realtime concerns, a large portion of state can be abstracted away.

-record(state, {remote,    % remote name    : CONNECTION
                address,   % {IP, Port}     : CONNECTION
                connection_ref, % reference handed out by connection manager
                transport, % transport module : CONNECTION
                socket,    % socket to use with transport : CONNECTION
                peername,  % cached when socket becomes active : CONNECTION
                proto,     % protocol version negotiated : CONNECTION
                ver,       % wire format negotiated : REALTIME?
                helper_pid,% riak_repl2_rtsource_helper pid : CONNECTIOn
                hb_interval,% seconds to send new heartbeat after last : CONNECTIOn
                hb_interval_tref, : CONNECTION
                hb_timeout,% seconds to wait for heartbeat after send CONNECTION
                hb_timeout_tref,% heartbeat timeout timer reference CONNECTIOn
                hb_sent_q,   % queue of heartbeats now() that were sent CONNECTION
                hb_rtt,    % RTT in milliseconds for last completed heartbeat CONNECTIOn
                cont = <<>>}). % continuation from previous TCP buffer CONNECTION

-record(state, {remote,     % remote site name   : CONNECTION
                transport,  % erlang module to use for transport : CONNECTION
                socket,     % socket to pass to transport : CONNECTION
                proto,      % protocol version negotiated : CONNECTION
                deliver_fun,% Deliver function : REALTIME
                sent_seq,   % last sequence sent : REALTIME
                v1_offset = 0, : REALTIME
                v1_seq_map = [], : REALTIME
                objects = 0}).   % number of objects sent - really number of pulls as could be multiobj : REALTIME

Similarly on the rtsink side

-record(state, {remote,           %% Remote site name : CONNECTION
                transport,        %% Module for sending : CONNECTION
                socket,           %% Socket : CONNECTION
                proto,            %% Protocol version negotiated : CONNECTION
                peername,         %% peername of socket : CONNECTION
                ver,              %% wire format agreed with rt source : REALTIME
                max_pending,      %% Maximum number of operations : REALTIME
                active = true,    %% If socket is set active : CONNECTION
                deactivated = 0,  %% Count of times deactivated : CONNECTION
                source_drops = 0, %% Count of upstream drops : REALTIME
                helper,           %% Helper PID : CONNECTION
                hb_last,          %% os:timestamp last heartbeat message received : CONNECTION
                seq_ref,          %% Sequence reference for completed/acked : REALTIME
                expect_seq = undefined,%% Next expected sequence number : REALTIME
                acked_seq = undefined, %% Last sequence number acknowledged : REALTIME
                completed = [],   %% Completed sequence numbers that need to be sent : REALTIME
                cont = <<>>,      %% Continuation from previous TCP buffer : CONNECTION
                bt_drops,         %% drops due to bucket type mis-matches : REALTIME
                bt_interval,      %% how often (in ms) to report bt_drops : REALTIME
                bt_timer          %% timer reference for interval   :REALTIME
               }).
-record(state, {parent           %% Parent process : CONNECTION
               }).

This breaks down into

  • msg encoding/decoding (testable for round trips) {Proto, Msg} -> Bin {Proto, Bin} -> Msg
  • source next request (blocking pull)
  • source send request
  • sink handle request
  • source handle response

Both sides need

  • protocol negotiation
  • heartbeat/rtt mechanism
    • sink is responsible for transmitting data within agreed time (maybe a message)
    • source sends local timestamp to remote, remote adds timestamp and sends back. Calculate RTT and drift.

Is there really any distinction between source/sink connections at this layer. Should you still just break into controlling/sender process, then specialize into source/sink.

Realtime knows it needs to establish an outbound connection to a particular cluster. {cluster, <>, [realtime]} and what versions we can support

Needs to register with RTQ once connection is established

Possible interface

riak_repl2_rtsource:start_link(Remote) ->
   riak_net_stream:start_link(?MODULE,
       [{location, {cluster, Remote}},
        #proto{name=realtime,
               version=[{3,0},{2,0},{1,0}],
               framing=riak_repl4_rtframing},
               capability=[lz4]
        ]

% riak_net_stream callbacks 

init(Args) ->
  {ok, #state{}}.

established(ConnInfo, State) ->
  % re-register with RTQ

handle_msg({ack, Seq}, State) ->
  % ack RTQ

helper_init() ->
  % rtq:pull_req()
  {send, Msg}

helper_info()

% riak_net_stream functions

start_link(Module, Location, Proto, Args, Opts)
rtt(Pid) -> {ok, RTT} - Get last measurement of app-level RTT
hb(Pid) -> % needed?
established(Pid) -> true | {false, Reasons}
status(Pid)
stream_status(Pid)
socket_status(Pid)
cast(Pid, Msg)
call(Pid, Msg)


% Possible riak_net callbacks
init(Args) - {ok, State}
init_helper(Args) -> {ok, HelperState} / loop() ?
established(ConnInfo, State)
retry(Failure, State)
status(State) -> {ok, List}
handle_cast(Msg, State)
handle_call(Msg, From, State) %% Maybe send something, maybe return something
  -> {ok, State} | {send, Msg, State}
  -> list of {send, Msg}, {reply, Answer}, {state, Blah},
               {helper, Msg}
 handle_msg(Msg, State) -> {ok, State} | {send, Msg, State} | {send_encoded
 helper_msg(Msg, HelperState) -> {ok, HelperState}