Skip to content

Commit

Permalink
autopush@1457112954
Browse files Browse the repository at this point in the history
  • Loading branch information
denkhaus committed Mar 4, 2016
1 parent b40cf01 commit 30e548d
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 121 deletions.
19 changes: 0 additions & 19 deletions Release.key

This file was deleted.

27 changes: 26 additions & 1 deletion cypher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@ const (
max(t.id) AS max_id
`

CYPHER_MENTIONS_MAX_ID = `
MATCH
(u:User {screen_name:{screen_name}})<-[m:MENTIONS]-(t:Tweet)
WHERE
m.method="mention_search"
RETURN
max(t.id) AS max_id
`

CYPHER_FOLLOWERS_IMPORT = `
UNWIND {users} AS u
WITH u
MERGE (user:User {screen_name:u.screen_name})
SET user.name = u.name,
user.location = u.location,
user.followers = u.followers_count,
user.following = u.friends_count,
user.statuses = u.statusus_count,
user.url = u.url,
user.profile_image_url = u.profile_image_url
MERGE (mainUser:User {screen_name:{screen_name}})
MERGE (user)-[:FOLLOWS]->(mainUser)
`

CYPHER_TWEETS_IMPORT = `
UNWIND {tweets} AS t
WITH t
Expand Down Expand Up @@ -51,7 +75,8 @@ const (
FOREACH (m IN e.user_mentions |
MERGE (mentioned:User {screen_name:m.screen_name})
ON CREATE SET mentioned.name = m.name
MERGE (tweet)-[:MENTIONS]->(mentioned)
MERGE (tweet)-[mts:MENTIONS]->(mentioned)
SET mts.method = {mention_type}
)
FOREACH (r IN [r IN [t.in_reply_to_status_id] WHERE r IS NOT NULL] |
MERGE (reply_tweet:Tweet {id:r})
Expand Down
138 changes: 39 additions & 99 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package main

import (
"fmt"
"net/http"
"net/url"
"time"

"github.com/denkhaus/neoism"
Expand All @@ -13,8 +11,10 @@ import (
)

const (
TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json"
TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json"
TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json?%v"
TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json?%v"
TW_PATH_MENTIONS_TIMELINE = "/1.1/statuses/mentions_timeline.json?%v"
TW_PATH_FOLLOWERS_TIMELINE = "/1.1/followers/list.json?%v"
)

type Engine struct {
Expand Down Expand Up @@ -61,83 +61,62 @@ func (p *Engine) openNeoDB() (*neoism.Database, error) {
return db, nil
}

func (p *Engine) AddUser() error {
func (p *Engine) ensureClients() (*neoism.Database, error) {
p.ensureTwitterClient()

return nil
db, err := p.openNeoDB()
if err != nil {
return nil, errors.Annotate(err, "open database")
}
err = p.initDatabase(db)
return db, err
}

func (p *Engine) tweetsGet(screenName string, maxID uint64) (twittergo.Timeline, error) {
query := url.Values{}
query.Set("count", "200")
query.Set("screen_name", screenName)

var results = twittergo.Timeline{}
minwait := time.Duration(10) * time.Second

for {
if maxID != 0 {
query.Set("max_id", fmt.Sprintf("%v", maxID))
}
func (p *Engine) initDatabase(db *neoism.Database) error {
logger.Infof("init database")

endpoint := fmt.Sprintf("/1.1/statuses/user_timeline.json?%v", query.Encode())
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, errors.Annotate(err, "create request")
}

resp, err := p.client.SendRequest(req)
if err != nil {
return nil, errors.Annotate(err, "send request")
}
cyphers := []string{
CYPHER_CONSTRAINT_TWEET,
CYPHER_CONSTRAINT_USER,
CYPHER_CONSTRAINT_HASHTAG,
CYPHER_CONSTRAINT_LINK,
CYPHER_CONSTRAINT_SOURCE,
}

if err = resp.Parse(&results); err != nil {
if rle, ok := err.(twittergo.RateLimitError); ok {
dur := rle.Reset.Sub(time.Now()) + time.Second
if dur < minwait {
dur = minwait
}

logger.Infof("Rate limited. Reset at %v. Waiting for %v\n", rle.Reset, dur)
time.Sleep(dur)
continue
} else {
logger.Errorf("Problem parsing response: %v\n", err)
}
for _, cy := range cyphers {
cq := neoism.CypherQuery{
Statement: cy,
}

if resp.HasRateLimit() {
logger.Infof("ratelimit: %v calls available", resp.RateLimitRemaining())
if err := db.Cypher(&cq); err != nil {
return errors.Annotate(err, "db query")
}
break
}

return results, nil
return nil
}

func (p *Engine) tweetsImport(db *neoism.Database, tweets twittergo.Timeline) error {
logger.Infof("import %d new tweets", len(tweets))
cq := neoism.CypherQuery{
Statement: CYPHER_TWEETS_IMPORT,
Parameters: neoism.Props{"tweets": tweets},
}
func (p *Engine) handleRateLimitError(err error) bool {
minwait := time.Duration(10) * time.Second
if rle, ok := err.(twittergo.RateLimitError); ok {
dur := rle.Reset.Sub(time.Now()) + time.Second
if dur < minwait {
dur = minwait
}

if err := db.Cypher(&cq); err != nil {
return errors.Annotate(err, "db query")
logger.Infof("Rate limited. Reset at %v. Waiting for %v\n", rle.Reset, dur)
time.Sleep(dur)
return true
}

return nil
return false
}

func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) {
db, err := p.openNeoDB()
if err != nil {
return 0, errors.Annotate(err, "open database")
}
func (p *Engine) getMaxID(db *neoism.Database, cypher, screenName string) (uint64, error) {

var maxIDData []interface{}
cq := neoism.CypherQuery{
Statement: CYPHER_TWEETS_MAX_ID,
Statement: cypher,
Parameters: neoism.Props{"screen_name": screenName},
Result: &maxIDData,
}
Expand All @@ -153,42 +132,3 @@ func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) {

return 0, nil
}

func (p *Engine) AddTweets() error {
logger.Info("add tweets")
p.ensureTwitterClient()

db, err := p.openNeoDB()
if err != nil {
return errors.Annotate(err, "open database")
}

screenName, err := p.cnf.ScreenName()
if err != nil {
return err
}

maxID, err := p.tweetsGetMaxID(screenName)
if err != nil {
return errors.Annotate(err, "tweets get max id")
}

tweets, err := p.tweetsGet(screenName, maxID)
if err != nil {
return errors.Annotate(err, "get tweets")
}

for len(tweets) > 0 {
maxID = tweets[len(tweets)-1].Id() - 1
if err = p.tweetsImport(db, tweets); err != nil {
return errors.Annotate(err, "import tweets")
}

tweets, err = p.tweetsGet(screenName, maxID)
if err != nil {
return errors.Annotate(err, "get tweets")
}
}

return nil
}
106 changes: 106 additions & 0 deletions followers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"fmt"
"net/http"
"net/url"

"github.com/denkhaus/neoism"
"github.com/juju/errors"
)

func (p *Engine) usersGet(apiURL string, cursor int64, params map[string]string) ([]interface{}, int64, error) {
query := url.Values{}
for k, v := range params {
query.Set(k, v)
}

query.Set("cursor", fmt.Sprintf("%v", cursor))
results := make(map[string]interface{})

for {
endpoint := fmt.Sprintf(apiURL, query.Encode())
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, 0, errors.Annotate(err, "create request")
}

resp, err := p.client.SendRequest(req)
if err != nil {
return nil, 0, errors.Annotate(err, "send request")
}

if err = resp.Parse(&results); err != nil {
if p.handleRateLimitError(err) {
continue
} else {
logger.Errorf("Problem parsing response: %v\n", err)
}
}

if resp.HasRateLimit() {
logger.Infof("ratelimit: %v calls available", resp.RateLimitRemaining())
}
break
}

users := results["users"].([]interface{})
cursor = int64(results["next_cursor"].(float64))
return users, cursor, nil
}

func (p *Engine) usersImport(db *neoism.Database, users []interface{}, props neoism.Props) error {
logger.Infof("import %d users", len(users))

props["users"] = users
cq := neoism.CypherQuery{
Statement: CYPHER_FOLLOWERS_IMPORT,
Parameters: props,
}

if err := db.Cypher(&cq); err != nil {
return errors.Annotate(err, "db query")
}

return nil
}

func (p *Engine) AddFollowers() error {
logger.Info("add followers")
db, err := p.ensureClients()
if err != nil {
return errors.Annotate(err, "ensure clients")
}

screenName, err := p.cnf.ScreenName()
if err != nil {
return err
}

params := map[string]string{
"count": "200",
"screen_name": screenName,
}

props := neoism.Props{
"screen_name": screenName,
}

users, cursor, err := p.usersGet(TW_PATH_FOLLOWERS_TIMELINE, -1, params)
if err != nil {
return errors.Annotate(err, "get followers")
}

for len(users) > 0 {
if err = p.usersImport(db, users, props); err != nil {
return errors.Annotate(err, "import followers")
}

users, cursor, err = p.usersGet(TW_PATH_FOLLOWERS_TIMELINE, cursor, params)
if err != nil {
return errors.Annotate(err, "get followers")
}
}

return nil
}
7 changes: 7 additions & 0 deletions friends.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

func (p *Engine) AddFriends() error {
p.ensureTwitterClient()

return nil
}
20 changes: 18 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,26 @@ func main() {
Name: "add",
Subcommands: []cli.Command{
cli.Command{
Name: "user",
Name: "friends",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.AddUser()
return eng.AddFriends()
})
},
},
cli.Command{
Name: "followers",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.AddFollowers()
})
},
},
cli.Command{
Name: "mentions",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.AddMentions()
})
},
},
Expand Down
Loading

0 comments on commit 30e548d

Please sign in to comment.