diff --git a/Release.key b/Release.key deleted file mode 100644 index 549e441..0000000 --- a/Release.key +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN PGP PUBLIC KEY BLOCK----- -Version: GnuPG v1.4.5 (GNU/Linux) - -mQGiBE8ySNwRBACKcsg4AceGW7vuOSywcGiiNZg4I8vIzn7zphgj+s6/i2/hs8TQ -TOHy1s3b0f32ani3sBRmTWtgNv8+HSaKSVUcNqqNL+Jy6Pulyao7amGTtvqVKmuz -6ADrbVz0SxWmzTlSctdxK3g3yRSb+vfYrBPrbXNCNB5g9C/uK1uh0fpj1wCgn3MX -GFlZsO3bF7SNqqYlzpV6DM0D/AjnBlxTwRJOA1sLAVSXipLOdkjo00OQ8jgOFDio -oJsqLtsscNGocPCqNRo6QSVQKNs5sfYP2GXWTnPVM5oJI1p6bjuADRE8mneJrJvF -Uc+Mt1MdS6ttF4tVqd5Ncc8LKYQJRn+AtiuxAAayAA35JOslnMM0bqSYMlw4BJTP -8TlPA/4r+k+ucz3cJtM2aaL/ngUdrhmGKfGK7npXA3Ic6FzQo2NlJyKfWjSsR8x+ -oN7tKEpuYxE5fcdbWd3v+4Ypx0LYMgBT1pUgI09AXxrx1cLRZI4yovrvQJgiA4Mh -CZ0eFkJJ1nwQR3NPMj5soar+7d3Ubc2EqkESGm0CgGtjgg+7HrQ6aXN2Om93bkNs -b3VkIE9CUyBQcm9qZWN0IDxpc3Y6b3duQ2xvdWRAYnVpbGQub3BlbnN1c2Uub3Jn -PohmBBMRAgAmBQJU2MJUAhsDBQkJxSl4BgsJCAcDAgQVAggDBBYCAwECHgECF4AA -CgkQl3xDqLpoQiMpOQCggCpaq5yz8C3ckqEr0RkTjICcQYMAnRvjarxoY6iSThDN -7vgxE1Fe8X5EiEYEExECAAYFAk8ySNwACgkQOzARt2udZSPlpwCfQZKNN8Rxx0LE -BF32EYXBdEvkMYYAoIt3lfXL8uwmjvyljzb0JgBQN1cF -=FJ6Q ------END PGP PUBLIC KEY BLOCK----- diff --git a/cypher.go b/cypher.go index 74d4669..16bd467 100644 --- a/cypher.go +++ b/cypher.go @@ -14,6 +14,30 @@ const ( max(t.id) AS max_id ` + CYPHER_MENTIONS_MAX_ID = ` + MATCH + (u:User {screen_name:{screen_name}})<-[m:MENTIONS]-(t:Tweet) + WHERE + m.method="mention_search" + RETURN + max(t.id) AS max_id + ` + + CYPHER_FOLLOWERS_IMPORT = ` + UNWIND {users} AS u + WITH u + MERGE (user:User {screen_name:u.screen_name}) + SET user.name = u.name, + user.location = u.location, + user.followers = u.followers_count, + user.following = u.friends_count, + user.statuses = u.statusus_count, + user.url = u.url, + user.profile_image_url = u.profile_image_url + MERGE (mainUser:User {screen_name:{screen_name}}) + MERGE (user)-[:FOLLOWS]->(mainUser) + ` + CYPHER_TWEETS_IMPORT = ` UNWIND {tweets} AS t WITH t @@ -51,7 +75,8 @@ const ( FOREACH (m IN e.user_mentions | MERGE (mentioned:User {screen_name:m.screen_name}) ON CREATE SET mentioned.name = m.name - MERGE (tweet)-[:MENTIONS]->(mentioned) + MERGE (tweet)-[mts:MENTIONS]->(mentioned) + SET mts.method = {mention_type} ) FOREACH (r IN [r IN [t.in_reply_to_status_id] WHERE r IS NOT NULL] | MERGE (reply_tweet:Tweet {id:r}) diff --git a/engine.go b/engine.go index b8fbb3d..5f470e7 100644 --- a/engine.go +++ b/engine.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "net/http" - "net/url" "time" "github.com/denkhaus/neoism" @@ -13,8 +11,10 @@ import ( ) const ( - TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json" - TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json" + TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json?%v" + TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json?%v" + TW_PATH_MENTIONS_TIMELINE = "/1.1/statuses/mentions_timeline.json?%v" + TW_PATH_FOLLOWERS_TIMELINE = "/1.1/followers/list.json?%v" ) type Engine struct { @@ -61,83 +61,62 @@ func (p *Engine) openNeoDB() (*neoism.Database, error) { return db, nil } -func (p *Engine) AddUser() error { +func (p *Engine) ensureClients() (*neoism.Database, error) { p.ensureTwitterClient() - return nil + db, err := p.openNeoDB() + if err != nil { + return nil, errors.Annotate(err, "open database") + } + err = p.initDatabase(db) + return db, err } -func (p *Engine) tweetsGet(screenName string, maxID uint64) (twittergo.Timeline, error) { - query := url.Values{} - query.Set("count", "200") - query.Set("screen_name", screenName) - - var results = twittergo.Timeline{} - minwait := time.Duration(10) * time.Second - - for { - if maxID != 0 { - query.Set("max_id", fmt.Sprintf("%v", maxID)) - } +func (p *Engine) initDatabase(db *neoism.Database) error { + logger.Infof("init database") - endpoint := fmt.Sprintf("/1.1/statuses/user_timeline.json?%v", query.Encode()) - req, err := http.NewRequest("GET", endpoint, nil) - if err != nil { - return nil, errors.Annotate(err, "create request") - } - - resp, err := p.client.SendRequest(req) - if err != nil { - return nil, errors.Annotate(err, "send request") - } + cyphers := []string{ + CYPHER_CONSTRAINT_TWEET, + CYPHER_CONSTRAINT_USER, + CYPHER_CONSTRAINT_HASHTAG, + CYPHER_CONSTRAINT_LINK, + CYPHER_CONSTRAINT_SOURCE, + } - if err = resp.Parse(&results); err != nil { - if rle, ok := err.(twittergo.RateLimitError); ok { - dur := rle.Reset.Sub(time.Now()) + time.Second - if dur < minwait { - dur = minwait - } - - logger.Infof("Rate limited. Reset at %v. Waiting for %v\n", rle.Reset, dur) - time.Sleep(dur) - continue - } else { - logger.Errorf("Problem parsing response: %v\n", err) - } + for _, cy := range cyphers { + cq := neoism.CypherQuery{ + Statement: cy, } - if resp.HasRateLimit() { - logger.Infof("ratelimit: %v calls available", resp.RateLimitRemaining()) + if err := db.Cypher(&cq); err != nil { + return errors.Annotate(err, "db query") } - break } - return results, nil + return nil } -func (p *Engine) tweetsImport(db *neoism.Database, tweets twittergo.Timeline) error { - logger.Infof("import %d new tweets", len(tweets)) - cq := neoism.CypherQuery{ - Statement: CYPHER_TWEETS_IMPORT, - Parameters: neoism.Props{"tweets": tweets}, - } +func (p *Engine) handleRateLimitError(err error) bool { + minwait := time.Duration(10) * time.Second + if rle, ok := err.(twittergo.RateLimitError); ok { + dur := rle.Reset.Sub(time.Now()) + time.Second + if dur < minwait { + dur = minwait + } - if err := db.Cypher(&cq); err != nil { - return errors.Annotate(err, "db query") + logger.Infof("Rate limited. Reset at %v. Waiting for %v\n", rle.Reset, dur) + time.Sleep(dur) + return true } - return nil + return false } -func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) { - db, err := p.openNeoDB() - if err != nil { - return 0, errors.Annotate(err, "open database") - } +func (p *Engine) getMaxID(db *neoism.Database, cypher, screenName string) (uint64, error) { var maxIDData []interface{} cq := neoism.CypherQuery{ - Statement: CYPHER_TWEETS_MAX_ID, + Statement: cypher, Parameters: neoism.Props{"screen_name": screenName}, Result: &maxIDData, } @@ -153,42 +132,3 @@ func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) { return 0, nil } - -func (p *Engine) AddTweets() error { - logger.Info("add tweets") - p.ensureTwitterClient() - - db, err := p.openNeoDB() - if err != nil { - return errors.Annotate(err, "open database") - } - - screenName, err := p.cnf.ScreenName() - if err != nil { - return err - } - - maxID, err := p.tweetsGetMaxID(screenName) - if err != nil { - return errors.Annotate(err, "tweets get max id") - } - - tweets, err := p.tweetsGet(screenName, maxID) - if err != nil { - return errors.Annotate(err, "get tweets") - } - - for len(tweets) > 0 { - maxID = tweets[len(tweets)-1].Id() - 1 - if err = p.tweetsImport(db, tweets); err != nil { - return errors.Annotate(err, "import tweets") - } - - tweets, err = p.tweetsGet(screenName, maxID) - if err != nil { - return errors.Annotate(err, "get tweets") - } - } - - return nil -} diff --git a/followers.go b/followers.go new file mode 100644 index 0000000..4cd103f --- /dev/null +++ b/followers.go @@ -0,0 +1,106 @@ +package main + +import ( + "fmt" + "net/http" + "net/url" + + "github.com/denkhaus/neoism" + "github.com/juju/errors" +) + +func (p *Engine) usersGet(apiURL string, cursor int64, params map[string]string) ([]interface{}, int64, error) { + query := url.Values{} + for k, v := range params { + query.Set(k, v) + } + + query.Set("cursor", fmt.Sprintf("%v", cursor)) + results := make(map[string]interface{}) + + for { + endpoint := fmt.Sprintf(apiURL, query.Encode()) + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, 0, errors.Annotate(err, "create request") + } + + resp, err := p.client.SendRequest(req) + if err != nil { + return nil, 0, errors.Annotate(err, "send request") + } + + if err = resp.Parse(&results); err != nil { + if p.handleRateLimitError(err) { + continue + } else { + logger.Errorf("Problem parsing response: %v\n", err) + } + } + + if resp.HasRateLimit() { + logger.Infof("ratelimit: %v calls available", resp.RateLimitRemaining()) + } + break + } + + users := results["users"].([]interface{}) + cursor = int64(results["next_cursor"].(float64)) + return users, cursor, nil +} + +func (p *Engine) usersImport(db *neoism.Database, users []interface{}, props neoism.Props) error { + logger.Infof("import %d users", len(users)) + + props["users"] = users + cq := neoism.CypherQuery{ + Statement: CYPHER_FOLLOWERS_IMPORT, + Parameters: props, + } + + if err := db.Cypher(&cq); err != nil { + return errors.Annotate(err, "db query") + } + + return nil +} + +func (p *Engine) AddFollowers() error { + logger.Info("add followers") + db, err := p.ensureClients() + if err != nil { + return errors.Annotate(err, "ensure clients") + } + + screenName, err := p.cnf.ScreenName() + if err != nil { + return err + } + + params := map[string]string{ + "count": "200", + "screen_name": screenName, + } + + props := neoism.Props{ + "screen_name": screenName, + } + + users, cursor, err := p.usersGet(TW_PATH_FOLLOWERS_TIMELINE, -1, params) + if err != nil { + return errors.Annotate(err, "get followers") + } + + for len(users) > 0 { + if err = p.usersImport(db, users, props); err != nil { + return errors.Annotate(err, "import followers") + } + + users, cursor, err = p.usersGet(TW_PATH_FOLLOWERS_TIMELINE, cursor, params) + if err != nil { + return errors.Annotate(err, "get followers") + } + } + + return nil +} diff --git a/friends.go b/friends.go new file mode 100644 index 0000000..3875d0c --- /dev/null +++ b/friends.go @@ -0,0 +1,7 @@ +package main + +func (p *Engine) AddFriends() error { + p.ensureTwitterClient() + + return nil +} diff --git a/main.go b/main.go index ac03b83..542ed12 100644 --- a/main.go +++ b/main.go @@ -75,10 +75,26 @@ func main() { Name: "add", Subcommands: []cli.Command{ cli.Command{ - Name: "user", + Name: "friends", Action: func(ctx *cli.Context) { exec(ctx, func(eng *Engine) error { - return eng.AddUser() + return eng.AddFriends() + }) + }, + }, + cli.Command{ + Name: "followers", + Action: func(ctx *cli.Context) { + exec(ctx, func(eng *Engine) error { + return eng.AddFollowers() + }) + }, + }, + cli.Command{ + Name: "mentions", + Action: func(ctx *cli.Context) { + exec(ctx, func(eng *Engine) error { + return eng.AddMentions() }) }, }, diff --git a/mentions.go b/mentions.go new file mode 100644 index 0000000..4989e70 --- /dev/null +++ b/mentions.go @@ -0,0 +1,55 @@ +package main + +import ( + "github.com/denkhaus/neoism" + "github.com/juju/errors" +) + +func (p *Engine) AddMentions() error { + logger.Info("add mention tweets") + db, err := p.ensureClients() + if err != nil { + return errors.Annotate(err, "ensure clients") + } + + screenName, err := p.cnf.ScreenName() + if err != nil { + return err + } + + params := map[string]string{ + "count": "200", + "screen_name": screenName, + "exclude_replies": "false", + "contributor_details": "true", + } + + props := neoism.Props{ + "mention_type": "mention_search", + } + + maxID, err := p.getMaxID(db, CYPHER_MENTIONS_MAX_ID, screenName) + if err != nil { + return errors.Annotate(err, "mentions get max id") + } + + mentions, err := p.tweetsGet(TW_PATH_MENTIONS_TIMELINE, maxID, params) + if err != nil { + return errors.Annotate(err, "get tweets") + } + + for len(mentions) > 0 { + maxID = mentions[len(mentions)-1].Id() - 1 + if err = p.tweetsImport(db, mentions, props); err != nil { + return errors.Annotate(err, "import mention tweets") + } + + mentions, err = p.tweetsGet(TW_PATH_MENTIONS_TIMELINE, maxID, params) + if err != nil { + return errors.Annotate(err, "get tweets") + } + + } + + return nil +} diff --git a/tweets.go b/tweets.go new file mode 100644 index 0000000..4c13726 --- /dev/null +++ b/tweets.go @@ -0,0 +1,114 @@ +package main + +import ( + "fmt" + "net/http" + "net/url" + + "github.com/denkhaus/neoism" + "github.com/juju/errors" + "github.com/kurrik/twittergo" +) + +func (p *Engine) tweetsGet(apiURL string, maxID uint64, params map[string]string) (twittergo.Timeline, error) { + query := url.Values{} + for k, v := range params { + query.Set(k, v) + } + + var results = twittergo.Timeline{} + + for { + if maxID != 0 { + query.Set("max_id", fmt.Sprintf("%v", maxID)) + } + + endpoint := fmt.Sprintf(apiURL, query.Encode()) + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, errors.Annotate(err, "create request") + } + + resp, err := p.client.SendRequest(req) + if err != nil { + return nil, errors.Annotate(err, "send request") + } + + if err = resp.Parse(&results); err != nil { + if p.handleRateLimitError(err) { + continue + } else { + logger.Errorf("Problem parsing response: %v\n", err) + } + } + + if resp.HasRateLimit() { + logger.Infof("ratelimit: %v calls available", resp.RateLimitRemaining()) + } + break + } + + return results, nil +} + +func (p *Engine) tweetsImport(db *neoism.Database, tweets twittergo.Timeline, props neoism.Props) error { + logger.Infof("import %d new tweets", len(tweets)) + + props["tweets"] = tweets + cq := neoism.CypherQuery{ + Statement: CYPHER_TWEETS_IMPORT, + Parameters: props, + } + + if err := db.Cypher(&cq); err != nil { + return errors.Annotate(err, "db query") + } + + return nil +} + +func (p *Engine) AddTweets() error { + logger.Info("add tweets") + db, err := p.ensureClients() + if err != nil { + return errors.Annotate(err, "ensure clients") + } + + screenName, err := p.cnf.ScreenName() + if err != nil { + return err + } + + params := map[string]string{ + "count": "200", + "screen_name": screenName, + } + + props := neoism.Props{ + "mention_type": "normal", + } + + maxID, err := p.getMaxID(db, CYPHER_TWEETS_MAX_ID, screenName) + if err != nil { + return errors.Annotate(err, "tweets get max id") + } + + tweets, err := p.tweetsGet(TW_PATH_USER_TIMELINE, maxID, params) + if err != nil { + return errors.Annotate(err, "get tweets") + } + + for len(tweets) > 0 { + maxID = tweets[len(tweets)-1].Id() - 1 + if err = p.tweetsImport(db, tweets, props); err != nil { + return errors.Annotate(err, "import tweets") + } + + tweets, err = p.tweetsGet(TW_PATH_USER_TIMELINE, maxID, params) + if err != nil { + return errors.Annotate(err, "get tweets") + } + } + + return nil +}