Skip to content

Commit

Permalink
Merge pull request #4 from opensourceways/hrz-930
Browse files Browse the repository at this point in the history
消息中心清洗服务930
  • Loading branch information
shishupei authored Oct 10, 2024
2 parents 85d2bd9 + 3b186a4 commit 4eaf9a4
Show file tree
Hide file tree
Showing 37 changed files with 3,600 additions and 48 deletions.
22 changes: 22 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM golang:latest as BUILDER
LABEL maintainer="shishupei"

# build binary
ARG USER
ARG PASS
RUN echo "machine github.com login $USER password $PASS" >/root/.netrc
RUN mkdir -p /go/src/github.com/opensourceways/message-transfer
COPY . /go/src/github.com/opensourceways/message-transfer
RUN cd /go/src/github.com/opensourceways/message-transfer && CGO_ENABLED=1 go build -v -o ./message-transfer main.go

# copy binary config and utils
FROM openeuler/openeuler:22.03
RUN dnf -y update && \
dnf in -y shadow && \
groupadd -g 1000 message-center && \
useradd -u 1000 -g message-center -s /bin/bash -m message-center

USER message-center
COPY --chown=message-center --from=BUILDER /go/src/github.com/opensourceways/message-transfer /opt/app
WORKDIR /opt/app/
ENTRYPOINT ["/opt/app/message-transfer"]
57 changes: 57 additions & 0 deletions common/domain/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved
*/

// Package domain repository provides custom error types and utility functions for error handling.
package domain

// ErrorDuplicateCreating represents an error indicating a duplicate creation attempt.
type ErrorDuplicateCreating struct {
error
}

// NewErrorDuplicateCreating creates a new ErrorDuplicateCreating error with the given underlying error.
func NewErrorDuplicateCreating(err error) ErrorDuplicateCreating {
return ErrorDuplicateCreating{error: err}
}

// ErrorResourceNotExists represents an error indicating a non-existent resource.
type ErrorResourceNotExists struct {
error
}

// NewErrorResourceNotExists creates a new ErrorResourceNotExists error with the given underlying error.
func NewErrorResourceNotExists(err error) ErrorResourceNotExists {
return ErrorResourceNotExists{error: err}
}

// ErrorConcurrentUpdating represents an error indicating a concurrent update conflict.
type ErrorConcurrentUpdating struct {
error
}

// NewErrorConcurrentUpdating creates a new ErrorConcurrentUpdating error with the given underlying error.
func NewErrorConcurrentUpdating(err error) ErrorConcurrentUpdating {
return ErrorConcurrentUpdating{error: err}
}

// IsErrorResourceNotExists checks if the given error is of type ErrorResourceNotExists.
func IsErrorResourceNotExists(err error) bool {
_, ok := err.(ErrorResourceNotExists)

return ok
}

// IsErrorDuplicateCreating checks if the given error is of type ErrorDuplicateCreating.
func IsErrorDuplicateCreating(err error) bool {
_, ok := err.(ErrorDuplicateCreating)

return ok
}

// IsErrorConcurrentUpdating checks if the given error is of type ErrorConcurrentUpdating.
func IsErrorConcurrentUpdating(err error) bool {
_, ok := err.(ErrorConcurrentUpdating)

return ok
}
12 changes: 12 additions & 0 deletions common/kafka/consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved
*/

package kafka

// ConsumeConfig kafka consume config.
type ConsumeConfig struct {
Topic string `json:"topic" required:"true"`
Publish string `json:"publish" required:"true"`
Group string `json:"group" required:"true"`
}
32 changes: 32 additions & 0 deletions common/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved
*/

// Package kafka provides functionality for interacting with Issue.
package kafka

import (
kfklib "github.com/opensourceways/kafka-lib/agent"
"github.com/opensourceways/kafka-lib/mq"
)

const (
deaultVersion = "2.1.0"
)

// Config represents the configuration for Issue.
type Config struct {
kfklib.Config
}

// SetDefault sets the default values for the Config.
func (cfg *Config) SetDefault() {
if cfg.Version == "" {
cfg.Version = deaultVersion
}
}

// Init initializes the Issue agent with the specified configuration, logger, and removeCfg flag.
func Init(cfg *Config, log mq.Logger, removeCfg bool) error {
return kfklib.Init(&cfg.Config, log, nil, "", removeCfg)
}
39 changes: 39 additions & 0 deletions common/kafka/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved
*/

// Package kafka send msg to kafka.
package kafka

import (
"fmt"

kfklib "github.com/opensourceways/kafka-lib/agent"
"github.com/sirupsen/logrus"

"github.com/opensourceways/message-transfer/models/message"
)

// SendMsg is a method on the messageAdapter struct that takes an EventMessage
// and sends it to the ModelCreate topic.
func SendMsg(topic string, e message.EventMessage) error {
res := send(topic, e)
logrus.Info("send to kafka success topic = " + topic)
return res
}

func send(topic string, v message.EventMessage) error {
body, err := v.Message()
if err != nil {
return err
}

err = kfklib.Publish(topic, nil, body)
if err != nil {
fmt.Println("出错啦")
return err
} else {
fmt.Println("成功啦")
return nil
}
}
76 changes: 76 additions & 0 deletions common/postgresql/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved
*/

// Package postgresql provides functionality for interacting with PostgreSQL databases.
package postgresql

import (
"fmt"
"time"
)

// Config represents the configuration for PostgreSQL.
type Config struct {
Host string `json:"host" required:"true"`
User string `json:"user" required:"true"`
Pwd string `json:"pwd" required:"true"`
Database string `json:"database" required:"true"`
Port int `json:"port" required:"true"`
Life int `json:"life" required:"true"` // the unit is minute
MaxConn int `json:"max_conn" required:"true"`
MaxIdle int `json:"max_idle" required:"true"`
Dbcert string `json:"cert"`
Code errorCode `json:"error_code"`
}

// SetDefault sets the default values for the Config.
func (p *Config) SetDefault() {
if p.MaxConn <= 0 {
p.MaxConn = 500
}

if p.MaxIdle <= 0 {
p.MaxIdle = 250
}

if p.Life <= 0 {
p.Life = 2
}
}

// ConfigItems returns the configuration items for the Config.
func (cfg *Config) ConfigItems() []interface{} {
return []interface{}{
&cfg.Code,
}
}

func (cfg *Config) getLifeDuration() time.Duration {
return time.Minute * time.Duration(cfg.Life)
}

func (p *Config) dsn() string {
if p.Dbcert != "" {
return fmt.Sprintf(
"host=%v user=%v password=%v dbname=%v port=%v sslmode=verify-ca TimeZone=Asia/Shanghai sslrootcert=%v",
p.Host, p.User, p.Pwd, p.Database, p.Port, p.Dbcert,
)
} else {
return fmt.Sprintf(
"host=%v user=%v password=%v dbname=%v port=%v sslmode=disable TimeZone=Asia/Shanghai",
p.Host, p.User, p.Pwd, p.Database, p.Port,
)
}
}

type errorCode struct {
UniqueConstraint string `json:"unique_constraint"`
}

// SetDefault sets the default values for the errorCode.
func (cfg *errorCode) SetDefault() {
if cfg.UniqueConstraint == "" {
cfg.UniqueConstraint = "23505"
}
}
Loading

0 comments on commit 4eaf9a4

Please sign in to comment.