diff --git a/e2e/tshark/tshark_test.go b/e2e/tshark/tshark_test.go new file mode 100644 index 00000000..8b0709a9 --- /dev/null +++ b/e2e/tshark/tshark_test.go @@ -0,0 +1,113 @@ +package basic + +import ( + "context" + "io" + "net/http" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/knuu/pkg/instance" + "github.com/celestiaorg/knuu/pkg/knuu" +) + +const ( + s3BucketName = "tshark-test-bucket" + s3Location = "eu-east-1" +) + +func TestTshark(t *testing.T) { + t.Parallel() + // Setup + + ctx := context.Background() + + kn, err := knuu.New(ctx, knuu.Options{}) + require.NoError(t, err, "error creating knuu") + + defer func() { + if err := kn.CleanUp(ctx); err != nil { + t.Logf("error cleaning up knuu: %v", err) + } + }() + + scope := kn.Scope() + t.Logf("Test scope: %s", scope) + + target, err := kn.NewInstance("busybox") + require.NoError(t, err, "error creating instance") + + err = target.SetImage(ctx, "busybox") + require.NoError(t, err, "error setting image") + + err = target.SetCommand("sleep", "infinity") + require.NoError(t, err, "error setting command") + + t.Log("deploying minio as s3 backend") + err = kn.MinioClient.DeployMinio(ctx) + require.NoError(t, err, "error deploying minio") + + t.Log("getting minio configs") + minioConf, err := kn.MinioClient.GetConfigs(ctx) + require.NoError(t, err, "error getting S3 (minio) configs") + + var ( + filename = target.K8sName() + instance.TsharkCaptureFileExtension + keyPrefix = "tshark/" + scope + fileKey = filepath.Join(keyPrefix, filename) + ) + + err = target.EnableTsharkCollector( + instance.TsharkCollectorConfig{ + VolumeSize: "10Gi", + S3AccessKey: minioConf.AccessKeyID, + S3SecretKey: minioConf.SecretAccessKey, + S3Region: s3Location, + S3Bucket: s3BucketName, + CreateBucket: true, // Since we fire up a fresh minio server, we need to create the bucket + S3KeyPrefix: keyPrefix, + S3Endpoint: minioConf.Endpoint, + UploadInterval: 1 * time.Second, // for sake of the test we keep this short + }, + ) + require.NoError(t, err, "error enabling tshark collector") + + err = target.Commit() + require.NoError(t, err, "error committing instance") + + t.Cleanup(func() { + if err := kn.CleanUp(ctx); err != nil { + t.Logf("error cleaning up knuu: %v", err) + } + }) + + // Test logic + + t.Log("starting target instance") + err = target.Start(ctx) + require.NoError(t, err, "error starting instance") + + err = target.WaitInstanceIsRunning(ctx) + require.NoError(t, err, "error waiting for instance to be running") + + // Perform a ping to do generate network traffic to allow tshark to capture it + _, err = target.ExecuteCommand(ctx, "ping", "-c", "4", "google.com") + require.NoError(t, err, "error executing command") + + url, err := kn.MinioClient.GetMinioURL(ctx, fileKey, s3BucketName) + require.NoError(t, err, "error getting minio url") + + resp, err := http.Get(url) + require.NoError(t, err, "error downloading from minio URL") + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode, "URL does not exist or is not accessible") + + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err, "error reading response body") + assert.NotEmpty(t, bodyBytes, "downloaded log file is empty") +} diff --git a/pkg/instance/errors.go b/pkg/instance/errors.go index d7fc8a7b..09c1ece8 100644 --- a/pkg/instance/errors.go +++ b/pkg/instance/errors.go @@ -198,4 +198,13 @@ var ( ErrAddingToProxy = errors.New("AddingToProxy", "error adding '%s' to traefik proxy for service '%s'") ErrGettingProxyURL = errors.New("GettingProxyURL", "error getting proxy URL for service '%s'") ErrProxyNotInitialized = errors.New("ProxyNotInitialized", "proxy not initialized") + ErrTsharkCollectorAlreadyEnabled = errors.New("TsharkCollectorAlreadyEnabled", "tshark collector already enabled for instance '%s'") + ErrCreatingTsharkCollectorInstance = errors.New("CreatingTsharkCollectorInstance", "error creating tshark collector instance") + ErrAddingTsharkCollectorSidecar = errors.New("AddingTsharkCollectorSidecar", "error adding tshark collector sidecar for instance '%s'") + ErrTsharkCollectorConfigNotSet = errors.New("TsharkCollectorConfigNotSet", "tshark collector config not set for instance '%s'. volumeSize: %s, s3AccessKey: %s, s3SecretKey: %s, s3Region: %s, s3BucketName: %s, s3KeyPrefix: %s") + ErrTsharkCollectorInvalidVolumeSize = errors.New("TsharkCollectorInvalidVolumeSize", "invalid volume size format for tshark collector: %s") + ErrTsharkCollectorInvalidS3AccessKey = errors.New("TsharkCollectorInvalidS3AccessKey", "invalid S3 access key format for tshark collector: %s") + ErrTsharkCollectorInvalidS3SecretKey = errors.New("TsharkCollectorInvalidS3SecretKey", "invalid S3 secret key format for tshark collector: %s") + ErrTsharkCollectorS3RegionOrBucketEmpty = errors.New("TsharkCollectorS3RegionOrBucketEmpty", "S3 region or bucket cannot be empty for tshark collector: %s, %s") + ErrRegexpCompile = errors.New("RegexpCompile", "error compiling regexp for %s") ) diff --git a/pkg/instance/helper.go b/pkg/instance/helper.go index 317c4791..7f587a00 100644 --- a/pkg/instance/helper.go +++ b/pkg/instance/helper.go @@ -361,34 +361,35 @@ func (i *Instance) cloneWithSuffix(suffix string) *Instance { clonedBitTwister.SetClient(nil) // reset client to avoid reusing the same client return &Instance{ - name: i.name + suffix, - k8sName: i.k8sName + suffix, - imageName: i.imageName, - state: i.state, - instanceType: i.instanceType, - kubernetesService: i.kubernetesService, - builderFactory: i.builderFactory, - kubernetesReplicaSet: i.kubernetesReplicaSet, - portsTCP: i.portsTCP, - portsUDP: i.portsUDP, - command: i.command, - args: i.args, - env: i.env, - volumes: i.volumes, - memoryRequest: i.memoryRequest, - memoryLimit: i.memoryLimit, - cpuRequest: i.cpuRequest, - policyRules: i.policyRules, - livenessProbe: i.livenessProbe, - readinessProbe: i.readinessProbe, - startupProbe: i.startupProbe, - isSidecar: false, - parentInstance: nil, - sidecars: clonedSidecars, - obsyConfig: i.obsyConfig, - securityContext: &clonedSecurityContext, - BitTwister: &clonedBitTwister, - SystemDependencies: i.SystemDependencies, + name: i.name + suffix, + k8sName: i.k8sName + suffix, + imageName: i.imageName, + state: i.state, + instanceType: i.instanceType, + kubernetesService: i.kubernetesService, + builderFactory: i.builderFactory, + kubernetesReplicaSet: i.kubernetesReplicaSet, + portsTCP: i.portsTCP, + portsUDP: i.portsUDP, + command: i.command, + args: i.args, + env: i.env, + volumes: i.volumes, + memoryRequest: i.memoryRequest, + memoryLimit: i.memoryLimit, + cpuRequest: i.cpuRequest, + policyRules: i.policyRules, + livenessProbe: i.livenessProbe, + readinessProbe: i.readinessProbe, + startupProbe: i.startupProbe, + isSidecar: false, + parentInstance: nil, + sidecars: clonedSidecars, + obsyConfig: i.obsyConfig, + tsharkCollectorConfig: i.tsharkCollectorConfig, + securityContext: &clonedSecurityContext, + BitTwister: &clonedBitTwister, + SystemDependencies: i.SystemDependencies, } } @@ -594,6 +595,17 @@ func (i *Instance) addOtelCollectorSidecar(ctx context.Context) error { return nil } +func (i *Instance) addTsharkCollectorSidecar(ctx context.Context) error { + tsharkSidecar, err := i.createTsharkCollectorInstance(ctx) + if err != nil { + return ErrCreatingTsharkCollectorInstance.WithParams(i.k8sName).Wrap(err) + } + if err := i.AddSidecar(tsharkSidecar); err != nil { + return ErrAddingTsharkCollectorSidecar.WithParams(i.k8sName).Wrap(err) + } + return nil +} + func (i *Instance) createBitTwisterInstance(ctx context.Context) (*Instance, error) { bt, err := New("bit-twister", i.SystemDependencies) if err != nil { diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index e056c6e7..8a1ed70b 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "regexp" "strconv" "strings" "time" @@ -69,6 +70,29 @@ type ObsyConfig struct { prometheusRemoteWriteExporterEndpoint string } +// TsharkCollectorConfig represents the configuration for the tshark collector +type TsharkCollectorConfig struct { + // VolumeSize is the size of the volume to use for the tshark collector + VolumeSize string + // S3AccessKey is the access key to use for the s3 server + S3AccessKey string + // S3SecretKey is the secret key to use for the s3 server + S3SecretKey string + // S3Region is the region of the s3 server + S3Region string + // S3Bucket is the bucket to use for the s3 server + S3Bucket string + // CreateBucket is the flag to create the bucket if it does not exist + CreateBucket bool + // S3KeyPrefix is the key prefix to use for the s3 server + S3KeyPrefix string + // S3Endpoint is the endpoint of the s3 server + S3Endpoint string + + // UploadInterval is the interval at which the tshark collector will upload the pcap file to the s3 server + UploadInterval time.Duration +} + // SecurityContext represents the security settings for a container type SecurityContext struct { // Privileged indicates whether the container should be run in privileged mode @@ -81,35 +105,36 @@ type SecurityContext struct { // Instance represents a instance type Instance struct { system.SystemDependencies - name string - imageName string - k8sName string - state InstanceState - instanceType InstanceType - kubernetesService *v1.Service - builderFactory *container.BuilderFactory - kubernetesReplicaSet *appv1.ReplicaSet - portsTCP []int - portsUDP []int - command []string - args []string - env map[string]string - volumes []*k8s.Volume - memoryRequest string - memoryLimit string - cpuRequest string - policyRules []rbacv1.PolicyRule - livenessProbe *v1.Probe - readinessProbe *v1.Probe - startupProbe *v1.Probe - files []*k8s.File - isSidecar bool - parentInstance *Instance - sidecars []*Instance - fsGroup int64 - obsyConfig *ObsyConfig - securityContext *SecurityContext - BitTwister *btConfig + name string + imageName string + k8sName string + state InstanceState + instanceType InstanceType + kubernetesService *v1.Service + builderFactory *container.BuilderFactory + kubernetesReplicaSet *appv1.ReplicaSet + portsTCP []int + portsUDP []int + command []string + args []string + env map[string]string + volumes []*k8s.Volume + memoryRequest string + memoryLimit string + cpuRequest string + policyRules []rbacv1.PolicyRule + livenessProbe *v1.Probe + readinessProbe *v1.Probe + startupProbe *v1.Probe + files []*k8s.File + isSidecar bool + parentInstance *Instance + sidecars []*Instance + fsGroup int64 + obsyConfig *ObsyConfig + tsharkCollectorConfig *TsharkCollectorConfig + securityContext *SecurityContext + BitTwister *btConfig } func New(name string, sysDeps system.SystemDependencies) (*Instance, error) { @@ -141,32 +166,33 @@ func New(name string, sysDeps system.SystemDependencies) (*Instance, error) { // Create the instance return &Instance{ - name: name, - k8sName: k8sName, - imageName: "", - state: None, - instanceType: BasicInstance, - portsTCP: make([]int, 0), - portsUDP: make([]int, 0), - command: make([]string, 0), - args: make([]string, 0), - env: make(map[string]string), - volumes: make([]*k8s.Volume, 0), - memoryRequest: "", - memoryLimit: "", - cpuRequest: "", - policyRules: make([]rbacv1.PolicyRule, 0), - livenessProbe: nil, - readinessProbe: nil, - startupProbe: nil, - files: make([]*k8s.File, 0), - isSidecar: false, - parentInstance: nil, - sidecars: make([]*Instance, 0), - obsyConfig: obsyConfig, - securityContext: securityContext, - BitTwister: getBitTwisterDefaultConfig(), - SystemDependencies: sysDeps, + name: name, + k8sName: k8sName, + imageName: "", + state: None, + instanceType: BasicInstance, + portsTCP: make([]int, 0), + portsUDP: make([]int, 0), + command: make([]string, 0), + args: make([]string, 0), + env: make(map[string]string), + volumes: make([]*k8s.Volume, 0), + memoryRequest: "", + memoryLimit: "", + cpuRequest: "", + policyRules: make([]rbacv1.PolicyRule, 0), + livenessProbe: nil, + readinessProbe: nil, + startupProbe: nil, + files: make([]*k8s.File, 0), + isSidecar: false, + parentInstance: nil, + sidecars: make([]*Instance, 0), + obsyConfig: obsyConfig, + tsharkCollectorConfig: nil, + securityContext: securityContext, + BitTwister: getBitTwisterDefaultConfig(), + SystemDependencies: sysDeps, }, nil } @@ -188,6 +214,10 @@ func (i *Instance) Name() string { return i.name } +func (i *Instance) K8sName() string { + return i.k8sName +} + func (i *Instance) SetInstanceType(instanceType InstanceType) { i.instanceType = instanceType } @@ -704,7 +734,7 @@ func (i *Instance) SetEnvironmentVariable(key, value string) error { } else if i.state == Committed { i.env[key] = value } - logrus.Debugf("Set environment variable '%s' to '%s' in instance '%s'", key, value, i.name) + logrus.Debugf("Set environment variable '%s' in instance '%s'", key, i.name) return nil } @@ -959,6 +989,62 @@ func (i *Instance) SetPrometheusRemoteWriteExporter(endpoint string) error { return nil } +// TsharkCollectorEnabled returns true if the tshark collector is enabled +func (i *Instance) TsharkCollectorEnabled() bool { + return i.tsharkCollectorConfig != nil +} + +// EnableTsharkCollector enables the tshark collector for the instance +// This function can only be called in the state 'Preparing' or 'Committed' +func (i *Instance) EnableTsharkCollector(conf TsharkCollectorConfig) error { + if err := i.validateStateForObsy(tsharkCollectorName); err != nil { + return err + } + if i.TsharkCollectorEnabled() { + return ErrTsharkCollectorAlreadyEnabled + } + + if err := validateTsharkCollectorConfig(conf); err != nil { + return err + } + + i.tsharkCollectorConfig = &conf + logrus.Debugf("Enabled Tshark collector for instance '%s'", i.name) + return nil +} + +// validateTsharkCollectorConfig checks the configuration fields for proper formatting +func validateTsharkCollectorConfig(conf TsharkCollectorConfig) error { + // Regex patterns for validation + volumeSizePattern, err := regexp.Compile(`^\d+[KMGT]?i$`) // Example: "10Gi", "500Mi" + if err != nil { + return ErrRegexpCompile.WithParams("volumeSizePattern") + } + awsKeyPattern, err := regexp.Compile(`^[A-Za-z0-9]{1,20}$`) + if err != nil { + return ErrRegexpCompile.WithParams("awsKeyPattern") + } + awsSecretPattern, err := regexp.Compile(`^[A-Za-z0-9/+=]{1,40}$`) + if err != nil { + return ErrRegexpCompile.WithParams("awsSecretPattern") + } + + if !volumeSizePattern.MatchString(conf.VolumeSize) { + return ErrTsharkCollectorInvalidVolumeSize.WithParams(conf.VolumeSize) + } + if !awsKeyPattern.MatchString(conf.S3AccessKey) { + return ErrTsharkCollectorInvalidS3AccessKey.WithParams(conf.S3AccessKey) + } + if !awsSecretPattern.MatchString(conf.S3SecretKey) { + return ErrTsharkCollectorInvalidS3SecretKey.WithParams(conf.S3SecretKey) + } + if conf.S3Region == "" || conf.S3Bucket == "" { + return ErrTsharkCollectorS3RegionOrBucketEmpty.WithParams(conf.S3Region, conf.S3Bucket) + } + + return nil +} + // SetPrivileged sets the privileged status for the instance // This function can only be called in the state 'Preparing' or 'Committed' func (i *Instance) SetPrivileged(privileged bool) error { @@ -1030,6 +1116,13 @@ func (i *Instance) StartWithoutWait(ctx context.Context) error { } } + // deploy tshark collector if enabled + if i.TsharkCollectorEnabled() { + if err := i.addTsharkCollectorSidecar(ctx); err != nil { + return ErrAddingTsharkCollectorSidecar.WithParams(i.k8sName).Wrap(err) + } + } + if i.BitTwister.Enabled() { if err := i.addBitTwisterSidecar(ctx); err != nil { return ErrAddingNetworkSidecar.WithParams(i.k8sName).Wrap(err) diff --git a/pkg/instance/tshark.go b/pkg/instance/tshark.go new file mode 100644 index 00000000..df07f5bd --- /dev/null +++ b/pkg/instance/tshark.go @@ -0,0 +1,74 @@ +package instance + +import ( + "context" + "fmt" +) + +const ( + tsharkCollectorName = "tshark-collector" + tsharkCollectorImage = "ghcr.io/celestiaorg/tshark-s3:pr-11" + tsharkCollectorCPU = "100m" + tsharkCollectorMemory = "250Mi" + tsharkCollectorVolumePath = "/tshark" + netAdminCapability = "NET_ADMIN" + TsharkCaptureFileExtension = ".pcapng" + + envStorageAccessKeyID = "STORAGE_ACCESS_KEY_ID" + envStorageSecretAccessKey = "STORAGE_SECRET_ACCESS_KEY" + envStorageRegion = "STORAGE_REGION" + envStorageBucketName = "STORAGE_BUCKET_NAME" + envCreateBucket = "STORAGE_CREATE_BUCKET" + envStorageKeyPrefix = "STORAGE_KEY_PREFIX" + envStorageEndpoint = "STORAGE_ENDPOINT" + envCaptureFileName = "CAPTURE_FILE_NAME" + envUploadInterval = "UPLOAD_INTERVAL" +) + +func (i *Instance) createTsharkCollectorInstance(ctx context.Context) (*Instance, error) { + if i.tsharkCollectorConfig == nil { + return nil, ErrTsharkCollectorConfigNotSet + } + + tsc, err := New(tsharkCollectorName, i.SystemDependencies) + if err != nil { + return nil, err + } + if err := tsc.SetImage(ctx, tsharkCollectorImage); err != nil { + return nil, err + } + if err := tsc.Commit(); err != nil { + return nil, err + } + if err := tsc.SetCPU(tsharkCollectorCPU); err != nil { + return nil, err + } + if err := tsc.SetMemory(tsharkCollectorMemory, tsharkCollectorMemory); err != nil { + return nil, err + } + if err := tsc.AddVolume(tsharkCollectorVolumePath, i.tsharkCollectorConfig.VolumeSize); err != nil { + return nil, err + } + + envVars := map[string]string{ + envStorageAccessKeyID: i.tsharkCollectorConfig.S3AccessKey, + envStorageSecretAccessKey: i.tsharkCollectorConfig.S3SecretKey, + envStorageRegion: i.tsharkCollectorConfig.S3Region, + envStorageBucketName: i.tsharkCollectorConfig.S3Bucket, + envStorageKeyPrefix: i.tsharkCollectorConfig.S3KeyPrefix, + envCaptureFileName: i.k8sName + TsharkCaptureFileExtension, + envStorageEndpoint: i.tsharkCollectorConfig.S3Endpoint, + envUploadInterval: fmt.Sprintf("%d", int64(i.tsharkCollectorConfig.UploadInterval.Seconds())), + envCreateBucket: fmt.Sprintf("%t", i.tsharkCollectorConfig.CreateBucket), + } + + for key, value := range envVars { + if err := tsc.SetEnvironmentVariable(key, value); err != nil { + return nil, err + } + } + if err := tsc.AddCapability(netAdminCapability); err != nil { + return nil, err + } + return tsc, nil +} diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index bf518b42..9fb1774e 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -45,6 +45,12 @@ type Minio struct { K8s k8s.KubeManager } +type Config struct { + Endpoint string + AccessKeyID string + SecretAccessKey string +} + func (m *Minio) DeployMinio(ctx context.Context) error { if err := m.createOrUpdateDeployment(ctx); err != nil { return ErrMinioFailedToStart.Wrap(err) @@ -244,6 +250,19 @@ func (m *Minio) GetMinioURL(ctx context.Context, minioFilePath, bucketName strin return presignedURL.String(), nil } +func (m *Minio) GetConfigs(ctx context.Context) (*Config, error) { + endpoint, err := m.getEndpoint(ctx) + if err != nil { + return nil, ErrMinioFailedToGetEndpoint.Wrap(err) + } + + return &Config{ + Endpoint: endpoint, + AccessKeyID: rootUser, + SecretAccessKey: rootPassword, + }, nil +} + func (m *Minio) createOrUpdateService(ctx context.Context) error { serviceClient := m.K8s.Clientset().CoreV1().Services(m.K8s.Namespace())