Skip to content

Commit

Permalink
remove kafka-ish topic-key-msg producer form; link flow with ULID
Browse files Browse the repository at this point in the history
Signed-off-by: AbhishekKr <[email protected]>
  • Loading branch information
abhishekkr committed Aug 12, 2024
1 parent c53cdaa commit c120a3c
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 36 deletions.
16 changes: 9 additions & 7 deletions consumer/tcp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/gol-gol/golenv"
ulid "github.com/oklog/ulid/v2"

ogitransformer "github.com/OpenChaos/ogi/transformer"
)
Expand All @@ -21,8 +22,8 @@ var (
tcpConsumerPort = golenv.OverrideIfEnv("OGI_TCP_CONSUMER_PORT", "8080")
)

func (t *TCPServer) Transform(lyne string) {
ogitransformer.Transform([]byte(lyne))
func (t *TCPServer) Transform(msgid, lyne string) {
ogitransformer.Transform(msgid, []byte(lyne))
}

func (t *TCPServer) Start() {
Expand All @@ -45,15 +46,16 @@ func (t *TCPServer) Start() {
fmt.Println(err)
return
}
if strings.TrimSpace(string(netData)) == "exit" {
if strings.TrimSpace(netData) == "exit" {
fmt.Println("bye!")
return
}

t.Transform(string(netData))
t := time.Now()
myTime := t.Format(time.RFC3339) + "\n"
c.Write([]byte(myTime))
msgid := ulid.Make().String()
t.Transform(msgid, netData)
tym := time.Now()
response := msgid + ": " + tym.Format(time.RFC3339) + "\n"
c.Write([]byte(response))
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
bou.ke/monkey v1.0.2
github.com/gol-gol/golenv v0.0.0-20230302172901-210791b57f21
github.com/newrelic/go-agent v3.8.1+incompatible
github.com/oklog/ulid/v2 v2.1.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJ
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/newrelic/go-agent v3.8.1+incompatible h1:8TAEekJseggmwfn79CjoV308PyNlzDVExkUwFeDBUxk=
github.com/newrelic/go-agent v3.8.1+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
Expand Down
17 changes: 4 additions & 13 deletions producer/echo_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ogiproducer
import (
"fmt"
"log"
"time"
)

type Echo struct {
Expand All @@ -13,19 +12,11 @@ func (e *Echo) Close() {
fmt.Println("")
}

func (e *Echo) Produce(topic string, message []byte, messageKey string) {
if topic != "" {
fmt.Println("topic:", topic)
}
if messageKey != "" {
fmt.Println("key:", messageKey)
} else {
fmt.Println("key:", time.Now())
}
if len(message) != 0 {
fmt.Println(string(message))
func (e *Echo) Produce(msgid string, msg []byte) {
if len(msg) != 0 {
fmt.Println(string(msg))
} else {
log.Println("# received blank message")
log.Println("# received blank message @", msgid)
}
}

Expand Down
4 changes: 2 additions & 2 deletions producer/plugin_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ func (plugin *ProducerPlugin) Close() {
plugin.CloseFunc.(func())()
}

func (plugin *ProducerPlugin) Produce(topic string, message []byte, messageKey string) {
plugin.ProduceFunc.(func(string, []byte, string))(topic, message, messageKey)
func (plugin *ProducerPlugin) Produce(msgid string, msg []byte) {
plugin.ProduceFunc.(func(string, []byte))(msgid, msg)
}
8 changes: 4 additions & 4 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type Producer interface {
Produce(string, []byte, string)
Produce(string, []byte)
Close()
}

Expand All @@ -26,13 +26,13 @@ func NewProducer() Producer {
return producerMap[ProducerType]()
}

func Produce(topic string, message []byte, messageKey string) {
func Produce(msgid string, msg []byte) {
txn := instrumentation.StartTransaction("produce_transaction", nil, nil)
defer instrumentation.EndTransaction(&txn)

producer := NewProducer()
defer producer.Close()

producer.Produce(topic, message, messageKey)
logger.Infof("topic '%s' message-key '%s'", topic, messageKey)
producer.Produce(msgid, msg)
logger.Infof("msg#[%s]", msgid)
}
4 changes: 2 additions & 2 deletions transformer/plugin_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ func NewTransformerPlugin() Transformer {
}
}

func (plugin *TransformerPlugin) Transform(msg []byte) error {
return plugin.TransformFunc.(func([]byte) error)(msg)
func (plugin *TransformerPlugin) Transform(msgid string, msg []byte) error {
return plugin.TransformFunc.(func(string, []byte) error)(msgid, msg)
}
8 changes: 4 additions & 4 deletions transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Transformer interface {
Transform([]byte) error
Transform(string, []byte) error
}

type NewTransformer func() Transformer
Expand All @@ -23,13 +23,13 @@ var (
}
)

func Transform(msg []byte) {
func Transform(msgid string, msg []byte) {
txn := instrumentation.StartTransaction("transform_transaction", nil, nil)
defer instrumentation.EndTransaction(&txn)

transformer := transformerMap[TransformerType]()
if err := transformer.Transform(msg); err != nil {
// produce to dead-man-talking topic
if err := transformer.Transform(msgid, msg); err != nil {
// produce to dead-man-talking
logger.Warn(err)
}
}
7 changes: 3 additions & 4 deletions transformer/transparent_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package ogitransformer
import ogiproducer "github.com/OpenChaos/ogi/producer"

type TransparentTransformer struct {
Topic string
}

func (t *TransparentTransformer) Transform(msg []byte) (err error) {
ogiproducer.Produce(t.Topic, msg, "")
func (t *TransparentTransformer) Transform(msgid string, msg []byte) (err error) {
ogiproducer.Produce(msgid, msg)
return nil
}

func NewTransparentTransformer() Transformer {
return &TransparentTransformer{Topic: ""}
return &TransparentTransformer{}
}

0 comments on commit c120a3c

Please sign in to comment.