Skip to content

Commit

Permalink
autopush@1456938661
Browse files Browse the repository at this point in the history
  • Loading branch information
denkhaus committed Mar 2, 2016
1 parent 40e363e commit e0e7dfa
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 0 deletions.
59 changes: 59 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"errors"

"github.com/codegangsta/cli"
)

type Config struct {
ctx *cli.Context
neoHost string
twitterConsumerKey string
twitterConsumerSecret string
twitterUserKey string
twitterUserSecret string
screenName string
}

func NewConfig(ctx *cli.Context) (*Config, error) {
cnf := &Config{
ctx: ctx,
}

cnf.neoHost = ctx.GlobalString("host")
if cnf.neoHost == "" {
return nil, errors.New("neo host is not defined")
}

cnf.twitterConsumerKey = ctx.GlobalString("consumer-key")
if cnf.twitterConsumerKey == "" {
return nil, errors.New("twitter consumer key is not defined")
}

cnf.twitterConsumerSecret = ctx.GlobalString("consumer-secret")
if cnf.twitterConsumerSecret == "" {
return nil, errors.New("twitter consumer secret is not defined")
}

cnf.twitterUserKey = ctx.GlobalString("user-key")
if cnf.twitterUserKey == "" {
return nil, errors.New("twitter user key is not defined")
}

cnf.twitterUserSecret = ctx.GlobalString("user-secret")
if cnf.twitterUserSecret == "" {
return nil, errors.New("twitter user secret is not defined")
}

return cnf, nil
}

func (p *Config) ScreenName() (string, error) {
p.screenName = p.ctx.GlobalString("screenname")
if p.screenName == "" {
return "", errors.New("twitter user secret is not defined")
}

return p.screenName, nil
}
145 changes: 145 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package main

import (
"database/sql"
"fmt"
"net/http"
"net/url"
"time"

"github.com/juju/errors"
"github.com/kurrik/oauth1a"
"github.com/kurrik/twittergo"
_ "gopkg.in/cq.v1"
)

const (
TW_PATH_VERIFY_CREDENTIALS = "/1.1/account/verify_credentials.json"
TW_PATH_USER_TIMELINE = "/1.1/statuses/user_timeline.json"
)

const (
max_id_query = `match (u:User {screen_name:{0})-[:POSTS]->(t:Tweet) return max(t.id) AS max_id`
)

type Engine struct {
cnf *Config
client *twittergo.Client
}

func NewEngine(cnf *Config) *Engine {
eng := &Engine{
cnf: cnf,
}

return eng
}

func (p *Engine) createClient() {
config := &oauth1a.ClientConfig{
ConsumerKey: p.cnf.twitterConsumerKey,
ConsumerSecret: p.cnf.twitterConsumerSecret,
}
user := oauth1a.NewAuthorizedConfig(
p.cnf.twitterUserKey,
p.cnf.twitterUserSecret,
)

p.client = twittergo.NewClient(config, user)
}

func (p *Engine) AddUser() error {
p.createClient()

return nil
}

func (p *Engine) getTweets(screenName string, maxID uint64) (twittergo.Timeline, error) {

query := url.Values{}
query.Set("count", "100")
query.Set("screen_name", screenName)

var results = twittergo.Timeline{}
minwait := time.Duration(10) * time.Second

for {
if maxID != 0 {
query.Set("max_id", fmt.Sprintf("%v", maxID))
}

endpoint := fmt.Sprintf("/1.1/statuses/user_timeline.json?%v", query.Encode())
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, errors.Annotate(err, "create request")
}

resp, err := p.client.SendRequest(req)
if err != nil {
return nil, errors.Annotate(err, "send request")
}

if err = resp.Parse(&results); err != nil {
if rle, ok := err.(twittergo.RateLimitError); ok {
dur := rle.Reset.Sub(time.Now()) + time.Second
if dur < minwait {
dur = minwait
}

logger.Infof("Rate limited. Reset at %v. Waiting for %v\n", rle.Reset, dur)
time.Sleep(dur)
continue
} else {
logger.Errorf("Problem parsing response: %v\n", err)
}
}
}

return results, nil
}

func (p *Engine) tweetsGetMaxID(screenName string) (uint64, error) {
db, err := sql.Open("neo4j-cypher", p.cnf.neoHost)
if err != nil {
return 0, errors.Annotate(err, "open database")
}
defer db.Close()

stmt, err := db.Prepare(max_id_query)
if err != nil {
return 0, errors.Annotate(err, "db prepare")
}
defer stmt.Close()

rows, err := stmt.Query(screenName)
if err != nil {
return 0, errors.Annotate(err, "db query")
}
defer rows.Close()

var maxID uint64
err = rows.Scan(&maxID)
if err != nil {
return 0, errors.Annotate(err, "scan query")
}

return maxID, nil
}

func (p *Engine) AddTweets() error {
p.createClient()

screenName, err := p.cnf.ScreenName()
if err != nil {
return err
}

maxID, err := p.tweetsGetMaxID(screenName)
if err != nil {
return errors.Annotate(err, "tweets get max id")
}

tweets, err := p.getTweets(screenName, maxID)

return nil
}
15 changes: 15 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package main

import "github.com/codegangsta/cli"

func exec(ctx *cli.Context, fn func(eng *Engine) error) {
cnf, err := NewConfig(ctx)
if err != nil {
logger.Fatalf("config error:%s", err)
}

eng := NewEngine(cnf)
if err := fn(eng); err != nil {
logger.Fatalf("exec error:%s", err)
}
}
96 changes: 96 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"os"

"github.com/codegangsta/cli"
"github.com/sirupsen/logrus"
)

var (
logger *logrus.Logger
)

func init() {
logger = logrus.New()
logger.Level = logrus.DebugLevel
logger.Out = os.Stdout
}

func main() {

app := cli.NewApp()
app.Name = "twitter-graph"
app.Usage = "A docker log pump to splunk"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "host, d",
Usage: "Neo4j host",
EnvVar: "NEO4_HOST",
Value: "http://localhost:7474",
},

cli.StringFlag{
Name: "consumer-key",
Usage: "Twitter Consumer Key",
EnvVar: "TWITTER_CONSUMER_KEY",
},

cli.StringFlag{
Name: "consumer-secret",
Usage: "Twitter Consumer Secret",
EnvVar: "TWITTER_CONSUMER_SECRET",
},

cli.StringFlag{
Name: "user-key",
Usage: "Twitter User Key",
EnvVar: "TWITTER_USER_KEY",
},

cli.StringFlag{
Name: "user-secret",
Usage: "Twitter User Secret",
EnvVar: "TWITTER_USER_SECRET",
},
}

app.Commands = []cli.Command{
cli.Command{
Name: "add",
Subcommands: []cli.Command{
cli.Command{
Name: "user",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.AddUser()
})
},
},
cli.Command{
Name: "tweets",
Action: func(ctx *cli.Context) {
exec(ctx, func(eng *Engine) error {
return eng.AddTweets()
})
},
},
},
},
}

app.Run(os.Args)
}

//logsPump := NewLogsPump(storagePath)

// closer.Bind(func() {
// logsPump.Shutdown()
// logger.Info("terminated")
// })

// closer.Checked(func() error {
// logger.Info("startup ---------------------------------------------")
// logsPump.RegisterAdapter(NewSplunkAdapter, host)
// return logsPump.Run()
// }, true)

0 comments on commit e0e7dfa

Please sign in to comment.