Skip to content

Commit

Permalink
Merge pull request #12 from S-S-X/issue-11
Browse files Browse the repository at this point in the history
Fix fetch_async
  • Loading branch information
S-S-X authored Mar 26, 2021
2 parents ec9fb9e + 5ba7e4c commit 7806eee
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 19 deletions.
26 changes: 12 additions & 14 deletions main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ local function QoS_wrapper(_, http_api, default_priority)
rc = rc + 1
req.timeout = get_timeout(req.timeout, priority_override)
handles[index] = api.fetch_async(req)
return index
end
else
-- TODO: Optimize to handle full queue at once if more than single response is available
Expand All @@ -136,21 +137,24 @@ local function QoS_wrapper(_, http_api, default_priority)
function fetch_async(req, index)
rc = rc + 1
handles[index] = api.fetch_async(req)
return index
end
end

function obj.fetch_async(req, priority_override)
-- Reserve future handle
handle_index = handle_index + 1
local p = priority_override or priority
-- Check queue limits for selected priority
if rc < limits[p] then
-- Execute request directly when below limits
return fetch_async(req, p)
end
-- Reserve future handle and queue request when above limits, if queues are full return nothing
handle_index = handle_index + 1
if queues:push(p, function() local index = handle_index; fetch_async(req, index, p) end) then
-- Execute request directly
return fetch_async(req, handle_index, p)
elseif queues:push(p, function() local index = handle_index; fetch_async(req, index, p) end) then
-- Queue request
handles[handle_index] = true
return handle_index
end
-- Queues are full, return nothing. Request failed and will not be executed ever.
end

function obj.fetch_async_get(handle)
Expand All @@ -159,21 +163,15 @@ local function QoS_wrapper(_, http_api, default_priority)
-- This request is queued and not yet executed
return {}
elseif real_handle then
-- This request was queued and handle should
-- This request was queued
local res = api.fetch_async_get(real_handle)
if res.completed then
rc = rc - 1
handles[handle] = nil
end
return res
else
-- This request was never queued and uses handle provided by engine
local res = api.fetch_async_get(handle)
if res.completed then
rc = rc - 1
end
return res
end
error("QoS fetch_async_get invalid handle")
end

function obj.fetch(req, callback, priority_override)
Expand Down
45 changes: 40 additions & 5 deletions spec/main_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ sourcefile("init")

describe("QoS wrapped HTTP API", function()

setup(function()
before_each(function()
-- Set fake current running mod name, mytestmod is added to secure.http_mods
mineunit:set_current_modname("mytestmod")
end)

teardown(function()
-- Restore current running mod name
after_each(function()
-- Restore current running mod name and clear queues
mineunit:restore_current_modname()
for _,queue in ipairs(QoS.data.queues) do
queue:clear()
end
-- Execute internal HTTP request queues to clean up
for _=1,30 do mineunit:execute_globalstep() end
end)

it("wraps fetch for supplied object", function()
Expand All @@ -35,7 +40,7 @@ describe("QoS wrapped HTTP API", function()

-- checkpoint was called once, minetest default handler was not called
assert.spy(checkpoint).was.called(1)
assert.spy(http.fetch).was.called(0)
assert.spy(http.fetch).was.not_called()
end)

it("pushes requests to QoS queue", function()
Expand All @@ -59,7 +64,37 @@ describe("QoS wrapped HTTP API", function()
-- priority 2 queue push was called 21 times: total 30 - executed 9 = queued 21
assert.spy(QoS.data.queues[2].push).was.called(21)
-- minetest default handler was not called
assert.spy(http.fetch).was.called(0)
assert.spy(http.fetch).was.not_called()
end)

it("pushes http.fetch_async requests to QoS queue", function()
local http = minetest.request_http_api()
local qos_http = QoS(http, 2)

local checkpoint = spy.new(function(data)
-- HTTP response table gets through QoS
assert.is_hashed(data)
end)
spy.on(http, "fetch_async")
spy.on(QoS.data.queues[2], "push")

-- Create 30 fetch requests, curl_parallel_limit = 12 (fixtures/minetest.cfg)
local handles = {}
for i=1,30 do table.insert(handles, qos_http.fetch_async({ url = "http://127.0.0.1/" })) end

for _,handle in ipairs(handles) do
local result = qos_http.fetch_async_get(handle)
if result.completed then
checkpoint(result)
end
end

-- checkpoint was called 9 times (at most 80% of curl_parallel_limit)
assert.spy(checkpoint).was.called(9)
-- priority 2 queue push was called 21 times: total 30 - executed 9 = queued 21
assert.spy(QoS.data.queues[2].push).was.called(21)
-- minetest default handler was called 9 times
assert.spy(http.fetch_async).was.called(9)
end)

end)

0 comments on commit 7806eee

Please sign in to comment.