Skip to content

Commit

Permalink
cache: limit size (to 64GB by default)
Browse files Browse the repository at this point in the history
  • Loading branch information
miku committed Feb 3, 2022
1 parent 2636b9c commit f6f01a8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
45 changes: 40 additions & 5 deletions go/ckit/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,72 @@ package cache

import (
"errors"
"log"
"os"
"sync"
"time"

"github.com/jmoiron/sqlx"
"github.com/slub/labe/go/ckit/tabutils"

_ "github.com/mattn/go-sqlite3"
)

var ErrCacheMiss = errors.New("cache miss")
var (
ErrCacheMiss = errors.New("cache miss")
ErrReadOnly = errors.New("read only")
DefaultMaxFileSize int64 = 1 << 36
)

// Cache is a minimalistic cache based on sqlite. In the future, values could
// be transparently compressed as well.
//
// TODO: set a limit on filesize, e.g. 100G; run periodic checks, whether
// maximum filesize is exceeded and switch to read-only mode, if necessary
type Cache struct {
Path string

Path string
MaxFileSize int64
// Lock applies to both, db and readOnly.
sync.Mutex
db *sqlx.DB
db *sqlx.DB
readOnly bool
}

func New(path string) (*Cache, error) {
conn, err := sqlx.Open("sqlite3", path)
if err != nil {
return nil, err
}
c := &Cache{Path: path, db: conn}
c := &Cache{Path: path, db: conn, MaxFileSize: DefaultMaxFileSize}
if err := c.init(); err != nil {
return nil, err
}
c.startSizeWatcher()
return c, nil
}

// startSizeWatcher sets up a goroutine that will watch the filesize
// periodically and will switch to read-only mode, if a given size has been
// exceeded.
func (c *Cache) startSizeWatcher() {
go func() {
ticker := time.NewTicker(30 * time.Second)
for t := range ticker.C {
fi, err := os.Stat(c.Path)
if err != nil {
log.Printf("could not stat file at %s, stopping watch thread", c.Path)
break
}
if fi.Size() > c.MaxFileSize {
c.Lock()
log.Printf("switching cache at %s to read-only mode at %v", c.Path, t)
c.readOnly = true
c.Unlock()
}
}
}()
}

func (c *Cache) init() error {
s := `
PRAGMA journal_mode = OFF;
Expand Down Expand Up @@ -106,6 +138,9 @@ func (c *Cache) ItemCount() (int, error) {
func (c *Cache) Set(key string, value []byte) error {
c.Lock()
defer c.Unlock()
if c.readOnly {
return ErrReadOnly
}
s := `INSERT into map (k, v) VALUES (?, ?)`
_, err := c.db.Exec(s, key, value)
return err
Expand Down
6 changes: 5 additions & 1 deletion go/ckit/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,11 @@ func (s *Server) handleLocalIdentifier() http.HandlerFunc {
return fmt.Errorf("cache close: %w", err)
}
if err := s.Cache.Set(response.ID, buf.Bytes()); err != nil {
return fmt.Errorf("failed to cache value for %s: %v", response.ID, err)
if err == cache.ErrReadOnly {
return nil
} else {
return fmt.Errorf("failed to cache value for %s: %v", response.ID, err)
}
}
s.Stats.MeasureSinceWithLabels("cached", t, nil)
sw.Record("cached value")
Expand Down

0 comments on commit f6f01a8

Please sign in to comment.