Skip to content

Commit

Permalink
Improve error reporting for subject transforms in streams (#5617)
Browse files Browse the repository at this point in the history
Improve error reporting for bad subject mapping transform sources and
destinations.

Add checking of subject transforms in stream configuration for mirroring
subject transform and stream subject transform.

Add missing JS API Errors:
JSMirrorInvalidTransformDestination
JSStreamTransformInvalidSource
JSStreamTransformInvalidDestination
  • Loading branch information
derekcollison authored Jul 22, 2024
2 parents 2303592 + 043c614 commit 5a0debd
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 53 deletions.
2 changes: 1 addition & 1 deletion server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
if tw[d.Cluster] > 100 {
return fmt.Errorf("total weight needs to be <= 100")
}
err := ValidateMappingDestination(d.Subject)
err := ValidateMapping(src, d.Subject)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ var (
ErrInvalidMappingDestination = errors.New("invalid mapping destination")

// ErrInvalidMappingDestinationSubject is used to error on a bad transform destination mapping
ErrInvalidMappingDestinationSubject = fmt.Errorf("%w: invalid subject", ErrInvalidMappingDestination)
ErrInvalidMappingDestinationSubject = fmt.Errorf("%w: invalid transform", ErrInvalidMappingDestination)

// ErrMappingDestinationNotUsingAllWildcards is used to error on a transform destination not using all of the token wildcards
ErrMappingDestinationNotUsingAllWildcards = fmt.Errorf("%w: not using all of the token wildcard(s)", ErrInvalidMappingDestination)
Expand Down
36 changes: 33 additions & 3 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@
"constant": "JSSourceInvalidSubjectFilter",
"code": 400,
"error_code": 10145,
"description": "source subject filter is invalid",
"description": "source transform source: {err}",
"comment": "",
"help": "",
"url": "",
Expand All @@ -1443,7 +1443,7 @@
"constant": "JSSourceInvalidTransformDestination",
"code": 400,
"error_code": 10146,
"description": "source transform destination is invalid",
"description": "source transform: {err}",
"comment": "",
"help": "",
"url": "",
Expand Down Expand Up @@ -1493,7 +1493,7 @@
"constant": "JSMirrorInvalidSubjectFilter",
"code": 400,
"error_code": 10151,
"description": "mirror subject filter is invalid",
"description": "mirror transform source: {err}",
"comment": "",
"help": "",
"url": "",
Expand All @@ -1518,5 +1518,35 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorInvalidTransformDestination",
"code": 400,
"error_code": 10154,
"description": "mirror transform: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamTransformInvalidSource",
"code": 400,
"error_code": 10155,
"description": "stream transform source: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamTransformInvalidDestination",
"code": 400,
"error_code": 10156,
"description": "stream transform: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
108 changes: 93 additions & 15 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,12 @@ const (
// JSMirrorInvalidStreamName mirrored stream name is invalid
JSMirrorInvalidStreamName ErrorIdentifier = 10142

// JSMirrorInvalidSubjectFilter mirror subject filter is invalid
// JSMirrorInvalidSubjectFilter mirror transform source: {err}
JSMirrorInvalidSubjectFilter ErrorIdentifier = 10151

// JSMirrorInvalidTransformDestination mirror transform: {err}
JSMirrorInvalidTransformDestination ErrorIdentifier = 10154

// JSMirrorMaxMessageSizeTooBigErr stream mirror must have max message size >= source
JSMirrorMaxMessageSizeTooBigErr ErrorIdentifier = 10030

Expand Down Expand Up @@ -308,10 +311,10 @@ const (
// JSSourceInvalidStreamName sourced stream name is invalid
JSSourceInvalidStreamName ErrorIdentifier = 10141

// JSSourceInvalidSubjectFilter source subject filter is invalid
// JSSourceInvalidSubjectFilter source transform source: {err}
JSSourceInvalidSubjectFilter ErrorIdentifier = 10145

// JSSourceInvalidTransformDestination source transform destination is invalid
// JSSourceInvalidTransformDestination source transform: {err}
JSSourceInvalidTransformDestination ErrorIdentifier = 10146

// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
Expand Down Expand Up @@ -446,6 +449,12 @@ const (
// JSStreamTemplateNotFoundErr template not found
JSStreamTemplateNotFoundErr ErrorIdentifier = 10068

// JSStreamTransformInvalidDestination stream transform: {err}
JSStreamTransformInvalidDestination ErrorIdentifier = 10156

// JSStreamTransformInvalidSource stream transform source: {err}
JSStreamTransformInvalidSource ErrorIdentifier = 10155

// JSStreamUpdateErrF Generic stream update error string ({err})
JSStreamUpdateErrF ErrorIdentifier = 10069

Expand Down Expand Up @@ -541,7 +550,8 @@ var (
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror subject filter is invalid"},
JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror transform source: {err}"},
JSMirrorInvalidTransformDestination: {Code: 400, ErrCode: 10154, Description: "mirror transform: {err}"},
JSMirrorMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10030, Description: "stream mirror must have max message size >= source"},
JSMirrorMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10150, Description: "mirror with multiple subject transforms cannot also have a single subject filter"},
JSMirrorOverlappingSubjectFilters: {Code: 400, ErrCode: 10152, Description: "mirror subject filters can not overlap"},
Expand All @@ -565,8 +575,8 @@ var (
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceInvalidStreamName: {Code: 400, ErrCode: 10141, Description: "sourced stream name is invalid"},
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source subject filter is invalid"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform destination is invalid"},
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source transform source: {err}"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform: {err}"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject transforms cannot also have a single subject filter"},
JSSourceOverlappingSubjectFilters: {Code: 400, ErrCode: 10147, Description: "source filters can not overlap"},
Expand Down Expand Up @@ -611,6 +621,8 @@ var (
JSStreamTemplateCreateErrF: {Code: 500, ErrCode: 10066, Description: "{err}"},
JSStreamTemplateDeleteErrF: {Code: 500, ErrCode: 10067, Description: "{err}"},
JSStreamTemplateNotFoundErr: {Code: 404, ErrCode: 10068, Description: "template not found"},
JSStreamTransformInvalidDestination: {Code: 400, ErrCode: 10156, Description: "stream transform: {err}"},
JSStreamTransformInvalidSource: {Code: 400, ErrCode: 10155, Description: "stream transform source: {err}"},
JSStreamUpdateErrF: {Code: 500, ErrCode: 10069, Description: "{err}"},
JSStreamWrongLastMsgIDErrF: {Code: 400, ErrCode: 10070, Description: "wrong last msg ID: {id}"},
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"},
Expand Down Expand Up @@ -1483,14 +1495,36 @@ func NewJSMirrorInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMirrorInvalidStreamName]
}

// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror subject filter is invalid"
func NewJSMirrorInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror transform source: {err}"
func NewJSMirrorInvalidSubjectFilterError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSMirrorInvalidSubjectFilter]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSMirrorInvalidTransformDestinationError creates a new JSMirrorInvalidTransformDestination error: "mirror transform: {err}"
func NewJSMirrorInvalidTransformDestinationError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorInvalidSubjectFilter]
e := ApiErrors[JSMirrorInvalidTransformDestination]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSMirrorMaxMessageSizeTooBigError creates a new JSMirrorMaxMessageSizeTooBigErr error: "stream mirror must have max message size >= source"
Expand Down Expand Up @@ -1747,24 +1781,36 @@ func NewJSSourceInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceInvalidStreamName]
}

// NewJSSourceInvalidSubjectFilterError creates a new JSSourceInvalidSubjectFilter error: "source subject filter is invalid"
func NewJSSourceInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
// NewJSSourceInvalidSubjectFilterError creates a new JSSourceInvalidSubjectFilter error: "source transform source: {err}"
func NewJSSourceInvalidSubjectFilterError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceInvalidSubjectFilter]
e := ApiErrors[JSSourceInvalidSubjectFilter]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSSourceInvalidTransformDestinationError creates a new JSSourceInvalidTransformDestination error: "source transform destination is invalid"
func NewJSSourceInvalidTransformDestinationError(opts ...ErrorOption) *ApiError {
// NewJSSourceInvalidTransformDestinationError creates a new JSSourceInvalidTransformDestination error: "source transform: {err}"
func NewJSSourceInvalidTransformDestinationError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceInvalidTransformDestination]
e := ApiErrors[JSSourceInvalidTransformDestination]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
Expand Down Expand Up @@ -2315,6 +2361,38 @@ func NewJSStreamTemplateNotFoundError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamTemplateNotFoundErr]
}

// NewJSStreamTransformInvalidDestinationError creates a new JSStreamTransformInvalidDestination error: "stream transform: {err}"
func NewJSStreamTransformInvalidDestinationError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSStreamTransformInvalidDestination]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSStreamTransformInvalidSourceError creates a new JSStreamTransformInvalidSource error: "stream transform source: {err}"
func NewJSStreamTransformInvalidSourceError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSStreamTransformInvalidSource]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSStreamUpdateError creates a new JSStreamUpdateErrF error: "{err}"
func NewJSStreamUpdateError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
65 changes: 63 additions & 2 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11577,7 +11577,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "*", Destination: "{{wildcard(2)}}"}}},
}, ApiErrors[JSStreamCreateErrF].ErrCode)
}, ApiErrors[JSMirrorInvalidTransformDestination].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Expand Down Expand Up @@ -23760,8 +23760,69 @@ func TestJetStreamBadSubjectMappingStream(t *testing.T) {
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: source transform: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}")))

require_Error(t, err, NewJSStreamUpdateError(errors.New("unable to get subject transform for source: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}")))
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
Sources: []*nats.StreamSource{
{
Name: "mapping",
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: "events.>.*",
Destination: "events.{{split(1,1)}}",
},
},
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: source transform source: invalid subject events.>.*")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
Mirror: &nats.StreamSource{
Name: "mapping",
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: "events.*",
Destination: "events.{{split(3,1)}}",
},
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: mirror transform: invalid mapping destination: wildcard index out of range in {{split(3,1)}}: [3]")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
Mirror: &nats.StreamSource{
Name: "mapping",
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: "events.>.*",
Destination: "events.{{split(1,1)}}",
},
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: mirror transform source: invalid subject events.>.*")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
SubjectTransform: &nats.SubjectTransformConfig{
Source: "events.*",
Destination: "events.{{split(3,1)}}",
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: stream transform: invalid mapping destination: wildcard index out of range in {{split(3,1)}}: [3]")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
SubjectTransform: &nats.SubjectTransformConfig{
Source: "events.>.*",
Destination: "events.{{split(1,1)}}",
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: stream transform source: invalid subject events.>.*")))
}

func TestJetStreamConsumerInfoNumPending(t *testing.T) {
Expand Down
Loading

0 comments on commit 5a0debd

Please sign in to comment.