Skip to content

Commit

Permalink
Merge Fuse with Learn (#120)
Browse files Browse the repository at this point in the history
* record numSamples

* removed fuse

* removed fuse

* rm fuse in envelop

* fix e2e test

* revert e2e crd

* better log for loadConfig

* fixed Mutex issues

* fixed Mutex issues
  • Loading branch information
davidhadas authored Jan 6, 2023
1 parent 5993e05 commit 98a0609
Show file tree
Hide file tree
Showing 26 changed files with 97 additions and 386 deletions.
59 changes: 34 additions & 25 deletions cmd/guard-service/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
pi "knative.dev/security-guard/pkg/pluginterfaces"
)

var maxPileCount = uint32(1000)
const (
pileMergeLimit = uint32(1000)
numSamplesLimit = uint32(1000000)
)

// A cached record kept by guard-service for each deployed service
type serviceRecord struct {
Expand Down Expand Up @@ -72,7 +75,6 @@ func (s *services) tick() {
// Tick should not include any asynchronous work
// Move all asynchronous work (e.g. KubeApi work) to go routines
s.mutex.Lock()
defer s.mutex.Unlock()

if len(s.tickerKeys) == 0 {
// Assign more work to be done now and in future ticks
Expand All @@ -90,18 +92,28 @@ func (s *services) tick() {
maxIterations = 100
}

// try to learn one record
i := 0
// find a record to learn
i := 0 // i is the index of the record to learn
var record *serviceRecord
for ; i < maxIterations; i++ {
if record, exists := s.cache[s.tickerKeys[i]]; exists {
// we still have this record, lets learn it
if s.learnPile(record) {
// we learned one record
r, exists := s.cache[s.tickerKeys[i]]
if exists {
if r.pile.Count != 0 {
// we will learn this record!
record = r
// (during the next tick we should try the next one)
i++
break
}
}
}
s.mutex.Unlock()
// Must unlock s.mutex before s.learnPile

if record != nil {
// lets learn it
s.learnPile(record)
}

// remove the keys we processed from the key slice
s.tickerKeys = s.tickerKeys[i:]
Expand Down Expand Up @@ -132,6 +144,7 @@ func (s *services) get(ns string, sid string, cmFlag bool) *serviceRecord {
// try to get from cache
record := s.cache[service]
s.mutex.Unlock()
// Must unlock s.mutex before s.kmgr.Watch, s.kmgr.GetGuardian, s.set

// watch any unknown namespace
if !knownNamespace {
Expand All @@ -153,6 +166,7 @@ func (s *services) set(ns string, sid string, cmFlag bool, guardianSpec *spec.Gu
service := serviceKey(ns, sid, cmFlag)

s.mutex.Lock()
defer s.mutex.Unlock()
record, exists := s.cache[service]
if !exists {
record = new(serviceRecord)
Expand All @@ -162,7 +176,6 @@ func (s *services) set(ns string, sid string, cmFlag bool, guardianSpec *spec.Gu
record.cmFlag = cmFlag
s.cache[service] = record
}
s.mutex.Unlock()

record.guardianSpec = guardianSpec
pi.Log.Debugf("cache record for %s.%s", ns, sid)
Expand All @@ -184,37 +197,33 @@ func (s *services) merge(record *serviceRecord, pile *spec.SessionDataPile) {
record.pileMutex.Lock()
record.pile.Merge(pile)
record.pileMutex.Unlock()
if record.pile.Count > maxPileCount {
// Must unlock pileMutex before s.learnPile

if record.pile.Count > pileMergeLimit {
s.learnPile(record)
}
}

// update the record guardianSpec by learning a new config and fusing with the record existing config
// update KubeAPI as well.
// return true if we try to learn and access kubeApi, false if count is zero and we have nothing to do
func (s *services) learnPile(record *serviceRecord) bool {
if record.pile.Count == 0 {
return false
func (s *services) learnPile(record *serviceRecord) {
if record.guardianSpec.Learned == nil {
record.guardianSpec.Learned = new(spec.SessionDataConfig)
}
config := new(spec.SessionDataConfig)

record.pileMutex.Lock()
config.Learn(&record.pile)
record.guardianSpec.Learned.Learn(&record.pile)
record.guardianSpec.NumSamples += record.pile.Count
if record.guardianSpec.NumSamples > numSamplesLimit {
record.guardianSpec.NumSamples = numSamplesLimit
}
record.pile.Clear()
record.pileMutex.Unlock()

if record.guardianSpec.Learned != nil {
config.Fuse(record.guardianSpec.Learned)
}

// update the cached record
record.guardianSpec.Learned = config
record.guardianSpec.Learned.Active = true
// Must unlock record.pileMutex before s.persist

// update the kubeApi record
go s.persist(record)

return true
}

func (s *services) persist(record *serviceRecord) {
Expand Down
11 changes: 1 addition & 10 deletions pkg/apis/guard/v1alpha1/asciiFlags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,7 @@ func (config *AsciiFlagsConfig) learnI(valPile ValuePile) {

// pile is RO and unchanged - never uses pile internal objects
func (config *AsciiFlagsConfig) Learn(pile AsciiFlagsPile) {
*config = AsciiFlagsConfig(pile)
}

func (config *AsciiFlagsConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(*otherValConfig.(*AsciiFlagsConfig))
}

// otherConfig is RO and unchanged - never uses otherConfig internal objects
func (config *AsciiFlagsConfig) Fuse(otherConfig AsciiFlagsConfig) {
*config |= otherConfig
*config |= AsciiFlagsConfig(pile)
}

func (config *AsciiFlagsConfig) Prepare() {
Expand Down
20 changes: 0 additions & 20 deletions pkg/apis/guard/v1alpha1/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,6 @@ func (config *BodyConfig) Learn(pile *BodyPile) {
}
}

func (config *BodyConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(otherValConfig.(*BodyConfig))
}

// otherConfig is RO and unchanged - never uses otherConfig internal objects
func (config *BodyConfig) Fuse(otherConfig *BodyConfig) {
if otherConfig.Structured != nil {
if config.Structured == nil {
config.Structured = new(StructuredConfig)
}
config.Structured.Fuse(otherConfig.Structured)
}
if otherConfig.Unstructured != nil {
if config.Unstructured == nil {
config.Unstructured = new(SimpleValConfig)
}
config.Unstructured.Fuse(otherConfig.Unstructured)
}
}

func (config *BodyConfig) Prepare() {
if config.Structured != nil {
config.Structured.Prepare()
Expand Down
62 changes: 15 additions & 47 deletions pkg/apis/guard/v1alpha1/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ type countRange struct {
Max uint8 `json:"max"`
}

func (cRange *countRange) fuseTwoRanges(otherRange *countRange) bool {
if cRange.Max < otherRange.Min || cRange.Min > otherRange.Max {
// no overlap - nothing to do!
return false
}

// There is overlap of some sort
if cRange.Min > otherRange.Min {
cRange.Min = otherRange.Min
}
if cRange.Max < otherRange.Max {
cRange.Max = otherRange.Max
}
return true
}

// Exposes ValueConfig interface
type CountConfig []countRange

Expand Down Expand Up @@ -116,14 +100,14 @@ func (config *CountConfig) learnI(valPile ValuePile) {

// Learn now offers the simplest single rule support
// pile is RO and unchanged - never uses pile internal objects
// Future: Improve Learn
// Future: Improve Learn - e.g. by supporting more then one range
func (config *CountConfig) Learn(pile CountPile) {
min := uint8(0)
max := uint8(0)
if len(pile) > 0 {
min = pile[0]
max = pile[0]
if len(pile) == 0 {
return
}

min := pile[0]
max := pile[0]
for _, v := range pile {
if min > v {
min = v
Expand All @@ -132,33 +116,17 @@ func (config *CountConfig) Learn(pile CountPile) {
max = v
}
}
*config = append(*config, countRange{min, max})
}

func (config *CountConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(*otherValConfig.(*CountConfig))
}
if *config == nil {
*config = append(*config, countRange{min, max})
return
}

// Fuse CountConfig in-place
// otherValConfig is RO and unchanged - never uses otherValConfig internal objects
// The implementation look to opportunistically merge new entries to existing ones
// The implementation does now squash entries even if after the Fuse they may be squashed
// This is done to achieve Fuse in-place
// Future: Improve Fuse - e.g. by keeping extra entries in Range [0,0] and reusing them instead of adding new entries
func (config *CountConfig) Fuse(otherConfig CountConfig) {
var fused bool
for _, other := range otherConfig {
fused = false
for idx, this := range *config {
if fused = this.fuseTwoRanges(&other); fused {
(*config)[idx] = this
break
}
}
if !fused {
// Creating new countRange avoids both objects pointing to the same object
*config = append(*config, countRange{other.Min, other.Max})
}
if min < (*config)[0].Min {
(*config)[0].Min = min
}
if max > (*config)[0].Max {
(*config)[0].Max = max
}
}

Expand Down
9 changes: 0 additions & 9 deletions pkg/apis/guard/v1alpha1/envelop.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,6 @@ func (config *EnvelopConfig) Learn(pile *EnvelopPile) {
config.ResponseTime.Learn(pile.ResponseTime)
}

func (config *EnvelopConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(otherValConfig.(*EnvelopConfig))
}

func (config *EnvelopConfig) Fuse(otherConfig *EnvelopConfig) {
config.CompletionTime.Fuse(&otherConfig.CompletionTime)
config.ResponseTime.Fuse(&otherConfig.ResponseTime)
}

func (config *EnvelopConfig) Prepare() {
config.CompletionTime.Prepare()
config.ResponseTime.Prepare()
Expand Down
12 changes: 1 addition & 11 deletions pkg/apis/guard/v1alpha1/flagSlice.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,7 @@ func (config *FlagSliceConfig) learnI(valPile ValuePile) {

// otherPile is RO and unchanged - never uses otherPile internal objects
func (config *FlagSliceConfig) Learn(pile FlagSlicePile) {
*config = make(FlagSliceConfig, len(pile))
copy(*config, pile)
}

func (config *FlagSliceConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(*otherValConfig.(*FlagSliceConfig))
}

// otherConfig is RO and unchanged - never uses otherConfig internal objects
func (config *FlagSliceConfig) Fuse(otherConfig FlagSliceConfig) {
*config = mergeFlagSlices(*config, otherConfig)
*config = mergeFlagSlices(*config, pile)
}

func (config *FlagSliceConfig) Prepare() {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/guard/v1alpha1/guardianSpec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Ctrl struct {
type GuardianSpec struct {
Configured *SessionDataConfig `json:"configured"` // configrued criteria
Learned *SessionDataConfig `json:"learned,omitempty"` // Learned citeria
NumSamples uint32 `json:"samples"` // Number of Samples Learned
Control *Ctrl `json:"control"` // Control
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/guard/v1alpha1/httpHeaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ func (config *HeadersConfig) Learn(pile *HeadersPile) {
config.Kv.Learn(pile.Kv)
}

func (config *HeadersConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(otherValConfig.(*HeadersConfig))
}

func (config *HeadersConfig) Fuse(otherConfig *HeadersConfig) {
config.Kv.Fuse(&otherConfig.Kv)
}

func (config *HeadersConfig) Prepare() {
config.Kv.Prepare()
}
9 changes: 0 additions & 9 deletions pkg/apis/guard/v1alpha1/httpMediaType.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,6 @@ func (config *MediaTypeConfig) Learn(pile *MediaTypePile) {
config.Params.Learn(&pile.Params)
}

func (config *MediaTypeConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(otherValConfig.(*MediaTypeConfig))
}

func (config *MediaTypeConfig) Fuse(otherConfig *MediaTypeConfig) {
config.TypeTokens.Fuse(&otherConfig.TypeTokens)
config.Params.Fuse(&otherConfig.Params)
}

func (config *MediaTypeConfig) Prepare() {
config.TypeTokens.Prepare()
config.Params.Prepare()
Expand Down
16 changes: 0 additions & 16 deletions pkg/apis/guard/v1alpha1/httpReq.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,6 @@ func (config *ReqConfig) Learn(pile *ReqPile) {
config.Url.Learn(&pile.Url)
}

func (config *ReqConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(otherValConfig.(*ReqConfig))
}

func (config *ReqConfig) Fuse(otherConfig *ReqConfig) {
config.ClientIp.Fuse(&otherConfig.ClientIp)
config.HopIp.Fuse(&otherConfig.HopIp)
config.Method.Fuse(&otherConfig.Method)
config.Proto.Fuse(&otherConfig.Proto)
config.MediaType.Fuse(&otherConfig.MediaType)
config.ContentLength.Fuse(otherConfig.ContentLength)
config.Headers.Fuse(&otherConfig.Headers)
config.Qs.Fuse(&otherConfig.Qs)
config.Url.Fuse(&otherConfig.Url)
}

func (config *ReqConfig) Prepare() {
config.ClientIp.Prepare()
config.HopIp.Prepare()
Expand Down
10 changes: 0 additions & 10 deletions pkg/apis/guard/v1alpha1/httpResp.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,6 @@ func (config *RespConfig) Learn(pile *RespPile) {
config.ContentLength.Learn(pile.ContentLength)
}

func (config *RespConfig) fuseI(otherValConfig ValueConfig) {
config.Fuse(otherValConfig.(*RespConfig))
}

func (config *RespConfig) Fuse(otherConfig *RespConfig) {
config.Headers.Fuse(&otherConfig.Headers)
config.MediaType.Fuse(&otherConfig.MediaType)
config.ContentLength.Fuse(otherConfig.ContentLength)
}

func (config *RespConfig) Prepare() {
config.Headers.Prepare()
config.MediaType.Prepare()
Expand Down
Loading

0 comments on commit 98a0609

Please sign in to comment.