Skip to content

Commit

Permalink
Merge pull request #169 from Shopify/v15.0.3-shopify-12-candidate
Browse files Browse the repository at this point in the history
Backport: VStreamer: improve representation of integers in json data types (2b43fd7)
  • Loading branch information
brendar authored May 22, 2024
2 parents 84ea974 + d0b5367 commit e45c636
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 79 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ require (
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.8.1
github.com/spyzhov/ajson v0.4.2
github.com/stretchr/testify v1.7.1
github.com/tchap/go-patricia v2.2.6+incompatible
github.com/tebeka/selenium v0.9.9
Expand Down Expand Up @@ -118,6 +117,7 @@ require (
require (
github.com/bndr/gotabulate v1.1.2
github.com/hashicorp/go-version v1.6.0
github.com/spyzhov/ajson v0.8.0
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb
modernc.org/sqlite v1.20.3
)
Expand Down Expand Up @@ -202,3 +202,5 @@ require (
modernc.org/token v1.0.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.3 // indirect
)

replace github.com/spyzhov/ajson v0.8.0 => github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c h1:Y/4qcogoZA2WUtLWMk/yXfJSpaIG3mK3r9Lw4kaARL4=
github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down Expand Up @@ -695,8 +697,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/spyzhov/ajson v0.4.2 h1:JMByd/jZApPKDvNsmO90X2WWGbmT2ahDFp73QhZbg3s=
github.com/spyzhov/ajson v0.4.2/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
Expand Down
196 changes: 147 additions & 49 deletions go/mysql/binlog_event_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (jh *BinlogJSON) register(typ jsonDataType, Plugin jsonPlugin) {
func (jh *BinlogJSON) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
Plugin := jh.plugins[typ]
if Plugin == nil {
return nil, fmt.Errorf("Plugin not found for type %d", typ)
return nil, fmt.Errorf("plugin not found for type %d", typ)
}
return Plugin.getNode(typ, data, pos)
}
Expand Down Expand Up @@ -316,59 +316,157 @@ type intPlugin struct {

var _ jsonPlugin = (*intPlugin)(nil)

func (ih intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
func (ipl intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value int64) {
var val uint64
var val2 float64
size := ih.sizes[typ]
var val2 int64
size := ipl.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonInt16:
val2 = float64(int16(val))
case jsonUint16:
val2 = float64(uint16(val))
val2 = int64(int16(val))
case jsonInt32:
val2 = float64(int32(val))
case jsonUint32:
val2 = float64(uint32(val))
val2 = int64(int32(val))
case jsonInt64:
val2 = float64(int64(val))
case jsonUint64:
val2 = float64(val)
case jsonDouble:
val2 = math.Float64frombits(val)
val2 = int64(val)
}
return val2
}

func (ih intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := ih.getVal(typ, data, pos)
node = ajson.NumericNode("", val)
func (ipl intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := ipl.getVal(typ, data, pos)
node = ajson.IntegerNode("", val)
return node, nil
}

func newIntPlugin() *intPlugin {
ih := &intPlugin{
ipl := &intPlugin{
info: &jsonPluginInfo{
name: "Int",
types: []jsonDataType{jsonInt64, jsonInt32, jsonInt16, jsonUint16, jsonUint32, jsonUint64, jsonDouble},
types: []jsonDataType{jsonInt64, jsonInt32, jsonInt16},
},
sizes: make(map[jsonDataType]int),
}
ipl.sizes = map[jsonDataType]int{
jsonInt64: 8,
jsonInt32: 4,
jsonInt16: 2,
}
for _, typ := range ipl.info.types {
binlogJSON.register(typ, ipl)
}
return ipl
}

//endregion

//region uint plugin

func init() {
newUintPlugin()
}

type uintPlugin struct {
info *jsonPluginInfo
sizes map[jsonDataType]int
}

var _ jsonPlugin = (*uintPlugin)(nil)

func (upl uintPlugin) getVal(typ jsonDataType, data []byte, pos int) (value uint64) {
var val uint64
var val2 uint64
size := upl.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonUint16:
val2 = uint64(uint16(val))
case jsonUint32:
val2 = uint64(uint32(val))
case jsonUint64:
val2 = val
}
return val2
}

func (upl uintPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := upl.getVal(typ, data, pos)
node = ajson.UnsignedIntegerNode("", val)
return node, nil
}

func newUintPlugin() *uintPlugin {
upl := &uintPlugin{
info: &jsonPluginInfo{
name: "Uint",
types: []jsonDataType{jsonUint16, jsonUint32, jsonUint64},
},
sizes: make(map[jsonDataType]int),
}
ih.sizes = map[jsonDataType]int{
upl.sizes = map[jsonDataType]int{
jsonUint64: 8,
jsonInt64: 8,
jsonUint32: 4,
jsonInt32: 4,
jsonUint16: 2,
jsonInt16: 2,
}
for _, typ := range upl.info.types {
binlogJSON.register(typ, upl)
}
return upl
}

//endregion

//region float plugin

func init() {
newFloatPlugin()
}

type floatPlugin struct {
info *jsonPluginInfo
sizes map[jsonDataType]int
}

var _ jsonPlugin = (*floatPlugin)(nil)

func (flp floatPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
var val uint64
var val2 float64
size := flp.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonDouble:
val2 = math.Float64frombits(val)
}
return val2
}

func (flp floatPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := flp.getVal(typ, data, pos)
node = ajson.NumericNode("", val)
return node, nil
}

func newFloatPlugin() *floatPlugin {
fp := &floatPlugin{
info: &jsonPluginInfo{
name: "Float",
types: []jsonDataType{jsonDouble},
},
sizes: make(map[jsonDataType]int),
}
fp.sizes = map[jsonDataType]int{
jsonDouble: 8,
}
for _, typ := range ih.info.types {
binlogJSON.register(typ, ih)
for _, typ := range fp.info.types {
binlogJSON.register(typ, fp)
}
return ih
return fp
}

//endregion
Expand All @@ -385,7 +483,7 @@ type literalPlugin struct {

var _ jsonPlugin = (*literalPlugin)(nil)

func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (lpl literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := jsonDataLiteral(data[pos])
switch val {
case jsonNullLiteral:
Expand All @@ -401,14 +499,14 @@ func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *a
}

func newLiteralPlugin() *literalPlugin {
lh := &literalPlugin{
lpl := &literalPlugin{
info: &jsonPluginInfo{
name: "Literal",
types: []jsonDataType{jsonLiteral},
},
}
binlogJSON.register(jsonLiteral, lh)
return lh
binlogJSON.register(jsonLiteral, lpl)
return lpl
}

//endregion
Expand All @@ -427,7 +525,7 @@ var _ jsonPlugin = (*opaquePlugin)(nil)

// other types are stored as catch-all opaque types: documentation on these is scarce.
// we currently know about (and support) date/time/datetime/decimal.
func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (opl opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
dataType := data[pos]
start := 3 // account for length of stored value
end := start + 8 // all currently supported opaque data types are 8 bytes in size
Expand Down Expand Up @@ -484,14 +582,14 @@ func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
}

func newOpaquePlugin() *opaquePlugin {
oh := &opaquePlugin{
opl := &opaquePlugin{
info: &jsonPluginInfo{
name: "Opaque",
types: []jsonDataType{jsonOpaque},
},
}
binlogJSON.register(jsonOpaque, oh)
return oh
binlogJSON.register(jsonOpaque, opl)
return opl
}

//endregion
Expand All @@ -508,22 +606,22 @@ type stringPlugin struct {

var _ jsonPlugin = (*stringPlugin)(nil)

func (sh stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (spl stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
size, pos := readVariableLength(data, pos)
node = ajson.StringNode("", string(data[pos:pos+size]))

return node, nil
}

func newStringPlugin() *stringPlugin {
sh := &stringPlugin{
spl := &stringPlugin{
info: &jsonPluginInfo{
name: "String",
types: []jsonDataType{jsonString},
},
}
binlogJSON.register(jsonString, sh)
return sh
binlogJSON.register(jsonString, spl)
return spl
}

//endregion
Expand All @@ -542,7 +640,7 @@ var _ jsonPlugin = (*arrayPlugin)(nil)

// arrays are stored thus:
// | type_identifier(one of [2,3]) | elem count | obj size | list of offsets+lengths of values | actual values |
func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (apl arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
jlog("JSON Array %s, len %d", jsonDataTypeToString(uint(typ)), len(data))
var nodes []*ajson.Node
var elem *ajson.Node
Expand All @@ -565,15 +663,15 @@ func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajs
}

func newArrayPlugin() *arrayPlugin {
ah := &arrayPlugin{
apl := &arrayPlugin{
info: &jsonPluginInfo{
name: "Array",
types: []jsonDataType{jsonSmallArray, jsonLargeArray},
},
}
binlogJSON.register(jsonSmallArray, ah)
binlogJSON.register(jsonLargeArray, ah)
return ah
binlogJSON.register(jsonSmallArray, apl)
binlogJSON.register(jsonLargeArray, apl)
return apl
}

//endregion
Expand All @@ -592,7 +690,7 @@ var _ jsonPlugin = (*objectPlugin)(nil)

// objects are stored thus:
// | type_identifier(0/1) | elem count | obj size | list of offsets+lengths of keys | list of offsets+lengths of values | actual keys | actual values |
func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (opl objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
jlog("JSON Type is %s, len %d", jsonDataTypeToString(uint(typ)), len(data))

// "large" decides number of bytes used to specify element count and total object size: 4 bytes for large, 2 for small
Expand Down Expand Up @@ -640,15 +738,15 @@ func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
}

func newObjectPlugin() *objectPlugin {
oh := &objectPlugin{
opl := &objectPlugin{
info: &jsonPluginInfo{
name: "Object",
types: []jsonDataType{jsonSmallObject, jsonLargeObject},
},
}
binlogJSON.register(jsonSmallObject, oh)
binlogJSON.register(jsonLargeObject, oh)
return oh
binlogJSON.register(jsonSmallObject, opl)
binlogJSON.register(jsonLargeObject, opl)
return opl
}

//endregion
Loading

0 comments on commit e45c636

Please sign in to comment.