Skip to content

Commit

Permalink
Protect db threads (#404)
Browse files Browse the repository at this point in the history
* db: skip failing dbs, protect app-backed threads

Signed-off-by: Sander Pick <[email protected]>

* ci: remove android tests

Signed-off-by: Sander Pick <[email protected]>

* manager: delete orphaned dbs

Signed-off-by: Sander Pick <[email protected]>

* net: adds net api token for apps

Signed-off-by: Sander Pick <[email protected]>
  • Loading branch information
sanderpick authored Jul 16, 2020
1 parent 43b77ee commit 00faa63
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 52 deletions.
30 changes: 0 additions & 30 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,3 @@ jobs:
go get -v -t -d ./...
- name: Test
run: go test ./integrationtests/foldersync/

android-client:
name: Android Client Test
needs: [test]
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.14
- name: set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Install protoc
uses: arduino/setup-protoc@master
with:
version: '3.11.2'
- name: Install Android SDK
uses: malinskiy/action-android/install-sdk@release/0.0.5
- name: Check out code
uses: actions/checkout@v1
- name: Run Daemon
run: |
(rm -rf .threads && go run threadsd/main.go) &
- name: Test
run: |
cd api/pb/android
./gradlew testReleaseUnitTest
19 changes: 17 additions & 2 deletions core/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -12,9 +13,16 @@ import (
"github.com/textileio/go-threads/broadcast"
"github.com/textileio/go-threads/core/net"
"github.com/textileio/go-threads/core/thread"
"github.com/textileio/go-threads/util"
)

var log = logging.Logger("app")
var (
log = logging.Logger("app")

// ErrThreadInUse indicates an operation could not be completed because the
// thread is bound to an app.
ErrThreadInUse = errors.New("thread is in use")
)

const (
busTimeout = time.Second * 10
Expand Down Expand Up @@ -105,6 +113,7 @@ type Connector struct {
Net Net

app App
token net.Token
threadID thread.ID
threadKey thread.Key
logID peer.ID
Expand All @@ -130,6 +139,7 @@ func NewConnector(app App, net Net, tinfo thread.Info, conn Connection) (*Connec
a := &Connector{
Net: net,
app: app,
token: util.GenerateRandomBytes(32),
threadID: tinfo.ID,
threadKey: tinfo.Key,
logID: lg.ID,
Expand Down Expand Up @@ -162,6 +172,11 @@ func (c *Connector) ThreadID() thread.ID {
return c.threadID
}

// Token returns the net token.
func (c *Connector) Token() net.Token {
return c.token
}

func (c *Connector) threadToApp(con Connection, wg *sync.WaitGroup) {
defer c.goRoutines.Done()
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -205,7 +220,7 @@ func (c *Connector) appToThread(wg *sync.WaitGroup) {
return
}
ctx, cancel := context.WithTimeout(context.Background(), addRecordTimeout)
if _, err := c.Net.CreateRecord(ctx, c.threadID, event.Node, net.WithThreadToken(event.Token)); err != nil {
if _, err := c.Net.CreateRecord(ctx, c.threadID, event.Node, net.WithThreadToken(event.Token), net.WithAPIToken(c.token)); err != nil {
log.Fatalf("error writing record: %v", err)
}
cancel()
Expand Down
10 changes: 10 additions & 0 deletions core/net/net.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net

import (
"bytes"
"context"
"io"

Expand Down Expand Up @@ -69,3 +70,12 @@ type API interface {
// Subscribe returns a read-only channel of records.
Subscribe(ctx context.Context, opts ...SubOption) (<-chan ThreadRecord, error)
}

// Token is used to restrict network APIs to a single app.App.
// In other words, a net token protects against writes and deletes
// external to an app.
type Token []byte

func (t Token) Equal(b Token) bool {
return bytes.Equal(t, b)
}
13 changes: 12 additions & 1 deletion core/net/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func WithNewThreadToken(t thread.Token) NewThreadOption {

// ThreadOptions defines options for interacting with a thread.
type ThreadOptions struct {
Token thread.Token
Token thread.Token
APIToken Token
}

// ThreadOption specifies thread options.
Expand All @@ -54,6 +55,16 @@ func WithThreadToken(t thread.Token) ThreadOption {
}
}

// WithAPIToken provides additional authorization for interacting
// with a thread as an application.
// For example, this is used by a db.DB to ensure that only it can
// create/add records or delete the underlying thread.
func WithAPIToken(t Token) ThreadOption {
return func(args *ThreadOptions) {
args.APIToken = t
}
}

// SubOptions defines options for a thread subscription.
type SubOptions struct {
ThreadIDs thread.IDSlice
Expand Down
23 changes: 15 additions & 8 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,16 @@ func newDB(n app.Net, id thread.ID, opts *NewOptions) (*DB, error) {
localEventsBus: app.NewLocalEventsBus(),
stateChangedNotifee: &stateChangedNotifee{},
}
if err := d.putName(opts.Name); err != nil {
if err := d.loadName(); err != nil {
return nil, err
}
if err := d.loadName(); err != nil {
prevName := d.name
if opts.Name != "" {
d.name = opts.Name
} else if prevName == "" {
d.name = "unnamed"
}
if err := d.saveName(prevName); err != nil {
return nil, err
}
if err := d.reCreateCollections(); err != nil {
Expand All @@ -179,7 +185,8 @@ func newDB(n app.Net, id thread.ID, opts *NewOptions) (*DB, error) {

connector, err := n.ConnectApp(d, id)
if err != nil {
log.Fatalf("unable to connect app: %s", err)
// @todo: Consider making this fatal again after fixing #400
return nil, err
}
d.connector = connector
return d, nil
Expand All @@ -192,15 +199,15 @@ func managedDatastore(ds ds.Datastore) bool {
return ok
}

// putName saves a name for db.
func (d *DB) putName(name string) error {
if name == "" {
// saveName saves the db name.
func (d *DB) saveName(prevName string) error {
if d.name == prevName {
return nil
}
if !nameRx.MatchString(name) {
if !nameRx.MatchString(d.name) {
return ErrInvalidName
}
return d.datastore.Put(dsName, []byte(name))
return d.datastore.Put(dsName, []byte(d.name))
}

// loadName loads db name if present.
Expand Down
8 changes: 8 additions & 0 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"errors"
"io/ioutil"
"os"
"reflect"
Expand All @@ -14,6 +15,7 @@ import (
format "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multiaddr"
"github.com/textileio/go-threads/common"
"github.com/textileio/go-threads/core/app"
core "github.com/textileio/go-threads/core/db"
"github.com/textileio/go-threads/core/thread"
"github.com/textileio/go-threads/util"
Expand Down Expand Up @@ -47,6 +49,12 @@ func TestE2EWithThreads(t *testing.T) {
dummyJSON = util.SetJSONProperty("Counter", 42, dummyJSON)
checkErr(t, c1.Save(dummyJSON))

// Make sure the thread can't be deleted directly
err = n1.DeleteThread(context.Background(), id1)
if !errors.Is(err, app.ErrThreadInUse) {
t.Fatal("thread should have been protected from deletion")
}

// Boilerplate to generate peer1 thread-addr
// @todo: This should be a network method
peer1Addr := n1.Host().Addrs()[0]
Expand Down
28 changes: 24 additions & 4 deletions db/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewManager(network app.Net, opts ...NewOption) (*Manager, error) {
return nil, err
}
defer results.Close()
invalids := make(map[thread.ID]struct{})
for res := range results.Next() {
parts := strings.Split(ds.RawKey(res.Key).String(), "/")
if len(parts) < 3 {
Expand All @@ -83,16 +84,28 @@ func NewManager(network app.Net, opts ...NewOption) (*Manager, error) {
if _, ok := m.dbs[id]; ok {
continue
}
if _, ok := invalids[id]; ok {
continue
}
opts, err := getDBOptions(id, m.opts, "")
if err != nil {
return nil, err
}
s, err := newDB(m.network, id, opts)
if err != nil {
return nil, err
log.Errorf("unable to reload db %s: %s (marked for deletion)", id, err)
invalids[id] = struct{}{}
continue
}
m.dbs[id] = s
}

// Cleanup invalids
for id := range invalids {
if err := m.deleteThreadNamespace(id); err != nil {
return nil, err
}
}
return m, nil
}

Expand Down Expand Up @@ -229,14 +242,23 @@ func (m *Manager) DeleteDB(ctx context.Context, id thread.ID, opts ...ManagedOpt
if err := db.Close(); err != nil {
return err
}
if err := m.network.DeleteThread(ctx, id, net.WithThreadToken(args.Token)); err != nil {
if err := m.network.DeleteThread(ctx, id, net.WithThreadToken(args.Token), net.WithAPIToken(db.connector.Token())); err != nil {
return err
}

// Cleanup keys used by the db
if err := id.Validate(); err != nil {
return err
}
if err := m.deleteThreadNamespace(id); err != nil {
return err
}

delete(m.dbs, id)
return nil
}

func (m *Manager) deleteThreadNamespace(id thread.ID) error {
pre := dsManagerBaseKey.ChildString(id.String())
q := query.Query{Prefix: pre.String(), KeysOnly: true}
results, err := m.opts.Datastore.Query(q)
Expand All @@ -249,8 +271,6 @@ func (m *Manager) DeleteDB(ctx context.Context, id thread.ID, opts ...ManagedOpt
return err
}
}

delete(m.dbs, id)
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions db/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,35 @@ func TestManager_GetDB(t *testing.T) {
_, err = collection.CreateMany([][]byte{person2, person3})
checkErr(t, err)

// Delete the db, we'll try to restart again
err = man.DeleteDB(ctx, id)
checkErr(t, err)

time.Sleep(time.Second)

err = man.Close()
checkErr(t, err)
err = n.Close()
checkErr(t, err)

t.Run("test get deleted db after restart", func(t *testing.T) {
n, err := common.DefaultNetwork(dir, common.WithNetDebug(true), common.WithNetHostAddr(util.FreeLocalAddr()))
checkErr(t, err)
man, err := NewManager(n, WithNewRepoPath(dir), WithNewDebug(true))
checkErr(t, err)

_, err = man.GetDB(ctx, id)
if !errors.Is(err, lstore.ErrThreadNotFound) {
t.Fatal("db was not deleted")
}

time.Sleep(time.Second)

err = man.Close()
checkErr(t, err)
err = n.Close()
checkErr(t, err)
})
})
}

Expand Down
Loading

0 comments on commit 00faa63

Please sign in to comment.