Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support etcd v3 #1943

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 47 additions & 31 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ local mt = {
end
}

local function readdir(etcd_cli, key)
local function readdir(etcd_cli, key, opts)
if not etcd_cli then
return nil, nil, "not inited"
end

local res, err = etcd_cli:readdir(key, true)
local res, err = etcd_cli:readdir(key, opts)
if not res then
-- log.error("failed to get key from etcd: ", err)
return nil, nil, err
Expand All @@ -64,21 +64,24 @@ local function readdir(etcd_cli, key)
return nil, "failed to read etcd dir"
end

return res
return res, err
end

local function waitdir(etcd_cli, key, modified_index)
local function watchdir(etcd_cli, key, opts)
if not etcd_cli then
return nil, nil, "not inited"
end

local res, err = etcd_cli:waitdir(key, modified_index)
local res_fun, err = etcd_cli:watchdir(key, opts)
res_fun() -- skip create info
local res, err = res_fun()

if not res then
-- log.error("failed to get key from etcd: ", err)
return nil, err
end

if type(res.body) ~= "table" then
if type(res.result) ~= "table" then
return nil, "failed to read etcd dir"
end

Expand Down Expand Up @@ -122,22 +125,27 @@ local function sync_data(self)
if not res then
return false, err
end
if res.body.error then
return res, err
end

local dir_res, headers = res.body.node, res.headers
local dir_res, header = res.body, res.body.header
log.debug("readdir key: ", self.key, " res: ",
json.delay_encode(dir_res))
if not dir_res then
return false, err
end

if not dir_res.dir then
return false, self.key .. " is not a dir"
end
-- if not dir_res.dir then
-- return false, self.key .. " is not a dir"
-- end

if not dir_res.nodes then
dir_res.nodes = {}
if not dir_res.kvs then
dir_res.kvs = {}
-- not return k-v initially, not like v2
dir_res.kvs.key = self.key
end

if self.values then
for i, val in ipairs(self.values) do
if val and val.clean_handlers then
Expand All @@ -152,11 +160,12 @@ local function sync_data(self)
self.values_hash = nil
end

self.values = new_tab(#dir_res.nodes, 0)
self.values_hash = new_tab(0, #dir_res.nodes)
self.values = new_tab(#dir_res.kvs, 0)
self.values_hash = new_tab(0, #dir_res.kvs)

local changed = false
for _, item in ipairs(dir_res.nodes) do

for _, item in ipairs(dir_res.kvs) do
local key = short_key(self, item.key)
local data_valid = true
if type(item.value) ~= "table" then
Expand Down Expand Up @@ -186,11 +195,10 @@ local function sync_data(self)
end
end

self:upgrade_version(item.modifiedIndex)
self:upgrade_version(item.mod_revision)
end

if headers then
self:upgrade_version(headers["X-Etcd-Index"])
if header then
self:upgrade_version(header.revision)
end

if changed then
Expand All @@ -200,31 +208,34 @@ local function sync_data(self)
self.need_reload = false
return true
end

local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1)
log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1)
-- get the first change
local dir_res, err = watchdir(self.etcd_cli, self.key, {start_revision = self.prev_index + 1, progress_notify = true})
log.info("watchdir key: ", self.key, " prev_index: ", self.prev_index + 1)
log.info("res: ", json.delay_encode(dir_res, true))
if not dir_res then
return false, err
end

local res = dir_res.body.node
local res = dir_res.result.events[1].kv
-- TODO: would be effected when compact in v3
--[[
local err_msg = dir_res.body.message
if err_msg then
if err_msg == "The event in requested index is outdated and cleared"
and dir_res.body.errorCode == 401 then
self.need_reload = true
log.warn("waitdir [", self.key, "] err: ", err_msg,
log.warn("watchdir [", self.key, "] err: ", err_msg,
", need to fully reload")
return false
end
return false, err
end
]]--

if not res then
if err == "The event in requested index is outdated and cleared" then
self.need_reload = true
log.warn("waitdir [", self.key, "] err: ", err,
log.warn("watchdir [", self.key, "] err: ", err,
", need to fully reload")
return false
end
Expand All @@ -234,7 +245,7 @@ local function sync_data(self)

local key = short_key(self, res.key)
if res.value and type(res.value) ~= "table" then
self:upgrade_version(res.modifiedIndex)
self:upgrade_version(res.mod_revision)
return false, "invalid item data of [" .. self.key .. "/" .. key
.. "], val: " .. tostring(res.value)
.. ", it shoud be a object"
Expand All @@ -243,27 +254,29 @@ local function sync_data(self)
if res.value and self.item_schema then
local ok, err = check_schema(self.item_schema, res.value)
if not ok then
self:upgrade_version(res.modifiedIndex)
self:upgrade_version(res.mod_revision)

return false, "failed to check item data of ["
.. self.key .. "] err:" .. err
end
end

self:upgrade_version(res.modifiedIndex)

self:upgrade_version(res.mod_revision)
-- no dir
--[[
if res.dir then
if res.value then
return false, "todo: support for parsing `dir` response "
.. "structures. " .. json.encode(res)
end
return false
end
]]--

if self.filter then
self.filter(res)
end

-- ?clean_handlers
local pre_index = self.values_hash[key]
if pre_index then
local pre_val = self.values[pre_index]
Expand Down Expand Up @@ -385,6 +398,9 @@ function _M.new(key, opts)
etcd_conf.http_host = etcd_conf.host
etcd_conf.host = nil
etcd_conf.prefix = nil
etcd_conf.protocol = etcd_conf.version
etcd_conf.version = nil
etcd_conf.api_prefix = "/v3"

local etcd_cli
etcd_cli, err = etcd.new(etcd_conf)
Expand Down
Loading