diff --git a/blobstore/common/rpc/auditlog/auditlog.go b/blobstore/common/rpc/auditlog/auditlog.go index 062315ab95..1027838fe9 100644 --- a/blobstore/common/rpc/auditlog/auditlog.go +++ b/blobstore/common/rpc/auditlog/auditlog.go @@ -53,6 +53,7 @@ type jsonAuditlog struct { module string decoder Decoder metricSender MetricSender + mqSender MQSender logFile LogCloser logPool sync.Pool @@ -158,6 +159,11 @@ func Open(module string, cfg *Config) (ph rpc.ProgressHandler, logFile LogCloser }, logFile, nil } +// SetMQSender will send audit data to the message queue of your sender implementation. +func (j *jsonAuditlog) SetMQSender(sender MQSender) { + j.mqSender = sender +} + func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request)) { var ( logBytes []byte @@ -172,6 +178,7 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func( body: j.bodyPool.Get().([]byte), bodyLimit: j.cfg.BodyLimit, span: span, + statusCode: 200, startTime: time.Now(), ResponseWriter: w, } @@ -243,7 +250,22 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func( auditLog.RespLength = _w.getBodyWritten() auditLog.Duration = endTime - startTime/1000 - j.metricSender.Send(auditLog.ToBytesWithTab(b)) + data := string(auditLog.ToBytesWithTab(b)) + if j.cfg.Async { + if j.mqSender != nil { + go func() { + j.mqSender.Send(ctx, data) + }() + } + go func() { + j.metricSender.Send([]byte(data)) + }() + } else { + if j.mqSender != nil { + j.mqSender.Send(ctx, data) + } + j.metricSender.Send(b.Bytes()) + } if j.logFile == nil || (len(j.cfg.KeywordsFilter) > 0 && defaultLogFilter(req, j.cfg.KeywordsFilter)) { return @@ -253,7 +275,7 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func( case LogFormatJSON: logBytes = auditLog.ToJson() default: - logBytes = b.Bytes() // *bytes.Buffer was filled with metricSender.Send + logBytes = b.Bytes() } err = j.logFile.Log(logBytes) if err != nil { @@ -273,3 +295,22 @@ func defaultLogFilter(r *http.Request, words []string) bool { } return false } + +// ExtraWrite provides extra response header writes to the ResponseWriter. +func ExtraWrite(w http.ResponseWriter, key string, val interface{}) { + ew, ok := w.(ExtraWriter) + if !ok { + return + } + ew.ExtraWrite(key, val) +} + +// SetHandlerMQSender sets the sender to the ProgressHandler instance, which +// indicates that audit data needs to be sent to the sender. +func SetHandlerMQSender(ph rpc.ProgressHandler, sender MQSender) { + eos, ok := ph.(ExtraOptionsSetter) + if !ok { + return + } + eos.SetMQSender(sender) +} diff --git a/blobstore/common/rpc/auditlog/proto.go b/blobstore/common/rpc/auditlog/proto.go index 17388a8cfd..062f45f9de 100644 --- a/blobstore/common/rpc/auditlog/proto.go +++ b/blobstore/common/rpc/auditlog/proto.go @@ -14,7 +14,10 @@ package auditlog -import "net/http" +import ( + "context" + "net/http" +) const ( LogFormatText = "text" @@ -33,7 +36,9 @@ type Config struct { RotateNew bool `json:"rotate_new"` LogFileSuffix string `json:"log_file_suffix"` // 0 means no backup limit - Backup int `json:"backup"` + Backup int `json:"backup"` + // Async whether to send audit data asynchronously + Async bool `json:"async"` MetricConfig PrometheusConfig `json:"metric_config"` // KeywordsFilter log filter based on uri and request method @@ -61,6 +66,18 @@ type MetricSender interface { Send(raw []byte) error } +type MQSender interface { + Send(ctx context.Context, data string) error +} + type Decoder interface { DecodeReq(req *http.Request) *DecodedReq } + +type ExtraWriter interface { + ExtraWrite(key string, val interface{}) +} + +type ExtraOptionsSetter interface { + SetMQSender(sender MQSender) +} diff --git a/blobstore/common/rpc/auditlog/request_row.go b/blobstore/common/rpc/auditlog/request_row.go index d6bb0d3934..dca12f46e7 100644 --- a/blobstore/common/rpc/auditlog/request_row.go +++ b/blobstore/common/rpc/auditlog/request_row.go @@ -50,6 +50,7 @@ type LogEntry interface { Code() string Method() string Path() string + ReqTime() int64 RespTime() int64 // 100ns RespLength() int64 ReqLength() int64 @@ -60,8 +61,14 @@ type LogEntry interface { XRespCode() string // ReqParams returns params of request, including params in raw query and body ReqParams() string + RawQuery() string Uid() uint32 ApiName() string + Referer() string + Bucket() string + ReqUid() string + OwnerUid() string + XRemoteIP() string } func ErrInValidFieldCnt(msg string) error { @@ -81,6 +88,8 @@ type ReqHeader struct { XSrc string `json:"X-Src"` IP string `json:"IP"` UA string `json:"User-Agent"` + Referer string `json:"Referer"` + XForwardedFor string `json:"X-Forwarded-For"` } type Token struct { @@ -110,7 +119,8 @@ type RespHeader struct { BatchOps map[string]int64 `json:"batchOps"` PreDelSize map[string]int64 `json:"preDelSize"` PreDelArchiveSize map[string]int64 `json:"preDelArchiveSize"` - OUid uint32 `json:"ouid"` // owner uid + Uid string `json:"uid"` // request uid + OUid string `json:"ouid"` // owner uid RsInfo *RsInfo `json:"rs-info"` XRespCode string `json:"X-Resp-Code"` // return from dora BillTag string `json:"billtag"` // must be same with definition in billtag.go @@ -182,6 +192,22 @@ func (a *RequestRow) Uid() uint32 { return 0 } +func (a *RequestRow) ReqUid() string { + respHeader := a.getRespHeader() + if respHeader == nil { + return "" + } + return respHeader.Uid +} + +func (a *RequestRow) OwnerUid() string { + respHeader := a.getRespHeader() + if respHeader == nil { + return "" + } + return respHeader.OUid +} + func (a *RequestRow) RespToken() *Token { respHeader := a.getRespHeader() if respHeader == nil { @@ -291,6 +317,25 @@ func (a *RequestRow) RemoteIp() string { return "" } +func (a *RequestRow) XRemoteIP() string { + reqHeader := a.getReqHeader() + if reqHeader != nil { + if reqHeader.XForwardedFor != "" { + return strings.TrimSpace(reqHeader.XForwardedFor) + } + return a.RemoteIp() + } + return "" +} + +func (a *RequestRow) Referer() string { + reqHeader := a.getReqHeader() + if reqHeader == nil { + return "" + } + return reqHeader.Referer +} + func (a *RequestRow) ReqCdn() string { reqHeader := a.getReqHeader() if reqHeader == nil { diff --git a/blobstore/common/rpc/auditlog/response.go b/blobstore/common/rpc/auditlog/response.go index 3a89163c7d..e13b4f6731 100644 --- a/blobstore/common/rpc/auditlog/response.go +++ b/blobstore/common/rpc/auditlog/response.go @@ -35,6 +35,7 @@ type responseWriter struct { // body hold some data buffer of response body, like json or form // audit log will record body buffer into log file body []byte + extra M // extra header span trace.Span startTime time.Time hasRecordCost bool @@ -48,7 +49,7 @@ func (w *responseWriter) Write(b []byte) (int, error) { w.WriteHeader(http.StatusOK) w.hasWroteHeader = true } - if w.n < w.bodyLimit { + if w.statusCode/100 != 2 && w.n < w.bodyLimit { n := copy(w.body[w.n:], b) w.n += n } @@ -88,10 +89,17 @@ func (w *responseWriter) Flush() { w.ResponseWriter.(http.Flusher).Flush() } +func (w *responseWriter) ExtraWrite(key string, val interface{}) { + if w.extra == nil { + w.extra = make(M) + } + w.extra[key] = val +} + func (w *responseWriter) getBody() []byte { header := w.ResponseWriter.Header() length, _ := strconv.ParseInt(header.Get(rpc.HeaderContentLength), 10, 64) - if length > int64(w.n) { + if w.statusCode/100 != 2 || length > int64(w.n) { return nil } return w.body[:w.n] @@ -111,6 +119,10 @@ func (w *responseWriter) getHeader() M { headerM[k] = header.Get(k) } } + for h, v := range w.extra { + headerM[h] = v + } + return headerM } diff --git a/blobstore/common/trace/tracer.go b/blobstore/common/trace/tracer.go index f7d3fae396..b8d6f7ca0a 100644 --- a/blobstore/common/trace/tracer.go +++ b/blobstore/common/trace/tracer.go @@ -234,9 +234,9 @@ func StartSpanFromHTTPHeaderSafe(r *http.Request, operationName string) (Span, c spanCtx, _ := Extract(HTTPHeaders, HTTPHeadersCarrier(r.Header)) traceID := r.Header.Get(RequestIDKey) if traceID == "" { - return StartSpanFromContext(context.Background(), operationName, ext.RPCServerOption(spanCtx)) + return StartSpanFromContext(r.Context(), operationName, ext.RPCServerOption(spanCtx)) } - return StartSpanFromContextWithTraceID(context.Background(), operationName, traceID, ext.RPCServerOption(spanCtx)) + return StartSpanFromContextWithTraceID(r.Context(), operationName, traceID, ext.RPCServerOption(spanCtx)) } // ContextWithSpan returns a new `context.Context` that holds a reference to diff --git a/objectnode/acl_enum.go b/objectnode/acl_enum.go index f59830ff53..dcde1f107c 100644 --- a/objectnode/acl_enum.go +++ b/objectnode/acl_enum.go @@ -48,7 +48,9 @@ const ( ) const ( - XMLNS = "http://www.w3.org/2001/XMLSchema-instance" + XMLNS = "http://s3.amazonaws.com/doc/2006-03-01/" + XMLSI = "http://www.w3.org/2001/XMLSchema-instance" + GroupAllUser = "http://acs.amazonaws.com/groups/global/AllUsers" GroupAuthenticated = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" ) diff --git a/objectnode/api_context.go b/objectnode/api_context.go index da2e139689..54e091f846 100644 --- a/objectnode/api_context.go +++ b/objectnode/api_context.go @@ -28,6 +28,8 @@ const ( ContextKeyRequestAction = "ctx_request_action" ContextKeyStatusCode = "status_code" ContextKeyErrorMessage = "error_message" + ContextKeyUid = "uid" + ContextKeyOUid = "ouid" ) func SetRequestID(r *http.Request, requestID string) { diff --git a/objectnode/api_middleware.go b/objectnode/api_middleware.go index 01ab39ded2..e3927e2696 100644 --- a/objectnode/api_middleware.go +++ b/objectnode/api_middleware.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/cubefs/cubefs/blobstore/common/rpc/auditlog" "github.com/cubefs/cubefs/proto" "github.com/cubefs/cubefs/util/exporter" "github.com/cubefs/cubefs/util/log" @@ -78,11 +79,17 @@ func (o *ObjectNode) traceMiddleware(next http.Handler) http.Handler { return strings.ReplaceAll(uUID.String(), "-", ""), nil } var handlerFunc http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { - var err error + defer func() { + param := ParseRequestParam(r) + auditlog.ExtraWrite(w, "Tbl", param.bucket) + auditlog.ExtraWrite(w, "Api", param.apiName) + auditlog.ExtraWrite(w, "Uid", param.vars[ContextKeyUid]) + auditlog.ExtraWrite(w, "OUid", param.vars[ContextKeyOUid]) + }() // ===== pre-handle start ===== - var requestID string - if requestID, err = generateRequestID(); err != nil { + requestID, err := generateRequestID() + if err != nil { log.LogErrorf("traceMiddleware: generate request ID fail, remote(%v) url(%v) err(%v)", r.RemoteAddr, r.URL.String(), err) _ = InternalErrorCode(err).ServeResponse(w, r) @@ -291,6 +298,8 @@ func (o *ObjectNode) corsMiddleware(next http.Handler) http.Handler { } } + mux.Vars(r)[ContextKeyOUid] = vol.GetOwner() + if IsAccountLevelApi(param.apiName) { next.ServeHTTP(w, r) return @@ -325,6 +334,17 @@ func (o *ObjectNode) corsMiddleware(next http.Handler) http.Handler { }) } +func (o *ObjectNode) auditLogMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + if o.handler != nil { + o.handler.Handler(w, r, next.ServeHTTP) + } else { + next.ServeHTTP(w, r) + } + }) +} + func isMatchAndSetupCORSHeader(cors *CORSConfiguration, writer http.ResponseWriter, request *http.Request, isPreflight bool) (match bool) { origin := request.Header.Get(Origin) reqHeaders := request.Header.Get(HeaderNameAccessControlRequestHeaders) diff --git a/objectnode/const.go b/objectnode/const.go index 80735cd679..d33a4dfca0 100644 --- a/objectnode/const.go +++ b/objectnode/const.go @@ -136,6 +136,7 @@ const ( XAttrKeyOSSCORS = "oss:cors" XAttrKeyOSSCacheControl = "oss:cache" XAttrKeyOSSExpires = "oss:expires" + XAttrKeyOSSLogging = "oss:logging" // Deprecated XAttrKeyOSSETagDeprecated = "oss:tag" diff --git a/objectnode/fs_volume.go b/objectnode/fs_volume.go index 7c01818e9b..050aefb8d5 100644 --- a/objectnode/fs_volume.go +++ b/objectnode/fs_volume.go @@ -188,6 +188,13 @@ func (v *Volume) loadOSSMeta() { return } v.metaLoader.storeCORS(cors) + + var logging *S3Logging + if logging, err = v.loadBucketLogging(); err != nil { + return + } + v.metaLoader.storeLogging(logging) + v.metaLoader.setSynced() } @@ -251,6 +258,19 @@ func (v *Volume) loadBucketCors() (configuration *CORSConfiguration, err error) return configuration, nil } +func (v *Volume) loadBucketLogging() (logging *S3Logging, err error) { + raw, err := v.store.Get(v.name, bucketRootPath, XAttrKeyOSSLogging) + if err != nil { + return + } + logging = new(S3Logging) + if len(raw) == 0 { + return + } + err = json.Unmarshal(raw, logging) + return +} + func (v *Volume) getInodeFromPath(path string) (inode uint64, err error) { if path == "/" { return volumeRootInode, nil diff --git a/objectnode/fs_volume_meta.go b/objectnode/fs_volume_meta.go index c74d48886e..9faea926f8 100644 --- a/objectnode/fs_volume_meta.go +++ b/objectnode/fs_volume_meta.go @@ -25,9 +25,11 @@ type ossMetaLoader interface { loadPolicy() (p *Policy, err error) loadACL() (p *AccessControlPolicy, err error) loadCORS() (cors *CORSConfiguration, err error) + loadLogging() (logging *S3Logging, err error) storePolicy(p *Policy) storeACL(p *AccessControlPolicy) storeCORS(cors *CORSConfiguration) + storeLogging(logging *S3Logging) setSynced() } @@ -44,12 +46,14 @@ type cacheMetaLoader struct { // OSSMeta is bucket policy and ACL metadata. type OSSMeta struct { - policy *Policy - acl *AccessControlPolicy - corsConfig *CORSConfiguration - policyLock sync.RWMutex - aclLock sync.RWMutex - corsLock sync.RWMutex + policy *Policy + acl *AccessControlPolicy + corsConfig *CORSConfiguration + logging *S3Logging + policyLock sync.RWMutex + aclLock sync.RWMutex + corsLock sync.RWMutex + loggingLock sync.RWMutex } func (c *cacheMetaLoader) loadPolicy() (p *Policy, err error) { @@ -127,6 +131,31 @@ func (c *cacheMetaLoader) storeCORS(cors *CORSConfiguration) { return } +func (c *cacheMetaLoader) loadLogging() (logging *S3Logging, err error) { + c.om.loggingLock.RLock() + logging = c.om.logging + c.om.loggingLock.RUnlock() + if logging == nil && atomic.LoadInt32(c.synced) == 0 { + ret, err, _ := c.sf.Do(XAttrKeyOSSLogging, func() (interface{}, error) { + c, err := c.sml.loadLogging() + return c, err + }) + if err != nil { + return nil, err + } + logging = ret.(*S3Logging) + c.storeLogging(logging) + } + return +} + +func (c *cacheMetaLoader) storeLogging(logging *S3Logging) { + c.om.loggingLock.Lock() + c.om.logging = logging + c.om.loggingLock.Unlock() + return +} + func (c *cacheMetaLoader) setSynced() { atomic.StoreInt32(c.synced, 1) } @@ -149,4 +178,10 @@ func (s *strictMetaLoader) loadCORS() (cors *CORSConfiguration, err error) { func (s *strictMetaLoader) storeCORS(cors *CORSConfiguration) {} +func (s *strictMetaLoader) loadLogging() (*S3Logging, error) { + return s.v.loadBucketLogging() +} + +func (s *strictMetaLoader) storeLogging(logging *S3Logging) {} + func (s *strictMetaLoader) setSynced() {} diff --git a/objectnode/logging.go b/objectnode/logging.go new file mode 100644 index 0000000000..06fc9d63d3 --- /dev/null +++ b/objectnode/logging.go @@ -0,0 +1,194 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package objectnode + +import ( + "encoding/json" + "encoding/xml" + "fmt" + "net/http" + "regexp" + "strconv" + "strings" + "time" + + "github.com/cubefs/cubefs/blobstore/common/rpc/auditlog" +) + +var ( + ErrInvalidBucketRegionForLogging = &ErrorCode{ + ErrorCode: "InvalidBucketRegionForLogging", + ErrorMessage: "The region of the target bucket is different from the source bucket.", + StatusCode: http.StatusBadRequest, + } + ErrInvalidTargetPrefixForLogging = &ErrorCode{ + ErrorCode: "InvalidTargetPrefixForLogging", + ErrorMessage: "The target prefix for bucket logging contains incorrectly formatted characters.", + StatusCode: http.StatusBadRequest, + } + ErrInvalidTargetBucketForLogging = &ErrorCode{ + ErrorCode: "InvalidTargetBucketForLogging", + ErrorMessage: "The target bucket for logging does not exist, is not owned by you, or does not have the appropriate grants for the log-delivery group.", + StatusCode: http.StatusBadRequest, + } + + loggingPrefixRegexp = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9-_/]{0,31}$`) +) + +type LoggingConfig struct { + Topic string `json:"topic"` + TimeoutMs int64 `json:"timeout_ms"` + Brokers []string `json:"brokers"` + + LoggingMgrConfig +} + +type LoggingData struct { + TargetBucket string + TargetPrefix string + + API string + Bucket string + BytesSent string + Host string + Object string + Owner string + Referer string + RemoteIP string + Requester string + RequestTime string + RequestURI string + StatusCode string + TotalTime string + UserAgent string +} + +type S3Logging struct { + XMLNS string `xml:"xmlns,attr,omitempty" json:"-"` + XMLName *xml.Name `xml:"BucketLoggingStatus" json:"-"` + LoggingEnabled *LoggingEnabled `xml:"LoggingEnabled,omitempty" json:"le,omitempty"` +} + +type LoggingEnabled struct { + TargetBucket string `xml:"TargetBucket" json:"tb"` + TargetPrefix string `xml:"TargetPrefix" json:"tp"` +} + +func isLoggingPrefixValid(prefix string) bool { + if prefix == "" { + return true + } + return loggingPrefixRegexp.MatchString(prefix) +} + +func storeBucketLogging(vol *Volume, logging *S3Logging) error { + data, err := json.Marshal(logging) + if err != nil { + return err + } + return vol.store.Put(vol.name, bucketRootPath, XAttrKeyOSSLogging, data) +} + +func getBucketLogging(vol *Volume) (*S3Logging, error) { + logging, err := vol.metaLoader.loadLogging() + if err != nil { + return nil, err + } + logging.XMLNS = XMLNS + return logging, nil +} + +func deleteBucketLogging(vol *Volume) error { + return vol.store.Delete(vol.name, bucketRootPath, XAttrKeyOSSLogging) +} + +func makeLoggingData(bucket, prefix string, entry auditlog.LogEntry) *LoggingData { + data := &LoggingData{ + TargetBucket: bucket, + TargetPrefix: prefix, + } + + data.Owner = stringRawOrHyphen(entry.OwnerUid()) + data.Bucket = stringRawOrHyphen(entry.Bucket()) + data.RequestTime = stringRawOrHyphen(fmt.Sprintf("%v", entry.ReqTime())) + data.Requester = stringRawOrHyphen(entry.ReqUid()) + data.API = stringRawOrHyphen(entry.ApiName()) + uri := fmt.Sprintf("%s %s", entry.Method(), entry.Path()) + if query := entry.RawQuery(); query != "" { + uri += "?" + query + } + data.RequestURI = stringWrappedOrHyphen(uri, `"`) + data.StatusCode = stringRawOrHyphen(entry.Code()) + data.BytesSent = stringRawOrHyphen(fmt.Sprintf("%v", entry.RespLength())) + data.TotalTime = fmt.Sprintf("%v", entry.RespTime()/10000) + data.UserAgent = stringWrappedOrHyphen(entry.UA(), `"`) + data.Referer = stringWrappedOrHyphen(entry.Referer(), `"`) + data.Host = stringRawOrHyphen(entry.ReqHost()) + data.RemoteIP = stringRawOrHyphen(entry.XRemoteIP()) + + return data +} + +func makeLoggingTimeName(sec int64, gar int) string { + utime := time.Unix(sec, 0).UTC() + year, mon, day := utime.Date() + hour, min, _ := utime.Clock() + if gar <= 0 { + gar = 15 + } + min = (min / gar) * gar + return fmt.Sprintf("%d-%02d-%02d-%02d-%02d-00", year, mon, day, hour, min) +} + +func makeLoggingContent(data *LoggingData, gar int) (name, content string, err error) { + reqSec, err := strconv.ParseInt(data.RequestTime, 10, 64) + if err != nil { + return + } + timeName := makeLoggingTimeName(reqSec, gar) + name = strings.Join([]string{timeName, data.Bucket, data.TargetBucket, data.TargetPrefix}, "/") + name = strings.ReplaceAll(name, "/", "%2F") + content = strings.Join([]string{ + time.Unix(reqSec, 0).UTC().Format(AMZTimeFormat), + data.API, + data.Bucket, + data.RequestURI, + data.Requester, + data.Owner, + data.StatusCode, + data.BytesSent, + data.TotalTime, + data.Host, + data.RemoteIP, + data.Referer, + data.UserAgent, + }, "\t") + "\n" + + return +} + +func stringRawOrHyphen(raw string) string { + if raw == "" { + return "-" + } + return raw +} + +func stringWrappedOrHyphen(raw, wc string) string { + if raw == "" { + return "-" + } + return fmt.Sprintf("%s%s%s", wc, raw, wc) +} diff --git a/objectnode/logging_handler.go b/objectnode/logging_handler.go new file mode 100644 index 0000000000..76750842ce --- /dev/null +++ b/objectnode/logging_handler.go @@ -0,0 +1,157 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package objectnode + +import ( + "io/ioutil" + "net/http" + "strconv" + + "github.com/cubefs/cubefs/util/log" +) + +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLogging.html +func (o *ObjectNode) getBucketLoggingHandler(w http.ResponseWriter, r *http.Request) { + var ( + err error + erc *ErrorCode + ) + defer func() { + o.errorResponse(w, r, err, erc) + }() + + param := ParseRequestParam(r) + if param.Bucket() == "" { + erc = InvalidBucketName + return + } + + vol, err := o.getVol(param.bucket) + if err != nil { + log.LogErrorf("getBucketLoggingHandler: load volume fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), param.bucket, err) + return + } + logging, err := getBucketLogging(vol) + if err != nil { + log.LogErrorf("getBucketLoggingHandler: get bucket logging fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), vol.Name(), err) + return + } + response, err := MarshalXMLEntity(logging) + if err != nil { + log.LogErrorf("getBucketLoggingHandler: xml marshal fail: requestID(%v) volume(%v) logging(%v) err(%v)", + GetRequestID(r), vol.Name(), logging, err) + return + } + + w.Header().Set("Content-Type", "application/xml") + w.Header().Set("Content-Length", strconv.Itoa(len(response))) + if _, err = w.Write(response); err != nil { + log.LogErrorf("getBucketLoggingHandler: write response fail: requestID(%v) volume(%v) body(%v) err(%v)", + GetRequestID(r), vol.Name(), string(response), err) + } + + return +} + +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLogging.html +func (o *ObjectNode) putBucketLoggingHandler(w http.ResponseWriter, r *http.Request) { + var ( + err error + erc *ErrorCode + ) + defer func() { + o.errorResponse(w, r, err, erc) + }() + + param := ParseRequestParam(r) + if param.Bucket() == "" { + erc = InvalidBucketName + return + } + + vol, err := o.getVol(param.bucket) + if err != nil { + log.LogErrorf("putBucketLoggingHandler: load volume fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), param.bucket, err) + return + } + requestMD5 := r.Header.Get(HeaderNameContentMD5) + if requestMD5 == "" { + erc = MissingContentMD5 + return + } + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.LogErrorf("putBucketLoggingHandler: read request body fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), vol.Name(), err) + return + } + if requestMD5 != GetMD5(body) { + erc = InvalidDigest + return + } + + var logging *S3Logging + if err = UnmarshalXMLEntity(body, &logging); err != nil { + log.LogErrorf("putBucketLoggingHandler: xml unmarshal fail: requestID(%v) volume(%v) body(%v) err(%v)", + GetRequestID(r), vol.Name(), string(body), err) + erc = MalformedXML + return + } + + if logging.LoggingEnabled != nil { + if !isLoggingPrefixValid(logging.LoggingEnabled.TargetPrefix) { + erc = ErrInvalidTargetPrefixForLogging + } + targetBucket := logging.LoggingEnabled.TargetBucket + if targetBucket == "" { + erc = ErrInvalidTargetBucketForLogging + return + } + var targetVol *Volume + if targetVol, err = o.getVol(targetBucket); err != nil { + log.LogErrorf("putBucketLoggingHandler: load volume fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), targetBucket, err) + erc = ErrInvalidTargetBucketForLogging + return + } + if vol.GetOwner() != targetVol.GetOwner() { + erc = ErrInvalidTargetBucketForLogging + return + } + + if vol.mw.Cluster() != targetVol.mw.Cluster() { + erc = ErrInvalidBucketRegionForLogging + return + } + if err = storeBucketLogging(vol, logging); err != nil { + log.LogErrorf("putBucketLoggingHandler: store logging fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), vol.Name(), err) + return + } + vol.metaLoader.storeLogging(logging) + } else { + if err = deleteBucketLogging(vol); err != nil { + log.LogErrorf("putBucketLoggingHandler: delete logging fail: requestID(%v) volume(%v) err(%v)", + GetRequestID(r), vol.Name(), err) + return + } + vol.metaLoader.storeLogging(nil) + } + + return +} diff --git a/objectnode/logging_manager.go b/objectnode/logging_manager.go new file mode 100644 index 0000000000..8652e03982 --- /dev/null +++ b/objectnode/logging_manager.go @@ -0,0 +1,650 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package objectnode + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/cubefs/cubefs/blobstore/util/taskpool" + "github.com/cubefs/cubefs/util/log" + "github.com/rs/xid" +) + +const ( + defaultConsumeBatchCnt = 1000 + defaultFileSizeThreshold = 64 * 1024 * 1024 + defaultMultipartSize = 8 * 1024 * 1024 + defaultMultipartConcurrency = 5 + defaultPathScanSec = 60 + defaultWriterExpiredMs = 60000 + defaultWriterScanMs = 60000 + minLogIntervalMin = 15 + maxLogIntervalMin = 60 + maxUploadParts = 10000 + + minConsumeWaitTime = 1000 * time.Millisecond +) + +type LoggingManager interface { + Start() + Close() error +} + +type LoggingMgr struct { + topic string + consumerGroup sarama.ConsumerGroup + fileDataCh chan *fileData + bufPool *sync.Pool + wg sync.WaitGroup + once sync.Once + stopCtx context.Context + stopCancel context.CancelFunc + + LoggingMgrConfig +} + +type fileData struct { + done chan struct{} + dataset map[string][]string +} + +type LoggingMgrConfig struct { + Path string `json:"path"` + ConsumeBatchCnt int `json:"consume_batch_cnt"` + GranularityMin int `json:"granularity_min"` + MultipartConcurrency int `json:"multipart_concurrency"` + PathScanSec int `json:"path_scan_sec"` + ConsumeIntervalMs int64 `json:"consume_interval_ms"` + FileSizeThreshold int64 `json:"size_threshold"` + MultipartSize int64 `json:"multipart_size"` + WriterScanMs int64 `json:"writer_scan_ms"` + WriterExpiredMs int64 `json:"writer_expired_ms"` + + GetVolume func(vol string) (*Volume, error) `json:"-"` +} + +func (cfg *LoggingMgrConfig) checkAndFix() error { + if cfg.Path == "" { + return errors.New("cfg path cannot be empty") + } + if _, err := os.Stat(cfg.Path); err != nil { + if !os.IsNotExist(err) { + return err + } + if err = os.MkdirAll(cfg.Path, 0755); err != nil { + return err + } + } + if cfg.ConsumeBatchCnt <= 0 { + cfg.ConsumeBatchCnt = defaultConsumeBatchCnt + } + if cfg.GranularityMin < minLogIntervalMin { + cfg.GranularityMin = minLogIntervalMin + } + if cfg.GranularityMin > maxLogIntervalMin { + cfg.GranularityMin = maxLogIntervalMin + } + if cfg.MultipartSize <= 0 { + cfg.MultipartSize = defaultMultipartSize + } + if cfg.MultipartConcurrency <= 0 { + cfg.MultipartConcurrency = defaultMultipartConcurrency + } + if cfg.FileSizeThreshold <= 0 { + cfg.FileSizeThreshold = defaultFileSizeThreshold + } + if cfg.PathScanSec <= 0 { + cfg.PathScanSec = defaultPathScanSec + } + if cfg.WriterScanMs <= 0 { + cfg.WriterScanMs = defaultWriterScanMs + } + if cfg.WriterExpiredMs <= 0 { + cfg.WriterExpiredMs = defaultWriterExpiredMs + } + + return nil +} + +func NewLoggingManager(topic string, addrs []string, cfg LoggingMgrConfig) (LoggingManager, error) { + if err := cfg.checkAndFix(); err != nil { + return nil, err + } + conf := sarama.NewConfig() + conf.Version = sarama.V2_1_0_0 + conf.Metadata.RefreshFrequency = 120 * time.Second + conf.Consumer.Offsets.Initial = sarama.OffsetOldest + conf.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + groupID := topic + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, conf) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + bufPool := &sync.Pool{ + New: func() interface{} { + b := make([]byte, cfg.MultipartSize) + return b + }, + } + + return &LoggingMgr{ + topic: topic, + fileDataCh: make(chan *fileData, 100), + stopCtx: ctx, + stopCancel: cancel, + bufPool: bufPool, + consumerGroup: consumerGroup, + LoggingMgrConfig: cfg, + }, nil +} + +func (l *LoggingMgr) Start() { + l.once.Do(func() { + for _, f := range []func(){l.consume, l.write, l.upload} { + w := f + l.wg.Add(1) + go func() { + w() + l.wg.Done() + }() + } + }) +} + +func (l *LoggingMgr) Close() error { + l.stopCancel() + l.consumerGroup.Close() + l.wg.Wait() + return nil +} + +func (l *LoggingMgr) Setup(sess sarama.ConsumerGroupSession) error { + partitions, ok := sess.Claims()[l.topic] + if !ok { + return fmt.Errorf("not found topic %s", l.topic) + } + log.LogInfof("kafka consumer group setup, partitions: %v", partitions) + return nil +} + +func (l *LoggingMgr) Cleanup(sess sarama.ConsumerGroupSession) error { + return nil +} + +func (l *LoggingMgr) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + var msgs []*sarama.ConsumerMessage + + ctx := sess.Context() + consumeWaitTime := time.Millisecond / 2 * time.Duration(l.ConsumeBatchCnt) + if consumeWaitTime < minConsumeWaitTime { + consumeWaitTime = minConsumeWaitTime + } + ticker := time.NewTicker(consumeWaitTime) + defer ticker.Stop() + + for { + var msg *sarama.ConsumerMessage + select { + case msg = <-claim.Messages(): + if msg == nil { + continue + } + msgs = append(msgs, msg) + case <-ticker.C: + case <-ctx.Done(): + return nil + } + if len(msgs) <= 0 || (msg != nil && len(msgs) < l.ConsumeBatchCnt) { + continue + } + l.handleMessages(sess, msgs) + if l.ConsumeIntervalMs > 0 { + time.Sleep(time.Duration(l.ConsumeIntervalMs) * time.Millisecond) + } + msgs = msgs[:0] + ticker.Reset(consumeWaitTime) + } +} + +func (l *LoggingMgr) handleMessages(sess sarama.ConsumerGroupSession, msgs []*sarama.ConsumerMessage) { + dataset := make(map[string][]string) + ctx := sess.Context() + for _, msg := range msgs { + if msg != nil { + data := new(LoggingData) + if err := json.Unmarshal(msg.Value, data); err != nil { + log.LogErrorf("unmarshal [%v] failed: %v", string(msg.Value), err) + continue + } + name, content, err := makeLoggingContent(data, l.GranularityMin) + if err != nil { + log.LogErrorf("make [%+v] to logging content failed: %v", data, err) + continue + } + dataset[name] = append(dataset[name], content) + } + } + if len(dataset) > 0 { + data := &fileData{ + dataset: dataset, + done: make(chan struct{}), + } + l.fileDataCh <- data + + select { + case <-data.done: + case <-ctx.Done(): + return + } + } + sess.MarkMessage(msgs[len(msgs)-1], "") +} + +func (l *LoggingMgr) consume() { + for { + if err := l.consumerGroup.Consume(l.stopCtx, []string{l.topic}, l); err != nil { + log.LogCriticalf("consume error: %v", err) + panic(err) + } + log.LogInfof("partition rebalanced for topic: %v", l.topic) + if l.stopCtx.Err() != nil { + log.LogInfof("service closed: %v", l.stopCtx.Err()) + return + } + } +} + +func (l *LoggingMgr) write() { + writerScanTime := time.Duration(l.WriterScanMs) * time.Millisecond + ticker := time.NewTicker(writerScanTime) + writers := make(map[string]LoggingWriter) + tp := taskpool.New(5, 1) + defer func() { + ticker.Stop() + tp.Close() + for name, writer := range writers { + if err := writer.Close(); err != nil { + log.LogErrorf("file [%s] close failed: %v", name, err) + } + } + }() + + for { + select { + case data := <-l.fileDataCh: + if data != nil { + var err error + wg := sync.WaitGroup{} + for name, contents := range data.dataset { + writer, has := writers[name] + if !has { + writer, err = newLoggingWriter(l.Path, name) + if err != nil { + log.LogErrorf("new writer [%s] failed: %v", name, err) + break + } + writers[name] = writer + } + wg.Add(1) + tp.Run(func() { + defer wg.Done() + if err = writer.Write(contents); err != nil { + log.LogErrorf("write to file [%s] failed: %v", name, err) + } + }) + } + wg.Wait() + if err != nil { + panic(err) + } + close(data.done) + } + case <-ticker.C: + var deletes []string + for name, writer := range writers { + mtime, err := writer.ModTime() + if err != nil { + log.LogErrorf("get mtime of file [%s] failed: %v", name, err) + } + if err != nil || time.Since(mtime).Milliseconds() > l.WriterExpiredMs { + log.LogInfof("writer [%s] will be closed", name) + if err = writer.Close(); err != nil { + log.LogErrorf("file [%s] close failed: %v", name, err) + } + deletes = append(deletes, name) + } + } + for _, name := range deletes { + delete(writers, name) + } + deletes = nil + ticker.Reset(writerScanTime) + case <-l.stopCtx.Done(): + return + } + } +} + +func (l *LoggingMgr) upload() { + fileNameCh := make(chan string, 100) + pathScanTime := time.Duration(l.PathScanSec) * time.Second + uploadBeforeTime := time.Duration(l.GranularityMin+1) * time.Minute + ticker := time.NewTicker(pathScanTime) + defer func() { + ticker.Stop() + close(fileNameCh) + }() + + go func() { + tp := taskpool.New(5, 5) + defer tp.Close() + for name := range fileNameCh { + // /// + infos := strings.SplitN(strings.ReplaceAll(name, "%2F", "/"), "/", 4) + if len(infos) < 3 { + log.LogWarnf("invalid filename: %s", name) + continue + } + date, source, target := infos[0], infos[1], infos[2] + var prefix string + if len(infos) == 4 { + prefix = infos[3] + } + tp.Run(func() { + // key: YYYY-mm-DD-HH-MM-SS-UniqueString + l.uploadFile(target, name, fmt.Sprintf("%s%s%s-%s", prefix, source, date, xid.New().String())) + }) + } + }() + + for { + select { + case <-ticker.C: + files, err := ioutil.ReadDir(l.Path) + if err != nil { + log.LogErrorf("read dir [%s] failed: %v", l.Path, err) + continue + } + for _, fi := range files { + if fi.IsDir() { + continue + } + mtime := fi.ModTime() + if time.Since(mtime) < uploadBeforeTime { + log.LogInfof("mod time[%v] of file [%s] is less than %v", mtime, fi.Name(), uploadBeforeTime) + continue + } + fileNameCh <- fi.Name() + } + ticker.Reset(pathScanTime) + case <-l.stopCtx.Done(): + return + } + } +} + +func (l *LoggingMgr) uploadFile(bucket, name, key string) error { + file, err := os.Open(path.Join(l.Path, name)) + if err != nil { + log.LogErrorf("open file [%s] failed: %v", name, err) + return err + } + defer file.Close() + + fs, err := file.Stat() + if err != nil { + log.LogErrorf("file [%s] stat failed: %v", name, err) + return err + } + fileSize := fs.Size() + if fileSize <= 0 { + return nil + } + vol, err := l.GetVolume(bucket) + if err != nil { + log.LogErrorf("volume %s get failed: %v", bucket, err) + if err == NoSuchBucket { + err = nil + os.Remove(path.Join(l.Path, name)) + } + return err + } + + bufPool := l.bufPool + number := fileSize / l.MultipartSize + if number >= maxUploadParts { + bufPool = &sync.Pool{ + New: func() interface{} { + b := make([]byte, (fileSize/maxUploadParts)+1) + return b + }, + } + } + concurrency := int(number) + 1 + if concurrency > l.MultipartConcurrency { + concurrency = l.MultipartConcurrency + } + cfg := &loggingUploadConfig{ + key: key, + vol: vol, + bufPool: bufPool, + concurrency: concurrency, + reader: bufio.NewReader(file), + } + uploader := newLoggingUploader(cfg) + + var fi *FSFileInfo + if fileSize <= l.FileSizeThreshold { + fi, err = uploader.NormalUpload() + } else { + fi, err = uploader.MultipartUpload() + } + if err != nil { + log.LogErrorf("upload file [%s] failed: %v", name, err) + } else { + os.Remove(path.Join(l.Path, name)) + log.LogInfof("upload file [%s] success: %+v", name, fi) + } + + return err +} + +type LoggingWriter interface { + Write(contents []string) error + ModTime() (time.Time, error) + Close() error +} + +type loggingWrite struct { + file *os.File + writer *bufio.Writer +} + +func newLoggingWriter(dir, name string) (LoggingWriter, error) { + file, err := os.OpenFile(path.Join(dir, name), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + return &loggingWrite{ + file: file, + writer: bufio.NewWriter(file), + }, nil +} + +func (w *loggingWrite) Write(contents []string) error { + for i := range contents { + _, err := w.writer.WriteString(contents[i]) + if err != nil { + return err + } + } + return w.writer.Flush() +} + +func (w *loggingWrite) ModTime() (time.Time, error) { + fi, err := w.file.Stat() + if err != nil { + return time.Time{}, err + } + return fi.ModTime(), nil +} + +func (w *loggingWrite) Close() error { + return w.file.Close() +} + +type LoggingUploader interface { + NormalUpload() (*FSFileInfo, error) + MultipartUpload() (*FSFileInfo, error) +} + +type loggingUpload struct { + *loggingUploadConfig + + id string + err error + mtx sync.Mutex +} + +type loggingUploadConfig struct { + concurrency int + vol *Volume + bufPool *sync.Pool + option *PutFileOption + key string + reader io.Reader +} + +func newLoggingUploader(cfg *loggingUploadConfig) LoggingUploader { + return &loggingUpload{loggingUploadConfig: cfg} +} + +func (u *loggingUpload) NormalUpload() (*FSFileInfo, error) { + return u.vol.PutObject(u.key, u.reader, u.option) +} + +func (u *loggingUpload) MultipartUpload() (*FSFileInfo, error) { + id, err := u.vol.InitMultipart(u.key, u.option) + if err != nil { + log.LogErrorf("init multipart failed: %v", err) + return nil, err + } + u.id = id + type part struct { + buf io.Reader + num uint16 + release func() + } + + ch := make(chan *part, u.concurrency) + wg := sync.WaitGroup{} + for i := 0; i < u.concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for data := range ch { + if data != nil { + if u.getErr() == nil { + fs, err := u.vol.WritePart(u.key, u.id, data.num, data.buf) + if err != nil { + u.setErr(err) + log.LogErrorf("write part [%d/%s] failed: %v", data.num, u.id, err) + } else { + log.LogInfof("write part [%d/%s] success: %+v", data.num, u.id, fs) + } + } + data.release() + } + } + }() + } + + var num uint16 = 1 + for u.getErr() == nil { + buf := u.bufPool.Get().([]byte) + n, err := u.reader.Read(buf) + if err != nil { + u.bufPool.Put(buf) + if err == io.EOF { + log.LogInfo("read file over") + break + } + log.LogErrorf("read file failed: %v", err) + break + } + release := func() { + u.bufPool.Put(buf) + } + log.LogInfof("read %s part %d, read: %d, buf: %d", u.key, num, n, len(buf)) + ch <- &part{buf: bytes.NewReader(buf[:n]), num: num, release: release} + num++ + } + close(ch) + wg.Wait() + + return u.complete() +} + +func (u *loggingUpload) setErr(e error) { + u.mtx.Lock() + defer u.mtx.Unlock() + u.err = e +} + +func (u *loggingUpload) getErr() error { + u.mtx.Lock() + defer u.mtx.Unlock() + return u.err +} + +func (u *loggingUpload) abort() { + if err := u.vol.AbortMultipart(u.key, u.id); err != nil { + log.LogErrorf("abort multipart [%s] failed: %v", u.id, err) + } +} + +func (u *loggingUpload) complete() (*FSFileInfo, error) { + var err error + defer func() { + if err != nil { + u.abort() + log.LogErrorf("complete multipart [%s] failed: %v", u.id, err) + } + }() + + if err = u.getErr(); err != nil { + return nil, err + } + info, err := u.vol.mw.GetMultipart_ll(u.key, u.id) + if err != nil { + return nil, err + } + + return u.vol.CompleteMultipart(u.key, u.id, info, nil) +} diff --git a/objectnode/logging_sender.go b/objectnode/logging_sender.go new file mode 100644 index 0000000000..764cd7815c --- /dev/null +++ b/objectnode/logging_sender.go @@ -0,0 +1,108 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package objectnode + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/cubefs/cubefs/blobstore/common/kafka" + "github.com/cubefs/cubefs/blobstore/common/rpc/auditlog" + "github.com/cubefs/cubefs/blobstore/common/trace" +) + +type LoggingSender interface { + Send(ctx context.Context, data string) error + Close() error +} + +type LoggingSendConfig struct { + kafka.ProducerCfg + + GetVolume func(vol string) (*Volume, error) `json:"-"` +} + +type LoggingSendMgr struct { + *LoggingSendConfig + + producer *kafka.Producer +} + +func NewLoggingSender(cfg *LoggingSendConfig) (LoggingSender, error) { + if cfg == nil { + return nil, errors.New("cfg cannot be empty") + } + if cfg.GetVolume == nil { + return nil, errors.New("cfg GetVolume cannot be empty") + } + producer, err := kafka.NewProducer(&cfg.ProducerCfg) + if err != nil { + return nil, err + } + + return &LoggingSendMgr{ + producer: producer, + LoggingSendConfig: cfg, + }, nil +} + +func (s *LoggingSendMgr) parseLine(line string) (data []byte, err error) { + entry, err := auditlog.ParseReqlog(line) + if err != nil || entry.Bucket() == "" { + return + } + + vol, err := s.GetVolume(entry.Bucket()) + if err != nil { + return + } + logging, err := vol.metaLoader.loadLogging() + if err != nil || logging == nil || logging.LoggingEnabled == nil { + return + } + + bucket := logging.LoggingEnabled.TargetBucket + prefix := logging.LoggingEnabled.TargetPrefix + data, err = json.Marshal(makeLoggingData(bucket, prefix, entry)) + + return +} + +func (s *LoggingSendMgr) Send(ctx context.Context, data string) (err error) { + span := trace.SpanFromContext(ctx) + start := time.Now() + defer func() { + span.AppendTrackLog("lsender", start, err) + }() + + msg, err := s.parseLine(data) + if err != nil { + span.Errorf("parse line[%s] error: %v", data, err) + return + } + if len(msg) > 0 { + if err = s.producer.SendMessage(s.Topic, msg); err != nil { + span.Errorf("send topic[%s] message[%s] error: %v", s.Topic, string(msg), err) + } + } + + return +} + +func (s *LoggingSendMgr) Close() error { + return s.producer.Close() +} diff --git a/objectnode/logging_test.go b/objectnode/logging_test.go new file mode 100644 index 0000000000..e3b689538f --- /dev/null +++ b/objectnode/logging_test.go @@ -0,0 +1,51 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package objectnode + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_IsLoggingPrefixValid(t *testing.T) { + // `^[a-zA-Z][a-zA-Z0-9-_/]{0,31}$` + testCases := []struct { + prefix string + expected bool + }{ + // must start with a letter + {"1prefix", false}, + {"-prefix", false}, + {"_prefix", false}, + {"prefix", true}, + // consist only of letters, numbers, hyphens(-), underscores(_) and slashes(/) + {"prefix-Log", true}, + {"prefix_log", true}, + {"prefix/log", true}, + {"prefix123g", true}, + {"pre-fix/123_g", true}, + {"prefix&log", false}, + {"prefix;log", false}, + // be between 0 and 32 characters long + {"", true}, + {"number-of-characters-exceeds-_32", true}, + {"number-of-characters-exceeds-_/32", false}, + } + + for _, tc := range testCases { + require.Equal(t, tc.expected, isLoggingPrefixValid(tc.prefix)) + } +} diff --git a/objectnode/result.go b/objectnode/result.go index d77a05d2f5..2119851963 100644 --- a/objectnode/result.go +++ b/objectnode/result.go @@ -15,6 +15,8 @@ package objectnode import ( + "bytes" + "encoding/json" "encoding/xml" "net/url" "regexp" @@ -41,6 +43,17 @@ func UnmarshalXMLEntity(bytes []byte, data interface{}) error { return nil } +func ParseJSONEntity(raw interface{}, entity interface{}) error { + data, err := json.Marshal(raw) + if err != nil { + return err + } + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.DisallowUnknownFields() + + return decoder.Decode(entity) +} + type LocationResponse struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LocationConstraint" json:"-"` Location string `xml:",chardata"` diff --git a/objectnode/router.go b/objectnode/router.go index bcc2b0836b..c666d23943 100644 --- a/objectnode/router.go +++ b/objectnode/router.go @@ -181,6 +181,13 @@ func (o *ObjectNode) registerApiRouters(router *mux.Router) { Queries("tagging", ""). HandlerFunc(o.getBucketTaggingHandler) + // Get bucket logging + // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLogging.html + r.NewRoute().Name(ActionToUniqueRouteName(proto.OSSGetBucketLoggingAction)). + Methods(http.MethodGet). + Queries("logging", ""). + HandlerFunc(o.getBucketLoggingHandler) + // Get bucket encryption // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketEncryption.html // Notes: unsupported operation @@ -398,6 +405,13 @@ func (o *ObjectNode) registerApiRouters(router *mux.Router) { Queries("tagging", ""). HandlerFunc(o.putBucketTaggingHandler) + // Put bucket logging + // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLogging.html + r.NewRoute().Name(ActionToUniqueRouteName(proto.OSSPutBucketLoggingAction)). + Methods(http.MethodPut). + Queries("logging", ""). + HandlerFunc(o.putBucketLoggingHandler) + // Put bucket encryption // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketEncryption.html // Notes: unsupported operation diff --git a/objectnode/server.go b/objectnode/server.go index 58d2603594..43d3bda037 100644 --- a/objectnode/server.go +++ b/objectnode/server.go @@ -17,6 +17,7 @@ package objectnode import ( "context" "errors" + "fmt" "net/http" "path" "regexp" @@ -24,6 +25,9 @@ import ( "sync" "github.com/cubefs/cubefs/blobstore/api/access" + "github.com/cubefs/cubefs/blobstore/common/rpc" + "github.com/cubefs/cubefs/blobstore/common/rpc/auditlog" + "github.com/cubefs/cubefs/blobstore/common/trace" "github.com/cubefs/cubefs/blockcache/bcache" "github.com/cubefs/cubefs/cmd/common" "github.com/cubefs/cubefs/proto" @@ -36,6 +40,12 @@ import ( "github.com/gorilla/mux" ) +func init() { + trace.PrefixBaggage = "x-object-baggage-" + trace.FieldKeyTraceID = "x-object-trace-id" + trace.FieldKeySpanID = "x-object-span-id" +} + // Configuration items that act on the ObjectNode. const ( // String type configuration type, used to configure the listening port number of the service. @@ -83,8 +93,8 @@ const ( disabledActions = "disabledActions" configSignatureIgnoredActions = "signatureIgnoredActions" - //ObjMetaCache takes each path hierarchy of the path-like S3 object key as the cache key, - //and map it to the corresponding posix-compatible inode + // ObjMetaCache takes each path hierarchy of the path-like S3 object key as the cache key, + // and map it to the corresponding posix-compatible inode // when enabled, the maxDentryCacheNum must at least be the minimum of defaultMaxDentryCacheNum // Example: // { @@ -101,9 +111,33 @@ const ( configMaxDentryCacheNum = "maxDentryCacheNum" configMaxInodeAttrCacheNum = "maxInodeAttrCacheNum" - //enable block cache when reading data in cold volume + // Map type configuration item, used to configure ObjectNode to support audit log function. For detailed + // parameters, see the auditlog.Config structure. + // Example: + // { + // "auditLog": { + // "logdir": "./run/auditlog/object/", + // "chunkbits": 29 + // } + // } + configAuditLog = "auditLog" + + // Map type configuration item, used to configure ObjectNode to save detailed access records for the + // requests made to a bucket. + // Example: + // { + // "logging": { + // "path": "./logging/data/", + // "topic": "bucket_logging_topic", + // "timeout_ms": 1000, + // "broker_list": ["127.0.0.1:9092"] + // } + // } + configLogging = "logging" + + // enable block cache when reading data in cold volume enableBcache = "enableBcache" - //define thread numbers for writing and reading ebs + // define thread numbers for writing and reading ebs ebsWriteThreads = "bStoreWriteThreads" ebsReadThreads = "bStoreReadThreads" ) @@ -114,7 +148,7 @@ const ( defaultCacheRefreshInterval = 10 * 60 defaultMaxDentryCacheNum = 10000000 defaultMaxInodeAttrCacheNum = 10000000 - //ebs + // ebs MaxSizePutOnce = int64(1) << 23 ) @@ -141,6 +175,8 @@ type ObjectNode struct { state uint32 wg sync.WaitGroup userStore UserInfoStore + handler rpc.ProgressHandler + defers []func() error // close other resources after http server closed signatureIgnoredActions proto.Actions // signature ignored actions disabledActions proto.Actions // disabled actions @@ -208,6 +244,24 @@ func (o *ObjectNode) loadConfig(cfg *config.Config) (err error) { } } + // parse auditLog config + if rawAuditLog := cfg.GetValue(configAuditLog); rawAuditLog != nil { + if err = o.setAuditLog(rawAuditLog); err != nil { + err = fmt.Errorf("invalid %v configuration: %v", configAuditLog, err) + return + } + log.LogInfof("loadConfig: setup config: %v(%v)", configAuditLog, rawAuditLog) + } + + // parse logging config + if rawLogging := cfg.GetValue(configLogging); rawLogging != nil { + if err = o.setLogging(rawLogging); err != nil { + err = fmt.Errorf("invalid %v configuration: %v", configLogging, err) + return + } + log.LogInfof("loadConfig: setup config: %v(%v)", configLogging, rawLogging) + } + // parse strict config strict := cfg.GetBool(configStrict) log.LogInfof("loadConfig: strict: %v", strict) @@ -251,6 +305,56 @@ func (o *ObjectNode) updateRegion(region string) { o.region = region } +func (o *ObjectNode) setAuditLog(raw interface{}) error { + cfg := new(auditlog.Config) + if err := ParseJSONEntity(raw, cfg); err != nil { + return err + } + + handler, auditLogFile, err := auditlog.Open("OBJECT", cfg) + if err != nil { + return err + } + o.handler = handler + o.defers = append(o.defers, auditLogFile.Close) + + return nil +} + +func (o *ObjectNode) setLogging(raw interface{}) error { + cfg := new(LoggingConfig) + if err := ParseJSONEntity(raw, cfg); err != nil { + return err + } + + if cfg.Topic != "" { + conf := &LoggingSendConfig{ + GetVolume: o.getVol, + } + conf.TimeoutMs = cfg.TimeoutMs + conf.Topic = cfg.Topic + conf.BrokerList = cfg.Brokers + sender, err := NewLoggingSender(conf) + if err != nil { + return err + } + o.defers = append(o.defers, sender.Close) + auditlog.SetHandlerMQSender(o.handler, sender) + + if cfg.Path != "" { + cfg.LoggingMgrConfig.GetVolume = o.getVol + manager, err := NewLoggingManager(cfg.Topic, cfg.Brokers, cfg.LoggingMgrConfig) + if err != nil { + return err + } + manager.Start() + o.defers = append(o.defers, manager.Close) + } + } + + return nil +} + func handleStart(s common.Server, cfg *config.Config) (err error) { o, ok := s.(*ObjectNode) if !ok { @@ -260,26 +364,25 @@ func handleStart(s common.Server, cfg *config.Config) (err error) { if err = o.loadConfig(cfg); err != nil { return } - // Get cluster info from master - + // get cluster info from master var ci *proto.ClusterInfo if ci, err = o.mc.AdminAPI().GetClusterInfo(); err != nil { return } o.updateRegion(ci.Cluster) log.LogInfof("handleStart: get cluster information: region(%v)", o.region) + // set ebs client ebsClient, err = blobstore.NewEbsClient(access.Config{ ConnMode: access.NoLimitConnMode, Consul: access.ConsulConfig{ Address: ci.EbsAddr, }, - //ServicePath: ci.ServicePath, + // ServicePath: ci.ServicePath, MaxSizePutOnce: MaxSizePutOnce, Logger: &access.Logger{ Filename: path.Join(cfg.GetString("logDir"), "ebs.log"), }, }) - if err != nil { wt := cfg.GetInt(ebsWriteThreads) if wt != 0 { @@ -290,7 +393,6 @@ func handleStart(s common.Server, cfg *config.Config) (err error) { readThreads = rt } } - // start rest api if err = o.startMuxRestAPI(); err != nil { log.LogInfof("handleStart: start rest api fail: err(%v)", err) @@ -309,13 +411,14 @@ func handleShutdown(s common.Server) { if !ok { return } - o.shutdownRestAPI() + o.shutdown() } func (o *ObjectNode) startMuxRestAPI() (err error) { router := mux.NewRouter().SkipClean(true) o.registerApiRouters(router) router.Use( + o.auditLogMiddleware, o.expectMiddleware, o.traceMiddleware, o.corsMiddleware, @@ -339,11 +442,15 @@ func (o *ObjectNode) startMuxRestAPI() (err error) { return } -func (o *ObjectNode) shutdownRestAPI() { +func (o *ObjectNode) shutdown() { if o.httpServer != nil { _ = o.httpServer.Shutdown(context.Background()) o.httpServer = nil } + + for _, f := range o.defers { + _ = f() + } } func NewServer() *ObjectNode { diff --git a/proto/perm_action.go b/proto/perm_action.go index 65596893c7..0d00bb4dc3 100644 --- a/proto/perm_action.go +++ b/proto/perm_action.go @@ -114,6 +114,10 @@ const ( OSSPutBucketTaggingAction Action = OSSActionPrefix + "PutBucketTagging" OSSDeleteBucketTaggingAction Action = OSSActionPrefix + "DeleteBucketTagging" + // Bucket logging actions + OSSGetBucketLoggingAction Action = OSSActionPrefix + "GetBucketLogging" + OSSPutBucketLoggingAction Action = OSSActionPrefix + "PutBucketLogging" + // Bucket lifecycle actions OSSGetBucketLifecycleAction Action = OSSActionPrefix + "GetBucketLifecycle" // unsupported OSSPutBucketLifecycleAction Action = OSSActionPrefix + "PutBucketLifecycle" // unsupported @@ -239,6 +243,8 @@ var ( OSSGetBucketReplicationAction, OSSPutBucketReplicationAction, OSSDeleteBucketReplicationAction, + OSSGetBucketLoggingAction, + OSSPutBucketLoggingAction, OSSOptionsObjectAction, // POSIX file system interface actions diff --git a/util/config/config.go b/util/config/config.go index 7ca6098f1b..90ea95d776 100644 --- a/util/config/config.go +++ b/util/config/config.go @@ -75,6 +75,11 @@ func (c *Config) parse(fileName string) error { return err } +// GetValue returns the raw data for the config key. +func (c *Config) GetValue(key string) interface{} { + return c.data[key] +} + // GetString returns a string for the config key. func (c *Config) GetString(key string) string { x, present := c.data[key]