From 07b95b8e773c965bb89465efc2655be63acbf622 Mon Sep 17 00:00:00 2001 From: denkhaus Date: Wed, 28 Dec 2016 23:49:50 +0100 Subject: [PATCH] autopush@1482965390 --- cypher.go | 108 +++++++++++++++++++++++++++++++++++++++++++++------ engine.go | 38 ++++++++++++++++-- graph.go | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ helper.go | 2 + main.go | 36 ++++++++++------- neo4j.go | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++ user.go | 56 +++++++++++++++++++++++++++ 7 files changed, 429 insertions(+), 29 deletions(-) create mode 100644 graph.go create mode 100644 neo4j.go create mode 100644 user.go diff --git a/cypher.go b/cypher.go index 16bd467..88943e4 100644 --- a/cypher.go +++ b/cypher.go @@ -1,40 +1,124 @@ package main const ( - CYPHER_CONSTRAINT_TWEET = `CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;` - CYPHER_CONSTRAINT_USER = `CREATE CONSTRAINT ON (u:User) ASSERT u.screen_name IS UNIQUE;` - CYPHER_CONSTRAINT_HASHTAG = `CREATE CONSTRAINT ON (h:Hashtag) ASSERT h.name IS UNIQUE;` - CYPHER_CONSTRAINT_LINK = `CREATE CONSTRAINT ON (l:Link) ASSERT l.url IS UNIQUE;` - CYPHER_CONSTRAINT_SOURCE = `CREATE CONSTRAINT ON (s:Source) ASSERT s.name IS UNIQUE;` + CYPHER_CONSTRAINT_TWEET = `CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;` + CYPHER_CONSTRAINT_USERNAME = `CREATE CONSTRAINT ON (u:TwitterUser) ASSERT u.screen_name IS UNIQUE;` + CYPHER_CONSTRAINT_USERID = `CREATE CONSTRAINT ON (u:TwitterUser) ASSERT u.id IS UNIQUE;` + CYPHER_CONSTRAINT_HASHTAG = `CREATE CONSTRAINT ON (h:Hashtag) ASSERT h.name IS UNIQUE;` + CYPHER_CONSTRAINT_LINK = `CREATE CONSTRAINT ON (l:Link) ASSERT l.url IS UNIQUE;` + CYPHER_CONSTRAINT_SOURCE = `CREATE CONSTRAINT ON (s:Source) ASSERT s.name IS UNIQUE;` CYPHER_TWEETS_MAX_ID = ` MATCH - (u:User {screen_name:{screen_name}})-[:POSTS]->(t:Tweet) + (u:TwitterUser {screen_name:{screen_name}})-[:POSTS]->(t:Tweet) RETURN max(t.id) AS max_id ` CYPHER_MENTIONS_MAX_ID = ` MATCH - (u:User {screen_name:{screen_name}})<-[m:MENTIONS]-(t:Tweet) + (u:TwitterUser {screen_name:{screen_name}})<-[m:MENTIONS]-(t:Tweet) WHERE m.method="mention_search" RETURN max(t.id) AS max_id ` + CYPHER_NEED_GRAPH_UPDATE = ` + MATCH + (a:TwitterUser) + WHERE + EXISTS(a.graph_updated) AND a.following <> 0 + WITH + a, size((a)-[:FOLLOWS]->()) as following_count + WHERE + a.following <> following_count + RETURN + a.id as id + ORDER BY + a.graph_updated + LIMIT 100 + ` + + CYPHER_REMOVE_FOLLOWS_REL = ` + MATCH (mainUser:TwitterUser {id:{id}})-[rel:FOLLOWS]->() + DELETE rel + ` + CYPHER_MERGE_FOLLOWING_IDS = ` + UNWIND {ids} AS id + WITH id + MATCH (mainUser:TwitterUser {id:{user_id}}) + MERGE (followedUser:TwitterUser {id:id}) + ON CREATE SET + followedUser.followers = 1, + followedUser.following = -1, + followedUser.graph_updated = 0 + MERGE (mainUser)-[:FOLLOWS]->(followedUser) + SET mainUser.graph_updated = timestamp() + ` + + CYPHER_USERS_NEED_COMPLETION = ` + MATCH (n1:TwitterUser) + WHERE NOT EXISTS(n1.created_at) AND EXISTS(n1.id) + RETURN n1.id as id + LIMIT 100 + ` + + CYPHER_USERS_UPDATE_BY_NAME = ` + UNWIND {users} AS u + WITH u + MERGE (user:TwitterUser {screen_name:u.screen_name}) + ON CREATE SET user.graph_updated = 0 + SET user.id = u.id_str, + user.name = u.name, + user.created_at = u.created_at, + user.location = u.location, + user.followers = u.followers_count, + user.following = u.friends_count, + user.statuses = u.statuses_count, + user.url = u.url, + user.profile_image_url = u.profile_image_url, + user.last_updated = timestamp() + ` + CYPHER_USERS_UPDATE_BY_ID = ` + UNWIND + {users} AS u + WITH + u + MERGE + (user:TwitterUser {id:u.id_str}) + ON CREATE SET + user.graph_updated = 0 + SET + user.screen_name = u.screen_name, + user.name = u.name, + user.created_at = u.created_at, + user.location = u.location, + user.followers = u.followers_count, + user.following = u.friends_count, + user.statuses = u.statuses_count, + user.url = u.url, + user.profile_image_url = u.profile_image_url, + user.last_updated = timestamp() + ` CYPHER_FOLLOWERS_IMPORT = ` UNWIND {users} AS u WITH u - MERGE (user:User {screen_name:u.screen_name}) - SET user.name = u.name, + MERGE (user:TwitterUser {screen_name:u.screen_name}) + ON CREATE SET + user.graph_updated = 0 + SET + user.name = u.name, + user.id = u.id_str, + user.created_at = u.created_at, user.location = u.location, user.followers = u.followers_count, user.following = u.friends_count, - user.statuses = u.statusus_count, + user.statuses = u.statuses_count, user.url = u.url, - user.profile_image_url = u.profile_image_url - MERGE (mainUser:User {screen_name:{screen_name}}) + user.profile_image_url = u.profile_image_url, + user.last_updated = timestamp() + MERGE (mainUser:TwitterUser {screen_name:{screen_name}}) MERGE (user)-[:FOLLOWS]->(mainUser) ` diff --git a/engine.go b/engine.go index 5f470e7..9585b73 100644 --- a/engine.go +++ b/engine.go @@ -4,6 +4,9 @@ import ( "fmt" "time" + "net/url" + + "github.com/ChimeraCoder/anaconda" "github.com/denkhaus/neoism" "github.com/juju/errors" "github.com/kurrik/oauth1a" @@ -15,11 +18,14 @@ const ( TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json?%v" TW_PATH_MENTIONS_TIMELINE = "/1.1/statuses/mentions_timeline.json?%v" TW_PATH_FOLLOWERS_TIMELINE = "/1.1/followers/list.json?%v" + TW_PATH_FOLLOWERS_IDS = "/1.1/followers/ids.json?%v" + TW_PATH_USER_LOOKUP = "/1.1/users/lookup.json?" ) type Engine struct { cnf *Config client *twittergo.Client + api *anaconda.TwitterApi } func NewEngine(cnf *Config) *Engine { @@ -30,11 +36,26 @@ func NewEngine(cnf *Config) *Engine { return eng } +func (p *Engine) Close() { + if p.api != nil { + p.api.Close() + p.api = nil + } +} + func (p *Engine) ensureTwitterClient() { if p.client != nil { return } + anaconda.SetConsumerKey(p.cnf.twitterConsumerKey) + anaconda.SetConsumerSecret(p.cnf.twitterConsumerSecret) + + p.api = anaconda.NewTwitterApi( + p.cnf.twitterUserKey, + p.cnf.twitterUserSecret, + ) + config := &oauth1a.ClientConfig{ ConsumerKey: p.cnf.twitterConsumerKey, ConsumerSecret: p.cnf.twitterConsumerSecret, @@ -52,8 +73,18 @@ func (p *Engine) openNeoDB() (*neoism.Database, error) { return nil, errors.New("invalid neo4j credentials") } - hostPort := fmt.Sprintf("%s:%d", p.cnf.neoHost, p.cnf.neoPort) - db, err := neoism.Connect(hostPort, p.cnf.neoUsername, p.cnf.neoPassword) + u := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", p.cnf.neoHost, p.cnf.neoPort), + } + + if p.cnf.neoUsername != "" { + u.User = url.UserPassword(p.cnf.neoUsername, p.cnf.neoPassword) + } + + logger.Debug("neo4j connection string:", u.String()) + + db, err := neoism.Connect(u.String()) if err != nil { return nil, errors.Annotate(err, "open database") } @@ -77,7 +108,8 @@ func (p *Engine) initDatabase(db *neoism.Database) error { cyphers := []string{ CYPHER_CONSTRAINT_TWEET, - CYPHER_CONSTRAINT_USER, + CYPHER_CONSTRAINT_USERNAME, + CYPHER_CONSTRAINT_USERID, CYPHER_CONSTRAINT_HASHTAG, CYPHER_CONSTRAINT_LINK, CYPHER_CONSTRAINT_SOURCE, diff --git a/graph.go b/graph.go new file mode 100644 index 0000000..4c40dc9 --- /dev/null +++ b/graph.go @@ -0,0 +1,113 @@ +package main + +import ( + "net/url" + // "github.com/davecgh/go-spew/spew" + "strconv" + + "github.com/denkhaus/neoism" + "github.com/juju/errors" +) + +func (p *Engine) execQuery(db *neoism.Database, statmnt string, props neoism.Props) (*CypherResult, error) { + + res := NewCypherResult() + cq := neoism.CypherQuery{ + Statement: statmnt, + Result: &res.Raw, + Parameters: props, + } + + if err := db.Cypher(&cq); err != nil { + return nil, errors.Annotate(err, "db query") + } + + return res, nil +} + +func (p *Engine) MaintainGraph() error { + logger.Info("maintain graph") + + db, err := p.ensureClients() + if err != nil { + return errors.Annotate(err, "ensure clients") + } + + res, err := p.execQuery(db, CYPHER_NEED_GRAPH_UPDATE, nil) + if err != nil { + return errors.Annotate(err, "exec following mismatch") + } + + ids := res.FilterResultsBy("id").ToStringSlice() + + if len(ids) > 0 { + + commonIdCount := 0 + params := url.Values{} + + for _, idStr := range ids { + + if commonIdCount != 0 { + logger.Infof("%d relations updated", commonIdCount) + } + + logger.Infof("update user #%s following relations", idStr) + + props := neoism.Props{ + "id": idStr, + } + + logger.Infof("remove following relations for user #%s", idStr) + _, err = p.execQuery(db, CYPHER_REMOVE_FOLLOWS_REL, props) + if err != nil { + return errors.Annotate(err, "exec remove follows relationship") + } + + commonIdCount = 0 + params.Set("user_id", idStr) + cursor := "-1" + + for { + + logger.Infof("retrive users user #%s is following", idStr) + + params.Set("cursor", cursor) + cur, err := p.api.GetFriendsIds(params) + if err != nil { + return errors.Annotate(err, "get friends ids") + } + + idCount := len(cur.Ids) + if idCount == 0 { + break + } + + commonIdCount += idCount + props = neoism.Props{ + "user_id": idStr, + "ids": toString(cur.Ids), + } + + logger.Infof("merge %d users user #%s is following", idCount, idStr) + _, err = p.execQuery(db, CYPHER_MERGE_FOLLOWING_IDS, props) + if err != nil { + return errors.Annotate(err, "insert following ids") + } + + cursor = cur.Next_cursor_str + } + } + } + + return nil +} + +func toString(arr []int64) []string { + + ret := make([]string, len(arr)) + for idx, val := range arr { + ret[idx] = strconv.FormatInt(val, 10) + } + + return ret +} diff --git a/helper.go b/helper.go index d8d17f5..b13b0e3 100644 --- a/helper.go +++ b/helper.go @@ -9,6 +9,8 @@ func exec(ctx *cli.Context, fn func(eng *Engine) error) { } eng := NewEngine(cnf) + defer eng.Close() + if err := fn(eng); err != nil { logger.Fatalf("exec error:%s", err) } diff --git a/main.go b/main.go index 542ed12..756f534 100644 --- a/main.go +++ b/main.go @@ -26,7 +26,7 @@ func main() { Name: "host, d", Usage: "Neo4j host", EnvVar: "NEO4_HOST", - Value: "http://localhost", + Value: "localhost", }, cli.IntFlag{ Name: "port, P", @@ -108,20 +108,28 @@ func main() { }, }, }, + cli.Command{ + Name: "maintain", + Subcommands: []cli.Command{ + cli.Command{ + Name: "users", + Action: func(ctx *cli.Context) { + exec(ctx, func(eng *Engine) error { + return eng.CompleteUsers() + }) + }, + }, + cli.Command{ + Name: "graph", + Action: func(ctx *cli.Context) { + exec(ctx, func(eng *Engine) error { + return eng.MaintainGraph() + }) + }, + }, + }, + }, } app.Run(os.Args) } - -//logsPump := NewLogsPump(storagePath) - -// closer.Bind(func() { -// logsPump.Shutdown() -// logger.Info("terminated") -// }) - -// closer.Checked(func() error { -// logger.Info("startup ---------------------------------------------") -// logsPump.RegisterAdapter(NewSplunkAdapter, host) -// return logsPump.Run() -// }, true) diff --git a/neo4j.go b/neo4j.go new file mode 100644 index 0000000..4e593d2 --- /dev/null +++ b/neo4j.go @@ -0,0 +1,105 @@ +package main + +import ( + "strconv" +) + +type CypherResult struct { + Raw interface{} +} + +func NewCypherResult() *CypherResult { + return &CypherResult{} +} + +func (p *CypherResult) ToStringSlice() []string { + res, ok := p.Raw.([]interface{}) + if !ok { + panic("cypher result: wrong raw value type") + } + + ret := []string{} + for _, val := range res { + s := val.(string) + if len(s) > 0 { + ret = append(ret, s) + } + } + + return ret +} + +func (p *CypherResult) ToInt64Slice() []int64 { + res, ok := p.Raw.([]interface{}) + if !ok { + panic("cypher result: wrong raw value type") + } + + ret := []int64{} + for _, val := range res { + + if val != nil { + switch v := val.(type) { + case string: + l, err := strconv.ParseInt(v, 10, 64) + if err != nil { + panic("conversion error") + } + ret = append(ret, l) + case int64: + ret = append(ret, v) + case float64: + ret = append(ret, int64(v)) + default: + panic("unhandled type") + } + } + } + + return ret +} + +func (p *CypherResult) GetAt(idx int) *CypherResult { + res, ok := p.Raw.([]interface{}) + if !ok { + panic("cypher result: wrong raw value type") + } + + if len(res) > idx { + return &CypherResult{Raw: res[idx]} + } + + return nil +} + +func (p *CypherResult) FilterResultsBy(name string) *CypherResult { + res, ok := p.Raw.([]interface{}) + if !ok { + panic("cypher result: wrong raw value type") + } + + ret := []interface{}{} + for _, r := range res { + mp := r.(map[string]interface{}) + if val, ok := mp[name]; ok { + ret = append(ret, val) + } + } + + return &CypherResult{Raw: ret} +} + +func (p *CypherResult) GetProperty(name string) interface{} { + + res, ok := p.Raw.(map[string]interface{}) + if !ok { + panic("cypher result: wrong raw value type") + } + + val, ok := res[name] + if !ok { + return nil + } + + return val +} diff --git a/user.go b/user.go new file mode 100644 index 0000000..0a80b3e --- /dev/null +++ b/user.go @@ -0,0 +1,56 @@ +package main + +import ( + // "github.com/davecgh/go-spew/spew" + "time" + + "github.com/denkhaus/neoism" + "github.com/juju/errors" +) + +func (p *Engine) completeUsers(db *neoism.Database) error { + + res, err := p.execQuery(db, CYPHER_USERS_NEED_COMPLETION, nil) + if err != nil { + return errors.Annotate(err, "exec users need completion") + } + + ids := res.FilterResultsBy("id").ToInt64Slice() + + if len(ids) > 0 { + logger.Infof("%d user ids need completion -> fetch", len(ids)) + twUsers, err := p.api.GetUsersLookupByIds(ids, nil) + if err != nil { + return errors.Annotate(err, "lookup users by ids") + } + + props := neoism.Props{ + "users": twUsers, + } + + logger.Infof("got data for %d users -> apply", len(twUsers)) + if _, err := p.execQuery(db, CYPHER_USERS_UPDATE_BY_ID, props); err != nil { + return errors.Annotate(err, "users update by id") + } + } + + return nil +} + +func (p *Engine) CompleteUsers() error { + logger.Info("complete users") + + db, err := p.ensureClients() + if err != nil { + return errors.Annotate(err, "ensure clients") + } + + for { + if err := p.completeUsers(db); err != nil { + return errors.Annotate(err, "complete users") + } + + time.Sleep(10 * time.Second) + } + +}