Skip to content

Commit

Permalink
autopush@1483311470
Browse files Browse the repository at this point in the history
  • Loading branch information
denkhaus committed Jan 1, 2017
1 parent d126946 commit c8623d6
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 55 deletions.
65 changes: 54 additions & 11 deletions cypher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,49 @@ const (
RETURN
max(t.id) AS max_id
`
CYPHER_NEED_GRAPH_UPDATE = `

CYPHER_NEED_GRAPH_UPDATE_FOLLOWING = `
MATCH
(a:TwitterUser)
WHERE
EXISTS(a.graph_updated) AND a.following <> 0
EXISTS(a.following_upd) AND a.following <> 0
WITH
a, size((a)-[:FOLLOWS]->()) as following_count
WHERE
a.following <> following_count
RETURN
a.id as id
ORDER BY
a.graph_updated
a.following_upd
LIMIT 100
`

CYPHER_REMOVE_FOLLOWS_REL = `
MATCH (mainUser:TwitterUser {id:{id}})-[rel:FOLLOWS]->()
CYPHER_NEED_GRAPH_UPDATE_FOLLOWERS = `
MATCH
(a:TwitterUser)
WHERE
EXISTS(a.followers_upd) AND a.followers <> 0
WITH
a, size(()-[:FOLLOWS]->(a)) as followers_count
WHERE
a.followers <> followers_count
RETURN
a.id as id
ORDER BY
a.followers_upd
LIMIT 100
`

CYPHER_REMOVE_FOLLOWING_REL = `
MATCH (TwitterUser {id:{id}})-[rel:FOLLOWS]->()
DELETE rel
`

CYPHER_REMOVE_FOLLOWERS_REL = `
MATCH (TwitterUser {id:{id}})<-[rel:FOLLOWS]-()
DELETE rel
`

CYPHER_MERGE_FOLLOWING_IDS = `
UNWIND {ids} AS id
WITH id
Expand All @@ -51,9 +74,24 @@ const (
ON CREATE SET
followedUser.followers = 1,
followedUser.following = -1,
followedUser.graph_updated = 0
followedUser.following_upd = 0,
followedUser.followers_upd = 0
MERGE (mainUser)-[:FOLLOWS]->(followedUser)
SET mainUser.graph_updated = timestamp()
SET mainUser.following_upd = timestamp()
`

CYPHER_MERGE_FOLLOWERS_IDS = `
UNWIND {ids} AS id
WITH id
MATCH (mainUser:TwitterUser {id:{user_id}})
MERGE (follower:TwitterUser {id:id})
ON CREATE SET
follower.followers = -1,
follower.following = 1,
follower.following_upd = 0,
follower.followers_upd = 0
MERGE (follower)-[:FOLLOWS]->(mainUser)
SET mainUser.followers_upd = timestamp()
`

CYPHER_USERS_NEED_COMPLETION = `
Expand All @@ -67,8 +105,11 @@ const (
UNWIND {users} AS u
WITH u
MERGE (user:TwitterUser {screen_name:u.screen_name})
ON CREATE SET user.graph_updated = 0
SET user.id = u.id_str,
ON CREATE SET
user.following_upd = 0,
user.followers_upd = 0
SET
user.id = u.id_str,
user.name = u.name,
user.created_at = u.created_at,
user.location = u.location,
Expand All @@ -87,7 +128,8 @@ const (
MERGE
(user:TwitterUser {id:u.id_str})
ON CREATE SET
user.graph_updated = 0
user.following_upd = 0,
user.followers_upd = 0
SET
user.screen_name = u.screen_name,
user.name = u.name,
Expand All @@ -106,7 +148,8 @@ const (
WITH u
MERGE (user:TwitterUser {screen_name:u.screen_name})
ON CREATE SET
user.graph_updated = 0
user.following_upd = 0,
user.followers_upd = 0
SET
user.name = u.name,
user.id = u.id_str,
Expand Down
15 changes: 15 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ func (p *Engine) initDatabase(db *neoism.Database) error {
return nil
}

func (p *Engine) execQuery(db *neoism.Database, statmnt string, props neoism.Props) (*CypherResult, error) {

res := NewCypherResult()
cq := neoism.CypherQuery{
Statement: statmnt,
Result: &res.Raw,
Parameters: props,
}

if err := db.Cypher(&cq); err != nil {
return nil, errors.Annotate(err, "db query")
}

return res, nil
}
func (p *Engine) handleRateLimitError(err error) bool {
minwait := time.Duration(10) * time.Second
if rle, ok := err.(twittergo.RateLimitError); ok {
Expand Down
7 changes: 0 additions & 7 deletions friends.go

This file was deleted.

156 changes: 129 additions & 27 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,116 @@ package main

import (
"net/url"
// "github.com/davecgh/go-spew/spew"
"strconv"
"time"

"github.com/ChimeraCoder/anaconda"
// "github.com/davecgh/go-spew/spew"
"github.com/jmcvetta/neoism"
"github.com/juju/errors"
)

func (p *Engine) execQuery(db *neoism.Database, statmnt string, props neoism.Props) (*CypherResult, error) {

res := NewCypherResult()
cq := neoism.CypherQuery{
Statement: statmnt,
Result: &res.Raw,
Parameters: props,
func (p *Engine) maintainFollowing(db *neoism.Database) error {
res, err := p.execQuery(db, CYPHER_NEED_GRAPH_UPDATE_FOLLOWING, nil)
if err != nil {
return errors.Annotate(err, "exec following mismatch")
}

if err := db.Cypher(&cq); err != nil {
return nil, errors.Annotate(err, "db query")
ids := res.FilterResultsBy("id").ToStringSlice()

if len(ids) > 0 {

commonIdCount := 0
params := url.Values{}

for _, idStr := range ids {

if commonIdCount != 0 {
logger.Infof("%d relations updated", commonIdCount)
}

props := neoism.Props{
"id": idStr,
}

logger.Infof("remove following relations for user #%s", idStr)
_, err = p.execQuery(db, CYPHER_REMOVE_FOLLOWING_REL, props)
if err != nil {
return errors.Annotate(err, "exec remove follows relationship")
}

commonIdCount = 0
params.Set("user_id", idStr)
cursor := "-1"

for {

logger.Infof("retrive users user #%s is following", idStr)

params.Set("cursor", cursor)
cur, err := p.api.GetFriendsIds(params)
if err != nil {
if apiErr, ok := err.(*anaconda.ApiError); ok {

if ok, tm := apiErr.RateLimitCheck(); ok {
dur := tm.Sub(time.Now())
logger.Warnf("rate limit error: wait for %s", dur)
time.Sleep(dur)
break
} else if apiErr.StatusCode > 400 {
logger.Warnf("received error code %d", apiErr.StatusCode)
break
}
}

return errors.Annotate(err, "get friends ids")
}

idCount := len(cur.Ids)
if idCount == 0 {
break
}

commonIdCount += idCount
props = neoism.Props{
"user_id": idStr,
"ids": toString(cur.Ids),
}

logger.Infof("merge %d users user #%s is following", idCount, idStr)
_, err = p.execQuery(db, CYPHER_MERGE_FOLLOWING_IDS, props)
if err != nil {
return errors.Annotate(err, "insert following ids")
}

cursor = cur.Next_cursor_str
}
}
}

return res, nil
return nil

}

func (p *Engine) MaintainGraph() error {
logger.Info("maintain graph")
func (p *Engine) MaintainFollowing() error {
logger.Info("maintain following")

db, err := p.ensureClients()
if err != nil {
return errors.Annotate(err, "ensure clients")
}

res, err := p.execQuery(db, CYPHER_NEED_GRAPH_UPDATE, nil)
if err := p.maintainFollowing(db); err != nil {
return errors.Annotate(err, "maintain following")
}

return nil
}

func (p *Engine) maintainFollowers(db *neoism.Database) error {
res, err := p.execQuery(db, CYPHER_NEED_GRAPH_UPDATE_FOLLOWERS, nil)
if err != nil {
return errors.Annotate(err, "exec following mismatch")
return errors.Annotate(err, "exec followers mismatch")
}

ids := res.FilterResultsBy("id").ToStringSlice()
Expand All @@ -46,21 +122,18 @@ func (p *Engine) MaintainGraph() error {
params := url.Values{}

for _, idStr := range ids {

if commonIdCount != 0 {
logger.Infof("%d relations updated", commonIdCount)
}

logger.Infof("update user #%s following relations", idStr)

props := neoism.Props{
"id": idStr,
}

logger.Infof("remove following relations for user #%s", idStr)
_, err = p.execQuery(db, CYPHER_REMOVE_FOLLOWS_REL, props)
logger.Infof("remove followers relations for user #%s", idStr)
_, err = p.execQuery(db, CYPHER_REMOVE_FOLLOWERS_REL, props)
if err != nil {
return errors.Annotate(err, "exec remove follows relationship")
return errors.Annotate(err, "remove followers relation")
}

commonIdCount = 0
Expand All @@ -69,12 +142,25 @@ func (p *Engine) MaintainGraph() error {

for {

logger.Infof("retrive users user #%s is following", idStr)
logger.Infof("retrive followers of user #%s", idStr)

params.Set("cursor", cursor)
cur, err := p.api.GetFriendsIds(params)
cur, err := p.api.GetFollowersIds(params)
if err != nil {
return errors.Annotate(err, "get friends ids")
if apiErr, ok := err.(*anaconda.ApiError); ok {

if ok, tm := apiErr.RateLimitCheck(); ok {
dur := tm.Sub(time.Now())
logger.Warnf("rate limit error: wait for %s", dur)
time.Sleep(dur)
break
} else if apiErr.StatusCode > 400 {
logger.Warnf("received error code %d", apiErr.StatusCode)
break
}
}

return errors.Annotate(err, "get followers ids")
}

idCount := len(cur.Ids)
Expand All @@ -88,17 +174,33 @@ func (p *Engine) MaintainGraph() error {
"ids": toString(cur.Ids),
}

logger.Infof("merge %d users user #%s is following", idCount, idStr)
_, err = p.execQuery(db, CYPHER_MERGE_FOLLOWING_IDS, props)
logger.Infof("merge %d followers of user #%s", idCount, idStr)
_, err = p.execQuery(db, CYPHER_MERGE_FOLLOWERS_IDS, props)
if err != nil {
return errors.Annotate(err, "insert following ids")
return errors.Annotate(err, "insert followers ids")
}

cursor = cur.Next_cursor_str
}
}
}

return nil

}

func (p *Engine) MaintainFollowers() error {
logger.Info("maintain followers")

db, err := p.ensureClients()
if err != nil {
return errors.Annotate(err, "ensure clients")
}

if err := p.maintainFollowers(db); err != nil {
return errors.Annotate(err, "maintain followers")
}

return nil
}

Expand Down
Loading

0 comments on commit c8623d6

Please sign in to comment.