Skip to content

Commit

Permalink
Merge branch 'feature/cli' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jubeless committed Mar 15, 2023
2 parents 335aada + f4eabf8 commit 211948a
Show file tree
Hide file tree
Showing 14 changed files with 658 additions and 20 deletions.
22 changes: 22 additions & 0 deletions cmd/kvdb/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"fmt"

"github.com/spf13/viper"
"github.com/streamingfast/kvdb/store"
"go.uber.org/zap"
)

func getKV() (store.KVStore, error) {
dsn := viper.GetString("global-dsn")
if dsn == "" {
return nil, fmt.Errorf("dsn is required")
}
zlog.Info("setting up store", zap.String("dsn", dsn))
s, err := store.New(dsn)
if err != nil {
return nil, fmt.Errorf("create store: %w", err)
}
return s, nil
}
8 changes: 8 additions & 0 deletions cmd/kvdb/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package main

type Decoder struct {
}

func NewDecoder() {

}
10 changes: 10 additions & 0 deletions cmd/kvdb/decoder/ascii.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package decoder

var _ Decode = (*AsciiDecoder)(nil)

type AsciiDecoder struct {
}

func (a *AsciiDecoder) Decode(data []byte) string {
return string(data)
}
12 changes: 12 additions & 0 deletions cmd/kvdb/decoder/hex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package decoder

import "encoding/hex"

var _ Decode = (*HexDecoder)(nil)

type HexDecoder struct {
}

func (h *HexDecoder) Decode(data []byte) string {
return hex.EncodeToString(data)
}
30 changes: 30 additions & 0 deletions cmd/kvdb/decoder/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package decoder

import (
"fmt"
"strings"
)

type Decode interface {
Decode([]byte) string
}

func NewDecoder(scheme string) (Decode, error) {
if scheme == "ascii" {
return &AsciiDecoder{}, nil
}

if scheme == "hex" {
return &HexDecoder{}, nil
}

if strings.HasPrefix(scheme, "proto") {
decoder, err := newProtoDecoder(scheme)
if err != nil {
return nil, fmt.Errorf("proto decoder: %w", err)
}
return decoder, nil
}

return nil, fmt.Errorf("unknown decoding scheme %q", scheme)
}
8 changes: 8 additions & 0 deletions cmd/kvdb/decoder/pb/system/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package system

import (
_ "embed"
)

//go:embed system.pb
var ProtobufDescriptors []byte
Binary file added cmd/kvdb/decoder/pb/system/system.pb
Binary file not shown.
119 changes: 119 additions & 0 deletions cmd/kvdb/decoder/proto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package decoder

import (
"fmt"
"strings"

"github.com/golang/protobuf/proto"
"github.com/streamingfast/kvdb/cmd/kvdb/decoder/pb/system"
"google.golang.org/protobuf/types/descriptorpb"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/jhump/protoreflect/dynamic"
)

var _ Decode = (*ProtoDecoder)(nil)

// proto:///path/to/file.proto@<full_qualified_message_type>
type ProtoDecoder struct {
messageDescriptor *desc.MessageDescriptor
messageType string
}

func (p *ProtoDecoder) Decode(bytes []byte) string {
dynMsg := dynamic.NewMessageFactoryWithDefaults().NewDynamicMessage(p.messageDescriptor)
if err := dynMsg.Unmarshal(bytes); err != nil {
return fmt.Sprintf("Error unmarshalling message into %s: %s\n", p.messageType, err.Error())

}

cnt, err := dynMsg.MarshalJSON()
if err != nil {
return fmt.Sprintf("Error marhsalling proto to json %s: %s\n", p.messageType, err.Error())
}
return string(cnt)
}

func newProtoDecoder(scheme string) (*ProtoDecoder, error) {
chunks := strings.Split(scheme, "://")
if len(chunks) != 2 {
return nil, fmt.Errorf("invalid proto decoder scheme %q, expect proto:///path/to/file.proto@<full_qualified_message_type>", scheme)
}

protoChunks := strings.Split(chunks[1], "@")
if len(chunks) != 2 {
return nil, fmt.Errorf("invalid proto decoder scheme %q, expect proto:///path/to/file.proto@<full_qualified_message_type>", scheme)
}

protoPath := protoChunks[0]
messageType := protoChunks[1]

protoFiles, err := loadProtobufs(protoPath)
if err != nil {
return nil, fmt.Errorf("load protos: %w", err)

}

fileDescs, err := desc.CreateFileDescriptors(protoFiles)
if err != nil {
return nil, fmt.Errorf("couldn't convert, should do this check much earlier: %w", err)
}

var msgDesc *desc.MessageDescriptor
for _, file := range fileDescs {
msgDesc = file.FindMessage(messageType)
if msgDesc != nil {
break
}
}

if msgDesc == nil {
return nil, fmt.Errorf("failed to find message descriptor %q", messageType)
}

return &ProtoDecoder{
messageType: messageType,
messageDescriptor: msgDesc,
}, nil

}

func loadProtobufs(protoPath string) (out []*descriptorpb.FileDescriptorProto, err error) {
// System protos
systemFiles, err := readSystemProtobufs()
if err != nil {
return nil, err
}

for _, file := range systemFiles.File {
out = append(out, file)
}

// User-specified protos
parser := &protoparse.Parser{
ImportPaths: []string{},
IncludeSourceCodeInfo: true,
}

customFiles, err := parser.ParseFiles(protoPath)
if err != nil {
return nil, fmt.Errorf("parse proto file %q: %w", protoPath, err)
}

for _, fd := range customFiles {
out = append(out, fd.AsFileDescriptorProto())
}

return out, nil
}

func readSystemProtobufs() (*descriptorpb.FileDescriptorSet, error) {
fds := &descriptorpb.FileDescriptorSet{}
err := proto.Unmarshal(system.ProtobufDescriptors, fds)
if err != nil {
return nil, err
}

return fds, nil
}
89 changes: 89 additions & 0 deletions cmd/kvdb/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/logging"

. "github.com/streamingfast/cli"
_ "github.com/streamingfast/kvdb/store/badger"
_ "github.com/streamingfast/kvdb/store/badger3"
_ "github.com/streamingfast/kvdb/store/bigkv"
_ "github.com/streamingfast/kvdb/store/netkv"
_ "github.com/streamingfast/kvdb/store/tikv"
)

// Commit sha1 value, injected via go build `ldflags` at build time
var commit = ""

// Version value, injected via go build `ldflags` at build time
var version = "dev"

// Date value, injected via go build `ldflags` at build time
var date = ""

var zlog, tracer = logging.RootLogger("kvdb", "github.com/streamingfast/kvdb/cmd/kvdb")

var RootCmd = &cobra.Command{
Use: "kvdb", Short: "",
}

func init() {
logging.InstantiateLoggers()
}

func main() {
Run("substreams-sink-kv", "KVDB Client",
ConfigureViper("KVDB"),
ConfigureVersion(),

Group("read", "KVDB read commands",
ReadGetCmd,
ReadScanCmd,
ReadPrefixCmd,

PersistentFlags(
func(flags *pflag.FlagSet) {
flags.String("decoder", "hex", "output decoding. Supported schemes: 'hex', 'ascii', 'proto:<path_to_proto>'")
},
),
),

PersistentFlags(
func(flags *pflag.FlagSet) {
flags.String("dsn", "", "URL to connect to the KV store. Supported schemes: 'badger3', 'badger', 'bigkv', 'tikv', 'netkv'. See https://github.com/streamingfast/kvdb for more details. (ex: 'badger3:///tmp/substreams-sink-kv-db')")
},
),
AfterAllHook(func(cmd *cobra.Command) {
cmd.PersistentPreRunE = func(_ *cobra.Command, _ []string) error {
return nil
}
}),
)
}

func ConfigureVersion() CommandOption {
return CommandOptionFunc(func(cmd *cobra.Command) {
cmd.Version = versionString(version)
})
}

func versionString(version string) string {
var labels []string
if len(commit) >= 7 {
labels = append(labels, fmt.Sprintf("Commit %s", commit[0:7]))
}

if date != "" {
labels = append(labels, fmt.Sprintf("Built %s", date))
}

if len(labels) == 0 {
return version
}

return fmt.Sprintf("%s (%s)", version, strings.Join(labels, ", "))
}
55 changes: 55 additions & 0 deletions cmd/kvdb/read_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"errors"
"fmt"

"github.com/spf13/viper"
"github.com/streamingfast/kvdb/cmd/kvdb/decoder"

"github.com/streamingfast/kvdb/store"

"github.com/spf13/cobra"
. "github.com/streamingfast/cli"
"go.uber.org/zap"
)

var ReadGetCmd = Command(readGetRunE,
"get <key>",
"Retrieve a key",
ExactArgs(1),
)

func readGetRunE(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

kvdb, err := getKV()
if err != nil {
return err
}

outputDecoder, err := decoder.NewDecoder(viper.GetString("read-global-decoder"))
if err != nil {
return fmt.Errorf("decoder: %w", err)
}

key := args[0]
zlog.Info("store get key",
zap.String("key", key),
)

value, err := kvdb.Get(ctx, []byte(key))
if err != nil {
if errors.Is(err, store.ErrNotFound) {
fmt.Println("")
fmt.Printf("Key ->\t%s\tNOT FOUND\n", key)
return nil
}
return fmt.Errorf("failed to get key: %w", err)
}

fmt.Println("")
fmt.Printf("Key\t->\t%s\n", key)
fmt.Printf("Value\t->\t%s\n", outputDecoder.Decode(value))
return nil
}
Loading

0 comments on commit 211948a

Please sign in to comment.