Skip to content

Commit

Permalink
enhance ephemeral to work with more than 2 players (#14)
Browse files Browse the repository at this point in the history
Co-authored-by: Petra Scherer <[email protected]>
Co-authored-by: Timo Klenk <[email protected]>
Co-authored-by: Lila <[email protected]>
Signed-off-by: Johannes Graf <[email protected]>
  • Loading branch information
3 people authored Apr 6, 2022
1 parent 12c134b commit 6e7113c
Show file tree
Hide file tree
Showing 28 changed files with 544 additions and 314 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ cover.*

### Ko
/_docker_build_files/*
vendor
pkg/mod/
pkg/sumdb/
6 changes: 6 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ these files remains with the original authors.

> **NOTE**: Please keep the following list of contributors sorted.
### Honda Research Institute Europe GmbH

- Graf Johannes (synyx) [[email protected]](mailto:[email protected])
- Klenk Timo (synyx) [[email protected]](mailto:[email protected])
- Scherer Petra (synyx) [[email protected]](mailto:[email protected])

### Robert Bosch GmbH

- Becker Sebastian <[email protected]>
Expand Down
7 changes: 7 additions & 0 deletions charts/ephemeral/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ provided while installing the chart. For example,
helm install --name my-release -f values.yaml ephemeral
```

### Global Parameters

| Parameter | Description | Default |
| ------------- | ----------------- | ------- |
| `playerCount` | Number of players | `2` |

### Discovery Service

| Parameter | Description | Default |
Expand Down Expand Up @@ -116,3 +122,4 @@ helm install --name my-release -f values.yaml ephemeral
| `ephemeral.amphora.path` | The path under which the Amphora serivce is available | `/` |
| `ephemeral.frontendUrl` | The external base URL of the VCP | \`\` |
| `ephemeral.spdz.prime` | The prime used by SPDZ | \`\` |
| `ephemeral.playerId` | Id of this player | \`\` |
3 changes: 2 additions & 1 deletion charts/ephemeral/templates/discovery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ data:
"frontendURL": "{{ .Values.discovery.frontendUrl }}",
"masterHost": "{{ .Values.discovery.master.host }}",
"masterPort": "{{ .Values.discovery.master.port }}",
"slave": {{ if .Values.discovery.isMaster }}false{{ else }}true{{ end }}
"slave": {{ if .Values.discovery.isMaster }}false{{ else }}true{{ end }},
"playerCount": {{ .Values.playerCount }}
}
---
apiVersion: networking.istio.io/v1alpha3
Expand Down
3 changes: 2 additions & 1 deletion charts/ephemeral/templates/ephemeral.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ data:
},
"frontendURL": "{{ .Values.ephemeral.frontendUrl }}",
"discoveryAddress": "{{ .Values.ephemeral.discoveryAddress }}",
"playerID": {{ .Values.ephemeral.playerId }}
"playerID": {{ .Values.ephemeral.playerId }},
"playerCount": {{ .Values.playerCount }}
}
2 changes: 2 additions & 0 deletions charts/ephemeral/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#

# This file defines the default values for all variables used in the Ephemeral Helm Chart.
playerCount: 2

discovery:
service:
annotations: []
Expand Down
10 changes: 8 additions & 2 deletions cmd/discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
panic(err)
}
// TODO: extract this Istio address dynamically.
s := discovery.NewServiceNG(bus, pb, stateTimeout, tr, n, config.FrontendURL, logger, mode, client)
s := discovery.NewServiceNG(bus, pb, stateTimeout, tr, n, config.FrontendURL, logger, mode, client, config.PlayerCount)
if err != nil {
panic(err)
}
Expand All @@ -95,7 +95,7 @@ func NewClient(config *types.DiscoveryConfig, stateTimeout time.Duration, logger
mode := ModeMaster
client := &cl.Client{}
var err error
if config.Slave {
if config.Slave { // If Follower/Slave -> Open GRPc Connection to Master
inCh := make(chan *proto.Event)
outCh := make(chan *proto.Event)
grpcClientConf := &c.TransportClientConfig{
Expand Down Expand Up @@ -167,6 +167,12 @@ func ParseConfig(path string) (*DiscoveryConfig, error) {
if conf.MasterPort == "" {
return nil, errors.New("missing config error, MasterPort must be defined")
}
if conf.PlayerCount == 0 {
return nil, errors.New("missing config error, PlayerCount must be defined")
}
if conf.PlayerCount < 2 {
return nil, errors.New("invalid config error, PlayerCount must be 2 or higher")
}
return &conf, nil
}

Expand Down
102 changes: 83 additions & 19 deletions cmd/discovery/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// SPDX-License-Identifier: Apache-2.0
//
package main_test
package main

import (
"errors"
Expand All @@ -17,7 +17,6 @@ import (
. "github.com/onsi/gomega"
"go.uber.org/zap"

. "github.com/carbynestack/ephemeral/cmd/discovery"
"github.com/carbynestack/ephemeral/pkg/discovery"
. "github.com/carbynestack/ephemeral/pkg/types"
"github.com/carbynestack/ephemeral/pkg/utils"
Expand All @@ -30,6 +29,7 @@ var _ = Describe("Main", func() {
FrontendURL: "abc",
MasterHost: "abc",
MasterPort: "8080",
PlayerCount: 2,
}
logger := zap.NewNop().Sugar()
errCh := make(chan error)
Expand Down Expand Up @@ -57,57 +57,87 @@ var _ = Describe("Main", func() {
})
Context("all required parameters are specified", func() {
AfterEach(func() {
_, err := cmder.CallCMD([]string{fmt.Sprintf("rm %s", path)}, "./")
_, _, err := cmder.CallCMD([]string{fmt.Sprintf("rm %s", path)}, "./")
Expect(err).NotTo(HaveOccurred())
})
It("succeeds", func() {
data := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud",
"masterPort": "31400","slave": false}`)
err := ioutil.WriteFile(path, data, 0644)
Expect(err).NotTo(HaveOccurred())
conf, err := ParseConfig(path)
Expect(err).NotTo(HaveOccurred())
Expect(conf.FrontendURL).To(Equal("apollo.test.specs.cloud"))
Expect(conf.MasterHost).To(Equal("apollo.test.specs.cloud"))
Expect(conf.MasterPort).To(Equal("31400"))
Expect(conf.Slave).To(BeFalse())
Context("parameters are plausible", func() {
It("succeeds", func() {
data := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud",
"masterPort": "31400","slave": false, "playerCount": 2}`)
err := ioutil.WriteFile(path, data, 0644)
Expect(err).NotTo(HaveOccurred())
conf, err := ParseConfig(path)
Expect(err).NotTo(HaveOccurred())
Expect(conf.FrontendURL).To(Equal("apollo.test.specs.cloud"))
Expect(conf.MasterHost).To(Equal("apollo.test.specs.cloud"))
Expect(conf.MasterPort).To(Equal("31400"))
Expect(conf.Slave).To(BeFalse())
})
})

Context("parameters are invalid", func() {
Context("playerCount is invalid", func() {
It("returns an error on PlayerCount == 1", func() {
data := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud",
"masterPort": "31400","slave": false, "playerCount": 1}`)
err := ioutil.WriteFile(path, data, 0644)
Expect(err).NotTo(HaveOccurred())
_, err = ParseConfig(path)
Expect(err).To(HaveOccurred())
})
It("returns an error on negative PlayerCount", func() {
data := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud",
"masterPort": "31400","slave": false, "playerCount": -2}`)
err := ioutil.WriteFile(path, data, 0644)
Expect(err).NotTo(HaveOccurred())
_, err = ParseConfig(path)
Expect(err).To(HaveOccurred())
})
})
})

})
Context("one of the required parameters is missing", func() {
Context("when no frontendURL is defined", func() {
AfterEach(func() {
_, err := cmder.CallCMD([]string{fmt.Sprintf("rm %s", path)}, "./")
_, _, err := cmder.CallCMD([]string{fmt.Sprintf("rm %s", path)}, "./")
Expect(err).NotTo(HaveOccurred())
})
It("returns an error", func() {
path := fmt.Sprintf("/tmp/test-%d", random)
noFrontendURLConfig := []byte(`{"masterHost": "apollo.test.specs.cloud",
"masterPort": "31400","slave": false}`)
"masterPort": "31400","slave": false, "playerCount": 2}`)
err := ioutil.WriteFile(path, noFrontendURLConfig, 0644)
Expect(err).NotTo(HaveOccurred())
_, err = ParseConfig(path)
Expect(err).To(HaveOccurred())

noMasterHostConfigSlave := []byte(`{"frontendURL": "apollo.test.specs.cloud",
"masterPort": "31400","slave": true}`)
"masterPort": "31400","slave": true, "playerCount": 2}`)
err = ioutil.WriteFile(path, noMasterHostConfigSlave, 0644)
Expect(err).NotTo(HaveOccurred())
_, err = ParseConfig(path)
Expect(err).To(HaveOccurred())

noMasterHostConfigMaster := []byte(`{"frontendURL": "apollo.test.specs.cloud",
"masterPort": "31400","slave": false}`)
"masterPort": "31400","slave": false, "playerCount": 2}`)
err = ioutil.WriteFile(path, noMasterHostConfigMaster, 0644)
Expect(err).NotTo(HaveOccurred())
conf, err := ParseConfig(path)
Expect(err).NotTo(HaveOccurred())
Expect(conf).NotTo(BeNil())

noMasterPortConfigSlave := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud","slave": false}`)
noMasterPortConfigSlave := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud","slave": false, "playerCount": 2}`)
err = ioutil.WriteFile(path, noMasterPortConfigSlave, 0644)
Expect(err).NotTo(HaveOccurred())
_, err = ParseConfig(path)
Expect(err).To(HaveOccurred())

noPlayerCountConfig := []byte(`{"frontendURL": "apollo.test.specs.cloud","masterHost": "apollo.test.specs.cloud","slave": false, "masterPort": "31400"}`)
err = ioutil.WriteFile(path, noPlayerCountConfig, 0644)
Expect(err).NotTo(HaveOccurred())
_, err = ParseConfig(path)
Expect(err).To(HaveOccurred())
})
})
Context("when port|busSize|portRange|configLocation are not defined", func() {
Expand Down Expand Up @@ -149,4 +179,38 @@ var _ = Describe("Main", func() {
})
})
})

Context("when getting the stateTimeout", func() {
Context("no stateTimeout was provided in the config", func() {
It("will use the defaultStateTimeout", func() {
var config = &DiscoveryConfig{}
timeout, err := getStateTimeout(config)
Expect(err).NotTo(HaveOccurred())
Expect(timeout).To(Equal(defaultStateTimeout))
})
})
Context("an empty stateTimeout was provided in the config", func() {
It("will use the defaultStateTimeout", func() {
var config = &DiscoveryConfig{StateTimeout: ""}
timeout, err := getStateTimeout(config)
Expect(err).NotTo(HaveOccurred())
Expect(timeout).To(Equal(defaultStateTimeout))
})
})
Context("a valid stateTimeout was provided in the config", func() {
It("will use the provided stateTimeout", func() {
var config = &DiscoveryConfig{StateTimeout: "5m"}
timeout, err := getStateTimeout(config)
Expect(err).NotTo(HaveOccurred())
Expect(timeout).To(Equal(5 * time.Minute))
})
})
Context("an invalid stateTimeout was provided in the config", func() {
It("will return an error", func() {
var config = &DiscoveryConfig{StateTimeout: "invalid"}
_, err := getStateTimeout(config)
Expect(err).To(HaveOccurred())
})
})
})
})
6 changes: 6 additions & 0 deletions cmd/ephemeral/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func GetHandlerChain(conf *SPDZEngineConfig, logger *zap.SugaredLogger) (http.Ha
spdzClient := NewSPDZEngine(logger, utils.NewCommander(), typedConfig)
server := NewServer(spdzClient.Compile, spdzClient.Activate, logger, typedConfig)
activationHandler := http.HandlerFunc(server.ActivationHandler)
// Apply in Order:
// 1) MethodFilter: Check that only POST Requests can go through
// 2) BodyFilter: Check that Request Body is set properly and Sets the CtxConfig to the request
// 3) CompilationHandler: Compiles the script if ?compile=true
// 4) ActivationHandler: Runs the script
filterChain := server.MethodFilter(server.BodyFilter(server.CompilationHandler(activationHandler)))
return filterChain, nil
}
Expand Down Expand Up @@ -118,6 +123,7 @@ func InitTypedConfig(conf *SPDZEngineConfig) (*SPDZEngineTypedConfig, error) {
RInv: rInv,
AmphoraClient: client,
PlayerID: conf.PlayerID,
PlayerCount: conf.PlayerCount,
FrontendURL: conf.FrontendURL,
MaxBulkSize: conf.MaxBulkSize,
DiscoveryAddress: conf.DiscoveryAddress,
Expand Down
2 changes: 1 addition & 1 deletion cmd/ephemeral/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ = Describe("Main", func() {
path = fmt.Sprintf("/tmp/test-%d", random)
})
AfterEach(func() {
_, err := cmder.CallCMD([]string{fmt.Sprintf("rm %s", path)}, "./")
_, _, err := cmder.CallCMD([]string{fmt.Sprintf("rm %s", path)}, "./")
Expect(err).NotTo(HaveOccurred())
})
Context("when it succeeds", func() {
Expand Down
13 changes: 7 additions & 6 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
)

const (
// ExpectedPlayers is a maximum number of computation parties currently supported.
ExpectedPlayers = 2
mpcPodNameLabel = "mpc.podName"
)

var (
basePort = int32(5000)
// BasePort is the base for the port number that is used by the proxy.
BasePort = int32(5000)
baseNetworkName = "player-network"
ctx = context.TODO()
)
Expand All @@ -42,7 +41,7 @@ type Event struct {
type PlayerID int32

// NewServiceNG returns a new instance of discovery service.
func NewServiceNG(bus mb.MessageBus, pub *Publisher, timeout time.Duration, tr t.Transport, n Networker, frontendAddress string, logger *zap.SugaredLogger, mode string, client DiscoveryClient) *ServiceNG {
func NewServiceNG(bus mb.MessageBus, pub *Publisher, timeout time.Duration, tr t.Transport, n Networker, frontendAddress string, logger *zap.SugaredLogger, mode string, client DiscoveryClient, playerCount int) *ServiceNG {
games := map[string]*Game{}
players := map[string]map[PlayerID]*pb.Player{}
pods := map[string]int32{}
Expand All @@ -56,6 +55,7 @@ func NewServiceNG(bus mb.MessageBus, pub *Publisher, timeout time.Duration, tr t
timeout: timeout,
transport: tr,
players: players,
playerCount: playerCount,
pods: pods,
networks: networks,
networker: n,
Expand All @@ -73,6 +73,7 @@ type ServiceNG struct {
pb *Publisher
games map[string]*Game
players map[string]map[PlayerID]*pb.Player
playerCount int
pods map[string]int32
networks map[string]int32
mux sync.Mutex
Expand Down Expand Up @@ -249,8 +250,8 @@ func (s *ServiceNG) processIn(e interface{}) {
name := ev.Name
s.registerPlayer(player, ev.GameID)
g, ok := s.games[ev.GameID]
if !ok {
g, err := NewGame(ctx, ev.GameID, s.bus, s.timeout, s.logger)
if !ok { // If game does not exist, create it
g, err := NewGame(ctx, ev.GameID, s.bus, s.timeout, s.logger, s.playerCount)
if err != nil {
s.errCh <- err
}
Expand Down
Loading

0 comments on commit 6e7113c

Please sign in to comment.