Skip to content

Commit

Permalink
TCP-Transport for milestone-1
Browse files Browse the repository at this point in the history
Excluding the last task defined  in skycoin/skywire#501 - everything is completed.

It was already done by most part a long time before.

The most hard part is in the last task.
  • Loading branch information
ayuryshev committed Aug 27, 2019
1 parent 6375fe2 commit 76bc8e8
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 19 deletions.
41 changes: 40 additions & 1 deletion pkg/snet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
// Network types.
const (
DmsgType = "dmsg"
TCPType = "tcptr"
)

var (
Expand All @@ -39,16 +40,22 @@ type Config struct {

DmsgDiscAddr string
DmsgMinSrvs int

LocalTCPAddress string
PubKeyFile string
}

// Network represents
type Network struct {
conf Config
dmsgC *dmsg.Client
tcpF *TCPFactory
}

func New(conf Config) *Network {
dmsgC := dmsg.NewClient(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.DmsgDiscAddr), dmsg.SetLogger(logging.MustGetLogger("snet.dmsgC")))
dmsgC := dmsg.NewClient(conf.PubKey,
conf.SecKey, disc.NewHTTP(conf.DmsgDiscAddr),
dmsg.SetLogger(logging.MustGetLogger("snet.dmsgC")))
return &Network{
conf: conf,
dmsgC: dmsgC,
Expand All @@ -67,6 +74,7 @@ func (n *Network) Init(ctx context.Context) error {
if err := n.dmsgC.InitiateServerConnections(ctx, n.conf.DmsgMinSrvs); err != nil {
return fmt.Errorf("failed to initiate 'dmsg': %v", err)
}

return nil
}

Expand Down Expand Up @@ -105,9 +113,16 @@ func (n *Network) Dial(network string, pk cipher.PubKey, port uint16) (*Conn, er
return nil, err
}
return makeConn(conn, network), nil
case TCPType:
conn, err := n.tcpF.Dial(ctx, pk)
if err != nil {
return nil, err
}
return makeConn(conn, network), nil
default:
return nil, ErrUnknownNetwork
}
return nil, nil
}

func (n *Network) Listen(network string, port uint16) (*Listener, error) {
Expand All @@ -118,9 +133,33 @@ func (n *Network) Listen(network string, port uint16) (*Listener, error) {
return nil, err
}
return makeListener(lis, network), nil
case TCPType:
if n.conf.PubKeyFile != "" {
pkt, err := FilePubKeyTable(n.conf.PubKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to inititiate tcp-transport: %v", err)
}
locAddr, err := net.ResolveTCPAddr("tcp", n.conf.LocalTCPAddress)
lsn, err := net.ListenTCP("tcp", locAddr)
if err != nil {
return nil, fmt.Errorf("failed to inititiate tcp-transport: %v", err)
}
n.tcpF = NewTCPFactory(n.conf.PubKey, pkt, lsn)
return &Listener{
Listener: lsn,
lPK: n.conf.PubKey,
lPort: 666,
network: TCPType,
}, nil
}
default:
return nil, ErrUnknownNetwork
}
return nil, nil
}

func (n *Network) TCP() *TCPFactory {
return n.tcpF
}

type Listener struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !no_ci

package transport
package snet

import (
"bufio"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !no_ci

package transport_test
package snet_test

import (
"context"
Expand All @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/require"

th "github.com/skycoin/skywire/internal/testhelpers"
"github.com/skycoin/skywire/pkg/transport"
"github.com/skycoin/skywire/pkg/snet"
)

/*
Expand Down Expand Up @@ -57,7 +57,7 @@ func Example_transport_TCPFactory() {
fmt.Println(err)
}

pkt := transport.MemoryPubKeyTable(
pkt := snet.MemoryPubKeyTable(
map[cipher.PubKey]string{
pkA: addrA.String(),
pkB: addrB.String(),
Expand All @@ -69,7 +69,7 @@ func Example_transport_TCPFactory() {
go func() {
defer wg.Done()

fA := transport.NewTCPFactory(pkA, pkt, lsnA)
fA := snet.NewTCPFactory(pkA, pkt, lsnA)
tr, err := fA.Accept(context.TODO())
if err != nil {
fmt.Printf("Accept err: %v\n", err)
Expand All @@ -87,7 +87,7 @@ func Example_transport_TCPFactory() {

go func() {
defer wg.Done()
fB := transport.NewTCPFactory(pkB, pkt, lsnB)
fB := snet.NewTCPFactory(pkB, pkt, lsnB)
tr, err := fB.Dial(context.TODO(), pkA)
if err != nil {
fmt.Printf("Dial err: %v\n", err)
Expand Down Expand Up @@ -127,11 +127,11 @@ func TestTCPFactory(t *testing.T) {
l2, err := net.ListenTCP("tcp", addr2)
require.NoError(t, err)

pkt1 := transport.MemoryPubKeyTable(map[cipher.PubKey]string{pk2: addr2.String()})
pkt2 := transport.MemoryPubKeyTable(map[cipher.PubKey]string{pk1: addr1.String()})
pkt1 := snet.MemoryPubKeyTable(map[cipher.PubKey]string{pk2: addr2.String()})
pkt2 := snet.MemoryPubKeyTable(map[cipher.PubKey]string{pk1: addr1.String()})

f1 := transport.NewTCPFactory(pk1, pkt1, l1)
f2 := transport.NewTCPFactory(pk2, pkt2, l2)
f1 := snet.NewTCPFactory(pk1, pkt1, l1)
f2 := snet.NewTCPFactory(pk2, pkt2, l2)
require.Equal(t, "tcp-transport", f1.Type())
require.Equal(t, pk1, f1.Local())
require.Equal(t, "tcp-transport", f2.Type())
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestFilePubKeyTable(t *testing.T) {
_, err = tmpfile.Write([]byte(fmt.Sprintf("%s\t%s\n", pk, addr)))
require.NoError(t, err)

pkt, err := transport.FilePubKeyTable(tmpfile.Name())
pkt, err := snet.FilePubKeyTable(tmpfile.Name())
require.NoError(t, err)
require.Equal(t, pkt.Count(), 1)

Expand All @@ -219,7 +219,7 @@ func Example_transport_MemoryPubKeyTable() {
pkA: ipA,
pkB: ipB,
}
pkt := transport.MemoryPubKeyTable(entries)
pkt := snet.MemoryPubKeyTable(entries)

fmt.Printf("ipA: %v\n", pkt.RemoteAddr(pkA))
fmt.Printf("pkB in: %v\n", pkt.RemotePK(ipA))
Expand All @@ -246,7 +246,7 @@ func Example_transport_FilePubKeyTable() {
_, err := tmpfile.Write([]byte(pkFileContent))
fmt.Printf("Write file success: %v\n", err == nil)

pkt, err := transport.FilePubKeyTable(tmpfile.Name())
pkt, err := snet.FilePubKeyTable(tmpfile.Name())

fmt.Printf("Opening FilePubKeyTable success: %v\n", err == nil)
fmt.Printf("ipA: %v\n", pkt.RemoteAddr(pkA))
Expand Down
5 changes: 5 additions & 0 deletions pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Config struct {
StaticSecKey cipher.SecKey `json:"static_secret_key"`
} `json:"node"`

TCPTransport struct {
PubKeyFile string `json:"pubkey_file"`
LocalTCPAddress string `json:"local_tcp_address"`
} `json:"tcp_transport"`

Messaging struct {
Discovery string `json:"discovery"`
ServerCount int `json:"server_count"`
Expand Down
12 changes: 7 additions & 5 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,13 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)

fmt.Println("min servers:", config.Messaging.ServerCount)
node.n = snet.New(snet.Config{
PubKey: pk,
SecKey: sk,
TpNetworks: []string{dmsg.Type}, // TODO: Have some way to configure this.
DmsgDiscAddr: config.Messaging.Discovery,
DmsgMinSrvs: config.Messaging.ServerCount,
PubKey: pk,
SecKey: sk,
TpNetworks: []string{dmsg.Type, snet.TCPType}, // TODO: Have some way to configure this.
DmsgDiscAddr: config.Messaging.Discovery,
DmsgMinSrvs: config.Messaging.ServerCount,
LocalTCPAddress: config.TCPTransport.LocalTCPAddress,
PubKeyFile: config.TCPTransport.PubKeyFile,
})
if err := node.n.Init(ctx); err != nil {
return nil, fmt.Errorf("failed to init network: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/visor/visor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,7 @@ func (r *mockRouter) Close() error {
func (r *mockRouter) IsSetupTransport(tr *transport.ManagedTransport) bool {
return false
}

func (r *mockRouter) SetupIsTrusted(_ cipher.PubKey) bool {
return false
}

0 comments on commit 76bc8e8

Please sign in to comment.