Skip to content

Commit

Permalink
chore: new version patch
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjunxu committed Sep 28, 2024
1 parent 4f716ef commit 5f5c642
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 84 deletions.
22 changes: 17 additions & 5 deletions apisix/core/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ function resp_exit(code, ...)
error("failed to encode data: " .. err, -2)
else
idx = idx + 1
t[idx] = body .. "\n"
t[idx] = body
idx = idx + 1
t[idx] = "\n"
end

elseif v ~= nil then
Expand All @@ -80,7 +82,7 @@ function resp_exit(code, ...)
end

if idx > 0 then
ngx_print(concat_tab(t, "", 1, idx))
ngx_print(t)
end

if code then
Expand Down Expand Up @@ -174,7 +176,7 @@ end
-- final_body = transform(final_body)
-- ngx.arg[1] = final_body
-- ...
function _M.hold_body_chunk(ctx, hold_the_copy)
function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
local body_buffer
local chunk, eof = arg[1], arg[2]

Expand All @@ -190,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
n = 1
}
ctx._body_buffer[ctx._plugin_name] = body_buffer
ctx._resp_body_bytes = #chunk
else
local n = body_buffer.n + 1
body_buffer.n = n
body_buffer[n] = chunk
ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
end
if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
body_data = str_sub(body_data, 1, max_resp_body_bytes)
return body_data
end
end

if eof then
body_buffer = ctx._body_buffer[ctx._plugin_name]
if not body_buffer then
if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
chunk = str_sub(chunk, 1, max_resp_body_bytes)
end
return chunk
end

body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
ctx._body_buffer[ctx._plugin_name] = nil
return body_buffer
return body_data
end

if not hold_the_copy then
Expand Down
36 changes: 34 additions & 2 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local expr = require("resty.expr.v1")
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
Expand All @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local math = math
local pairs = pairs
local type = type
local req_read_body = ngx.req.read_body
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")

Expand Down Expand Up @@ -95,7 +97,7 @@ local schema = {
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
enum = { 1, -1 },
},
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
Expand All @@ -115,6 +117,8 @@ local schema = {
type = "array"
}
},
max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand All @@ -134,7 +138,9 @@ local schema = {
local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
log_format = {
type = "object"
}
},
}

Expand Down Expand Up @@ -208,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
end


function _M.access(conf, ctx)
if conf.include_req_body then
local should_read_body = true
if conf.include_req_body_expr then
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate request expr err ', err)
return
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
should_read_body = false
end
end
if should_read_body then
req_read_body()
end
end
end


function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
Expand Down
Loading

0 comments on commit 5f5c642

Please sign in to comment.