Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vm-runner: iptables-based traffic metering #1153

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion autoscaler-agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ data:
"activeTimeMetricName": "active_time_seconds",
"collectEverySeconds": 4,
"accumulateEverySeconds": 24,
"clients": {}
"clients": {},
"ingressBytesMetricName": "ingress_bytes",
"egressBytesMetricName": "egress_bytes"
},
"monitor": {
"serverPort": 10301,
Expand Down
106 changes: 83 additions & 23 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
"github.com/containerd/cgroups/v3/cgroup2"
"github.com/coreos/go-iptables/iptables"
"github.com/digitalocean/go-qemu/qmp"
"github.com/docker/libnetwork/types"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -607,28 +608,30 @@ func runInitScript(logger *zap.Logger, script string) error {
}

type Config struct {
vmSpecDump string
vmStatusDump string
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
enableDummyCPUServer bool
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
vmSpecDump string
vmStatusDump string
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
enableDummyCPUServer bool
enableNetworkMonitoring bool // TODO(myrrc): pass in vmspec?
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
}

func newConfig(logger *zap.Logger) *Config {
cfg := &Config{
vmSpecDump: "",
vmStatusDump: "",
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
enableDummyCPUServer: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
vmSpecDump: "",
vmStatusDump: "",
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
enableDummyCPUServer: false,
enableNetworkMonitoring: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
}
flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump,
"Base64 encoded VirtualMachine json specification")
Expand All @@ -644,6 +647,9 @@ func newConfig(logger *zap.Logger) *Config {
flag.BoolVar(&cfg.enableDummyCPUServer, "enable-dummy-cpu-server",
cfg.skipCgroupManagement,
"Use with -skip-cgroup-management. Provide a CPU server but don't actually do anything with it")
flag.BoolVar(&cfg.enableNetworkMonitoring, "enable-network-monitoring",
cfg.skipCgroupManagement,
"Enable in/out traffic measuring with iptables")
flag.StringVar(&cfg.diskCacheSettings, "qemu-disk-cache-settings",
cfg.diskCacheSettings, "Cache settings to add to -drive args for VM disks")
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
Expand Down Expand Up @@ -1059,7 +1065,7 @@ func runQEMU(
}

wg.Add(1)
go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg)
go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, cfg.enableNetworkMonitoring)
}
wg.Add(1)
go forwardLogs(ctx, logger, &wg)
Expand Down Expand Up @@ -1162,12 +1168,13 @@ type cpuServerCallbacks struct {
set func(*zap.Logger, vmv1.MilliCPU) error
}

func listenForCPUChanges(
func listenForHTTPRequests(
ctx context.Context,
logger *zap.Logger,
port int32,
callbacks cpuServerCallbacks,
wg *sync.WaitGroup,
networkUsageRoute bool,
) {
defer wg.Done()
mux := http.NewServeMux()
Expand All @@ -1180,6 +1187,13 @@ func listenForCPUChanges(
mux.HandleFunc("/cpu_current", func(w http.ResponseWriter, r *http.Request) {
handleCPUCurrent(cpuCurrentLogger, w, r, callbacks.get)
})
if networkUsageRoute {
networkUsageLogger := loggerHandlers.Named("network_usage")
logger.Info("Listening for network_usage")
mux.HandleFunc("/network_usage", func(w http.ResponseWriter, r *http.Request) {
handleGetNetworkUsage(networkUsageLogger, w, r)
})
}
server := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
Expand All @@ -1194,14 +1208,60 @@ func listenForCPUChanges(
select {
case err := <-errChan:
if errors.Is(err, http.ErrServerClosed) {
logger.Info("cpu_change server closed")
logger.Info("http server closed")
} else if err != nil {
logger.Fatal("cpu_change exited with error", zap.Error(err))
logger.Fatal("http server exited with error", zap.Error(err))
}
case <-ctx.Done():
err := server.Shutdown(context.Background())
logger.Info("shut down cpu_change server", zap.Error(err))
logger.Info("shut down http server", zap.Error(err))
}
}

func incNetBytesCounter(iptables iptables.IPTables, chain string, cnt *vmv1.NetworkBytes) error {
// TODO(myrrc) neon_private_cidr?
rules, err := iptables.StructuredStats("filter", chain)
if err != nil {
return err
}
for _, stat := range rules {
if stat.Protocol == "tcp" {
*cnt += vmv1.NetworkBytes(stat.Bytes)
}
}
return nil
}

func handleGetNetworkUsage(logger *zap.Logger, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
logger.Error("unexpected method", zap.String("method", r.Method))
w.WriteHeader(400)
return
}

counts := vmv1.VirtualMachineNetworkUsage{IngressBytes: 0, EgressBytes: 0}
// Rules configured at github.com/neondatabase/cloud/blob/main/compute-init/compute-init.sh#L98
iptables := iptables.IPTables{}
if err := incNetBytesCounter(iptables, "INPUT", &counts.IngressBytes); err != nil {
logger.Error("error reading ingress byte counts", zap.Error(err))
w.WriteHeader(500)
return
}
if err := incNetBytesCounter(iptables, "OUTPUT", &counts.EgressBytes); err != nil {
logger.Error("error reading egress byte counts", zap.Error(err))
w.WriteHeader(500)
return
}
body, err := json.Marshal(counts)
if err != nil {
logger.Error("could not marshal byte counts", zap.Error(err))
w.WriteHeader(500)
return
}

w.Header().Add("Content-Type", "application/json")
w.Write(body) //nolint:errcheck // Not much to do with the error here. TODO: log it?
logger.Info("Responded with byte counts", zap.String("byte_counts", string(body)))
}

func printWithNewline(slice []byte) error {
Expand Down
82 changes: 82 additions & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package v1

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"slices"
"time"

Expand Down Expand Up @@ -68,6 +71,14 @@ type VirtualMachineResources struct {
MemorySlotSize resource.Quantity `json:"memorySlotSize"`
}

type (
NetworkBytes uint64
VirtualMachineNetworkUsage struct {
IngressBytes NetworkBytes `json:"ingress_bytes"`
EgressBytes NetworkBytes `json:"egress_bytes"`
}
)

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// VirtualMachineSpec defines the desired state of VirtualMachine
Expand Down Expand Up @@ -150,6 +161,11 @@ type VirtualMachineSpec struct {
// propagated to the VM.
// +optional
TargetRevision *RevisionWithTime `json:"targetRevision,omitempty"`

// Enable network monitoring on the VM
// +kubebuilder:default:=false
// +optional
EnableNetworkMonitoring *bool `json:"enableNetworkMonitoring,omitempty"`
}

func (spec *VirtualMachineSpec) Resources() VirtualMachineResources {
Expand Down Expand Up @@ -590,6 +606,41 @@ func (p VmPhase) IsAlive() bool {
}
}

// +kubebuilder:object:generate=false
type NetworkUsageJSONError struct {
Err error
}

func (e NetworkUsageJSONError) Error() string {
return fmt.Sprintf("Error unmarshaling network usage: %s", e.Err.Error())
}

func (e NetworkUsageJSONError) Unwrap() error {
return e.Err
}

// +kubebuilder:object:generate=false
type NetworkUsageRequestError struct {
Err error
}

func (e NetworkUsageRequestError) Error() string {
return fmt.Sprintf("Error fetching network usage: %s", e.Err.Error())
}

func (e NetworkUsageRequestError) Unwrap() error {
return e.Err
}

// +kubebuilder:object:generate=false
type NetworkUsageStatusCodeError struct {
StatusCode int
}

func (e NetworkUsageStatusCodeError) Error() string {
return fmt.Sprintf("Unexpected HTTP status code when fetching network usage: %d", e.StatusCode)
}

//+genclient
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
Expand All @@ -613,6 +664,37 @@ type VirtualMachine struct {
Status VirtualMachineStatus `json:"status,omitempty"`
}

func (vm *VirtualMachine) GetNetworkUsage(ctx context.Context, timeout time.Duration) (*VirtualMachineNetworkUsage, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf("http://%s:%d/network_usage", vm.Status.PodIP, vm.Spec.RunnerPort),
nil,
)
if err != nil {
return nil, fmt.Errorf("error initializing http request to fetch network usage: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, NetworkUsageRequestError{Err: err}
}
if resp.StatusCode != 200 {
return nil, NetworkUsageStatusCodeError{StatusCode: resp.StatusCode}
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading http response body: %w", err)
}
var result VirtualMachineNetworkUsage
if err := json.Unmarshal(body, &result); err != nil {
return nil, NetworkUsageJSONError{Err: err}
}
return &result, nil
}

func (vm *VirtualMachine) Cleanup() {
vm.Status.PodName = ""
vm.Status.PodIP = ""
Expand Down
1 change: 1 addition & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (r *VirtualMachine) ValidateUpdate(old runtime.Object) (admission.Warnings,
{".spec.enableAcceleration", func(v *VirtualMachine) any { return v.Spec.EnableAcceleration }},
{".spec.enableSSH", func(v *VirtualMachine) any { return v.Spec.EnableSSH }},
{".spec.initScript", func(v *VirtualMachine) any { return v.Spec.InitScript }},
{".spec.enableNetworkMonitoring", func(v *VirtualMachine) any { return v.Spec.EnableNetworkMonitoring }},
}

for _, info := range immutableFields {
Expand Down
20 changes: 20 additions & 0 deletions neonvm/apis/neonvm/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions neonvm/config/crd/bases/vm.neon.tech_virtualmachines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,10 @@ spec:
default: true
description: Use KVM acceleation
type: boolean
enableNetworkMonitoring:
default: false
description: Enable network monitoring on the VM
type: boolean
enableSSH:
default: true
description: |-
Expand Down
Loading
Loading