Skip to content

Commit

Permalink
change: patch log functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nic-chen committed Sep 30, 2024
1 parent 7770483 commit 4800f61
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 32 deletions.
22 changes: 17 additions & 5 deletions apisix/core/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ function resp_exit(code, ...)
error("failed to encode data: " .. err, -2)
else
idx = idx + 1
t[idx] = body .. "\n"
t[idx] = body
idx = idx + 1
t[idx] = "\n"
end

elseif v ~= nil then
Expand All @@ -80,7 +82,7 @@ function resp_exit(code, ...)
end

if idx > 0 then
ngx_print(concat_tab(t, "", 1, idx))
ngx_print(t)
end

if code then
Expand Down Expand Up @@ -174,7 +176,7 @@ end
-- final_body = transform(final_body)
-- ngx.arg[1] = final_body
-- ...
function _M.hold_body_chunk(ctx, hold_the_copy)
function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
local body_buffer
local chunk, eof = arg[1], arg[2]

Expand All @@ -190,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
n = 1
}
ctx._body_buffer[ctx._plugin_name] = body_buffer
ctx._resp_body_bytes = #chunk
else
local n = body_buffer.n + 1
body_buffer.n = n
body_buffer[n] = chunk
ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
end
if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
body_data = str_sub(body_data, 1, max_resp_body_bytes)
return body_data
end
end

if eof then
body_buffer = ctx._body_buffer[ctx._plugin_name]
if not body_buffer then
if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
chunk = str_sub(chunk, 1, max_resp_body_bytes)
end
return chunk
end

body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
ctx._body_buffer[ctx._plugin_name] = nil
return body_buffer
return body_data
end

if not hold_the_copy then
Expand Down
36 changes: 34 additions & 2 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local expr = require("resty.expr.v1")
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
Expand All @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local math = math
local pairs = pairs
local type = type
local req_read_body = ngx.req.read_body
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")

Expand Down Expand Up @@ -95,7 +97,7 @@ local schema = {
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
enum = { 1, -1 },
},
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
Expand All @@ -115,6 +117,8 @@ local schema = {
type = "array"
}
},
max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand All @@ -134,7 +138,9 @@ local schema = {
local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
log_format = {
type = "object"
}
},
}

Expand Down Expand Up @@ -208,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
end


function _M.access(conf, ctx)
if conf.include_req_body then
local should_read_body = true
if conf.include_req_body_expr then
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate request expr err ', err)
return
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
should_read_body = false
end
end
if should_read_body then
req_read_body()
end
end
end


function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
Expand Down
124 changes: 99 additions & 25 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,56 @@
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local expr = require("resty.expr.v1")
local ngx = ngx
local content_decode = require("apisix.utils.content-decode")
local ngx = ngx
local pairs = pairs
local ngx_now = ngx.now
local ngx_header = ngx.header
local os_date = os.date
local str_byte = string.byte
local str_sub = string.sub
local math_floor = math.floor
local ngx_update_time = ngx.update_time
local req_get_body_data = ngx.req.get_body_data
local is_http = ngx.config.subsystem == "http"
local req_get_body_file = ngx.req.get_body_file
local MAX_REQ_BODY = 524288 -- 512 KiB
local MAX_RESP_BODY = 524288 -- 512 KiB
local io = io

local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})

local _M = {}
_M.metadata_schema_log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
}


local function get_request_body(max_bytes)
local req_body = req_get_body_data()
if req_body then
if max_bytes and #req_body >= max_bytes then
req_body = str_sub(req_body, 1, max_bytes)
end
return req_body
end

local file_name = req_get_body_file()
if not file_name then
return nil
end

core.log.info("attempt to read body from file: ", file_name)

local f, err = io.open(file_name, 'r')
if not f then
return nil, "fail to open file " .. err
end

req_body = f:read(max_bytes)
f:close()

return req_body
end


local function gen_log_format(format)
Expand All @@ -55,6 +82,7 @@ local function gen_log_format(format)
return log_format
end


local function get_custom_format_log(ctx, format)
local log_format = lru_log_format(format or "", nil, gen_log_format, format)
local entry = core.table.new(0, core.table.nkeys(log_format))
Expand Down Expand Up @@ -186,15 +214,13 @@ local function get_full_log(ngx, conf)
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local body, err = get_request_body(max_req_body_bytes)
if err then
core.log.error("fail to get request body: ", err)
return
end
log.request.body = body
end
end

Expand All @@ -210,7 +236,27 @@ function _M.inject_get_full_log(f)
end


local function is_match(match, ctx)
local match_result
for _, m in pairs(match) do
local expr, _ = expr.new(m)
match_result = expr:eval(ctx.var)
if match_result then
break
end
end

return match_result
end


function _M.get_log_entry(plugin_name, conf, ctx)
-- If the "match" configuration is set and the matching conditions are not met,
-- then do not log the message.
if conf.match and not is_match(conf.match, ctx) then
return
end

local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

Expand All @@ -237,20 +283,21 @@ end


function _M.get_req_original(ctx, conf)
local headers = {
local data = {
ctx.var.request, "\r\n"
}
for k, v in pairs(ngx.req.get_headers()) do
core.table.insert_tail(headers, k, ": ", v, "\r\n")
core.table.insert_tail(data, k, ": ", v, "\r\n")
end
-- core.log.error("headers: ", core.table.concat(headers, ""))
core.table.insert(headers, "\r\n")
core.table.insert(data, "\r\n")

if conf.include_req_body then
core.table.insert(headers, ctx.var.request_body)
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local req_body = get_request_body(max_req_body_bytes)
core.table.insert(data, req_body)
end

return core.table.concat(headers, "")
return core.table.concat(data, "")
end


Expand Down Expand Up @@ -295,11 +342,38 @@ function _M.collect_body(conf, ctx)
end

if log_response_body then
local final_body = core.response.hold_body_chunk(ctx, true)
local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY

if ctx._resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
return
end
local final_body = core.response.hold_body_chunk(ctx, true, max_resp_body_bytes)
if not final_body then
return
end
ctx.resp_body = final_body

local response_encoding = ngx_header["Content-Encoding"]
if not response_encoding then
ctx.resp_body = final_body
return
end

local decoder = content_decode.dispatch_decoder(response_encoding)
if not decoder then
core.log.warn("unsupported compression encoding type: ",
response_encoding)
ctx.resp_body = final_body
return
end

local decoded_body, err = decoder(final_body)
if err ~= nil then
core.log.warn("try decode compressed data err: ", err)
ctx.resp_body = final_body
return
end

ctx.resp_body = decoded_body
end
end
end
Expand Down

0 comments on commit 4800f61

Please sign in to comment.