diff --git a/examples/clickhouse-exporter/main.go b/examples/clickhouse-exporter/main.go index c9e2e3a..a055ee5 100644 --- a/examples/clickhouse-exporter/main.go +++ b/examples/clickhouse-exporter/main.go @@ -9,6 +9,7 @@ import ( "github.com/kmlebedev/txmlconnector/client/commands" log "github.com/sirupsen/logrus" "os" + "slices" "strconv" "strings" "sync" @@ -31,8 +32,8 @@ const ( high Float32, low Float32, volume UInt64 - ) ENGINE = ReplacingMergeTree() - ORDER BY (date, sec_code, period)` + ) ENGINE = ReplacingMergeTree() + ORDER BY (date, sec_code, period)` securitiesDDL = `CREATE TABLE IF NOT EXISTS transaq_securities ( secid UInt16, @@ -48,7 +49,7 @@ const ( sectype String, quotestype UInt8 ) ENGINE = ReplacingMergeTree() - ORDER BY (secid, seccode, board)` + ORDER BY (seccode, board, market, sectype, quotestype)` tradesDDL = `CREATE TABLE IF NOT EXISTS transaq_trades ( time DateTime('Europe/Moscow'), @@ -60,7 +61,7 @@ const ( quantity UInt32, buy_sell LowCardinality(FixedString(1)), open_interest Int32, - period LowCardinality(FixedString(1)), + period LowCardinality(FixedString(1)) ) ENGINE = ReplacingMergeTree() ORDER BY (secid, sec_code, trade_no, time, buy_sell)` ) @@ -75,6 +76,7 @@ var ( dataCandleCount = ExportCandleCount dataCandleCountLock = sync.RWMutex{} isAllTradesPositions = false + quotations = []commands.SubSecurity{} allTrades = commands.SubAllTrades{} ) @@ -120,7 +122,7 @@ func init() { func processTransaq() { var status commands.ServerStatus - ticker := time.NewTicker(60 * time.Second) + ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { select { @@ -130,17 +132,20 @@ func processTransaq() { switch status.Connected { case "true": log.Infof("server status is true") + if err := doSubscribe(); err != nil { + log.Error(err) + } case "error": log.Warnf("txmlconnector not connected %+v\n", status) default: log.Infof("Status %+v", status) } case <-ticker.C: - if status.Connected != "error" { + if status.Connected == "true" { continue } if err := tc.Connect(); err != nil { - log.Fatal(err) + log.Error("reconnect ", err) } case trades := <-tc.AllTradesChan: //go func(t *commands.AllTrades) { @@ -270,6 +275,17 @@ func processTransaq() { } } +func doSubscribe() error { + if err := tc.SendCommand(commands.Command{ + Id: "subscribe", + Quotations: quotations, + AllTrades: allTrades, + }); err != nil { + return fmt.Errorf("SendCommand subscribe: %+v", err) + } + return nil +} + func main() { defer func() { tc.Disconnect() @@ -301,12 +317,11 @@ func main() { } // Get History data for all sec - quotations := []commands.SubSecurity{} exportCandleCount := ExportCandleCount if eCandleCount, err := strconv.Atoi(os.Getenv("EXPORT_CANDLE_COUNT")); err == nil && eCandleCount > -2 { exportCandleCount = eCandleCount } - exportSecBoards := []string{"TQBR"} + exportSecBoards := []string{"TQBR", "TQTF", "FUT"} if eSecBoards := os.Getenv("EXPORT_SEC_BOARDS"); eSecBoards != "" { exportSecBoards = strings.Split(eSecBoards, ",") } @@ -324,21 +339,16 @@ func main() { } for _, sec := range tc.Data.Securities.Items { - exportSecBoardFound := false - for _, exportSecBoard := range exportSecBoards { - if exportSecBoard == sec.Board || exportSecBoard == "ALL" { - exportSecBoardFound = true - break - } - } - for _, exportSecCode := range exportAllTradesSec { - if exportSecCode == sec.SecCode { - allTrades.Items = append(allTrades.Items, sec.SecId) - } - } if sec.SecId == 0 || sec.Active != "true" || len(sec.SecCode) > 16 { continue } + exportSecBoardFound := false + if slices.Contains(exportSecBoards, sec.Board) { + exportSecBoardFound = true + } + if exportSecBoardFound && slices.Contains(exportAllTradesSec, sec.SecCode) { + allTrades.Items = append(allTrades.Items, sec.SecId) + } log.Debugf("%+v", sec) if err := batchSec.Append(uint16(sec.SecId), @@ -426,16 +436,8 @@ func main() { } // receive TQBRGMKN249544220432249503516188156324962512522211545477262731364.723 // Get subscribe on all sec - if err = tc.SendCommand(commands.Command{Id: "get_mc_portfolio", Union: "377620R2555"}); err != nil { - log.Error("SendCommand get_mc_portfolio: ", err) - } - time.Sleep(10 * time.Second) - if err = tc.SendCommand(commands.Command{ - Id: "subscribe", - Quotations: quotations, - AllTrades: allTrades, - }); err != nil { - log.Error("SendCommand subscribe: ", err) - } + // if err = tc.SendCommand(commands.Command{Id: "get_mc_portfolio", Union: "377620R2555"}); err != nil { + // log.Error("SendCommand get_mc_portfolio: ", err) + // } <-tc.ShutdownChannel }