Skip to content

Commit

Permalink
refactoring changes
Browse files Browse the repository at this point in the history
  • Loading branch information
maksenius committed Feb 7, 2023
1 parent 5b8e563 commit 1462c07
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 66 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## General

The [SAP HANA](https://www.sap.com/products/technology-platform/hana/what-is-sap-hana.html) connector is one of Conduit plugins.
It provides the source Sap Hana connector.
It provides the source SAP HANA connector.

### Prerequisites

Expand Down
13 changes: 7 additions & 6 deletions columntypes/columntypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type TableInfo struct {
}

// GetColumnQueryPart prepare query part about creation column for tracking table.
// For example: NAME VARCHAR(40), AGE INT, ADDRESS VARCHAR(120).
func (t TableInfo) GetColumnQueryPart() string {
var columns []string
for key, val := range t.ColumnTypes {
Expand Down Expand Up @@ -148,9 +149,9 @@ func GetTableInfo(ctx context.Context, querier Querier, tableName string) (Table

for rows.Next() {
var count int
er := rows.Scan(&count)
if er != nil {
return TableInfo{}, fmt.Errorf("scan: %w", err)
scanErr := rows.Scan(&count)
if scanErr != nil {
return TableInfo{}, fmt.Errorf("scan: %w", scanErr)
}

if count == 0 {
Expand Down Expand Up @@ -206,8 +207,8 @@ func GetTableInfo(ctx context.Context, querier Querier, tableName string) (Table
}, nil
}

// ConvertStructureData converts a sdk.StructureData values to a proper database types.
func ConvertStructureData(
// ConvertStructuredData converts a sdk.StructureData values to a proper database types.
func ConvertStructuredData(
ctx context.Context,
columnTypes map[string]string,
data sdk.StructuredData,
Expand Down Expand Up @@ -236,7 +237,7 @@ func ConvertStructureData(
}

// Converting value to time if it is string.
switch columnTypes[strings.ToLower(key)] {
switch columnTypes[strings.ToUpper(key)] {
case dateType, timeType, secondDateType, timestampType:
_, ok := value.(time.Time)
if ok {
Expand Down
26 changes: 17 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package config

import (
"fmt"
)

const (
// DSNAuthType name of DSN auth.
DSNAuthType string = "DSN"
Expand Down Expand Up @@ -59,45 +63,49 @@ func (a *AuthConfig) Validate() error {
switch a.Mechanism {
case DSNAuthType:
if a.DSN == "" {
return errRequiredDSNParameter
return requiredAuthParam(DSNAuthType, "DSN")
}

return nil
case BasicAuthType:
if a.Host == "" {
return errRequiredHostParameter
return requiredAuthParam(BasicAuthType, "host")
}
if a.Username == "" {
return errRequiredUsernameParameter
return requiredAuthParam(BasicAuthType, "username")
}
if a.Password == "" {
return errRequiredPasswordParameter
return requiredAuthParam(BasicAuthType, "password")
}

return nil
case JWTAuthType:
if a.Host == "" {
return errRequiredHostParameter
return requiredAuthParam(JWTAuthType, "host")
}
if a.Token == "" {
return errRequiredTokenParameter
return requiredAuthParam(JWTAuthType, "token")
}

return nil

case X509AuthType:
if a.Host == "" {
return errRequiredHostParameter
return requiredAuthParam(X509AuthType, "host")
}
if a.ClientKeyFilePath == "" {
return errRequiredClientKeyFileParameter
return requiredAuthParam(X509AuthType, "client key file path")
}
if a.ClientCertFilePath == "" {
return errRequiredClientCertFileParameter
return requiredAuthParam(X509AuthType, "client cert file path")
}

return nil
default:
return ErrInvalidAuthMechanism
}
}

func requiredAuthParam(auth, param string) error {
return fmt.Errorf("%s is required for %s auth", param, auth)
}
20 changes: 2 additions & 18 deletions config/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,5 @@ import (
"errors"
)

var (
// ErrInvalidAuthMechanism occurs when there's invalid mechanism config value.
ErrInvalidAuthMechanism = errors.New("invalid auth mechanism")
// errRequiredDSNParameter occurs when there's empty dsn parameter for dsn auth.
errRequiredDSNParameter = errors.New("dsn is required parameter for dsn auth")
// errRequiredHostParameter occurs when there's empty host parameter for basic or jwt or x509 auth.
errRequiredHostParameter = errors.New("host is required parameter for basic, jwt, x509 auth")
// errRequiredUsernameParameter occurs when there's empty username parameter for basic auth.
errRequiredUsernameParameter = errors.New("username is required parameter for basic auth")
// errRequiredPasswordParameter occurs when there's empty password parameter for basic auth.
errRequiredPasswordParameter = errors.New("password is required parameter for basic auth")
// errRequiredTokenParameter occurs when there's empty token parameter for jwt auth.
errRequiredTokenParameter = errors.New("token is required for jwt auth")
// errRequiredClientCertFileParameter occurs when there's empty client cert file parameter for x509 auth.
errRequiredClientCertFileParameter = errors.New("client cert file path is required for x509 auth")
// errRequiredClientCertFileParameter occurs when there's empty client key file path parameter for x509 auth.
errRequiredClientKeyFileParameter = errors.New("client key file path is required for x509 auth")
)
// ErrInvalidAuthMechanism occurs when there's invalid mechanism config value.
var ErrInvalidAuthMechanism = errors.New("invalid auth mechanism")
2 changes: 1 addition & 1 deletion helper/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,6 @@ func ConnectToDB(c config.AuthConfig) (*sqlx.DB, error) {

return sqlx.NewDb(sql.OpenDB(con), driverName), nil
default:
return nil, config.ErrInvalidAuthMechanism
return nil, fmt.Errorf("invalid auth mechanism :%s", c.Mechanism)
}
}
11 changes: 6 additions & 5 deletions source/iterator/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ func (i *cdcIterator) Next(ctx context.Context) (sdk.Record, error) {
}

pos := position.Position{
IteratorType: position.TypeCDC,
CDCLastID: int(id),
SuffixName: i.trackingTable[len(i.trackingTable)-6:],
IteratorType: position.TypeCDC,
CDCLastID: int(id),
TrackingTableName: i.trackingTable,
}

convertedPosition, err := pos.ConvertToSDKPosition()
Expand Down Expand Up @@ -382,7 +382,7 @@ func (i *cdcIterator) clearTrackingTable(ctx context.Context) {
func setupCDC(
ctx context.Context,
db *sqlx.DB,
tableName, trackingTableName, suffixName string,
tableName, trackingTableName string,
tableInfo columntypes.TableInfo,
) error {
var trackingTableExist bool
Expand Down Expand Up @@ -424,7 +424,8 @@ func setupCDC(
}

// setup triggers for catch insert, delete, update operations.
err = setTriggers(ctx, tx, tableInfo.ColumnTypes, tableName, trackingTableName, suffixName)
err = setTriggers(ctx, tx, tableInfo.ColumnTypes, tableName,
trackingTableName, trackingTableName[len(trackingTableName)-6:])
if err != nil {
return fmt.Errorf("setup triggers: %w", err)
}
Expand Down
23 changes: 12 additions & 11 deletions source/iterator/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,24 @@ func NewCombinedIterator(ctx context.Context, params CombinedParams) (*CombinedI
return nil, fmt.Errorf("parse position: %w", err)
}

suffixName := getSuffixName(pos)
trakingTableName := getTrackingTableName(pos, params.Table)

it := &CombinedIterator{
db: params.DB,
table: params.Table,
orderingColumn: params.OrderingColumn,
batchSize: params.BatchSize,
trackingTable: fmt.Sprintf(trackingTablePattern, params.Table, suffixName),
trackingTable: trakingTableName,
}

it.tableInfo, err = columntypes.GetTableInfo(ctx, params.DB, params.Table)
if err != nil {
return nil, fmt.Errorf("get table info: %w", err)
}

it.setKeys(params.CfgKeys)
it.setKeys(params.CfgKeys, it.tableInfo.PrimaryKeys)

err = setupCDC(ctx, it.db, it.table, it.trackingTable, suffixName, it.tableInfo)
err = setupCDC(ctx, it.db, it.table, it.trackingTable, it.tableInfo)
if err != nil {
return nil, fmt.Errorf("setup cdc: %w", err)
}
Expand All @@ -101,7 +101,7 @@ func NewCombinedIterator(ctx context.Context, params CombinedParams) (*CombinedI
batchSize: it.batchSize,
position: pos,
columnTypes: it.tableInfo.ColumnTypes,
suffixName: suffixName,
trackingTable: it.trackingTable,
})
if err != nil {
return nil, fmt.Errorf("new shapshot iterator: %w", err)
Expand Down Expand Up @@ -223,7 +223,7 @@ func (c *CombinedIterator) switchToCDCIterator(ctx context.Context) error {
return nil
}

func (c *CombinedIterator) setKeys(cfgKeys []string) {
func (c *CombinedIterator) setKeys(cfgKeys, tableKeys []string) {
// first priority keys from config.
if len(cfgKeys) > 0 {
for i := range cfgKeys {
Expand All @@ -236,20 +236,21 @@ func (c *CombinedIterator) setKeys(cfgKeys []string) {
}

// second priority primary keys from table.
if len(c.keys) > 0 {
if len(tableKeys) > 0 {
c.keys = tableKeys

return
}

// last priority ordering column.
c.keys = []string{c.orderingColumn}
}

func getSuffixName(pos *position.Position) string {
// get suffix from position
func getTrackingTableName(pos *position.Position, table string) string {
if pos != nil {
return pos.SuffixName
return pos.TrackingTableName
}

// create new suffix
return time.Now().Format("150405")
return fmt.Sprintf(trackingTablePattern, table, time.Now().Format("150405"))
}
10 changes: 5 additions & 5 deletions source/iterator/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type snapshotIterator struct {
position *position.Position
// columnTypes column types from table.
columnTypes map[string]string
// suffixName special suffix that connector uses for identify tracking table and triggers.
suffixName string
// trackingTable name.
trackingTable string
}

type snapshotParams struct {
Expand All @@ -68,7 +68,7 @@ type snapshotParams struct {
batchSize int
position *position.Position
columnTypes map[string]string
suffixName string
trackingTable string
}

func newSnapshotIterator(
Expand All @@ -85,7 +85,7 @@ func newSnapshotIterator(
batchSize: snapshotParams.batchSize,
position: snapshotParams.position,
columnTypes: snapshotParams.columnTypes,
suffixName: snapshotParams.suffixName,
trackingTable: snapshotParams.trackingTable,
}

err = it.loadRows(ctx)
Expand Down Expand Up @@ -143,7 +143,7 @@ func (i *snapshotIterator) Next(ctx context.Context) (sdk.Record, error) {
IteratorType: position.TypeSnapshot,
SnapshotLastProcessedVal: transformedRow[i.orderingColumn],
SnapshotMaxValue: i.maxValue,
SuffixName: i.suffixName,
TrackingTableName: i.trackingTable,
}

sdkPos, err := pos.ConvertToSDKPosition()
Expand Down
4 changes: 2 additions & 2 deletions source/position/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type Position struct {
// CDC information.
// CDCLastID - last processed id from tracking table.
CDCLastID int
// SuffixName special suffix that connector uses for identify tracking table and triggers.
SuffixName string
// TrackingTableName tracking table name.
TrackingTableName string
}

// ParseSDKPosition parses SDK position and returns Position.
Expand Down
4 changes: 2 additions & 2 deletions source/position/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func TestParseSDKPosition(t *testing.T) {
SnapshotLastProcessedVal: 1,
SnapshotMaxValue: 4,
CDCLastID: 0,
SuffixName: "test",
TrackingTableName: "test",
}

wrongPosType := Position{
IteratorType: "i",
SnapshotLastProcessedVal: 1,
SnapshotMaxValue: 4,
CDCLastID: 0,
SuffixName: "test",
TrackingTableName: "test",
}

snapshotPosBytes, _ := json.Marshal(snapshotPos)
Expand Down
4 changes: 0 additions & 4 deletions source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,6 @@ func TestSource_CDC_Success(t *testing.T) {
t.Fatal(err)
}

// Check read from empty table.
_, err = s.Read(ctx)
is.Equal(sdk.ErrBackoffRetry, err)

// check inserted data.
r, err := s.Read(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestSource_Read(t *testing.T) {

it := mock.NewMockIterator(ctrl)
it.EXPECT().HasNext(ctx).Return(true, nil)
it.EXPECT().Next(ctx).Return(sdk.Record{}, errors.New("key is not exist"))
it.EXPECT().Next(ctx).Return(sdk.Record{}, errors.New("key does not exist"))

s := Source{
iterator: it,
Expand Down
2 changes: 1 addition & 1 deletion spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Specification() sdk.Specification {
Name: "sap-hana",
Summary: "The SAP HANA database source and destination plugin for Conduit, written in Go.",
Description: "The SAP HANA database connector is one of Conduit plugins. " +
"It provides both, a source and a destination Sap Hana connector.",
"It provides both, a source and a destination SAP HANA connector.",
Version: version,
Author: "Meroxa, Inc. & Yalantis",
}
Expand Down

0 comments on commit 1462c07

Please sign in to comment.