Skip to content

Commit

Permalink
Listener and stream now take a channel
Browse files Browse the repository at this point in the history
Snap function added
close executed in Historical
initial work for #2
  • Loading branch information
liampauling committed Oct 17, 2017
1 parent 78f3839 commit 5e46b86
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 27 deletions.
5 changes: 5 additions & 0 deletions historical.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,10 @@ func (h *Historical) ParseHistoricalData(directory string, listener streaming.Li
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
log.Println("Reading complete")

// close channel
close(listener.OutputChannel)

return nil
}
29 changes: 4 additions & 25 deletions streaming/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package streaming

import (
"log"
"strconv"
"time"
"os"
"fmt"
)

func CreateMarketCache(changeMessage MarketChangeMessage, marketChange MarketChange) *MarketCache {
Expand Down Expand Up @@ -335,28 +331,11 @@ func (cache *MarketCache) UpdateCache(changeMessage MarketChangeMessage, marketC
}
}
}

f, err := os.OpenFile("tempertrap.txt", os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
panic(err)
}
defer f.Close()
for _, tem := range cache.Runners {
s := strconv.Itoa(*cache.PublishTime)
s_t := fmt.Sprintf("%v,%v,%v,%v,%v,%v,%v,%v,%v,%v\n",
MsToTime(s).Format("2006-01-02 15:04:05.999999"), cache.MarketId, tem.SelectionId,
*tem.LastTradedPrice, *tem.TradedVolume, cache.MarketDefinition.InPlay, *cache.TradedVolume)
if _, err = f.WriteString(s_t); err != nil {
panic(err)
}
}
}

func MsToTime(ms string) (time.Time) {
msInt, err := strconv.ParseInt(ms, 10, 64)
if err != nil {
return time.Time{}
func (cache *MarketCache) Snap() MarketBook {
return MarketBook{
MarketId: cache.MarketId,
TradedVolume: *cache.TradedVolume,
}

return time.Unix(0, msInt*int64(time.Millisecond))
}
2 changes: 2 additions & 0 deletions streaming/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ type Listener struct {
MarketStream *MarketStream
OrderStream Stream
UniqueId int64
OutputChannel chan MarketBook //todo change to interface so that OrderBook can be accepted
}

func (l *Listener) AddMarketStream() {
l.MarketStream = new(MarketStream)
l.MarketStream.OutputChannel = l.OutputChannel
l.MarketStream.Cache = make(map[string]MarketCache)
}

Expand Down
5 changes: 5 additions & 0 deletions streaming/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,8 @@ type MarketChangeMessage struct {
SegmentType string `json:"segmentType"`
Status int32 `json:"status"`
}

type MarketBook struct {
MarketId string
TradedVolume float64
}
6 changes: 4 additions & 2 deletions streaming/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ type Stream interface {
}

type MarketStream struct {
Cache map[string]MarketCache
OutputChannel chan MarketBook
Cache map[string]MarketCache
}

func (ms *MarketStream) OnSubscribe(changeMessage MarketChangeMessage) {
Expand All @@ -32,10 +33,11 @@ func (ms *MarketStream) OnUpdate(changeMessage MarketChangeMessage) {

if marketCache, ok := ms.Cache[marketChange.MarketId]; ok {
marketCache.UpdateCache(changeMessage, marketChange)
ms.OutputChannel<-marketCache.Snap()
} else {
ms.Cache[marketChange.MarketId] = *CreateMarketCache(changeMessage, marketChange)
log.Println("Created new market cache", marketChange.MarketId)
//todo snap here
}
}
// todo output snap
}

0 comments on commit 5e46b86

Please sign in to comment.