Skip to content

Commit

Permalink
Merge pull request #20 from kmlebedev/fix_contains
Browse files Browse the repository at this point in the history
Fix contains
  • Loading branch information
kmlebedev authored Dec 7, 2024
2 parents 0e8594c + 8f940d7 commit 45f9fc3
Showing 1 changed file with 34 additions and 32 deletions.
66 changes: 34 additions & 32 deletions examples/clickhouse-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/kmlebedev/txmlconnector/client/commands"
log "github.com/sirupsen/logrus"
"os"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -31,8 +32,8 @@ const (
high Float32,
low Float32,
volume UInt64
) ENGINE = ReplacingMergeTree()
ORDER BY (date, sec_code, period)`
) ENGINE = ReplacingMergeTree()
ORDER BY (date, sec_code, period)`

securitiesDDL = `CREATE TABLE IF NOT EXISTS transaq_securities (
secid UInt16,
Expand All @@ -48,7 +49,7 @@ const (
sectype String,
quotestype UInt8
) ENGINE = ReplacingMergeTree()
ORDER BY (secid, seccode, board)`
ORDER BY (seccode, board, market, sectype, quotestype)`

tradesDDL = `CREATE TABLE IF NOT EXISTS transaq_trades (
time DateTime('Europe/Moscow'),
Expand All @@ -60,7 +61,7 @@ const (
quantity UInt32,
buy_sell LowCardinality(FixedString(1)),
open_interest Int32,
period LowCardinality(FixedString(1)),
period LowCardinality(FixedString(1))
) ENGINE = ReplacingMergeTree()
ORDER BY (secid, sec_code, trade_no, time, buy_sell)`
)
Expand All @@ -75,6 +76,7 @@ var (
dataCandleCount = ExportCandleCount
dataCandleCountLock = sync.RWMutex{}
isAllTradesPositions = false
quotations = []commands.SubSecurity{}
allTrades = commands.SubAllTrades{}
)

Expand Down Expand Up @@ -120,7 +122,7 @@ func init() {

func processTransaq() {
var status commands.ServerStatus
ticker := time.NewTicker(60 * time.Second)
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
Expand All @@ -130,17 +132,20 @@ func processTransaq() {
switch status.Connected {
case "true":
log.Infof("server status is true")
if err := doSubscribe(); err != nil {
log.Error(err)
}
case "error":
log.Warnf("txmlconnector not connected %+v\n", status)
default:
log.Infof("Status %+v", status)
}
case <-ticker.C:
if status.Connected != "error" {
if status.Connected == "true" {
continue
}
if err := tc.Connect(); err != nil {
log.Fatal(err)
log.Error("reconnect ", err)
}
case trades := <-tc.AllTradesChan:
//go func(t *commands.AllTrades) {
Expand Down Expand Up @@ -270,6 +275,17 @@ func processTransaq() {
}
}

func doSubscribe() error {
if err := tc.SendCommand(commands.Command{
Id: "subscribe",
Quotations: quotations,
AllTrades: allTrades,
}); err != nil {
return fmt.Errorf("SendCommand subscribe: %+v", err)
}
return nil
}

func main() {
defer func() {
tc.Disconnect()
Expand Down Expand Up @@ -301,12 +317,11 @@ func main() {
}

// Get History data for all sec
quotations := []commands.SubSecurity{}
exportCandleCount := ExportCandleCount
if eCandleCount, err := strconv.Atoi(os.Getenv("EXPORT_CANDLE_COUNT")); err == nil && eCandleCount > -2 {
exportCandleCount = eCandleCount
}
exportSecBoards := []string{"TQBR"}
exportSecBoards := []string{"TQBR", "TQTF", "FUT"}
if eSecBoards := os.Getenv("EXPORT_SEC_BOARDS"); eSecBoards != "" {
exportSecBoards = strings.Split(eSecBoards, ",")
}
Expand All @@ -324,21 +339,16 @@ func main() {
}

for _, sec := range tc.Data.Securities.Items {
exportSecBoardFound := false
for _, exportSecBoard := range exportSecBoards {
if exportSecBoard == sec.Board || exportSecBoard == "ALL" {
exportSecBoardFound = true
break
}
}
for _, exportSecCode := range exportAllTradesSec {
if exportSecCode == sec.SecCode {
allTrades.Items = append(allTrades.Items, sec.SecId)
}
}
if sec.SecId == 0 || sec.Active != "true" || len(sec.SecCode) > 16 {
continue
}
exportSecBoardFound := false
if slices.Contains(exportSecBoards, sec.Board) {
exportSecBoardFound = true
}
if exportSecBoardFound && slices.Contains(exportAllTradesSec, sec.SecCode) {
allTrades.Items = append(allTrades.Items, sec.SecId)
}
log.Debugf("%+v", sec)

if err := batchSec.Append(uint16(sec.SecId),
Expand Down Expand Up @@ -426,16 +436,8 @@ func main() {
}
// receive <quotations><quotation secid="21"><board>TQBR</board><seccode>GMKN</seccode><last>24954</last><quantity>4</quantity><time>11:24:00</time><change>220</change><priceminusprevwaprice>432</priceminusprevwaprice><bid>24950</bid><biddepth>35</biddepth><biddeptht>16188</biddeptht><numbids>1563</numbids><offer>24962</offer><offerdepth>51</offerdepth><offerdeptht>25222</offerdeptht><numoffers>1154</numoffers><voltoday>54772</voltoday><numtrades>6273</numtrades><valtoday>1364.723</valtoday></quotation></quotations>
// Get subscribe on all sec
if err = tc.SendCommand(commands.Command{Id: "get_mc_portfolio", Union: "377620R2555"}); err != nil {
log.Error("SendCommand get_mc_portfolio: ", err)
}
time.Sleep(10 * time.Second)
if err = tc.SendCommand(commands.Command{
Id: "subscribe",
Quotations: quotations,
AllTrades: allTrades,
}); err != nil {
log.Error("SendCommand subscribe: ", err)
}
// if err = tc.SendCommand(commands.Command{Id: "get_mc_portfolio", Union: "377620R2555"}); err != nil {
// log.Error("SendCommand get_mc_portfolio: ", err)
// }
<-tc.ShutdownChannel
}

0 comments on commit 45f9fc3

Please sign in to comment.