Skip to content

Commit

Permalink
Reputation tracking on redis db and transaction in mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
Simone Lazzaris committed Jan 24, 2020
1 parent 2b6a4be commit 42ecbe3
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions polka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {

func main() {
InitCfg()
InitReputation()
flag.Parse()
if (!*xdebug) { daemon(0,0) } else {fmt.Println("Starting in debug mode")}
// Listen for incoming connections.
Expand All @@ -47,7 +48,7 @@ func main() {
defer l.Close()

// open connection to the database
db,err:=sql.Open("mysql",fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?autocommit=false",cfg["dbuser"],cfg["dbpass"],cfg["dbhost"],cfg["dbport"],cfg["dbname"]))
db,err:=sql.Open("mysql",fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?autocommit=true",cfg["dbuser"],cfg["dbpass"],cfg["dbhost"],cfg["dbport"],cfg["dbname"]))
if (err!=nil) {
fmt.Println("ERROR CONNECTING MYSQL")
os.Exit(1)
Expand Down Expand Up @@ -127,11 +128,23 @@ func policy_verify(xdata connData, db *sql.DB) string {
default:
return "REJECT no credentials"
}
var repu_logmsg string
var repu_ok bool
if repu_ok, repu_logmsg=verify_reputation(xdata.sender, xdata.ip_address); !repu_ok {
return "REJECT bad reputation"
}
xmutex.Lock()
defer xmutex.Unlock()
defer db.Exec("COMMIT") //defer db.Exec("COMMIT; UNLOCK TABLES;")
db.Exec("START TRANSACTION")
err:=db.QueryRow("SELECT max, quota, unix_timestamp(ts), unix_timestamp(now()) FROM "+cfg["policy_table"]+" where type=? and item=? FOR UPDATE",xtype, xitem).Scan(&mx, &quota, &ts, &s_now)
tx,err:=db.Begin()
if err!=nil {
xlog.Err("TRANSACTION ERROR:"+err.Error())
if (*xdebug) { fmt.Println("ERROR STARTING TRANSACTION:",err.Error())}
return "DUNNO"
}
defer tx.Commit()
//defer db.Exec("COMMIT") //defer db.Exec("COMMIT; UNLOCK TABLES;")
//db.Exec("START TRANSACTION")
err=tx.QueryRow("SELECT max, quota, unix_timestamp(ts), unix_timestamp(now()) FROM "+cfg["policy_table"]+" where type=? and item=? FOR UPDATE",xtype, xitem).Scan(&mx, &quota, &ts, &s_now)
switch {
case err==sql.ErrNoRows:
if (*xdebug) { fmt.Println("NOT FOUND") }
Expand All @@ -140,10 +153,11 @@ func policy_verify(xdata connData, db *sql.DB) string {
fquota=0
last_ts=time.Now().Unix()
i_now=time.Now().Unix()
_,err=db.Exec("INSERT INTO "+cfg["policy_table"]+" set type=?, item=?, max=?, quota=0, ts=now()",xtype, xitem, cfg["defaultquota"])
_,err=tx.Exec("INSERT INTO "+cfg["policy_table"]+" set type=?, item=?, max=?, quota=0, ts=now()",xtype, xitem, cfg["defaultquota"])
if (err!=nil) {
xlog.Err(err.Error())
xlog.Err("INSERT ERROR:"+err.Error())
if (*xdebug) { fmt.Println("ERROR INSERTING:",err.Error())}
return "DUNNO"
}
case err!=nil:
xlog.Err("ERROR: "+ err.Error())
Expand All @@ -164,12 +178,15 @@ func policy_verify(xdata connData, db *sql.DB) string {
xlog.Info(fmt.Sprintf("DEFERRING overquota for item %s:%s [%.2f/%.2f]",xtype,xitem,fquota,fmax))
return "DEFER quota exceeded"
}
_,err=db.Exec("UPDATE "+cfg["policy_table"]+" set quota=?, ts=now() where type=? and item=?",fquota, xtype, xitem)
_,err=tx.Exec("UPDATE "+cfg["policy_table"]+" set quota=?, ts=now() where type=? and item=?",fquota, xtype, xitem)
if (err!=nil) {
xlog.Err(err.Error())
xlog.Err("UPDATE ERROR"+err.Error())
if (*xdebug) { fmt.Println("ERROR UPDATING:",err.Error())}
return "DUNNO"
}
xlog.Info(fmt.Sprintf("Updating quota for item %s:%s [%.2f/%.2f]. Sender: <%s>; Client IP: <%s>; SASL_username: <%s>", xtype, xitem, fquota, fmax, xdata.sender, xdata.ip_address, xdata.sasl_user ))
logmsg:=fmt.Sprintf("Updating quota for item %s:%s [%.2f/%.2f]. Sender: <%s>; Client IP: <%s>; SASL_username: <%s>", xtype, xitem, fquota, fmax, xdata.sender, xdata.ip_address, xdata.sasl_user )
if repu_logmsg!="" { logmsg+=repu_logmsg }
xlog.Info(logmsg)
return "DUNNO" // not OK so we can pipe more checks in postfix

}
Expand Down Expand Up @@ -204,7 +221,8 @@ func handleRequest(conn net.Conn, db *sql.DB) {
}
db.Ping()
db.Exec("set time_zone='+00:00'") // timezone UTC
db.Exec("set session TRANSACTION ISOLATION LEVEL REPEATABLE READ")
//db.Exec("set session TRANSACTION ISOLATION LEVEL REPEATABLE READ")
db.Exec("set session TRANSACTION ISOLATION LEVEL READ COMMITTED")
resp:=policy_verify(xdata, db)
conn.Write([]byte(fmt.Sprintf("action=%s\n\n",resp)))
conn.Close()
Expand Down

0 comments on commit 42ecbe3

Please sign in to comment.