Skip to content

Commit

Permalink
Update Query (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
louiseschmidtgen authored Sep 13, 2024
1 parent 5b20dbb commit 77f11ae
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 93 deletions.
41 changes: 41 additions & 0 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type Generic struct {
FillSQL string
InsertLastInsertIDSQL string
CreateSQL string
UpdateSQL string
GetSizeSQL string
Retry ErrRetry
TranslateErr TranslateErr
Expand Down Expand Up @@ -312,6 +313,21 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
) maxkv
WHERE maxkv.deleted = 1 OR id IS NULL`, paramCharacter, numbered),

UpdateSQL: q(`
INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
SELECT
? AS name,
0 AS created,
0 AS deleted,
create_revision,
id AS prev_revision,
? AS lease,
? AS value,
value AS old_value
FROM kine WHERE id = (SELECT MAX(id) FROM kine WHERE name = ?)
AND deleted = 0
AND id = ?`, paramCharacter, numbered),

FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered),
AdmissionControlPolicy: &allowAllPolicy{},
Expand Down Expand Up @@ -515,6 +531,31 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6
}
return result.LastInsertId()
}
func (d *Generic) Update(ctx context.Context, key string, value []byte, preRev, ttl int64) (rev int64, updated bool, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName))
defer func() {
if err != nil {
if d.TranslateErr != nil {
err = d.TranslateErr(err)
}
span.RecordError(err)
}
span.End()
}()

result, err := d.execute(ctx, "update_sql", d.UpdateSQL, key, ttl, value, key, preRev)
if err != nil {
logrus.WithError(err).Error("failed to update key")
return 0, false, err
}
if insertCount, err := result.RowsAffected(); err != nil {
return 0, false, err
} else if insertCount == 0 {
return 0, false, nil
}
rev, err = result.LastInsertId()
return rev, true, err
}

// Compact compacts the database up to the revision provided in the method's call.
// After the call, any request for a version older than the given revision will return
Expand Down
47 changes: 4 additions & 43 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Log interface {
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Expand Down Expand Up @@ -275,61 +276,21 @@ func (l *LogStructured) Count(ctx context.Context, prefix, startKey string, revi
return rev, count, nil
}

func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {
func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, updateRet bool, errRet error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Update", otelName))
defer func() {
l.adjustRevision(ctx, &revRet)
kvRev := int64(0)
if kvRet != nil {
kvRev = kvRet.ModRevision
}
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet)
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, updateRet, errRet)
span.SetAttributes(
attribute.String("key", key),
attribute.Int64("revision", revision),
attribute.Int64("lease", lease),
attribute.Int64("value-size", int64(len(value))),
attribute.Int64("adjusted-revision", revRet),
attribute.Int64("kv-mod-revision", kvRev),
attribute.Bool("updated", updateRet),
)
span.End()
}()

rev, event, err := l.get(ctx, key, "", 1, 0, false)
if err != nil {
return 0, nil, false, err
}

if event == nil {
return 0, nil, false, nil
}

if event.KV.ModRevision != revision {
return rev, event.KV, false, nil
}

updateEvent := &server.Event{
KV: &server.KeyValue{
Key: key,
CreateRevision: event.KV.CreateRevision,
Value: value,
Lease: lease,
},
PrevKV: event.KV,
}

rev, err = l.log.Append(ctx, updateEvent)
if err != nil {
rev, event, err := l.get(ctx, key, "", 1, 0, false)
if event == nil {
return rev, nil, false, err
}
return rev, event.KV, false, err
}

updateEvent.KV.ModRevision = rev
return rev, updateEvent.KV, true, err
return l.log.Update(ctx, key, value, revision, lease)
}

func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
Expand Down
39 changes: 21 additions & 18 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Dialect interface {
After(ctx context.Context, rev, limit int64) (*sql.Rows, error)
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (int64, bool, error)
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, int64, error)
Compact(ctx context.Context, revision int64) error
Expand Down Expand Up @@ -272,10 +273,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
return rev, result, server.ErrCompacted
}

select {
case s.notify <- rev:
default:
}
s.notifyWatcherPoll(rev)

return rev, result, err
}
Expand Down Expand Up @@ -441,18 +439,12 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
// and trigger a quick retry for simple out of order events
skip = next
skipTime = time.Now()
select {
case s.notify <- next:
default:
}
s.notifyWatcherPoll(next)
break
} else {
if err := s.d.Fill(s.ctx, next); err == nil {
logrus.Debugf("FILL, revision=%d, err=%v", next, err)
select {
case s.notify <- next:
default:
}
s.notifyWatcherPoll(next)
} else {
logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err)
}
Expand Down Expand Up @@ -537,10 +529,7 @@ func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error)
if err != nil {
return 0, err
}
select {
case s.notify <- rev:
default:
}
s.notifyWatcherPoll(rev)
return rev, nil
}

Expand All @@ -558,11 +547,25 @@ func (s *SQLLog) Create(ctx context.Context, key string, value []byte, lease int
return 0, err
}

s.notifyWatcherPoll(rev)
return rev, nil
}

func (s *SQLLog) Update(ctx context.Context, key string, value []byte, prevRev, lease int64) (rev int64, updated bool, err error) {
rev, updated, err = s.d.Update(ctx, key, value, prevRev, lease)
if err != nil {
return 0, false, err
}

s.notifyWatcherPoll(rev)
return rev, updated, nil
}

func (s *SQLLog) notifyWatcherPoll(revision int64) {
select {
case s.notify <- rev:
case s.notify <- revision:
default:
}
return rev, nil
}

func scan(rows *sql.Rows, event *server.Event) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Backend interface {
Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
DbSize(ctx context.Context) (int64, error)
DoCompact(ctx context.Context) error
Expand Down
32 changes: 19 additions & 13 deletions pkg/kine/server/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func isUpdate(txn *etcdserverpb.TxnRequest) (int64, string, []byte, int64, bool)

func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value []byte, lease int64) (*etcdserverpb.TxnResponse, error) {
var (
kv *KeyValue
ok bool
err error
kv *KeyValue
updated bool
err error
)
updateCnt.Add(ctx, 1)

Expand All @@ -46,26 +46,28 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value

if rev == 0 {
rev, err = l.backend.Create(ctx, key, value, lease)
ok = true

span.SetAttributes(
attribute.Int64("revision", rev),
attribute.Bool("ok", ok),
)
if err == ErrKeyExists {
return &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: false,
}, nil
} else {
updated = true
}
} else {
rev, kv, ok, err = l.backend.Update(ctx, key, value, rev, lease)
span.SetAttributes(attribute.Bool("ok", ok))
rev, updated, err = l.backend.Update(ctx, key, value, rev, lease)
}
if err != nil {
return nil, err
}
span.SetAttributes(attribute.Bool("updated", updated), attribute.Int64("revision", rev))

resp := &etcdserverpb.TxnResponse{
Header: txnHeader(rev),
Succeeded: ok,
Succeeded: updated,
}

if ok {
if updated {
resp.Responses = []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponsePut{
Expand All @@ -76,6 +78,10 @@ func (l *LimitedServer) update(ctx context.Context, rev int64, key string, value
},
}
} else {
rev, kv, err = l.backend.Get(ctx, key, "", 1, rev)
if err != nil {
return nil, err
}
resp.Responses = []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponseRange{
Expand Down
3 changes: 2 additions & 1 deletion test/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ func assertMissingKey(ctx context.Context, g Gomega, client *clientv3.Client, ke
g.Expect(resp.Kvs).To(HaveLen(0))
}

func deleteKey(ctx context.Context, g Gomega, client *clientv3.Client, key string) {
func deleteKey(ctx context.Context, g Gomega, client *clientv3.Client, key string) int64 {
// The Get before the Delete is to trick kine to accept the transaction
resp, err := client.Txn(ctx).
Then(clientv3.OpGet(key), clientv3.OpDelete(key)).
Commit()

g.Expect(err).To(BeNil())
g.Expect(resp.Succeeded).To(BeTrue())
return resp.Header.Revision
}

func assertKey(ctx context.Context, g Gomega, client *clientv3.Client, key string, value string) {
Expand Down
Loading

0 comments on commit 77f11ae

Please sign in to comment.