Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store Events in separate database #247

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions cmd/metal-api/internal/datastore/event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package datastore

import (
"fmt"
"time"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-lib/zapup"
)

// ListProvisioningEventContainers returns all machine provisioning event containers.
Expand Down Expand Up @@ -35,3 +39,67 @@ func (rs *RethinkStore) CreateProvisioningEventContainer(ec *metal.ProvisioningE
func (rs *RethinkStore) UpsertProvisioningEventContainer(ec *metal.ProvisioningEventContainer) error {
return rs.upsertEntity(rs.eventTable(), ec)
}

func (rs *RethinkStore) EvaluateMachineLiveliness(m metal.Machine) (metal.MachineLiveliness, error) {
provisioningEvents, err := rs.FindProvisioningEventContainer(m.ID)
if err != nil {
// we have no provisioning events... we cannot tell
return metal.MachineLivelinessUnknown, fmt.Errorf("no provisioningEvents found for ID: %s", m.ID)
}

old := *provisioningEvents

if provisioningEvents.LastEventTime != nil {
if time.Since(*provisioningEvents.LastEventTime) > metal.MachineDeadAfter {
if m.Allocation != nil {
// the machine is either dead or the customer did turn off the phone home service
provisioningEvents.Liveliness = metal.MachineLivelinessUnknown
} else {
// the machine is just dead
provisioningEvents.Liveliness = metal.MachineLivelinessDead
}
} else {
provisioningEvents.Liveliness = metal.MachineLivelinessAlive
}
err = rs.UpdateProvisioningEventContainer(&old, provisioningEvents)
if err != nil {
return provisioningEvents.Liveliness, err
}
}

return provisioningEvents.Liveliness, nil
}

func (rs *RethinkStore) ProvisioningEventForMachine(machineID string, event metal.ProvisioningEvent) (*metal.ProvisioningEventContainer, error) {
ec, err := rs.FindProvisioningEventContainer(machineID)
if err != nil && !metal.IsNotFound(err) {
return nil, err
}

if ec == nil {
ec = &metal.ProvisioningEventContainer{
Base: metal.Base{
ID: machineID,
},
Liveliness: metal.MachineLivelinessAlive,
}
}
now := time.Now()
ec.LastEventTime = &now

if event.Event == metal.ProvisioningEventAlive {
zapup.MustRootLogger().Sugar().Debugw("received provisioning alive event", "id", ec.ID)
ec.Liveliness = metal.MachineLivelinessAlive
} else if event.Event == metal.ProvisioningEventPhonedHome && len(ec.Events) > 0 && ec.Events[0].Event == metal.ProvisioningEventPhonedHome {
zapup.MustRootLogger().Sugar().Debugw("swallowing repeated phone home event", "id", ec.ID)
ec.Liveliness = metal.MachineLivelinessAlive
} else {
ec.Events = append([]metal.ProvisioningEvent{event}, ec.Events...)
ec.IncompleteProvisioningCycles = ec.CalculateIncompleteCycles(zapup.MustRootLogger().Sugar())
ec.Liveliness = metal.MachineLivelinessAlive
}
ec.TrimEvents(metal.ProvisioningEventsInspectionLimit)

err = rs.UpsertProvisioningEventContainer(ec)
return ec, err
}
152 changes: 152 additions & 0 deletions cmd/metal-api/internal/service/event-service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package service

import (
"errors"
"net/http"
"time"

"github.com/metal-stack/security"

"github.com/metal-stack/metal-lib/httperrors"
"github.com/metal-stack/metal-lib/zapup"
"go.uber.org/zap"

mdm "github.com/metal-stack/masterdata-api/pkg/client"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
v1 "github.com/metal-stack/metal-api/cmd/metal-api/internal/service/v1"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/utils"

restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"
)

type eventResource struct {
webResource
mdc mdm.Client
userGetter security.UserGetter
}

// NewMachine returns a webservice for machine specific endpoints.
func NewEvent(
ds *datastore.RethinkStore,
mdc mdm.Client,
userGetter security.UserGetter,
) (*restful.WebService, error) {
r := eventResource{
webResource: webResource{
ds: ds,
},
mdc: mdc,
userGetter: userGetter,
}

return r.webService(), nil
}

// webService creates the webservice endpoint
func (r eventResource) webService() *restful.WebService {
ws := new(restful.WebService)
ws.
Path(BasePath + "v1/event").
Consumes(restful.MIME_JSON).
Produces(restful.MIME_JSON)

tags := []string{"event"}

ws.Route(ws.GET("/machine/{id}").
To(viewer(r.getProvisioningEventContainer)).
Operation("getProvisioningEventContainer").
Doc("get the current machine provisioning event container").
Param(ws.PathParameter("id", "identifier of the machine").DataType("string")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes(v1.MachineRecentProvisioningEvents{}).
Returns(http.StatusOK, "OK", v1.MachineRecentProvisioningEvents{}).
DefaultReturns("Error", httperrors.HTTPErrorResponse{}))

ws.Route(ws.POST("/machine/{id}").
To(editor(r.addProvisioningEvent)).
Operation("addProvisioningEvent").
Doc("adds a machine provisioning event").
Param(ws.PathParameter("id", "identifier of the machine").DataType("string")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(v1.MachineProvisioningEvent{}).
Writes(v1.MachineRecentProvisioningEvents{}).
Returns(http.StatusOK, "OK", v1.MachineRecentProvisioningEvents{}).
DefaultReturns("Error", httperrors.HTTPErrorResponse{}))

return ws
}

func (r eventResource) getProvisioningEventContainer(request *restful.Request, response *restful.Response) {
id := request.PathParameter("id")

// check for existence of the machine
_, err := r.ds.FindMachineByID(id)
if checkError(request, response, utils.CurrentFuncName(), err) {
return
}

ec, err := r.ds.FindProvisioningEventContainer(id)
if checkError(request, response, utils.CurrentFuncName(), err) {
return
}
err = response.WriteHeaderAndEntity(http.StatusOK, v1.NewMachineRecentProvisioningEvents(ec))
if err != nil {
zapup.MustRootLogger().Error("Failed to send response", zap.Error(err))
return
}
}

func (r eventResource) addProvisioningEvent(request *restful.Request, response *restful.Response) {
id := request.PathParameter("id")
m, err := r.ds.FindMachineByID(id)
if err != nil && !metal.IsNotFound(err) {
if checkError(request, response, utils.CurrentFuncName(), err) {
return
}
}

// an event can actually create an empty machine. This enables us to also catch the very first PXE Booting event
// in a machine lifecycle
if m == nil {
m = &metal.Machine{
Base: metal.Base{
ID: id,
},
}
err = r.ds.CreateMachine(m)
if checkError(request, response, utils.CurrentFuncName(), err) {
return
}
}

var requestPayload v1.MachineProvisioningEvent
err = request.ReadEntity(&requestPayload)
if checkError(request, response, utils.CurrentFuncName(), err) {
return
}
ok := metal.AllProvisioningEventTypes[metal.ProvisioningEventType(requestPayload.Event)]
if !ok {
if checkError(request, response, utils.CurrentFuncName(), errors.New("unknown provisioning event")) {
return
}
}

event := metal.ProvisioningEvent{
Time: time.Now(),
Event: metal.ProvisioningEventType(requestPayload.Event),
Message: requestPayload.Message,
}
ec, err := r.ds.ProvisioningEventForMachine(id, event)
if checkError(request, response, utils.CurrentFuncName(), err) {
return
}

err = response.WriteHeaderAndEntity(http.StatusOK, v1.NewMachineRecentProvisioningEvents(ec))
if err != nil {
zapup.MustRootLogger().Error("Failed to send response", zap.Error(err))
return
}
}
51 changes: 51 additions & 0 deletions cmd/metal-api/internal/service/event-service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package service

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

restful "github.com/emicklei/go-restful/v3"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
v1 "github.com/metal-stack/metal-api/cmd/metal-api/internal/service/v1"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/testdata"
"github.com/stretchr/testify/require"
)

func TestAddProvisioningEvent(t *testing.T) {
ds, mock := datastore.InitMockDB()
testdata.InitMockDBData(mock)

eventservice, err := NewEvent(ds, nil, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(eventservice)
event := &metal.ProvisioningEvent{
Event: metal.ProvisioningEventPreparing,
Message: "starting metal-hammer",
}
js, _ := json.Marshal(event)
body := bytes.NewBuffer(js)
req := httptest.NewRequest("POST", "/v1/event/machine/1", body)
container = injectEditor(container, req)
req.Header.Add("Content-Type", "application/json")
w := httptest.NewRecorder()
container.ServeHTTP(w, req)

resp := w.Result()
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode, w.Body.String())
var result v1.MachineRecentProvisioningEvents
err = json.NewDecoder(resp.Body).Decode(&result)

require.Nil(t, err)
require.Equal(t, "0", result.IncompleteProvisioningCycles)
require.Len(t, result.Events, 1)
if len(result.Events) > 0 {
require.Equal(t, "starting metal-hammer", result.Events[0].Message)
require.Equal(t, string(metal.ProvisioningEventPreparing), result.Events[0].Event)
}
}
Loading