Skip to content
This repository has been archived by the owner on Dec 14, 2021. It is now read-only.

Commit

Permalink
Merge pull request #393 from TheThingsNetwork/develop
Browse files Browse the repository at this point in the history
v2.0.3
  • Loading branch information
htdvisser authored Dec 22, 2016
2 parents bb72d66 + 5799f0c commit e17566e
Show file tree
Hide file tree
Showing 17 changed files with 576 additions and 172 deletions.
18 changes: 18 additions & 0 deletions api/metadata.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"strconv"

"github.com/TheThingsNetwork/ttn/utils/errors"
"golang.org/x/net/context" // See https://github.com/grpc/grpc-go/issues/711"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -45,3 +47,19 @@ func KeyFromMetadata(md metadata.MD) (string, error) {
}
return key[0], nil
}

func OffsetFromMetadata(md metadata.MD) (int, error) {
offset, ok := md["offset"]
if !ok || len(offset) == 0 {
return 0, nil
}
return strconv.Atoi(offset[0])
}

func LimitFromMetadata(md metadata.MD) (int, error) {
limit, ok := md["limit"]
if !ok || len(limit) == 0 {
return 0, nil
}
return strconv.Atoi(limit[0])
}
97 changes: 97 additions & 0 deletions api/monitor/broker_downlink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright © 2016 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.

package monitor

import (
"context"
"time"

"github.com/TheThingsNetwork/ttn/api/broker"
"github.com/TheThingsNetwork/ttn/utils/backoff"
"github.com/TheThingsNetwork/ttn/utils/errors"
"github.com/golang/protobuf/ptypes/empty"
)

func (cl *brokerClient) initDownlink() {
cl.downlink.ch = make(chan *broker.DownlinkMessage, BufferSize)
go cl.monitorDownlink()
}

func (cl *brokerClient) monitorDownlink() {
var retries int
newStream:
for {
ctx, cancel := context.WithCancel(cl.Context())
cl.downlink.Lock()
cl.downlink.cancel = cancel
cl.downlink.Unlock()

stream, err := cl.client.client.BrokerDownlink(ctx)
if err != nil {
cl.Ctx.WithError(errors.FromGRPCError(err)).Warn("Failed to open new monitor downlink stream")

retries++
time.Sleep(backoff.Backoff(retries))

continue
}
retries = 0
cl.Ctx.Debug("Opened new monitor downlink stream")

// The actual stream
go func() {
for {
select {
case <-ctx.Done():
return
case downlink, ok := <-cl.downlink.ch:
if ok {
stream.Send(downlink)
cl.Ctx.Debug("Sent downlink to monitor")
}
}
}
}()

msg := new(empty.Empty)
for {
if err := stream.RecvMsg(&msg); err != nil {
cl.Ctx.WithError(errors.FromGRPCError(err)).Warn("Received error on monitor downlink stream, closing...")
stream.CloseSend()
cl.Ctx.Debug("Closed monitor downlink stream")

cl.downlink.Lock()
cl.downlink.cancel()
cl.downlink.cancel = nil
cl.downlink.Unlock()

retries++
time.Sleep(backoff.Backoff(retries))

continue newStream
}
}
}
}

func (cl *brokerClient) closeDownlink() {
cl.downlink.Lock()
defer cl.downlink.Unlock()
if cl.downlink.cancel != nil {
cl.downlink.cancel()
}
}

// SendDownlink sends downlink to the monitor
func (cl *brokerClient) SendDownlink(downlink *broker.DownlinkMessage) (err error) {
cl.downlink.init.Do(cl.initDownlink)

select {
case cl.downlink.ch <- downlink:
default:
cl.Ctx.Warn("Not sending downlink to monitor, buffer full")
return errors.New("Not sending downlink to monitor, buffer full")
}
return
}
97 changes: 97 additions & 0 deletions api/monitor/broker_uplink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright © 2016 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.

package monitor

import (
"context"
"time"

"github.com/TheThingsNetwork/ttn/api/broker"
"github.com/TheThingsNetwork/ttn/utils/backoff"
"github.com/TheThingsNetwork/ttn/utils/errors"
"github.com/golang/protobuf/ptypes/empty"
)

func (cl *brokerClient) initUplink() {
cl.uplink.ch = make(chan *broker.DeduplicatedUplinkMessage, BufferSize)
go cl.monitorUplink()
}

func (cl *brokerClient) monitorUplink() {
var retries int
newStream:
for {
ctx, cancel := context.WithCancel(cl.Context())
cl.uplink.Lock()
cl.uplink.cancel = cancel
cl.uplink.Unlock()

stream, err := cl.client.client.BrokerUplink(ctx)
if err != nil {
cl.Ctx.WithError(errors.FromGRPCError(err)).Warn("Failed to open new monitor uplink stream")

retries++
time.Sleep(backoff.Backoff(retries))

continue
}
retries = 0
cl.Ctx.Debug("Opened new monitor uplink stream")

// The actual stream
go func() {
for {
select {
case <-ctx.Done():
return
case uplink, ok := <-cl.uplink.ch:
if ok {
stream.Send(uplink)
cl.Ctx.Debug("Sent uplink to monitor")
}
}
}
}()

msg := new(empty.Empty)
for {
if err := stream.RecvMsg(&msg); err != nil {
cl.Ctx.WithError(errors.FromGRPCError(err)).Warn("Received error on monitor uplink stream, closing...")
stream.CloseSend()
cl.Ctx.Debug("Closed monitor uplink stream")

cl.uplink.Lock()
cl.uplink.cancel()
cl.uplink.cancel = nil
cl.uplink.Unlock()

retries++
time.Sleep(backoff.Backoff(retries))

continue newStream
}
}
}
}

func (cl *brokerClient) closeUplink() {
cl.uplink.Lock()
defer cl.uplink.Unlock()
if cl.uplink.cancel != nil {
cl.uplink.cancel()
}
}

// SendUplink sends uplink to the monitor
func (cl *brokerClient) SendUplink(uplink *broker.DeduplicatedUplinkMessage) (err error) {
cl.uplink.init.Do(cl.initUplink)

select {
case cl.uplink.ch <- uplink:
default:
cl.Ctx.Warn("Not sending uplink to monitor, buffer full")
return errors.New("Not sending uplink to monitor, buffer full")
}
return
}
56 changes: 53 additions & 3 deletions api/monitor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/TheThingsNetwork/ttn/api"
"github.com/TheThingsNetwork/ttn/api/broker"
"github.com/TheThingsNetwork/ttn/api/gateway"
"github.com/TheThingsNetwork/ttn/api/router"
"github.com/TheThingsNetwork/ttn/utils/errors"
Expand All @@ -27,8 +28,10 @@ type Client struct {
conn *grpc.ClientConn
addr string

gateways map[string]GatewayClient
mutex sync.RWMutex
gateways map[string]GatewayClient
BrokerClient BrokerClient

mutex sync.RWMutex
}

// NewClient is a wrapper for NewMonitorClient, initializes
Expand Down Expand Up @@ -70,6 +73,11 @@ func (cl *Client) open() (err error) {
}

cl.client = NewMonitorClient(cl.conn)

cl.BrokerClient = &brokerClient{
Ctx: cl.Ctx.WithField("component", "broker"),
client: cl,
}
return nil
}

Expand Down Expand Up @@ -208,7 +216,7 @@ func (cl *gatewayClient) SetToken(token string) {
func (cl *gatewayClient) IsConfigured() bool {
cl.RLock()
defer cl.RUnlock()
return cl.token != "" && cl.token != "token"
return cl.token != ""
}

// Close closes all opened monitor streams for the gateway
Expand All @@ -228,3 +236,45 @@ func (cl *gatewayClient) Context() (monitorContext context.Context) {
"token", cl.token,
))
}

type brokerClient struct {
sync.RWMutex

client *Client

Ctx log.Interface

uplink struct {
init sync.Once
ch chan *broker.DeduplicatedUplinkMessage
cancel func()
sync.Mutex
}

downlink struct {
init sync.Once
ch chan *broker.DownlinkMessage
cancel func()
sync.RWMutex
}
}

// BrokerClient is used as the main client for Brokers to communicate with the monitor
type BrokerClient interface {
SendUplink(msg *broker.DeduplicatedUplinkMessage) (err error)
SendDownlink(msg *broker.DownlinkMessage) (err error)
Close() (err error)
}

// Close closes all opened monitor streams for the broker
func (cl *brokerClient) Close() (err error) {
cl.closeUplink()
cl.closeDownlink()
return err
}

// Context returns monitor connection context for broker
func (cl *brokerClient) Context() (monitorContext context.Context) {
//TODO add auth
return context.Background()
}
60 changes: 60 additions & 0 deletions api/monitor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/TheThingsNetwork/ttn/api"
"github.com/TheThingsNetwork/ttn/api/broker"
"github.com/TheThingsNetwork/ttn/api/gateway"
"github.com/TheThingsNetwork/ttn/api/router"
. "github.com/TheThingsNetwork/ttn/utils/testing"
Expand Down Expand Up @@ -148,4 +149,63 @@ func TestClient(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

{
client, _ := NewClient(ctx, fmt.Sprintf("localhost:%d", port))
defer client.Close()

err := client.BrokerClient.SendUplink(&broker.DeduplicatedUplinkMessage{})
a.So(err, ShouldBeNil)

// The first two messages are OK
for i := 0; i < 2; i++ {
err = client.BrokerClient.SendUplink(&broker.DeduplicatedUplinkMessage{})
a.So(err, ShouldBeNil)
}

// The next one will cause an error on the test server
err = client.BrokerClient.SendUplink(&broker.DeduplicatedUplinkMessage{})
time.Sleep(10 * time.Millisecond)

// Then, we are going to buffer 10 messages locally
for i := 0; i < 10; i++ {
err = client.BrokerClient.SendUplink(&broker.DeduplicatedUplinkMessage{})
a.So(err, ShouldBeNil)
}

// After which messages will get dropped
err = client.BrokerClient.SendUplink(&broker.DeduplicatedUplinkMessage{})
a.So(err, ShouldNotBeNil)

time.Sleep(100 * time.Millisecond)
}

{
client, _ := NewClient(ctx, fmt.Sprintf("localhost:%d", port))
defer client.Close()

err := client.BrokerClient.SendDownlink(&broker.DownlinkMessage{})
a.So(err, ShouldBeNil)

// The first two messages are OK
for i := 0; i < 2; i++ {
err = client.BrokerClient.SendDownlink(&broker.DownlinkMessage{})
a.So(err, ShouldBeNil)
}

// The next one will cause an error on the test server
err = client.BrokerClient.SendDownlink(&broker.DownlinkMessage{})
time.Sleep(10 * time.Millisecond)

// Then, we are going to buffer 10 messages locally
for i := 0; i < 10; i++ {
err = client.BrokerClient.SendDownlink(&broker.DownlinkMessage{})
a.So(err, ShouldBeNil)
}

// After which messages will get dropped
err = client.BrokerClient.SendDownlink(&broker.DownlinkMessage{})
a.So(err, ShouldNotBeNil)

time.Sleep(100 * time.Millisecond)
}
}
Loading

0 comments on commit e17566e

Please sign in to comment.