Skip to content
This repository has been archived by the owner on Dec 14, 2021. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
htdvisser committed Feb 8, 2017
2 parents 8ed4769 + 275ce71 commit 3c8b184
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 43 deletions.
29 changes: 27 additions & 2 deletions amqp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type DefaultClient struct {
url string
ctx log.Interface
conn *AMQP.Connection
mutex *sync.Mutex
mutex sync.Mutex
channels map[*DefaultChannelClient]*AMQP.Channel
}

Expand All @@ -38,11 +38,19 @@ type ChannelClient interface {
io.Closer
}

// ChannelClientUser is a user of a channel, e.g. a publisher or consumer
type channelClientUser interface {
use(*AMQP.Channel) error
close()
}

// DefaultChannelClient represents the default client of an AMQP channel
type DefaultChannelClient struct {
ctx log.Interface
client *DefaultClient
channel *AMQP.Channel
usersMutex sync.RWMutex
users []channelClientUser
name string
exchange string
exchangeType string
Expand Down Expand Up @@ -71,7 +79,6 @@ func NewClient(ctx log.Interface, username, password, host string) Client {
return &DefaultClient{
ctx: ctx,
url: fmt.Sprintf("amqp://%s@%s", credentials, host),
mutex: &sync.Mutex{},
channels: make(map[*DefaultChannelClient]*AMQP.Channel),
}
}
Expand Down Expand Up @@ -118,6 +125,13 @@ func (c *DefaultClient) connect(reconnect bool) (chan *AMQP.Error, error) {
}
c.ctx.Infof("Reopened channel %s for %s", user.name, user.exchange)
user.channel = channel
user.usersMutex.RLock()
defer user.usersMutex.RUnlock()
for _, channelUser := range user.users {
if err := channelUser.use(channel); err != nil {
c.ctx.WithError(err).Warnf("Failed to use channel (%s)", err)
}
}
c.channels[user] = channel
}
}
Expand Down Expand Up @@ -200,5 +214,16 @@ func (p *DefaultChannelClient) Open() error {

// Close closes the channel
func (p *DefaultChannelClient) Close() error {
p.usersMutex.RLock()
defer p.usersMutex.RUnlock()
for _, user := range p.users {
user.close()
}
return p.client.closeChannel(p)
}

func (p *DefaultChannelClient) addUser(u channelClientUser) {
p.usersMutex.Lock()
defer p.usersMutex.Unlock()
p.users = append(p.users, u)
}
44 changes: 31 additions & 13 deletions amqp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/TheThingsNetwork/ttn/core/types"
. "github.com/smartystreets/assertions"
AMQP "github.com/streadway/amqp"
)
Expand Down Expand Up @@ -85,27 +86,44 @@ func TestReopenChannelClient(t *testing.T) {
a.So(err, ShouldBeNil)
defer c.Disconnect()

p := &DefaultChannelClient{
ctx: ctx,
client: c,
}
err = p.Open()
publisher := c.NewPublisher("amq.topic")
err = publisher.Open()
a.So(err, ShouldBeNil)
defer p.Close()
defer publisher.Close()

test := func() error {
subscriber := c.NewSubscriber("amq.topic", "", false, false)
err = subscriber.Open()
a.So(err, ShouldBeNil)
defer subscriber.Close()

downs := make(chan types.DownlinkMessage, 1)
err = subscriber.SubscribeDownlink(func(_ Subscriber, appID string, _ string, msg types.DownlinkMessage) {
a.So(appID, ShouldEqual, "app")
ctx.Debugf("Got downlink message")
downs <- msg
})
a.So(err, ShouldBeNil)

test := func() {
ctx.Debug("Testing publish")
return p.channel.Publish("", "test", false, false, AMQP.Publishing{
Body: []byte("test"),
err := publisher.PublishDownlink(types.DownlinkMessage{
AppID: "app",
})
a.So(err, ShouldBeNil)
select {
case <-downs:
case <-time.After(100 * time.Millisecond):
panic("Published message didn't come in in time")
}
return
}

// First attempt should be OK
err = test()
a.So(err, ShouldBeNil)
test()

// Make sure that the old channel is closed
p.channel.Close()
publisher.(*DefaultPublisher).channel.Close()
subscriber.(*DefaultSubscriber).channel.Close()

// Simulate a connection close so a new channel should be opened
closed <- AMQP.ErrClosed
Expand All @@ -114,6 +132,6 @@ func TestReopenChannelClient(t *testing.T) {
time.Sleep(100 * time.Millisecond)

// Second attempt should be OK as well and will only work on a new channel
err = test()
test()
a.So(err, ShouldBeNil)
}
42 changes: 38 additions & 4 deletions amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,46 @@ func (s *DefaultSubscriber) QueueUnbind(name, key string) error {
return nil
}

func (s *DefaultSubscriber) consume(queue string) (<-chan AMQP.Delivery, error) {
err := s.channel.Qos(PrefetchCount, PrefetchSize, false)
type consumer struct {
queue string
deliveries chan AMQP.Delivery
}

func (c *consumer) use(channel *AMQP.Channel) error {
err := channel.Qos(PrefetchCount, PrefetchSize, false)
if err != nil {
return fmt.Errorf("Failed to set channel QoS (%s)", err)
}
deliveries, err := channel.Consume(c.queue, "", false, false, false, false, nil)
if err != nil {
return nil, fmt.Errorf("Failed to set channel QoS (%s)", err)
return err
}
go func() {
for delivery := range deliveries {
c.deliveries <- delivery
}
}()
return nil
}

func (c *consumer) close() {
if c.deliveries != nil {
close(c.deliveries)
c.deliveries = nil
}
}

func (s *DefaultSubscriber) consume(queue string) (<-chan AMQP.Delivery, error) {
deliveries := make(chan AMQP.Delivery)
c := &consumer{
queue: queue,
deliveries: deliveries,
}
if err := c.use(s.channel); err != nil {
return nil, err
}
return s.channel.Consume(queue, "", false, false, false, false, nil)
s.addUser(c)
return deliveries, nil
}

func (s *DefaultSubscriber) subscribe(key string) (<-chan AMQP.Delivery, error) {
Expand Down
21 changes: 16 additions & 5 deletions core/handler/convert_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.Deduplicate

fields, valid, err := functions.Process(appUp.PayloadRaw, appUp.FPort)
if err != nil {
return nil // Do not set fields if processing failed

// Emit the error
h.mqttEvent <- &types.DeviceEvent{
AppID: appUp.AppID,
DevID: appUp.DevID,
Event: types.UplinkErrorEvent,
Data: types.ErrorEventData{Error: err.Error()},
}

// Do not set fields if processing failed, but allow the handler to continue processing
// without payload functions
return nil
}

if !valid {
Expand Down Expand Up @@ -79,7 +90,7 @@ func (f *UplinkFunctions) Decode(payload []byte, port uint8) (map[string]interfa
Decoder(payload.slice(0), port);
`, f.Decoder)

value, err := functions.RunCode("decoder", code, env, timeOut, f.Logger)
value, err := functions.RunCode("Decoder", code, env, timeOut, f.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +125,7 @@ func (f *UplinkFunctions) Convert(fields map[string]interface{}, port uint8) (ma
Converter(fields, port)
`, f.Converter)

value, err := functions.RunCode("converter", code, env, timeOut, f.Logger)
value, err := functions.RunCode("Converter", code, env, timeOut, f.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -148,7 +159,7 @@ func (f *UplinkFunctions) Validate(fields map[string]interface{}, port uint8) (b
Validator(fields, port)
`, f.Validator)

value, err := functions.RunCode("valdator", code, env, timeOut, f.Logger)
value, err := functions.RunCode("Validator", code, env, timeOut, f.Logger)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -202,7 +213,7 @@ func (f *DownlinkFunctions) Encode(payload map[string]interface{}, port uint8) (
Encoder(payload, port)
`, f.Encoder)

value, err := functions.RunCode("encoder", code, env, timeOut, f.Logger)
value, err := functions.RunCode("Encoder", code, env, timeOut, f.Logger)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion core/handler/convert_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestConvertFieldsUp(t *testing.T) {

h := &handler{
applications: application.NewRedisApplicationStore(GetRedisClient(), "handler-test-convert-fields-up"),
mqttEvent: make(chan *types.DeviceEvent, 1),
}

// No functions
Expand Down Expand Up @@ -72,12 +73,18 @@ func TestConvertFieldsUp(t *testing.T) {

// Function error
app.StartUpdate()
app.Validator = `function Validator (data) { throw "expected"; }`
app.Validator = `function Validator (data) { throw new Error("expected"); }`
h.applications.Set(app)
ttnUp, appUp = buildConversionUplink(appID)
err = h.ConvertFieldsUp(GetLogger(t, "TestConvertFieldsUp"), ttnUp, appUp, nil)
a.So(err, ShouldBeNil)
a.So(appUp.PayloadFields, ShouldBeEmpty)

a.So(len(h.mqttEvent), ShouldEqual, 1)
evt := <-h.mqttEvent
data, ok := evt.Data.(types.ErrorEventData)
a.So(ok, ShouldBeTrue)
fmt.Println(data.Error)
}

func TestDecode(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions core/handler/dry_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ func TestDryUplinkFields(t *testing.T) {
a.So(res.Valid, ShouldBeTrue)
a.So(res.Logs, ShouldResemble, []*pb.LogEntry{
&pb.LogEntry{
Function: "decoder",
Function: "Decoder",
Fields: []string{`"hi"`, "11"},
},
&pb.LogEntry{
Function: "converter",
Function: "Converter",
Fields: []string{`"foo"`},
},
})
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestDryDownlinkFields(t *testing.T) {
a.So(res.Payload, ShouldResemble, []byte{1, 2, 3})
a.So(res.Logs, ShouldResemble, []*pb.LogEntry{
&pb.LogEntry{
Function: "encoder",
Function: "Encoder",
Fields: []string{`"hello"`, `{"foo":33}`},
},
})
Expand Down Expand Up @@ -251,11 +251,11 @@ func TestLogs(t *testing.T) {
a.So(err, ShouldBeNil)
a.So(res.Logs, ShouldResemble, []*pb.LogEntry{
&pb.LogEntry{
Function: "encoder",
Function: "Encoder",
Fields: []string{`"foo"`, "1", `"bar"`, `"1970-01-01T00:00:00.000Z"`},
},
&pb.LogEntry{
Function: "encoder",
Function: "Encoder",
Fields: []string{"1", `{"baa":"foo","bal":{"bar":10},"baz":10}`},
},
})
Expand Down
11 changes: 8 additions & 3 deletions core/handler/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func RunCode(name, code string, env map[string]interface{}, timeout time.Duratio
if caught := recover(); caught != nil {
val = otto.Value{}
if caught == errTimeOutExceeded {
err = errors.NewErrInternal(fmt.Sprintf("Interrupted javascript execution after %v", duration))
err = errors.NewErrInternal(fmt.Sprintf("Interrupted javascript execution for %s after %v", name, duration))
return
} else {
err = errors.NewErrInternal(fmt.Sprintf("Fatal error in payload function: %s", caught))
err = errors.NewErrInternal(fmt.Sprintf("Fatal error in %s: %s", name, caught))
}
}
}()
Expand All @@ -56,5 +56,10 @@ func RunCode(name, code string, env map[string]interface{}, timeout time.Duratio
}
}()

return vm.Run(code)
val, err = vm.Run(code)
if err != nil {
return val, errors.NewErrInternal(fmt.Sprintf("%s threw error: %s", name, err))
}

return val, nil
}
17 changes: 17 additions & 0 deletions core/handler/functions/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ func TestRunCode(t *testing.T) {
})
}

func TestRunCodeThrow(t *testing.T) {
a := New(t)

logger := NewEntryLogger()
env := map[string]interface{}{}

code := `
(function () {
throw new Error("This is an error")
return 10
})()
`

_, err := RunCode("test", code, env, time.Second, logger)
a.So(err, ShouldNotBeNil)
}

var result string

func BenchmarkJSON(b *testing.B) {
Expand Down
2 changes: 2 additions & 0 deletions core/networkserver/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pb_broker "github.com/TheThingsNetwork/ttn/api/broker"
pb_handler "github.com/TheThingsNetwork/ttn/api/handler"
"github.com/TheThingsNetwork/ttn/api/trace"
"github.com/TheThingsNetwork/ttn/core/networkserver/device"
"github.com/TheThingsNetwork/ttn/core/types"
"github.com/TheThingsNetwork/ttn/utils/errors"
"github.com/TheThingsNetwork/ttn/utils/random"
Expand Down Expand Up @@ -132,6 +133,7 @@ func (n *networkServer) HandleActivate(activation *pb_handler.DeviceActivationRe
dev.NwkSKey = *lorawan.NwkSKey
dev.FCntUp = 0
dev.FCntDown = 0
dev.ADR = device.ADRSettings{Band: dev.ADR.Band, Margin: dev.ADR.Margin}

if band := meta.GetLorawan().GetRegion().String(); band != "" {
dev.ADR.Band = band
Expand Down
Loading

0 comments on commit 3c8b184

Please sign in to comment.