Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Storage Driver: Cassandra #856

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ updates:
- "🤖 Dependencies"
schedule:
interval: "daily"
- package-ecosystem: "gomod"
directory: "/cassandra/" # Location of package manifests
labels:
- "🤖 Dependencies"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/dynamodb/" # Location of package manifests
labels:
Expand Down
43 changes: 43 additions & 0 deletions .github/release-drafter-cassandra.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name-template: 'Cassandra - v$RESOLVED_VERSION'
tag-template: 'cassandra/v$RESOLVED_VERSION'
tag-prefix: cassandra/v
include-paths:
- cassandra
categories:
- title: '🚀 New'
labels:
- '✏️ Feature'
- title: '🧹 Updates'
labels:
- '🧹 Updates'
- '🤖 Dependencies'
- title: '🐛 Fixes'
labels:
- '☢️ Bug'
- title: '📚 Documentation'
labels:
- '📒 Documentation'
change-template: '- $TITLE (#$NUMBER)'
change-title-escapes: '\<*_&' # You can add # and @ to disable mentions, and add ` to disable code blocks.
version-resolver:
major:
labels:
- 'major'
minor:
labels:
- 'minor'
- '✏️ Feature'
patch:
labels:
- 'patch'
- '📒 Documentation'
- '☢️ Bug'
- '🤖 Dependencies'
- '🧹 Updates'
default: patch
template: |
$CHANGES

**Full Changelog**: https://github.com/$OWNER/$REPOSITORY/compare/$PREVIOUS_TAG...cassandra/v$RESOLVED_VERSION

Thank you $CONTRIBUTORS for making this update possible.
6 changes: 5 additions & 1 deletion .github/workflows/gosec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Run Gosec (root)
working-directory: .
run: |
gosec -exclude-dir=arangodb -exclude-dir=badger -exclude-dir=dynamodb -exclude-dir=memcache -exclude-dir=memory -exclude-dir=mongodb -exclude-dir=mysql -exclude-dir=postgres -exclude-dir=redis -exclude-dir=ristretto -exclude-dir=sqlite3 -exclude-dir=s3 -exclude-dir=bbolt -exclude-dir=azureblob -exclude-dir=mssql -exclude-dir=pebble ./....
gosec -exclude-dir=arangodb -exclude-dir=badger -exclude-dir=dynamodb -exclude-dir=memcache -exclude-dir=memory -exclude-dir=mongodb -exclude-dir=mysql -exclude-dir=postgres -exclude-dir=redis -exclude-dir=ristretto -exclude-dir=sqlite3 -exclude-dir=s3 -exclude-dir=bbolt -exclude-dir=azureblob -exclude-dir=mssql -exclude-dir=pebble -exclude-dir=cassandra ./....
# -----
- name: Run Gosec (arangodb)
working-directory: ./arangodb
Expand Down Expand Up @@ -104,3 +104,7 @@ jobs:
working-directory: ./pebble
run: gosec ./...
# -----
- name: Run Gosec (cassandra)
working-directory: ./cassandra
run: gosec ./...
# -----
19 changes: 19 additions & 0 deletions .github/workflows/release-drafter-cassandra.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: Release Drafter Cassandra
on:
push:
# branches to consider in the event; optional, defaults to all
branches:
- master
- main
paths:
- 'cassandra/**'
jobs:
draft_release_cassandra:
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- uses: release-drafter/release-drafter@v5
with:
config-name: release-drafter-cassandra.yml
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
41 changes: 41 additions & 0 deletions .github/workflows/test-cassandra.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
on:
push:
branches:
- master
- main
paths:
- 'cassandra/**'
pull_request:
paths:
- 'cassandra/**'

name: "Tests Cassandra"

jobs:
Tests:
runs-on: ubuntu-latest

strategy:
matrix:
go-version:
- 1.19.x
- 1.20.x

steps:
- name: Fetch Repository
uses: actions/checkout@v3

- name: Run Cassandra
run: |
docker run --name cassandra -e CASSANDRA_BROADCAST_ADDRESS=0.0.0.0 -d -p 9042:9042 cassandra:latest
sleep 30 # Wait for Cassandra to initialize

- name: Install Go
uses: actions/setup-go@v4
with:
go-version: '${{ matrix.go-version }}'

- name: Run Test
run: cd ./cassandra && go test ./... -v -race


112 changes: 112 additions & 0 deletions cassandra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Cassandra

A Cassandra storage driver using [gocql/gocql]("https://github.com/gocql/gocql").

### Table of Contents
- [Signatures](#signatures)
- [Installation](#installation)
- [Examples](#examples)
- [Config](#config)
- [Default Config](#default-config)

### Signatures
```go
func New(config ...Config) Storage
func (s *Storage) Get(key string) ([]byte, error)
func (s *Storage) Set(key string, val []byte, exp time.Duration) error
func (s *Storage) Delete(key string) error
func (s *Storage) Reset() error
func (s *Storage) Close() error
func (s *Storage) Conn() *gocql.Session
```
### Installation
Cassandra is tested on the 2 last [Go versions](https://golang.org/dl/) with support for modules. So make sure to initialize one first if you didn't do that yet:
```bash
go mod init github.com/<user>/<repo>
```
And then install the cassandra implementation:
```bash
go get github.com/gofiber/storage/cassandra
```

### Examples
Import the storage package.
```go
import "github.com/gofiber/storage/cassandra"
```

You can use the following possibilities to create a storage:
```go
// Initialize default config
store := cassandra.New()

// Initialize custom config
store := cassandra.New(cassandra.Config{
Host: "127.0.0.1",
Port: 9042,
Database: "fiber",
Collection: "fiber_storage",
Reset: false,
})


```

### Config
```go
type Config struct {
// Host name where the DB is hosted
//
// Optional. Default is "127.0.0.1"
Host string

// Port where the DB is listening on
//
// Optional. Default is 9042
Port int

// Server username
//
// Optional. Default is ""
Username string

// Server password
//
// Optional. Default is ""
Password string

// Keyspace name
//
// Optional. Default is "cassandra_db"
Keyspace string

// Number of replication
//
// Optional. Default 1
ReplicationFactor int

// Database to be operated on in the cluster.
//
// Optional. Default is "".
Table string

// Reset clears any existing keys in existing Table
//
// Optional. Default is false
Reset bool
}
```

### Default Config
```go
var ConfigDefault = Config{
Host: "127.0.0.1",
Port: 9042,
Username: "",
Password: "",
Table: "cassandra_table",
Keyspace: "cassandra_db",
ReplicationFactor: 1,
}
```

144 changes: 144 additions & 0 deletions cassandra/cassandra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package cassandra

import (
"context"
"fmt"
"github.com/gocql/gocql"
"time"
)

type Storage struct {
Cluster *gocql.ClusterConfig
Session *gocql.Session

cqlSelect string
cqlInsert string
cqlDelete string
cqlReset string
cqlGC string
}

var (
checkSchemaMsg = "The `v` column has an incorrect data type. " +
"It should be BLOB but is instead %s. This will cause encoding-related panics if the DB is not migrated (see https://github.com/gofiber/storage/blob/main/MIGRATE.md)."

dropQuery = "DROP TABLE IF EXISTS %s;"

initQuery = []string{
`CREATE TABLE IF NOT EXISTS %s (
k text PRIMARY KEY,
v blob,
e bigint
);`,
}

checkSchemaQuery = `SELECT validator FROM system.schema_columns
WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND column_name = 'v';`
)

func New(config ...Config) *Storage {
cfg := configDefault(config...)

url := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)

cluster := gocql.NewCluster(url)
cluster.Keyspace = cfg.Keyspace
cluster.ProtoVersion = 4

session, err := cluster.CreateSession()
if err != nil {
panic(err)
}

// Primitive ping method
if err := session.Query("SELECT release_version FROM system.local").Exec(); err != nil {
session.Close()
panic(err)
}
if err := session.Query(fmt.Sprintf(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
cfg.Keyspace,
)).Exec(); err != nil {
session.Close()
panic(err)
}
// Drop table if reset set
ctx := context.Background()
if cfg.Reset {
if err := session.Query(dropQuery, cfg.Table).WithContext(ctx).Exec(); err != nil {
session.Close()
panic(err)
}
}

// Init database queries
ctx = context.Background()
for _, query := range initQuery {

if err := session.Query(fmt.Sprintf(query, cfg.Table)).WithContext(ctx).Exec(); err != nil {
session.Close()
panic(err)
}
}

storage := &Storage{
Cluster: cluster,
Session: session,

cqlSelect: fmt.Sprintf(`SELECT v, e FROM %s WHERE k=?`, cfg.Table),
cqlInsert: fmt.Sprintf(`INSERT INTO %s (k, v, e) VALUES (?, ?, ?)`, cfg.Table),
cqlDelete: fmt.Sprintf(`DELETE FROM %s WHERE k=?`, cfg.Table),
cqlReset: fmt.Sprintf(`TRUNCATE %s`, cfg.Table),
cqlGC: fmt.Sprintf(`DELETE FROM %s WHERE e <= ? AND e != 0`, cfg.Table),
}

return storage
}

// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
ctx := context.Background()
var (
data []byte
exp int64 = 0
)
if err := s.Session.Query(s.cqlSelect, key).WithContext(ctx).Consistency(gocql.One).Scan(&data, &exp); err != nil {
if err == gocql.ErrNotFound {
return nil, nil
}
return nil, err
}

return data, nil
}

// Set sets a value in the storage for the provided key
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
ctx := context.Background()

return s.Session.Query(s.cqlInsert, key, val, int64(exp.Seconds())).WithContext(ctx).Exec()
}

// Delete deletes a value from the storage based on the provided key
func (s *Storage) Delete(key string) error {
ctx := context.Background()

return s.Session.Query(s.cqlDelete, key).WithContext(ctx).Exec()
}

// Reset resets the storage
func (s *Storage) Reset() error {
ctx := context.Background()

return s.Session.Query(s.cqlReset).WithContext(ctx).Exec()
}

// Close closes the connection to the storage
func (s *Storage) Close() error {
s.Session.Close()
return nil
}

func (s *Storage) Conn() *gocql.Session {
return s.Session
}
Loading