From e0e7dfa34ea5f4e4dd808c0171e38e9fc6558809 Mon Sep 17 00:00:00 2001 From: denkhaus Date: Wed, 2 Mar 2016 18:11:01 +0100 Subject: [PATCH] autopush@1456938661 --- config.go | 59 ++++++++++++++++++++++ engine.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ helper.go | 15 ++++++ main.go | 96 ++++++++++++++++++++++++++++++++++++ 4 files changed, 315 insertions(+) create mode 100644 config.go create mode 100644 engine.go create mode 100644 helper.go create mode 100644 main.go diff --git a/config.go b/config.go new file mode 100644 index 0000000..5009e20 --- /dev/null +++ b/config.go @@ -0,0 +1,59 @@ +package main + +import ( + "errors" + + "github.com/codegangsta/cli" +) + +type Config struct { + ctx *cli.Context + neoHost string + twitterConsumerKey string + twitterConsumerSecret string + twitterUserKey string + twitterUserSecret string + screenName string +} + +func NewConfig(ctx *cli.Context) (*Config, error) { + cnf := &Config{ + ctx: ctx, + } + + cnf.neoHost = ctx.GlobalString("host") + if cnf.neoHost == "" { + return nil, errors.New("neo host is not defined") + } + + cnf.twitterConsumerKey = ctx.GlobalString("consumer-key") + if cnf.twitterConsumerKey == "" { + return nil, errors.New("twitter consumer key is not defined") + } + + cnf.twitterConsumerSecret = ctx.GlobalString("consumer-secret") + if cnf.twitterConsumerSecret == "" { + return nil, errors.New("twitter consumer secret is not defined") + } + + cnf.twitterUserKey = ctx.GlobalString("user-key") + if cnf.twitterUserKey == "" { + return nil, errors.New("twitter user key is not defined") + } + + cnf.twitterUserSecret = ctx.GlobalString("user-secret") + if cnf.twitterUserSecret == "" { + return nil, errors.New("twitter user secret is not defined") + } + + return cnf, nil +} + +func (p *Config) ScreenName() (string, error) { + p.screenName = p.ctx.GlobalString("screenname") + if p.screenName == "" { + return "", errors.New("twitter user secret is not defined") + } + + return p.screenName, nil +} diff --git a/engine.go b/engine.go new file mode 100644 index 0000000..b99958b --- /dev/null +++ b/engine.go @@ -0,0 +1,145 @@ +package main + +import ( + "database/sql" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/juju/errors" + "github.com/kurrik/oauth1a" + "github.com/kurrik/twittergo" + _ "gopkg.in/cq.v1" +) + +const ( + TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json" + TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json" +) + +const ( + max_id_query = `match (u:User {screen_name:{0})-[:POSTS]->(t:Tweet) return max(t.id) AS max_id` +) + +type Engine struct { + cnf *Config + client *twittergo.Client +} + +func NewEngine(cnf *Config) *Engine { + eng := &Engine{ + cnf: cnf, + } + + return eng +} + +func (p *Engine) createClient() { + config := &oauth1a.ClientConfig{ + ConsumerKey: p.cnf.twitterConsumerKey, + ConsumerSecret: p.cnf.twitterConsumerSecret, + } + user := oauth1a.NewAuthorizedConfig( + p.cnf.twitterUserKey, + p.cnf.twitterUserSecret, + ) + + p.client = twittergo.NewClient(config, user) +} + +func (p *Engine) AddUser() error { + p.createClient() + + return nil +} + +func (p *Engine) getTweets(screenName string, maxID uint64) (twittergo.Timeline, error) { + + query := url.Values{} + query.Set("count", "100") + query.Set("screen_name", screenName) + + var results = twittergo.Timeline{} + minwait := time.Duration(10) * time.Second + + for { + if maxID != 0 { + query.Set("max_id", fmt.Sprintf("%v", maxID)) + } + + endpoint := fmt.Sprintf("/1.1/statuses/user_timeline.json?%v", query.Encode()) + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, errors.Annotate(err, "create request") + } + + resp, err := p.client.SendRequest(req) + if err != nil { + return nil, errors.Annotate(err, "send request") + } + + if err = resp.Parse(&results); err != nil { + if rle, ok := err.(twittergo.RateLimitError); ok { + dur := rle.Reset.Sub(time.Now()) + time.Second + if dur < minwait { + dur = minwait + } + + logger.Infof("Rate limited. Reset at %v. Waiting for %v\n", rle.Reset, dur) + time.Sleep(dur) + continue + } else { + logger.Errorf("Problem parsing response: %v\n", err) + } + } + } + + return results, nil +} + +func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) { + db, err := sql.Open("neo4j-cypher", p.cnf.neoHost) + if err != nil { + return 0, errors.Annotate(err, "open database") + } + defer db.Close() + + stmt, err := db.Prepare(max_id_query) + if err != nil { + return 0, errors.Annotate(err, "db prepare") + } + defer stmt.Close() + + rows, err := stmt.Query(screenName) + if err != nil { + return 0, errors.Annotate(err, "db query") + } + defer rows.Close() + + var maxID uint64 + err = rows.Scan(&maxID) + if err != nil { + return 0, errors.Annotate(err, "scan query") + } + + return maxID, nil +} + +func (p *Engine) AddTweets() error { + p.createClient() + + screenName, err := p.cnf.ScreenName() + if err != nil { + return err + } + + maxID, err := p.tweetsGetMaxID(screenName) + if err != nil { + return errors.Annotate(err, "tweets get max id") + } + + tweets, err := p.getTweets(screenName, maxID) + + return nil +} diff --git a/helper.go b/helper.go new file mode 100644 index 0000000..d8d17f5 --- /dev/null +++ b/helper.go @@ -0,0 +1,15 @@ +package main + +import "github.com/codegangsta/cli" + +func exec(ctx *cli.Context, fn func(eng *Engine) error) { + cnf, err := NewConfig(ctx) + if err != nil { + logger.Fatalf("config error:%s", err) + } + + eng := NewEngine(cnf) + if err := fn(eng); err != nil { + logger.Fatalf("exec error:%s", err) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..62e7977 --- /dev/null +++ b/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "os" + + "github.com/codegangsta/cli" + "github.com/sirupsen/logrus" +) + +var ( + logger *logrus.Logger +) + +func init() { + logger = logrus.New() + logger.Level = logrus.DebugLevel + logger.Out = os.Stdout +} + +func main() { + + app := cli.NewApp() + app.Name = "twitter-graph" + app.Usage = "A docker log pump to splunk" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "host, d", + Usage: "Neo4j host", + EnvVar: "NEO4_HOST", + Value: "http://localhost:7474", + }, + + cli.StringFlag{ + Name: "consumer-key", + Usage: "Twitter Consumer Key", + EnvVar: "TWITTER_CONSUMER_KEY", + }, + + cli.StringFlag{ + Name: "consumer-secret", + Usage: "Twitter Consumer Secret", + EnvVar: "TWITTER_CONSUMER_SECRET", + }, + + cli.StringFlag{ + Name: "user-key", + Usage: "Twitter User Key", + EnvVar: "TWITTER_USER_KEY", + }, + + cli.StringFlag{ + Name: "user-secret", + Usage: "Twitter User Secret", + EnvVar: "TWITTER_USER_SECRET", + }, + } + + app.Commands = []cli.Command{ + cli.Command{ + Name: "add", + Subcommands: []cli.Command{ + cli.Command{ + Name: "user", + Action: func(ctx *cli.Context) { + exec(ctx, func(eng *Engine) error { + return eng.AddUser() + }) + }, + }, + cli.Command{ + Name: "tweets", + Action: func(ctx *cli.Context) { + exec(ctx, func(eng *Engine) error { + return eng.AddTweets() + }) + }, + }, + }, + }, + } + + app.Run(os.Args) +} + +//logsPump := NewLogsPump(storagePath) + +// closer.Bind(func() { +// logsPump.Shutdown() +// logger.Info("terminated") +// }) + +// closer.Checked(func() error { +// logger.Info("startup ---------------------------------------------") +// logsPump.RegisterAdapter(NewSplunkAdapter, host) +// return logsPump.Run() +// }, true)