Skip to content

Commit

Permalink
autopush@1482965390
Browse files Browse the repository at this point in the history
  • Loading branch information
denkhaus committed Dec 28, 2016
1 parent 30e548d commit 07b95b8
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 29 deletions.
108 changes: 96 additions & 12 deletions cypher.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,124 @@
package main

const (
CYPHER_CONSTRAINT_TWEET = `CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;`
CYPHER_CONSTRAINT_USER = `CREATE CONSTRAINT ON (u:User) ASSERT u.screen_name IS UNIQUE;`
CYPHER_CONSTRAINT_HASHTAG = `CREATE CONSTRAINT ON (h:Hashtag) ASSERT h.name IS UNIQUE;`
CYPHER_CONSTRAINT_LINK = `CREATE CONSTRAINT ON (l:Link) ASSERT l.url IS UNIQUE;`
CYPHER_CONSTRAINT_SOURCE = `CREATE CONSTRAINT ON (s:Source) ASSERT s.name IS UNIQUE;`
CYPHER_CONSTRAINT_TWEET = `CREATE CONSTRAINT ON (t:Tweet) ASSERT t.id IS UNIQUE;`
CYPHER_CONSTRAINT_USERNAME = `CREATE CONSTRAINT ON (u:TwitterUser) ASSERT u.screen_name IS UNIQUE;`
CYPHER_CONSTRAINT_USERID = `CREATE CONSTRAINT ON (u:TwitterUser) ASSERT u.id IS UNIQUE;`
CYPHER_CONSTRAINT_HASHTAG = `CREATE CONSTRAINT ON (h:Hashtag) ASSERT h.name IS UNIQUE;`
CYPHER_CONSTRAINT_LINK = `CREATE CONSTRAINT ON (l:Link) ASSERT l.url IS UNIQUE;`
CYPHER_CONSTRAINT_SOURCE = `CREATE CONSTRAINT ON (s:Source) ASSERT s.name IS UNIQUE;`

CYPHER_TWEETS_MAX_ID = `
MATCH
(u:User {screen_name:{screen_name}})-[:POSTS]->(t:Tweet)
(u:TwitterUser {screen_name:{screen_name}})-[:POSTS]->(t:Tweet)
RETURN
max(t.id) AS max_id
`

CYPHER_MENTIONS_MAX_ID = `
MATCH
(u:User {screen_name:{screen_name}})<-[m:MENTIONS]-(t:Tweet)
(u:TwitterUser {screen_name:{screen_name}})<-[m:MENTIONS]-(t:Tweet)
WHERE
m.method="mention_search"
RETURN
max(t.id) AS max_id
`
CYPHER_NEED_GRAPH_UPDATE = `
MATCH
(a:TwitterUser)
WHERE
EXISTS(a.graph_updated) AND a.following <> 0
WITH
a, size((a)-[:FOLLOWS]->()) as following_count
WHERE
a.following <> following_count
RETURN
a.id as id
ORDER BY
a.graph_updated
LIMIT 100
`

CYPHER_REMOVE_FOLLOWS_REL = `
MATCH (mainUser:TwitterUser {id:{id}})-[rel:FOLLOWS]->()
DELETE rel
`
CYPHER_MERGE_FOLLOWING_IDS = `
UNWIND {ids} AS id
WITH id
MATCH (mainUser:TwitterUser {id:{user_id}})
MERGE (followedUser:TwitterUser {id:id})
ON CREATE SET
followedUser.followers = 1,
followedUser.following = -1,
followedUser.graph_updated = 0
MERGE (mainUser)-[:FOLLOWS]->(followedUser)
SET mainUser.graph_updated = timestamp()
`

CYPHER_USERS_NEED_COMPLETION = `
MATCH (n1:TwitterUser)
WHERE NOT EXISTS(n1.created_at) AND EXISTS(n1.id)
RETURN n1.id as id
LIMIT 100
`

CYPHER_USERS_UPDATE_BY_NAME = `
UNWIND {users} AS u
WITH u
MERGE (user:TwitterUser {screen_name:u.screen_name})
ON CREATE SET user.graph_updated = 0
SET user.id = u.id_str,
user.name = u.name,
user.created_at = u.created_at,
user.location = u.location,
user.followers = u.followers_count,
user.following = u.friends_count,
user.statuses = u.statuses_count,
user.url = u.url,
user.profile_image_url = u.profile_image_url,
user.last_updated = timestamp()
`
CYPHER_USERS_UPDATE_BY_ID = `
UNWIND
{users} AS u
WITH
u
MERGE
(user:TwitterUser {id:u.id_str})
ON CREATE SET
user.graph_updated = 0
SET
user.screen_name = u.screen_name,
user.name = u.name,
user.created_at = u.created_at,
user.location = u.location,
user.followers = u.followers_count,
user.following = u.friends_count,
user.statuses = u.statuses_count,
user.url = u.url,
user.profile_image_url = u.profile_image_url,
user.last_updated = timestamp()
`

CYPHER_FOLLOWERS_IMPORT = `
UNWIND {users} AS u
WITH u
MERGE (user:User {screen_name:u.screen_name})
SET user.name = u.name,
MERGE (user:TwitterUser {screen_name:u.screen_name})
ON CREATE SET
user.graph_updated = 0
SET
user.name = u.name,
user.id = u.id_str,
user.created_at = u.created_at,
user.location = u.location,
user.followers = u.followers_count,
user.following = u.friends_count,
user.statuses = u.statusus_count,
user.statuses = u.statuses_count,
user.url = u.url,
user.profile_image_url = u.profile_image_url
MERGE (mainUser:User {screen_name:{screen_name}})
user.profile_image_url = u.profile_image_url,
user.last_updated = timestamp()
MERGE (mainUser:TwitterUser {screen_name:{screen_name}})
MERGE (user)-[:FOLLOWS]->(mainUser)
`

Expand Down
38 changes: 35 additions & 3 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"fmt"
"time"

"net/url"

"github.com/ChimeraCoder/anaconda"
"github.com/denkhaus/neoism"
"github.com/juju/errors"
"github.com/kurrik/oauth1a"
Expand All @@ -15,11 +18,14 @@ const (
TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json?%v"
TW_PATH_MENTIONS_TIMELINE = "/1.1/statuses/mentions_timeline.json?%v"
TW_PATH_FOLLOWERS_TIMELINE = "/1.1/followers/list.json?%v"
TW_PATH_FOLLOWERS_IDS = "/1.1/followers/ids.json?%v"
TW_PATH_USER_LOOKUP = "/1.1/users/lookup.json?"
)

type Engine struct {
cnf *Config
client *twittergo.Client
api *anaconda.TwitterApi
}

func NewEngine(cnf *Config) *Engine {
Expand All @@ -30,11 +36,26 @@ func NewEngine(cnf *Config) *Engine {
return eng
}

func (p *Engine) Close() {
if p.api != nil {
p.api.Close()
p.api = nil
}
}

func (p *Engine) ensureTwitterClient() {
if p.client != nil {
return
}

anaconda.SetConsumerKey(p.cnf.twitterConsumerKey)
anaconda.SetConsumerSecret(p.cnf.twitterConsumerSecret)

p.api = anaconda.NewTwitterApi(
p.cnf.twitterUserKey,
p.cnf.twitterUserSecret,
)

config := &oauth1a.ClientConfig{
ConsumerKey: p.cnf.twitterConsumerKey,
ConsumerSecret: p.cnf.twitterConsumerSecret,
Expand All @@ -52,8 +73,18 @@ func (p *Engine) openNeoDB() (*neoism.Database, error) {
return nil, errors.New("invalid neo4j credentials")
}

hostPort := fmt.Sprintf("%s:%d", p.cnf.neoHost, p.cnf.neoPort)
db, err := neoism.Connect(hostPort, p.cnf.neoUsername, p.cnf.neoPassword)
u := url.URL{
Scheme: "http",
Host: fmt.Sprintf("%s:%d", p.cnf.neoHost, p.cnf.neoPort),
}

if p.cnf.neoUsername != "" {
u.User = url.UserPassword(p.cnf.neoUsername, p.cnf.neoPassword)
}

logger.Debug("neo4j connection string:", u.String())

db, err := neoism.Connect(u.String())
if err != nil {
return nil, errors.Annotate(err, "open database")
}
Expand All @@ -77,7 +108,8 @@ func (p *Engine) initDatabase(db *neoism.Database) error {

cyphers := []string{
CYPHER_CONSTRAINT_TWEET,
CYPHER_CONSTRAINT_USER,
CYPHER_CONSTRAINT_USERNAME,
CYPHER_CONSTRAINT_USERID,
CYPHER_CONSTRAINT_HASHTAG,
CYPHER_CONSTRAINT_LINK,
CYPHER_CONSTRAINT_SOURCE,
Expand Down
113 changes: 113 additions & 0 deletions graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"net/url"
// "github.com/davecgh/go-spew/spew"
"strconv"

"github.com/denkhaus/neoism"
"github.com/juju/errors"
)

func (p *Engine) execQuery(db *neoism.Database, statmnt string, props neoism.Props) (*CypherResult, error) {

res := NewCypherResult()
cq := neoism.CypherQuery{
Statement: statmnt,
Result: &res.Raw,
Parameters: props,
}

if err := db.Cypher(&cq); err != nil {
return nil, errors.Annotate(err, "db query")
}

return res, nil
}

func (p *Engine) MaintainGraph() error {
logger.Info("maintain graph")

db, err := p.ensureClients()
if err != nil {
return errors.Annotate(err, "ensure clients")
}

res, err := p.execQuery(db, CYPHER_NEED_GRAPH_UPDATE, nil)
if err != nil {
return errors.Annotate(err, "exec following mismatch")
}

ids := res.FilterResultsBy("id").ToStringSlice()

if len(ids) > 0 {

commonIdCount := 0
params := url.Values{}

for _, idStr := range ids {

if commonIdCount != 0 {
logger.Infof("%d relations updated", commonIdCount)
}

logger.Infof("update user #%s following relations", idStr)

props := neoism.Props{
"id": idStr,
}

logger.Infof("remove following relations for user #%s", idStr)
_, err = p.execQuery(db, CYPHER_REMOVE_FOLLOWS_REL, props)
if err != nil {
return errors.Annotate(err, "exec remove follows relationship")
}

commonIdCount = 0
params.Set("user_id", idStr)
cursor := "-1"

for {

logger.Infof("retrive users user #%s is following", idStr)

params.Set("cursor", cursor)
cur, err := p.api.GetFriendsIds(params)
if err != nil {
return errors.Annotate(err, "get friends ids")
}

idCount := len(cur.Ids)
if idCount == 0 {
break
}

commonIdCount += idCount
props = neoism.Props{
"user_id": idStr,
"ids": toString(cur.Ids),
}

logger.Infof("merge %d users user #%s is following", idCount, idStr)
_, err = p.execQuery(db, CYPHER_MERGE_FOLLOWING_IDS, props)
if err != nil {
return errors.Annotate(err, "insert following ids")
}

cursor = cur.Next_cursor_str
}
}
}

return nil
}

func toString(arr []int64) []string {

ret := make([]string, len(arr))
for idx, val := range arr {
ret[idx] = strconv.FormatInt(val, 10)
}

return ret
}
2 changes: 2 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ func exec(ctx *cli.Context, fn func(eng *Engine) error) {
}

eng := NewEngine(cnf)
defer eng.Close()

if err := fn(eng); err != nil {
logger.Fatalf("exec error:%s", err)
}
Expand Down
36 changes: 22 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func main() {
Name: "host, d",
Usage: "Neo4j host",
EnvVar: "NEO4_HOST",
Value: "http://localhost",
Value: "localhost",
},
cli.IntFlag{
Name: "port, P",
Expand Down Expand Up @@ -108,20 +108,28 @@ func main() {
},
},
},
cli.Command{
Name: "maintain",
Subcommands: []cli.Command{
cli.Command{
Name: "users",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.CompleteUsers()
})
},
},
cli.Command{
Name: "graph",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.MaintainGraph()
})
},
},
},
},
}

app.Run(os.Args)
}

//logsPump := NewLogsPump(storagePath)

// closer.Bind(func() {
// logsPump.Shutdown()
// logger.Info("terminated")
// })

// closer.Checked(func() error {
// logger.Info("startup ---------------------------------------------")
// logsPump.RegisterAdapter(NewSplunkAdapter, host)
// return logsPump.Run()
// }, true)
Loading

0 comments on commit 07b95b8

Please sign in to comment.