Skip to content

Commit

Permalink
autopush@1457026367
Browse files Browse the repository at this point in the history
  • Loading branch information
denkhaus committed Mar 3, 2016
1 parent 87b3fd3 commit 92d345e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 96 deletions.
14 changes: 3 additions & 11 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,16 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
return nil, errors.New("twitter user secret is not defined")
}

cnf.neoUsername = ctx.GlobalString("user")
cnf.neoPassword = ctx.GlobalString("password")
return cnf, nil
}

func (p *Config) ScreenName() (string, error) {
p.screenName = p.ctx.GlobalString("screenname")
if p.screenName == "" {
return "", errors.New("twitter user secret is not defined")
return "", errors.New("twitter screen name is not defined")
}

return p.screenName, nil
}

func (p *Config) NeoUsername() string {
p.neoUsername = p.ctx.GlobalString("user")
return p.neoUsername
}

func (p *Config) NeoPassword() string {
p.neoPassword = p.ctx.GlobalString("password")
return p.neoPassword
}
27 changes: 18 additions & 9 deletions cypher.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
package main

const (
CYPHER_TWEETS_MAX_ID = `match (u:User {screen_name:{0})-[:POSTS]->(t:Tweet) return max(t.id) AS max_id`
CYPHER_TWEETS_MAX_ID = `
MATCH
(u:User {screen_name:{screen_name}})-[:POSTS]->(t:Tweet)
RETURN
max(t.id) AS max_id
`

CYPHER_TWEETS_IMPORT = `
UNWIND {0} AS t
UNWIND {tweets} AS t
WITH t
ORDER BY t.id
WITH t,
t.entities AS e,
t.user AS u,
t.retweeted_status AS retweet
MERGE (tweet:Tweet {id:t.id})
SET tweet.id_str = t.id_str,
tweet.text = t.text,
SET tweet.id_str = t.id_str,
tweet.created_at = t.created_at,
tweet.favorites = t.favorite_count
tweet.favorites = t.favorite_count,
tweet.retweets = t.retweet_count
MERGE (user:User {screen_name:u.screen_name})
SET user.name = u.name,
user.location = u.location,
user.followers = u.followers_count,
user.following = u.friends_count,
user.statuses = u.statusus_count,
user.profile_image_url = u.profile_image_url
user.statuses = u.statuses_count,
user.created_at = u.created_at,
user.favourites = u.favourites_count,
user.listed = u.listed_count,
user.profile_image_url = u.profile_image_url
MERGE (user)-[:POSTS]->(tweet)
MERGE (source:Source {name:REPLACE(SPLIT(t.source, ">")[1], "</a", "")})
MERGE (tweet)-[:USING]->(source)
Expand All @@ -45,6 +54,6 @@ const (
FOREACH (retweet_id IN [x IN [retweet.id] WHERE x IS NOT NULL] |
MERGE (retweet_tweet:Tweet {id:retweet_id})
MERGE (tweet)-[:RETWEETS]->(retweet_tweet)
)
`
)
`
)
118 changes: 55 additions & 63 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"net/http"
"net/url"
"time"
"strings"

"github.com/denkhaus/neoism"
"github.com/juju/errors"
"github.com/kurrik/oauth1a"
"github.com/kurrik/twittergo"
"github.com/jmcvetta/neoism"
)

const (
TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json"
TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json"
)


type Engine struct {
cnf *Config
client *twittergo.Client
Expand All @@ -31,7 +30,11 @@ func NewEngine(cnf *Config) *Engine {
return eng
}

func (p *Engine) createClient() {
func (p *Engine) ensureTwitterClient() {
if p.client != nil {
return
}

config := &oauth1a.ClientConfig{
ConsumerKey: p.cnf.twitterConsumerKey,
ConsumerSecret: p.cnf.twitterConsumerSecret,
Expand All @@ -40,40 +43,31 @@ func (p *Engine) createClient() {
p.cnf.twitterUserKey,
p.cnf.twitterUserSecret,
)

p.client = twittergo.NewClient(config, user)
}

func (p *Engine) openDatabase() (*neoism.Database, error){

var userInfo string
if p.cnf.neoUsername != ""{
userInfo = fmt.Sprintf("%s:%s", p.cnf.neoUsername, p.cnf.neoPassword)

if len(strings.Split(userInfo, ":")) != 2{
func (p *Engine) openNeoDB() (*neoism.Database, error) {
logger.Info("open db connection")
if p.cnf.neoUsername != "" && p.cnf.neoPassword == "" {
return nil, errors.New("invalid neo4j credentials")
}

}

db, err := neoism.Connect()

hostPort := fmt.Sprintf("%s:%d", p.cnf.neoHost, p.cnf.neoPort)
db, err := neoism.Connect(hostPort, p.cnf.neoUsername, p.cnf.neoPassword)
if err != nil {
return errors.Annotate(err, "open database")
return nil, errors.Annotate(err, "open database")
}
defer db.Close()


return db, nil
}

func (p *Engine) AddUser() error {
p.createClient()
p.ensureTwitterClient()

return nil
}

func (p *Engine) getTweets(screenName string, maxID uint64) (twittergo.Timeline, error) {

func (p *Engine) tweetsGet(screenName string, maxID uint64) (twittergo.Timeline, error) {
query := url.Values{}
query.Set("count", "100")
query.Set("screen_name", screenName)
Expand Down Expand Up @@ -111,60 +105,63 @@ func (p *Engine) getTweets(screenName string, maxID uint64) (twittergo.Timeline,
logger.Errorf("Problem parsing response: %v\n", err)
}
}

if resp.HasRateLimit() {
logger.Infof("ratelimit: %v calls available", resp.RateLimitRemaining())
}
break
}

return results, nil
}

func (p *Engine) tweetsImport(db *neoism.Database, tweets twittergo.Timeline) error {
stmt, err := db.Prepare(CYPHER_TWEETS_IMPORT)
if err != nil {
return errors.Annotate(err, "db prepare")
func (p *Engine) tweetsImport(db *neoism.Database, tweets twittergo.Timeline) error {
logger.Infof("import %d new tweets", len(tweets))
cq := neoism.CypherQuery{
Statement: CYPHER_TWEETS_IMPORT,
Parameters: neoism.Props{"tweets": tweets},
}
defer stmt.Close()

_, err := stmt.Exec(tweets)
if err != nil {
return errors.Annotate(err, "db exec")
if err := db.Cypher(&cq); err != nil {
return errors.Annotate(err, "db query")
}

return nil
}

func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) {

db, err := p.openDatabase()
func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) {
db, err := p.openNeoDB()
if err != nil {
return 0, errors.Annotate(err, "open database")
}




var maxID uint64
cq := neoism.CypherQuery{
Statement: CYPHER_TWEETS_MAX_ID,
Parameters: neoism.Props{"screen_name": screenName},
Result: &maxID,
}

var maxIDData []interface{}
cq := neoism.CypherQuery{
Statement: CYPHER_TWEETS_MAX_ID,
Parameters: neoism.Props{"screen_name": screenName},
Result: &maxIDData,
}

if err := db.Cypher(&cq); err != nil {
return 0, errors.Annotate(err, "db query")
}

return maxID, nil

mp := maxIDData[0].(map[string]interface{})
if mp["max_id"] != nil {
return uint64(mp["max_id"].(float64)), nil
}

return 0, nil
}

func (p *Engine) AddTweets() error {
p.createClient()

db, err := neoism.Connect(p.cnf.neoHost)
logger.Info("add tweets")
p.ensureTwitterClient()

db, err := p.openNeoDB()
if err != nil {
return 0, errors.Annotate(err, "open database")
return errors.Annotate(err, "open database")
}


defer db.Close()

screenName, err := p.cnf.ScreenName()
if err != nil {
Expand All @@ -176,23 +173,18 @@ func (p *Engine) AddTweets() error {
return errors.Annotate(err, "tweets get max id")
}

tweets, err := p.getTweets(screenName, maxID)
tweets, err := p.tweetsGet(screenName, maxID)
if err != nil {
return errors.Annotate(err, "get tweets")
}

for len(tweets) >0{
for len(tweets) > 0 {
maxID = tweets[len(tweets)-1].Id() - 1
if err = p.tweetsImport(db, tweets); err != nil {
return errors.Annotate(err, "import tweets")
}

for _, tw := range tweets {
if tw.Id() > maxID {
maxID = tw.Id()
}
}

tweets, err = p.getTweets(screenName, maxID)
tweets, err = p.tweetsGet(screenName, maxID)
if err != nil {
return errors.Annotate(err, "get tweets")
}
Expand Down
26 changes: 13 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func init() {
}

func main() {

app := cli.NewApp()
app.Name = "twitter-graph"
app.Usage = "A docker log pump to splunk"
Expand All @@ -27,7 +26,7 @@ func main() {
Name: "host, d",
Usage: "Neo4j host",
EnvVar: "NEO4_HOST",
Value: "localhost",
Value: "http://localhost",
},
cli.IntFlag{
Name: "port, P",
Expand All @@ -46,28 +45,29 @@ func main() {
EnvVar: "NEO4_PASSWORD",
},
cli.StringFlag{
Name: "consumer-key",
Usage: "Twitter Consumer Key",
EnvVar: "TWITTER_CONSUMER_KEY",
Name: "screenname, s",
Usage: "Twitter screen name",
},

cli.StringFlag{
Name: "consumer-secret",
Usage: "Twitter Consumer Secret",
EnvVar: "TWITTER_CONSUMER_SECRET",
},

cli.StringFlag{
Name: "user-key",
Usage: "Twitter User Key",
EnvVar: "TWITTER_USER_KEY",
},

cli.StringFlag{
Name: "user-secret",
Usage: "Twitter User Secret",
EnvVar: "TWITTER_USER_SECRET",
},
cli.StringFlag{
Name: "consumer-key",
Usage: "Twitter Consumer Key",
EnvVar: "TWITTER_CONSUMER_KEY",
},
cli.StringFlag{
Name: "consumer-secret",
Usage: "Twitter Consumer Secret",
EnvVar: "TWITTER_CONSUMER_SECRET",
},
}

app.Commands = []cli.Command{
Expand Down

0 comments on commit 92d345e

Please sign in to comment.