From 42ecbe394d6106dd3113112ce5f5d13eb23b3ef4 Mon Sep 17 00:00:00 2001 From: Simone Lazzaris Date: Fri, 24 Jan 2020 15:02:43 +0100 Subject: [PATCH] Reputation tracking on redis db and transaction in mysql --- polka.go | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/polka.go b/polka.go index 3f986f8..49fe449 100644 --- a/polka.go +++ b/polka.go @@ -36,6 +36,7 @@ func init() { func main() { InitCfg() + InitReputation() flag.Parse() if (!*xdebug) { daemon(0,0) } else {fmt.Println("Starting in debug mode")} // Listen for incoming connections. @@ -47,7 +48,7 @@ func main() { defer l.Close() // open connection to the database - db,err:=sql.Open("mysql",fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?autocommit=false",cfg["dbuser"],cfg["dbpass"],cfg["dbhost"],cfg["dbport"],cfg["dbname"])) + db,err:=sql.Open("mysql",fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?autocommit=true",cfg["dbuser"],cfg["dbpass"],cfg["dbhost"],cfg["dbport"],cfg["dbname"])) if (err!=nil) { fmt.Println("ERROR CONNECTING MYSQL") os.Exit(1) @@ -127,11 +128,23 @@ func policy_verify(xdata connData, db *sql.DB) string { default: return "REJECT no credentials" } + var repu_logmsg string + var repu_ok bool + if repu_ok, repu_logmsg=verify_reputation(xdata.sender, xdata.ip_address); !repu_ok { + return "REJECT bad reputation" + } xmutex.Lock() defer xmutex.Unlock() - defer db.Exec("COMMIT") //defer db.Exec("COMMIT; UNLOCK TABLES;") - db.Exec("START TRANSACTION") - err:=db.QueryRow("SELECT max, quota, unix_timestamp(ts), unix_timestamp(now()) FROM "+cfg["policy_table"]+" where type=? and item=? FOR UPDATE",xtype, xitem).Scan(&mx, "a, &ts, &s_now) + tx,err:=db.Begin() + if err!=nil { + xlog.Err("TRANSACTION ERROR:"+err.Error()) + if (*xdebug) { fmt.Println("ERROR STARTING TRANSACTION:",err.Error())} + return "DUNNO" + } + defer tx.Commit() + //defer db.Exec("COMMIT") //defer db.Exec("COMMIT; UNLOCK TABLES;") + //db.Exec("START TRANSACTION") + err=tx.QueryRow("SELECT max, quota, unix_timestamp(ts), unix_timestamp(now()) FROM "+cfg["policy_table"]+" where type=? and item=? FOR UPDATE",xtype, xitem).Scan(&mx, "a, &ts, &s_now) switch { case err==sql.ErrNoRows: if (*xdebug) { fmt.Println("NOT FOUND") } @@ -140,10 +153,11 @@ func policy_verify(xdata connData, db *sql.DB) string { fquota=0 last_ts=time.Now().Unix() i_now=time.Now().Unix() - _,err=db.Exec("INSERT INTO "+cfg["policy_table"]+" set type=?, item=?, max=?, quota=0, ts=now()",xtype, xitem, cfg["defaultquota"]) + _,err=tx.Exec("INSERT INTO "+cfg["policy_table"]+" set type=?, item=?, max=?, quota=0, ts=now()",xtype, xitem, cfg["defaultquota"]) if (err!=nil) { - xlog.Err(err.Error()) + xlog.Err("INSERT ERROR:"+err.Error()) if (*xdebug) { fmt.Println("ERROR INSERTING:",err.Error())} + return "DUNNO" } case err!=nil: xlog.Err("ERROR: "+ err.Error()) @@ -164,12 +178,15 @@ func policy_verify(xdata connData, db *sql.DB) string { xlog.Info(fmt.Sprintf("DEFERRING overquota for item %s:%s [%.2f/%.2f]",xtype,xitem,fquota,fmax)) return "DEFER quota exceeded" } - _,err=db.Exec("UPDATE "+cfg["policy_table"]+" set quota=?, ts=now() where type=? and item=?",fquota, xtype, xitem) + _,err=tx.Exec("UPDATE "+cfg["policy_table"]+" set quota=?, ts=now() where type=? and item=?",fquota, xtype, xitem) if (err!=nil) { - xlog.Err(err.Error()) + xlog.Err("UPDATE ERROR"+err.Error()) if (*xdebug) { fmt.Println("ERROR UPDATING:",err.Error())} + return "DUNNO" } - xlog.Info(fmt.Sprintf("Updating quota for item %s:%s [%.2f/%.2f]. Sender: <%s>; Client IP: <%s>; SASL_username: <%s>", xtype, xitem, fquota, fmax, xdata.sender, xdata.ip_address, xdata.sasl_user )) + logmsg:=fmt.Sprintf("Updating quota for item %s:%s [%.2f/%.2f]. Sender: <%s>; Client IP: <%s>; SASL_username: <%s>", xtype, xitem, fquota, fmax, xdata.sender, xdata.ip_address, xdata.sasl_user ) + if repu_logmsg!="" { logmsg+=repu_logmsg } + xlog.Info(logmsg) return "DUNNO" // not OK so we can pipe more checks in postfix } @@ -204,7 +221,8 @@ func handleRequest(conn net.Conn, db *sql.DB) { } db.Ping() db.Exec("set time_zone='+00:00'") // timezone UTC - db.Exec("set session TRANSACTION ISOLATION LEVEL REPEATABLE READ") + //db.Exec("set session TRANSACTION ISOLATION LEVEL REPEATABLE READ") + db.Exec("set session TRANSACTION ISOLATION LEVEL READ COMMITTED") resp:=policy_verify(xdata, db) conn.Write([]byte(fmt.Sprintf("action=%s\n\n",resp))) conn.Close()