diff --git a/api/broker/client_streams.go b/api/broker/client_streams.go index fba01e29f..dc9272924 100644 --- a/api/broker/client_streams.go +++ b/api/broker/client_streams.go @@ -84,6 +84,13 @@ func NewMonitoredRouterStream(client BrokerClient, getContextFunc func() context message, err := client.Recv() if message != nil { s.ctx.Debug("Receiving Downlink message") + if err := message.Validate(); err != nil { + s.ctx.WithError(err).Warn("Invalid Downlink") + continue + } + if err := message.UnmarshalPayload(); err != nil { + s.ctx.Warn("Could not unmarshal Downlink payload") + } select { case s.down <- message: default: @@ -345,6 +352,13 @@ func NewMonitoredHandlerSubscribeStream(client BrokerClient, getContextFunc func message, err = client.Recv() if message != nil { s.ctx.Debug("Receiving Uplink message") + if err := message.Validate(); err != nil { + s.ctx.WithError(err).Warn("Invalid Uplink") + continue + } + if err := message.UnmarshalPayload(); err != nil { + s.ctx.Warn("Could not unmarshal Uplink payload") + } select { case s.ch <- message: default: diff --git a/api/broker/server_streams.go b/api/broker/server_streams.go index da81b4b8c..55ece9fe1 100644 --- a/api/broker/server_streams.go +++ b/api/broker/server_streams.go @@ -8,7 +8,6 @@ import ( "github.com/TheThingsNetwork/go-utils/log" "github.com/TheThingsNetwork/ttn/api" - "github.com/TheThingsNetwork/ttn/utils/errors" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/metadata" ) @@ -70,7 +69,11 @@ func (s *BrokerStreamServer) Associate(stream Broker_AssociateServer) (err error return err } if err := uplink.Validate(); err != nil { - return errors.Wrap(err, "Invalid Uplink") + s.ctx.WithError(err).Warn("Invalid Uplink") + continue + } + if err := uplink.UnmarshalPayload(); err != nil { + s.ctx.Warn("Could not unmarshal uplink payload") } upChan <- uplink } @@ -139,7 +142,11 @@ func (s *BrokerStreamServer) Publish(stream Broker_PublishServer) error { return err } if err := downlink.Validate(); err != nil { - return errors.Wrap(err, "Invalid Downlink") + s.ctx.WithError(err).Warn("Invalid Downlink") + continue + } + if err := downlink.UnmarshalPayload(); err != nil { + s.ctx.Warn("Could not unmarshal downlink payload") } ch <- downlink } diff --git a/api/message_marshaling.go b/api/message_marshaling.go new file mode 100644 index 000000000..95e651872 --- /dev/null +++ b/api/message_marshaling.go @@ -0,0 +1,9 @@ +// Copyright © 2017 The Things Network +// Use of this source code is governed by the MIT license that can be found in the LICENSE file. + +package api + +// PayloadUnmarshaler unmarshals the Payload to a Message +type PayloadUnmarshaler interface { + UnmarshalPayload() error +} diff --git a/api/router/client_streams.go b/api/router/client_streams.go index 1b8b1b620..e417ccc7b 100644 --- a/api/router/client_streams.go +++ b/api/router/client_streams.go @@ -333,6 +333,13 @@ func NewMonitoredDownlinkStream(client RouterClientForGateway) DownlinkStream { message, err = client.Recv() if message != nil { s.ctx.Debug("Receiving Downlink message") + if err := message.Validate(); err != nil { + s.ctx.WithError(err).Warn("Invalid Downlink") + continue + } + if err := message.UnmarshalPayload(); err != nil { + s.ctx.Warn("Could not unmarshal Downlink payload") + } select { case s.ch <- message: default: diff --git a/api/router/server_streams.go b/api/router/server_streams.go index 67794ee27..2f6b8ec2c 100644 --- a/api/router/server_streams.go +++ b/api/router/server_streams.go @@ -61,7 +61,11 @@ func (s *RouterStreamServer) Uplink(stream Router_UplinkServer) (err error) { return err } if err := uplink.Validate(); err != nil { - return errors.Wrap(err, "Invalid Uplink") + s.ctx.WithError(err).Warn("Invalid Uplink") + continue + } + if err := uplink.UnmarshalPayload(); err != nil { + s.ctx.Warn("Could not unmarshal Uplink payload") } ch <- uplink }