From 5f5c64274e0bb785fc1f4a096a4cee4fe27e34a2 Mon Sep 17 00:00:00 2001 From: chenjunxu Date: Sat, 28 Sep 2024 00:57:43 +0800 Subject: [PATCH] chore: new version patch --- apisix/core/response.lua | 22 +++- apisix/plugins/kafka-logger.lua | 36 +++++- apisix/utils/log-util.lua | 204 ++++++++++++++++++++------------ 3 files changed, 178 insertions(+), 84 deletions(-) diff --git a/apisix/core/response.lua b/apisix/core/response.lua index cfbac1467341..baee97749598 100644 --- a/apisix/core/response.lua +++ b/apisix/core/response.lua @@ -70,7 +70,9 @@ function resp_exit(code, ...) error("failed to encode data: " .. err, -2) else idx = idx + 1 - t[idx] = body .. "\n" + t[idx] = body + idx = idx + 1 + t[idx] = "\n" end elseif v ~= nil then @@ -80,7 +82,7 @@ function resp_exit(code, ...) end if idx > 0 then - ngx_print(concat_tab(t, "", 1, idx)) + ngx_print(t) end if code then @@ -174,7 +176,7 @@ end -- final_body = transform(final_body) -- ngx.arg[1] = final_body -- ... -function _M.hold_body_chunk(ctx, hold_the_copy) +function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes) local body_buffer local chunk, eof = arg[1], arg[2] @@ -190,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy) n = 1 } ctx._body_buffer[ctx._plugin_name] = body_buffer + ctx._resp_body_bytes = #chunk else local n = body_buffer.n + 1 body_buffer.n = n body_buffer[n] = chunk + ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk + end + if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then + local body_data = concat_tab(body_buffer, "", 1, body_buffer.n) + body_data = str_sub(body_data, 1, max_resp_body_bytes) + return body_data end end if eof then body_buffer = ctx._body_buffer[ctx._plugin_name] if not body_buffer then + if max_resp_body_bytes and #chunk >= max_resp_body_bytes then + chunk = str_sub(chunk, 1, max_resp_body_bytes) + end return chunk end - body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n) + local body_data = concat_tab(body_buffer, "", 1, body_buffer.n) ctx._body_buffer[ctx._plugin_name] = nil - return body_buffer + return body_data end if not hold_the_copy then diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 2abbd1fce8a8..adeec2921a35 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -14,6 +14,7 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -- +local expr = require("resty.expr.v1") local core = require("apisix.core") local log_util = require("apisix.utils.log-util") local producer = require ("resty.kafka.producer") @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager") local math = math local pairs = pairs local type = type +local req_read_body = ngx.req.read_body local plugin_name = "kafka-logger" local batch_processor_manager = bp_manager_mod.new("kafka logger") @@ -95,7 +97,7 @@ local schema = { required_acks = { type = "integer", default = 1, - enum = { 0, 1, -1 }, + enum = { 1, -1 }, }, key = {type = "string"}, timeout = {type = "integer", minimum = 1, default = 3}, @@ -115,6 +117,8 @@ local schema = { type = "array" } }, + max_req_body_bytes = {type = "integer", minimum = 1, default = 524288}, + max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288}, -- in lua-resty-kafka, cluster_name is defined as number -- see https://github.com/doujiang24/lua-resty-kafka#new-1 cluster_name = {type = "integer", minimum = 1, default = 1}, @@ -134,7 +138,9 @@ local schema = { local metadata_schema = { type = "object", properties = { - log_format = log_util.metadata_schema_log_format, + log_format = { + type = "object" + } }, } @@ -208,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod) end +function _M.access(conf, ctx) + if conf.include_req_body then + local should_read_body = true + if conf.include_req_body_expr then + if not conf.request_expr then + local request_expr, err = expr.new(conf.include_req_body_expr) + if not request_expr then + core.log.error('generate request expr err ', err) + return + end + conf.request_expr = request_expr + end + + local result = conf.request_expr:eval(ctx.var) + + if not result then + should_read_body = false + end + end + if should_read_body then + req_read_body() + end + end +end + + function _M.body_filter(conf, ctx) log_util.collect_body(conf, ctx) end diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index 1df687c7bcfd..e53daca80be8 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -17,105 +17,78 @@ local core = require("apisix.core") local plugin = require("apisix.plugin") local expr = require("resty.expr.v1") -local ngx = ngx +local content_decode = require("apisix.utils.content-decode") +local ngx = ngx local pairs = pairs local ngx_now = ngx.now +local ngx_header = ngx.header local os_date = os.date local str_byte = string.byte +local str_sub = string.sub local math_floor = math.floor local ngx_update_time = ngx.update_time local req_get_body_data = ngx.req.get_body_data local is_http = ngx.config.subsystem == "http" +local req_get_body_file = ngx.req.get_body_file +local MAX_REQ_BODY = 524288 -- 512 KiB +local MAX_RESP_BODY = 524288 -- 512 KiB +local io = io local lru_log_format = core.lrucache.new({ ttl = 300, count = 512 }) local _M = {} -_M.metadata_schema_log_format = { - type = "object", - default = { - ["host"] = "$host", - ["@timestamp"] = "$time_iso8601", - ["client_ip"] = "$remote_addr", - }, -} -local function gen_log_format(format) - local log_format = {} - for k, var_name in pairs(format) do - if var_name:byte(1, 1) == str_byte("$") then - log_format[k] = {true, var_name:sub(2)} - else - log_format[k] = {false, var_name} +local function get_request_body(max_bytes) + local req_body = req_get_body_data() + if req_body then + if max_bytes and #req_body >= max_bytes then + req_body = str_sub(req_body, 1, max_bytes) end + return req_body end - core.log.info("log_format: ", core.json.delay_encode(log_format)) - return log_format -end - - -local function get_request_body(conf, ctx) - local res = {} - if conf.include_req_body then + local file_name = req_get_body_file() + if not file_name then + return nil + end - local log_request_body = true + core.log.info("attempt to read body from file: ", file_name) - if conf.include_req_body_expr then + local f, err = io.open(file_name, 'r') + if not f then + return nil, "fail to open file " .. err + end - if not conf.request_expr then - local request_expr, err = expr.new(conf.include_req_body_expr) - if not request_expr then - core.log.error('generate request expr err ' .. err) - return res - end - conf.request_expr = request_expr - end + req_body = f:read(max_bytes) + f:close() - local result = conf.request_expr:eval(ctx.var) + return req_body +end - if not result then - log_request_body = false - end - end - if log_request_body then - local body = req_get_body_data() - if body then - res.request_body = body - return res - else - local body_file = ngx.req.get_body_file() - if body_file then - res.request_body_file = body_file - return res - end - end +local function gen_log_format(format) + local log_format = {} + for k, var_name in pairs(format) do + if var_name:byte(1, 1) == str_byte("$") then + log_format[k] = {true, var_name:sub(2)} + else + log_format[k] = {false, var_name} end end - - return res + core.log.info("log_format: ", core.json.delay_encode(log_format)) + return log_format end -local function get_custom_format_log(ctx, format, conf) +local function get_custom_format_log(ctx, format) local log_format = lru_log_format(format or "", nil, gen_log_format, format) local entry = core.table.new(0, core.table.nkeys(log_format)) for k, var_attr in pairs(log_format) do if var_attr[1] then - if var_attr[2] == "response_body" then - entry[k] = ctx.resp_body - elseif var_attr[2] == "request_body" then - local request_data = get_request_body(conf, ctx) - entry[k] = request_data.request_body - elseif var_attr[2] == "request_body_file" then - local request_data = get_request_body(conf, ctx) - entry[k] = request_data.request_body_file - else - entry[k] = ctx.var[var_attr[2]] - end + entry[k] = ctx.var[var_attr[2]] else entry[k] = var_attr[2] end @@ -218,9 +191,38 @@ local function get_full_log(ngx, conf) log.response.body = ctx.resp_body end - local request_data = get_request_body(conf, ctx) - log.request.body = request_data.request_body - log.request.body_file = request_data.request_body_file + if conf.include_req_body then + + local log_request_body = true + + if conf.include_req_body_expr then + + if not conf.request_expr then + local request_expr, err = expr.new(conf.include_req_body_expr) + if not request_expr then + core.log.error('generate request expr err ' .. err) + return log + end + conf.request_expr = request_expr + end + + local result = conf.request_expr:eval(ctx.var) + + if not result then + log_request_body = false + end + end + + if log_request_body then + local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY + local body, err = get_request_body(max_req_body_bytes) + if err then + core.log.error("fail to get request body: ", err) + return + end + log.request.body = body + end + end return log end @@ -234,7 +236,27 @@ function _M.inject_get_full_log(f) end +local function is_match(match, ctx) + local match_result + for _, m in pairs(match) do + local expr, _ = expr.new(m) + match_result = expr:eval(ctx.var) + if match_result then + break + end + end + + return match_result +end + + function _M.get_log_entry(plugin_name, conf, ctx) + -- If the "match" configuration is set and the matching conditions are not met, + -- then do not log the message. + if conf.match and not is_match(conf.match, ctx) then + return + end + local metadata = plugin.plugin_metadata(plugin_name) core.log.info("metadata: ", core.json.delay_encode(metadata)) @@ -246,7 +268,7 @@ function _M.get_log_entry(plugin_name, conf, ctx) if conf.log_format or has_meta_log_format then customized = true - entry = get_custom_format_log(ctx, conf.log_format or metadata.value.log_format, conf) + entry = get_custom_format_log(ctx, conf.log_format or metadata.value.log_format) else if is_http then entry = get_full_log(ngx, conf) @@ -261,20 +283,21 @@ end function _M.get_req_original(ctx, conf) - local headers = { + local data = { ctx.var.request, "\r\n" } for k, v in pairs(ngx.req.get_headers()) do - core.table.insert_tail(headers, k, ": ", v, "\r\n") + core.table.insert_tail(data, k, ": ", v, "\r\n") end - -- core.log.error("headers: ", core.table.concat(headers, "")) - core.table.insert(headers, "\r\n") + core.table.insert(data, "\r\n") if conf.include_req_body then - core.table.insert(headers, ctx.var.request_body) + local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY + local req_body = get_request_body(max_req_body_bytes) + core.table.insert(data, req_body) end - return core.table.concat(headers, "") + return core.table.concat(data, "") end @@ -319,11 +342,38 @@ function _M.collect_body(conf, ctx) end if log_response_body then - local final_body = core.response.hold_body_chunk(ctx, true) + local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY + + if ctx._resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then + return + end + local final_body = core.response.hold_body_chunk(ctx, true, max_resp_body_bytes) if not final_body then return end - ctx.resp_body = final_body + + local response_encoding = ngx_header["Content-Encoding"] + if not response_encoding then + ctx.resp_body = final_body + return + end + + local decoder = content_decode.dispatch_decoder(response_encoding) + if not decoder then + core.log.warn("unsupported compression encoding type: ", + response_encoding) + ctx.resp_body = final_body + return + end + + local decoded_body, err = decoder(final_body) + if err ~= nil then + core.log.warn("try decode compressed data err: ", err) + ctx.resp_body = final_body + return + end + + ctx.resp_body = decoded_body end end end