Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature: TCPFactory/TCPTransport instead of dmsg #503

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1857842
Compiling. Passes tests
ayuryshev Jul 25, 2019
b81e03f
configurable pubkeys file, TCPFactory initialized with Listener
ayuryshev Jul 25, 2019
0a2297d
Deduplicated code
ayuryshev Jul 25, 2019
6d96017
Commented out tcp-transport for setup-node
ayuryshev Jul 29, 2019
56c74c5
Changes: TCPFactory working correctly with predefined in/out ports
ayuryshev Jul 31, 2019
15226f0
Now works with dynamic ports from diallers
ayuryshev Jul 31, 2019
afbdd85
Integration environment ready.
ayuryshev Jul 31, 2019
cb5bf31
Proceeded to thorn-letter problem
ayuryshev Aug 1, 2019
4f56b61
Transport is accepted by nodeA. Still not working
ayuryshev Aug 2, 2019
45f805c
Remerged with mainnet. Setup node changes rolled back
ayuryshev Aug 8, 2019
fca2d56
Start of multihead test
ayuryshev Aug 12, 2019
4d4b980
Managed to run 128 nodes in Example_runMultihead
ayuryshev Aug 13, 2019
8dec53d
Logrus output accumulated in memory. Routing Tables are in memory too
ayuryshev Aug 13, 2019
5ae74d5
multihead environment enveloped in type MultiHead. Send message as Mu…
ayuryshev Aug 14, 2019
43d2611
Fixed logging everywhere. Multhead simplified
ayuryshev Aug 15, 2019
a8142bc
Restructured tests for router
ayuryshev Aug 16, 2019
70b1779
Changes in PacketRouter implementations: callbacks streamlined into r…
ayuryshev Aug 17, 2019
4a951c6
a lot of debugging logs
ayuryshev Aug 17, 2019
3da5dbe
still working
ayuryshev Aug 19, 2019
1d31171
yet another step forward
ayuryshev Aug 19, 2019
598e17a
Yet another step
ayuryshev Aug 21, 2019
7572b29
tcp-transport finally working
ayuryshev Aug 22, 2019
7635a5a
some cleanups
ayuryshev Aug 23, 2019
d9e6356
encore un efforti
ayuryshev Aug 26, 2019
fef6c40
restored logging
ayuryshev Aug 26, 2019
3ac70d0
routing.Loop renamed to routing.AddressPair
ayuryshev Aug 26, 2019
1e53cfe
LoopDescriptor, LoopData -> AddressPairDescriptor, AddressPairData
ayuryshev Aug 26, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/skywire-cli/commands/node/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ func defaultConfig() *visor.Config {
conf.Routing.RouteFinder = "https://routefinder.skywire.skycoin.net/"

const defaultSetupNodePK = "0324579f003e6b4048bae2def4365e634d8e0e3054a20fc7af49daf2a179658557"

sPK := cipher.PubKey{}
if err := sPK.UnmarshalText([]byte(defaultSetupNodePK)); err != nil {
log.WithError(err).Warnf("Failed to unmarshal default setup node public key %s", defaultSetupNodePK)
}

conf.Routing.SetupNodes = []cipher.PubKey{sPK}
conf.Routing.Table.Type = "boltdb"
conf.Routing.Table.Location = "./skywire/routing.db"
Expand All @@ -120,5 +122,8 @@ func defaultConfig() *visor.Config {

conf.Interfaces.RPCAddress = "localhost:3435"

conf.TransportType = "dmsg"
conf.PubKeysFile = "./local/pubkeys"

return conf
}
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ require (
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.3.0
go.etcd.io/bbolt v1.3.3
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
golang.org/x/tools v0.0.0-20190725161231-2e34cfcb95cb // indirect
)

// Uncomment for tests with alternate branches of 'dmsg'
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,15 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 h1:ydJNl0ENAG67pFbB+9tfhiL2pYqLhfoaZFw/cjLhY4A=
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -136,12 +140,16 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190627182818-9947fec5c3ab/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190725161231-2e34cfcb95cb h1:Zi4or4sGkVpI7V5TX4JDMHJAthfKDKg8WotOBKXqmTs=
golang.org/x/tools v0.0.0-20190725161231-2e34cfcb95cb/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
4 changes: 4 additions & 0 deletions pkg/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ type Config struct {

TransportDiscovery string `json:"transport_discovery"`

TransportType string `json:"transport_type"`
PubKeysFile string `json:"pubkeys_file"`
TCPTransportAddr string `json:"tcptransport_addr"`

LogLevel string `json:"log_level"`
}
34 changes: 26 additions & 8 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Node struct {
Logger *logging.Logger

tm *transport.Manager
messenger *dmsg.Client
messenger transport.Factory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setup node is supposed to use dmsg exclusively and directly. The communication between the visor and the setup node should not use TCP transport.


srvCount int
metrics metrics.Recorder
Expand All @@ -45,7 +45,21 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) {
if lvl, err := logging.LevelFromString(conf.LogLevel); err == nil {
logger.SetLevel(lvl)
}
messenger := dmsg.NewClient(pk, sk, disc.NewHTTP(conf.Messaging.Discovery), dmsg.SetLogger(logger.PackageLogger(dmsg.Type)))

var (
factory transport.Factory
err error
)
switch conf.TransportType {
case "dmsg":
factory = dmsg.NewClient(pk, sk, disc.NewHTTP(conf.Messaging.Discovery),
dmsg.SetLogger(logger.PackageLogger(dmsg.Type)))
case "tcp-transport":
factory, err = transport.NewTCPFactory(pk, conf.PubKeysFile, conf.TCPTransportAddr)
if err != nil {
return nil, err
}
}

trDiscovery, err := trClient.NewHTTP(conf.TransportDiscovery, pk, sk)
if err != nil {
Expand All @@ -59,7 +73,7 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) {
LogStore: transport.InMemoryTransportLogStore(),
}

tm, err := transport.NewManager(tmConf, messenger)
tm, err := transport.NewManager(tmConf, factory)
if err != nil {
log.Fatal("Failed to setup Transport Manager: ", err)
}
Expand All @@ -69,18 +83,22 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) {
Logger: logger.PackageLogger("routesetup"),
metrics: metrics,
tm: tm,
messenger: messenger,
messenger: factory,
srvCount: conf.Messaging.ServerCount,
}, nil
}

// Serve starts transport listening loop.
func (sn *Node) Serve(ctx context.Context) error {
if sn.srvCount > 0 {
if err := sn.messenger.InitiateServerConnections(ctx, sn.srvCount); err != nil {
return fmt.Errorf("messaging: %s", err)

switch factory := sn.messenger.(type) {
case *dmsg.Client:
if sn.srvCount > 0 {
if err := factory.InitiateServerConnections(ctx, sn.srvCount); err != nil {
return fmt.Errorf("messaging: %s", err)
}
sn.Logger.Info("Connected to messaging servers")
}
sn.Logger.Info("Connected to messaging servers")
}

go func() {
Expand Down
43 changes: 30 additions & 13 deletions pkg/transport/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
Expand All @@ -18,35 +19,51 @@ var ErrUnknownRemote = errors.New("unknown remote")

// TCPFactory implements Factory over TCP connection.
type TCPFactory struct {
l *net.TCPListener
lpk cipher.PubKey
pkt PubKeyTable
Pk cipher.PubKey
PkTable PubKeyTable
Lsr *net.TCPListener
}

// NewTCPFactory constructs a new TCP Factory.
func NewTCPFactory(lpk cipher.PubKey, pkt PubKeyTable, l *net.TCPListener) Factory {
return &TCPFactory{l, lpk, pkt}
// NewTCPFactory constructs a new TCP Factory
func NewTCPFactory(pk cipher.PubKey, pubkeysFile string, tcpAddr string) (Factory, error) {

pkTbl, err := FilePubKeyTable(pubkeysFile)
if err != nil {
return nil, fmt.Errorf("error %v reading %v", err, pubkeysFile)
}

addr, err := net.ResolveTCPAddr("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("error %v resolving %v", err, tcpAddr)
}

tcpListener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error %v listening %v", err, tcpAddr)
}

return &TCPFactory{pk, pkTbl, tcpListener}, nil
}

// Accept accepts a remotely-initiated Transport.
func (f *TCPFactory) Accept(ctx context.Context) (Transport, error) {
conn, err := f.l.AcceptTCP()
conn, err := f.Lsr.AcceptTCP()
if err != nil {
return nil, err
}

raddr := conn.RemoteAddr().(*net.TCPAddr)
rpk := f.pkt.RemotePK(raddr.IP)
rpk := f.PkTable.RemotePK(raddr.IP)
if rpk.Null() {
return nil, ErrUnknownRemote
}

return &TCPTransport{conn, [2]cipher.PubKey{f.lpk, rpk}}, nil
return &TCPTransport{conn, [2]cipher.PubKey{f.Pk, rpk}}, nil
}

// Dial initiates a Transport with a remote node.
func (f *TCPFactory) Dial(ctx context.Context, remote cipher.PubKey) (Transport, error) {
raddr := f.pkt.RemoteAddr(remote)
raddr := f.PkTable.RemoteAddr(remote)
if raddr == nil {
return nil, ErrUnknownRemote
}
Expand All @@ -56,20 +73,20 @@ func (f *TCPFactory) Dial(ctx context.Context, remote cipher.PubKey) (Transport,
return nil, err
}

return &TCPTransport{conn, [2]cipher.PubKey{f.lpk, remote}}, nil
return &TCPTransport{conn, [2]cipher.PubKey{f.Pk, remote}}, nil
}

// Close implements io.Closer
func (f *TCPFactory) Close() error {
if f == nil {
return nil
}
return f.l.Close()
return f.Lsr.Close()
}

// Local returns the local public key.
func (f *TCPFactory) Local() cipher.PubKey {
return f.lpk
return f.Pk
}

// Type returns the Transport type.
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/tcp_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTCPFactory(t *testing.T) {
pkt1 := transport.InMemoryPubKeyTable(map[cipher.PubKey]*net.TCPAddr{pk2: addr2})
pkt2 := transport.InMemoryPubKeyTable(map[cipher.PubKey]*net.TCPAddr{pk1: addr1})

f1 := transport.NewTCPFactory(pk1, pkt1, l1)
f1 := &transport.TCPFactory{pk1, pkt1, l1}
errCh := make(chan error)
go func() {
tr, err := f1.Accept(context.TODO())
Expand All @@ -49,7 +49,7 @@ func TestTCPFactory(t *testing.T) {
errCh <- nil
}()

f2 := transport.NewTCPFactory(pk2, pkt2, l2)
f2 := &transport.TCPFactory{pk2, pkt2, l2}
assert.Equal(t, "tcp", f2.Type())
assert.Equal(t, pk2, f2.Local())

Expand Down
9 changes: 6 additions & 3 deletions pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ type Config struct {
AppsPath string `json:"apps_path"`
LocalPath string `json:"local_path"`

LogLevel string `json:"log_level"`
ShutdownTimeout Duration `json:"shutdown_timeout"` // time value, examples: 10s, 1m, etc
TransportType string `json:"transport_type"`
PubKeysFile string `json:"pubkeys_file"`
TCPTransportAddr string `json:"tcptransport_addr"`

Interfaces InterfaceConfig `json:"interfaces"`
ShutdownTimeout Duration `json:"shutdown_timeout"` // time value, examples: 10s, 1m, etc
Interfaces InterfaceConfig `json:"interfaces"`
LogLevel string `json:"log_level"`
}

// MessagingConfig returns config for dmsg client.
Expand Down
31 changes: 24 additions & 7 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type PacketRouter interface {
type Node struct {
config *Config
router PacketRouter
messenger *dmsg.Client
messenger transport.Factory
tm *transport.Manager
rt routing.Table
executer appExecuter
Expand Down Expand Up @@ -122,16 +122,27 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)
sk := config.Node.StaticSecKey
mConfig, err := config.MessagingConfig()
if err != nil {
return nil, fmt.Errorf("invalid Messaging config: %s", err)
return nil, fmt.Errorf("invalid messaging config: %s", err)
}

node.messenger = dmsg.NewClient(mConfig.PubKey, mConfig.SecKey, mConfig.Discovery, dmsg.SetLogger(node.Logger.PackageLogger(dmsg.Type)))
switch config.TransportType {
case "dmsg":
node.messenger = dmsg.NewClient(mConfig.PubKey, mConfig.SecKey,
mConfig.Discovery, dmsg.SetLogger(node.Logger.PackageLogger(dmsg.Type)))
case "tcp-transport":
var err error
node.messenger, err = transport.NewTCPFactory(config.Node.StaticPubKey, config.PubKeysFile, config.TCPTransportAddr)
if err != nil {
return nil, err
}
}

trDiscovery, err := config.TransportDiscovery()
if err != nil {
return nil, fmt.Errorf("invalid MessagingConfig: %s", err)
}
logStore, err := config.TransportLogStore()

if err != nil {
return nil, fmt.Errorf("invalid TransportLogStore: %s", err)
}
Expand Down Expand Up @@ -205,11 +216,17 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)
// Start spawns auto-started Apps, starts router and RPC interfaces .
func (node *Node) Start() error {
ctx := context.Background()
err := node.messenger.InitiateServerConnections(ctx, node.config.Messaging.ServerCount)
if err != nil {
return fmt.Errorf("%s: %s", dmsg.Type, err)

switch factory := node.messenger.(type) {
case *dmsg.Client:
err := factory.InitiateServerConnections(ctx, node.config.Messaging.ServerCount)
if err != nil {
return fmt.Errorf("%s: %s", dmsg.Type, err)
}
node.logger.Info("Connected to messaging servers")
case *transport.TCPFactory:
node.logger.Info("TCPFactory: ignoring configured messaging servers")
}
node.logger.Info("Connected to messaging servers")

pathutil.EnsureDir(node.dir())
node.closePreviousApps()
Expand Down
Loading