Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change: patch log functions #16

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions apisix/core/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ function resp_exit(code, ...)
error("failed to encode data: " .. err, -2)
else
idx = idx + 1
t[idx] = body .. "\n"
t[idx] = body
idx = idx + 1
t[idx] = "\n"
end

elseif v ~= nil then
Expand All @@ -80,7 +82,7 @@ function resp_exit(code, ...)
end

if idx > 0 then
ngx_print(concat_tab(t, "", 1, idx))
ngx_print(t)
end

if code then
Expand Down Expand Up @@ -174,7 +176,7 @@ end
-- final_body = transform(final_body)
-- ngx.arg[1] = final_body
-- ...
function _M.hold_body_chunk(ctx, hold_the_copy)
function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
local body_buffer
local chunk, eof = arg[1], arg[2]

Expand All @@ -190,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
n = 1
}
ctx._body_buffer[ctx._plugin_name] = body_buffer
ctx._resp_body_bytes = #chunk
else
local n = body_buffer.n + 1
body_buffer.n = n
body_buffer[n] = chunk
ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
end
if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
body_data = str_sub(body_data, 1, max_resp_body_bytes)
return body_data
end
end

if eof then
body_buffer = ctx._body_buffer[ctx._plugin_name]
if not body_buffer then
if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
chunk = str_sub(chunk, 1, max_resp_body_bytes)
end
return chunk
end

body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
ctx._body_buffer[ctx._plugin_name] = nil
return body_buffer
return body_data
end

if not hold_the_copy then
Expand Down
36 changes: 34 additions & 2 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local expr = require("resty.expr.v1")
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
Expand All @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local math = math
local pairs = pairs
local type = type
local req_read_body = ngx.req.read_body
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")

Expand Down Expand Up @@ -95,7 +97,7 @@ local schema = {
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
enum = { 1, -1 },
},
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
Expand All @@ -115,6 +117,8 @@ local schema = {
type = "array"
}
},
max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand All @@ -134,7 +138,9 @@ local schema = {
local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
log_format = {
type = "object"
}
},
}

Expand Down Expand Up @@ -208,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
end


function _M.access(conf, ctx)
if conf.include_req_body then
local should_read_body = true
if conf.include_req_body_expr then
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate request expr err ', err)
return
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
should_read_body = false
end
end
if should_read_body then
req_read_body()
end
end
end


function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
Expand Down
124 changes: 99 additions & 25 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,56 @@
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local expr = require("resty.expr.v1")
local ngx = ngx
local content_decode = require("apisix.utils.content-decode")
local ngx = ngx
local pairs = pairs
local ngx_now = ngx.now
local ngx_header = ngx.header
local os_date = os.date
local str_byte = string.byte
local str_sub = string.sub
local math_floor = math.floor
local ngx_update_time = ngx.update_time
local req_get_body_data = ngx.req.get_body_data
local is_http = ngx.config.subsystem == "http"
local req_get_body_file = ngx.req.get_body_file
local MAX_REQ_BODY = 524288 -- 512 KiB
local MAX_RESP_BODY = 524288 -- 512 KiB
local io = io

local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})

local _M = {}
_M.metadata_schema_log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
}


local function get_request_body(max_bytes)
local req_body = req_get_body_data()
if req_body then
if max_bytes and #req_body >= max_bytes then
req_body = str_sub(req_body, 1, max_bytes)
end
return req_body
end

local file_name = req_get_body_file()
if not file_name then
return nil
end

core.log.info("attempt to read body from file: ", file_name)

local f, err = io.open(file_name, 'r')
if not f then
return nil, "fail to open file " .. err
end

req_body = f:read(max_bytes)
f:close()

return req_body
end


local function gen_log_format(format)
Expand All @@ -55,6 +82,7 @@ local function gen_log_format(format)
return log_format
end


local function get_custom_format_log(ctx, format)
local log_format = lru_log_format(format or "", nil, gen_log_format, format)
local entry = core.table.new(0, core.table.nkeys(log_format))
Expand Down Expand Up @@ -186,15 +214,13 @@ local function get_full_log(ngx, conf)
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local body, err = get_request_body(max_req_body_bytes)
if err then
core.log.error("fail to get request body: ", err)
return
end
log.request.body = body
end
end

Expand All @@ -210,7 +236,27 @@ function _M.inject_get_full_log(f)
end


local function is_match(match, ctx)
local match_result
for _, m in pairs(match) do
local expr, _ = expr.new(m)
match_result = expr:eval(ctx.var)
if match_result then
break
end
end

return match_result
end


function _M.get_log_entry(plugin_name, conf, ctx)
-- If the "match" configuration is set and the matching conditions are not met,
-- then do not log the message.
if conf.match and not is_match(conf.match, ctx) then
return
end

local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

Expand All @@ -237,20 +283,21 @@ end


function _M.get_req_original(ctx, conf)
local headers = {
local data = {
ctx.var.request, "\r\n"
}
for k, v in pairs(ngx.req.get_headers()) do
core.table.insert_tail(headers, k, ": ", v, "\r\n")
core.table.insert_tail(data, k, ": ", v, "\r\n")
end
-- core.log.error("headers: ", core.table.concat(headers, ""))
core.table.insert(headers, "\r\n")
core.table.insert(data, "\r\n")

if conf.include_req_body then
core.table.insert(headers, ctx.var.request_body)
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local req_body = get_request_body(max_req_body_bytes)
core.table.insert(data, req_body)
end

return core.table.concat(headers, "")
return core.table.concat(data, "")
end


Expand Down Expand Up @@ -295,11 +342,38 @@ function _M.collect_body(conf, ctx)
end

if log_response_body then
local final_body = core.response.hold_body_chunk(ctx, true)
local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY

if ctx._resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
return
end
local final_body = core.response.hold_body_chunk(ctx, true, max_resp_body_bytes)
if not final_body then
return
end
ctx.resp_body = final_body

local response_encoding = ngx_header["Content-Encoding"]
if not response_encoding then
ctx.resp_body = final_body
return
end

local decoder = content_decode.dispatch_decoder(response_encoding)
if not decoder then
core.log.warn("unsupported compression encoding type: ",
response_encoding)
ctx.resp_body = final_body
return
end

local decoded_body, err = decoder(final_body)
if err ~= nil then
core.log.warn("try decode compressed data err: ", err)
ctx.resp_body = final_body
return
end

ctx.resp_body = decoded_body
end
end
end
Expand Down
Loading