Skip to content

Commit

Permalink
feat(objectnode/blobstore): supports auditlog and logging features
Browse files Browse the repository at this point in the history
Signed-off-by: yhjiango <[email protected]>
  • Loading branch information
yhjiango committed Aug 3, 2023
1 parent 1ee5e30 commit 31f9858
Show file tree
Hide file tree
Showing 21 changed files with 1,531 additions and 31 deletions.
45 changes: 43 additions & 2 deletions blobstore/common/rpc/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type jsonAuditlog struct {
module string
decoder Decoder
metricSender MetricSender
mqSender MQSender
logFile LogCloser

logPool sync.Pool
Expand Down Expand Up @@ -158,6 +159,11 @@ func Open(module string, cfg *Config) (ph rpc.ProgressHandler, logFile LogCloser
}, logFile, nil
}

// SetMQSender will send audit data to the message queue of your sender implementation.
func (j *jsonAuditlog) SetMQSender(sender MQSender) {
j.mqSender = sender
}

func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request)) {
var (
logBytes []byte
Expand All @@ -172,6 +178,7 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(
body: j.bodyPool.Get().([]byte),
bodyLimit: j.cfg.BodyLimit,
span: span,
statusCode: 200,
startTime: time.Now(),
ResponseWriter: w,
}
Expand Down Expand Up @@ -243,7 +250,22 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(
auditLog.RespLength = _w.getBodyWritten()
auditLog.Duration = endTime - startTime/1000

j.metricSender.Send(auditLog.ToBytesWithTab(b))
data := string(auditLog.ToBytesWithTab(b))
if j.cfg.Async {
if j.mqSender != nil {
go func() {
j.mqSender.Send(ctx, data)
}()
}
go func() {
j.metricSender.Send([]byte(data))
}()
} else {
if j.mqSender != nil {
j.mqSender.Send(ctx, data)
}
j.metricSender.Send(b.Bytes())
}

if j.logFile == nil || (len(j.cfg.KeywordsFilter) > 0 && defaultLogFilter(req, j.cfg.KeywordsFilter)) {
return
Expand All @@ -253,7 +275,7 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(
case LogFormatJSON:
logBytes = auditLog.ToJson()
default:
logBytes = b.Bytes() // *bytes.Buffer was filled with metricSender.Send
logBytes = b.Bytes()
}
err = j.logFile.Log(logBytes)
if err != nil {
Expand All @@ -273,3 +295,22 @@ func defaultLogFilter(r *http.Request, words []string) bool {
}
return false
}

// ExtraWrite provides extra response header writes to the ResponseWriter.
func ExtraWrite(w http.ResponseWriter, key string, val interface{}) {
ew, ok := w.(ExtraWriter)
if !ok {
return
}
ew.ExtraWrite(key, val)
}

// SetHandlerMQSender sets the sender to the ProgressHandler instance, which
// indicates that audit data needs to be sent to the sender.
func SetHandlerMQSender(ph rpc.ProgressHandler, sender MQSender) {
eos, ok := ph.(ExtraOptionsSetter)
if !ok {
return
}
eos.SetMQSender(sender)
}
21 changes: 19 additions & 2 deletions blobstore/common/rpc/auditlog/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package auditlog

import "net/http"
import (
"context"
"net/http"
)

const (
LogFormatText = "text"
Expand All @@ -33,7 +36,9 @@ type Config struct {
RotateNew bool `json:"rotate_new"`
LogFileSuffix string `json:"log_file_suffix"`
// 0 means no backup limit
Backup int `json:"backup"`
Backup int `json:"backup"`
// Async whether to send audit data asynchronously
Async bool `json:"async"`
MetricConfig PrometheusConfig `json:"metric_config"`

// KeywordsFilter log filter based on uri and request method
Expand Down Expand Up @@ -61,6 +66,18 @@ type MetricSender interface {
Send(raw []byte) error
}

type MQSender interface {
Send(ctx context.Context, data string) error
}

type Decoder interface {
DecodeReq(req *http.Request) *DecodedReq
}

type ExtraWriter interface {
ExtraWrite(key string, val interface{})
}

type ExtraOptionsSetter interface {
SetMQSender(sender MQSender)
}
47 changes: 46 additions & 1 deletion blobstore/common/rpc/auditlog/request_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type LogEntry interface {
Code() string
Method() string
Path() string
ReqTime() int64
RespTime() int64 // 100ns
RespLength() int64
ReqLength() int64
Expand All @@ -60,8 +61,14 @@ type LogEntry interface {
XRespCode() string
// ReqParams returns params of request, including params in raw query and body
ReqParams() string
RawQuery() string
Uid() uint32
ApiName() string
Referer() string
Bucket() string
ReqUid() string
OwnerUid() string
XRemoteIP() string
}

func ErrInValidFieldCnt(msg string) error {
Expand All @@ -81,6 +88,8 @@ type ReqHeader struct {
XSrc string `json:"X-Src"`
IP string `json:"IP"`
UA string `json:"User-Agent"`
Referer string `json:"Referer"`
XForwardedFor string `json:"X-Forwarded-For"`
}

type Token struct {
Expand Down Expand Up @@ -110,7 +119,8 @@ type RespHeader struct {
BatchOps map[string]int64 `json:"batchOps"`
PreDelSize map[string]int64 `json:"preDelSize"`
PreDelArchiveSize map[string]int64 `json:"preDelArchiveSize"`
OUid uint32 `json:"ouid"` // owner uid
Uid string `json:"uid"` // request uid
OUid string `json:"ouid"` // owner uid
RsInfo *RsInfo `json:"rs-info"`
XRespCode string `json:"X-Resp-Code"` // return from dora
BillTag string `json:"billtag"` // must be same with definition in billtag.go
Expand Down Expand Up @@ -182,6 +192,22 @@ func (a *RequestRow) Uid() uint32 {
return 0
}

func (a *RequestRow) ReqUid() string {
respHeader := a.getRespHeader()
if respHeader == nil {
return ""
}
return respHeader.Uid
}

func (a *RequestRow) OwnerUid() string {
respHeader := a.getRespHeader()
if respHeader == nil {
return ""
}
return respHeader.OUid
}

func (a *RequestRow) RespToken() *Token {
respHeader := a.getRespHeader()
if respHeader == nil {
Expand Down Expand Up @@ -291,6 +317,25 @@ func (a *RequestRow) RemoteIp() string {
return ""
}

func (a *RequestRow) XRemoteIP() string {
reqHeader := a.getReqHeader()
if reqHeader != nil {
if reqHeader.XForwardedFor != "" {
return strings.TrimSpace(reqHeader.XForwardedFor)
}
return a.RemoteIp()
}
return ""
}

func (a *RequestRow) Referer() string {
reqHeader := a.getReqHeader()
if reqHeader == nil {
return ""
}
return reqHeader.Referer
}

func (a *RequestRow) ReqCdn() string {
reqHeader := a.getReqHeader()
if reqHeader == nil {
Expand Down
16 changes: 14 additions & 2 deletions blobstore/common/rpc/auditlog/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type responseWriter struct {
// body hold some data buffer of response body, like json or form
// audit log will record body buffer into log file
body []byte
extra M // extra header
span trace.Span
startTime time.Time
hasRecordCost bool
Expand All @@ -48,7 +49,7 @@ func (w *responseWriter) Write(b []byte) (int, error) {
w.WriteHeader(http.StatusOK)
w.hasWroteHeader = true
}
if w.n < w.bodyLimit {
if w.statusCode/100 != 2 && w.n < w.bodyLimit {
n := copy(w.body[w.n:], b)
w.n += n
}
Expand Down Expand Up @@ -88,10 +89,17 @@ func (w *responseWriter) Flush() {
w.ResponseWriter.(http.Flusher).Flush()
}

func (w *responseWriter) ExtraWrite(key string, val interface{}) {
if w.extra == nil {
w.extra = make(M)
}
w.extra[key] = val
}

func (w *responseWriter) getBody() []byte {
header := w.ResponseWriter.Header()
length, _ := strconv.ParseInt(header.Get(rpc.HeaderContentLength), 10, 64)
if length > int64(w.n) {
if w.statusCode/100 != 2 || length > int64(w.n) {
return nil
}
return w.body[:w.n]
Expand All @@ -111,6 +119,10 @@ func (w *responseWriter) getHeader() M {
headerM[k] = header.Get(k)
}
}
for h, v := range w.extra {
headerM[h] = v
}

return headerM
}

Expand Down
4 changes: 2 additions & 2 deletions blobstore/common/trace/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func StartSpanFromHTTPHeaderSafe(r *http.Request, operationName string) (Span, c
spanCtx, _ := Extract(HTTPHeaders, HTTPHeadersCarrier(r.Header))
traceID := r.Header.Get(RequestIDKey)
if traceID == "" {
return StartSpanFromContext(context.Background(), operationName, ext.RPCServerOption(spanCtx))
return StartSpanFromContext(r.Context(), operationName, ext.RPCServerOption(spanCtx))
}
return StartSpanFromContextWithTraceID(context.Background(), operationName, traceID, ext.RPCServerOption(spanCtx))
return StartSpanFromContextWithTraceID(r.Context(), operationName, traceID, ext.RPCServerOption(spanCtx))
}

// ContextWithSpan returns a new `context.Context` that holds a reference to
Expand Down
4 changes: 3 additions & 1 deletion objectnode/acl_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ const (
)

const (
XMLNS = "http://www.w3.org/2001/XMLSchema-instance"
XMLNS = "http://s3.amazonaws.com/doc/2006-03-01/"
XMLSI = "http://www.w3.org/2001/XMLSchema-instance"

GroupAllUser = "http://acs.amazonaws.com/groups/global/AllUsers"
GroupAuthenticated = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
)
Expand Down
2 changes: 2 additions & 0 deletions objectnode/api_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ContextKeyRequestAction = "ctx_request_action"
ContextKeyStatusCode = "status_code"
ContextKeyErrorMessage = "error_message"
ContextKeyUid = "uid"
ContextKeyOUid = "ouid"
)

func SetRequestID(r *http.Request, requestID string) {
Expand Down
26 changes: 23 additions & 3 deletions objectnode/api_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/cubefs/cubefs/blobstore/common/rpc/auditlog"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/log"
Expand Down Expand Up @@ -78,11 +79,17 @@ func (o *ObjectNode) traceMiddleware(next http.Handler) http.Handler {
return strings.ReplaceAll(uUID.String(), "-", ""), nil
}
var handlerFunc http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
var err error
defer func() {
param := ParseRequestParam(r)
auditlog.ExtraWrite(w, "Tbl", param.bucket)
auditlog.ExtraWrite(w, "Api", param.apiName)
auditlog.ExtraWrite(w, "Uid", param.vars[ContextKeyUid])
auditlog.ExtraWrite(w, "OUid", param.vars[ContextKeyOUid])
}()

// ===== pre-handle start =====
var requestID string
if requestID, err = generateRequestID(); err != nil {
requestID, err := generateRequestID()
if err != nil {
log.LogErrorf("traceMiddleware: generate request ID fail, remote(%v) url(%v) err(%v)",
r.RemoteAddr, r.URL.String(), err)
_ = InternalErrorCode(err).ServeResponse(w, r)
Expand Down Expand Up @@ -291,6 +298,8 @@ func (o *ObjectNode) corsMiddleware(next http.Handler) http.Handler {
}
}

mux.Vars(r)[ContextKeyOUid] = vol.GetOwner()

if IsAccountLevelApi(param.apiName) {
next.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -325,6 +334,17 @@ func (o *ObjectNode) corsMiddleware(next http.Handler) http.Handler {
})
}

func (o *ObjectNode) auditLogMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
if o.handler != nil {
o.handler.Handler(w, r, next.ServeHTTP)
} else {
next.ServeHTTP(w, r)
}
})
}

func isMatchAndSetupCORSHeader(cors *CORSConfiguration, writer http.ResponseWriter, request *http.Request, isPreflight bool) (match bool) {
origin := request.Header.Get(Origin)
reqHeaders := request.Header.Get(HeaderNameAccessControlRequestHeaders)
Expand Down
1 change: 1 addition & 0 deletions objectnode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const (
XAttrKeyOSSCORS = "oss:cors"
XAttrKeyOSSCacheControl = "oss:cache"
XAttrKeyOSSExpires = "oss:expires"
XAttrKeyOSSLogging = "oss:logging"

// Deprecated
XAttrKeyOSSETagDeprecated = "oss:tag"
Expand Down
20 changes: 20 additions & 0 deletions objectnode/fs_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ func (v *Volume) loadOSSMeta() {
return
}
v.metaLoader.storeCORS(cors)

var logging *S3Logging
if logging, err = v.loadBucketLogging(); err != nil {
return
}
v.metaLoader.storeLogging(logging)

v.metaLoader.setSynced()
}

Expand Down Expand Up @@ -251,6 +258,19 @@ func (v *Volume) loadBucketCors() (configuration *CORSConfiguration, err error)
return configuration, nil
}

func (v *Volume) loadBucketLogging() (logging *S3Logging, err error) {
raw, err := v.store.Get(v.name, bucketRootPath, XAttrKeyOSSLogging)
if err != nil {
return
}
logging = new(S3Logging)
if len(raw) == 0 {
return
}
err = json.Unmarshal(raw, logging)
return
}

func (v *Volume) getInodeFromPath(path string) (inode uint64, err error) {
if path == "/" {
return volumeRootInode, nil
Expand Down
Loading

0 comments on commit 31f9858

Please sign in to comment.