1
0
mirror of https://github.com/Refactorio/RedMew.git synced 2024-12-12 10:04:40 +02:00

Merge pull request #962 from grilledham/task+priority_queue_refactor

Task+ PriorityQueue clean up + micro optimisations
This commit is contained in:
grilledham 2019-06-23 10:15:25 +01:00 committed by GitHub
commit da4827ea98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 62 deletions

View File

@ -1,20 +1,46 @@
local Debug = require 'utils.debug'
local is_closure = Debug.is_closure
local floor = math.floor
local PriorityQueue = {} local PriorityQueue = {}
function PriorityQueue.new() local function default_comparator(a, b)
return {}
end
local function default_comp(a, b)
return a < b return a < b
end end
local function HeapifyFromEndToStart(queue, comp) --- Min heap implementation of a priority queue. Smaller elements, as determined by the comparator,
comp = comp or default_comp -- have a higher priority.
local pos = #queue -- @param comparator <function|nil> the comparator function used to compare elements, if nil the
-- deafult comparator is used.
-- @usage
-- local PriorityQueue = require 'utils.priority_queue'
--
-- local queue = PriorityQueue.new()
-- PriorityQueue.push(queue, 4)
-- PriorityQueue.push(queue, 7)
-- PriorityQueue.push(queue, 2)
--
-- game.print(PriorityQueue.pop(queue)) -- 2
-- game.print(PriorityQueue.pop(queue)) -- 4
-- game.print(PriorityQueue.pop(queue)) -- 7
function PriorityQueue.new(comparator)
if comparator == nil then
comparator = default_comparator
elseif is_closure(comparator) then
error('comparator cannot be a closure.', 2)
end
return {_comparator = comparator}
end
local function heapify_from_end_to_start(self)
local comparator = self._comparator
local pos = #self
while pos > 1 do while pos > 1 do
local parent = bit32.rshift(pos, 1) -- integer division by 2 local parent = floor(pos * 0.5)
if comp(queue[pos], queue[parent]) then local a, b = self[pos], self[parent]
queue[pos], queue[parent] = queue[parent], queue[pos] if comparator(a, b) then
self[pos], self[parent] = b, a
pos = parent pos = parent
else else
break break
@ -22,25 +48,26 @@ local function HeapifyFromEndToStart(queue, comp)
end end
end end
local function HeapifyFromStartToEnd(queue, comp) local function heapify_from_start_to_end(self)
comp = comp or default_comp local comparator = self._comparator
local parent = 1 local parent = 1
local smallest = 1 local smallest = 1
local count = #self
while true do while true do
local child = parent * 2 local child = parent * 2
if child > #queue then if child > count then
break break
end end
if comp(queue[child], queue[parent]) then if comparator(self[child], self[parent]) then
smallest = child smallest = child
end end
child = child + 1 child = child + 1
if child <= #queue and comp(queue[child], queue[smallest]) then if child <= count and comparator(self[child], self[smallest]) then
smallest = child smallest = child
end end
if parent ~= smallest then if parent ~= smallest then
queue[parent], queue[smallest] = queue[smallest], queue[parent] self[parent], self[smallest] = self[smallest], self[parent]
parent = smallest parent = smallest
else else
break break
@ -48,27 +75,33 @@ local function HeapifyFromStartToEnd(queue, comp)
end end
end end
function PriorityQueue.size(queue) --- Returns the number of the number of elements in the priority queue.
return #queue function PriorityQueue.size(self)
return #self
end end
function PriorityQueue.push(queue, element, comp) -- Inserts an element into the priority queue.
table.insert(queue, element) function PriorityQueue.push(self, element)
HeapifyFromEndToStart(queue, comp) self[#self + 1] = element
heapify_from_end_to_start(self)
end end
function PriorityQueue.pop(queue, comp) -- Removes and returns the highest priority element from the priority queue.
local element = queue[1] -- If the priority queue is empty returns nil.
function PriorityQueue.pop(self)
local element = self[1]
queue[1] = queue[#queue] self[1] = self[#self]
queue[#queue] = nil self[#self] = nil
HeapifyFromStartToEnd(queue, comp) heapify_from_start_to_end(self)
return element return element
end end
function PriorityQueue.peek(queue) -- Returns, without removing, the highest priority element from the priority queue.
return queue[1] -- If the priority queue is empty returns nil.
function PriorityQueue.peek(self)
return self[1]
end end
return PriorityQueue return PriorityQueue

View File

@ -9,38 +9,65 @@ local PriorityQueue = require 'utils.priority_queue'
local Event = require 'utils.event' local Event = require 'utils.event'
local Token = require 'utils.token' local Token = require 'utils.token'
local ErrorLogging = require 'utils.error_logging' local ErrorLogging = require 'utils.error_logging'
local Global = require 'utils.global'
local floor = math.floor
local log10 = math.log10
local Token_get = Token.get
local pcall = pcall
local Queue_peek = Queue.peek
local Queue_pop = Queue.pop
local Queue_push = Queue.push
local PriorityQueue_peek = PriorityQueue.peek
local PriorityQueue_pop = PriorityQueue.pop
local PriorityQueue_push = PriorityQueue.push
local Task = {} local Task = {}
global.callbacks = global.callbacks or PriorityQueue.new() local function comparator(a, b)
global.next_async_callback_time = -1
global.task_queue = global.task_queue or Queue.new()
global.total_task_weight = 0
global.task_queue_speed = 1
local function comp(a, b)
return a.time < b.time return a.time < b.time
end end
global.tpt = global.task_queue_speed local callbacks = PriorityQueue.new(comparator)
local function get_task_per_tick() local task_queue = Queue.new()
if game.tick % 300 == 0 then local primitives = {
local size = global.total_task_weight next_async_callback_time = -1,
global.tpt = math.floor(math.log10(size + 1)) * global.task_queue_speed total_task_weight = 0,
if global.tpt < 1 then task_queue_speed = 1,
global.tpt = 1 task_per_tick = 1
}
Global.register(
{callbacks = callbacks, task_queue = task_queue, primitives = primitives},
function(tbl)
callbacks = tbl.callbacks
task_queue = tbl.task_queue
primitives = tbl.primitives
end end
)
local function get_task_per_tick(tick)
if tick % 300 == 0 then
local size = primitives.total_task_weight
local task_per_tick = floor(log10(size + 1)) * primitives.task_queue_speed
if task_per_tick < 1 then
task_per_tick = 1
end end
return global.tpt
primitives.task_per_tick = task_per_tick
return task_per_tick
end
return primitives.task_per_tick
end end
local function on_tick() local function on_tick()
local queue = global.task_queue local tick = game.tick
for i = 1, get_task_per_tick() do
local task = Queue.peek(queue) for i = 1, get_task_per_tick(tick) do
local task = Queue_peek(task_queue)
if task ~= nil then if task ~= nil then
-- result is error if not success else result is a boolean for if the task should stay in the queue. -- result is error if not success else result is a boolean for if the task should stay in the queue.
local success, result = pcall(Token.get(task.func_token), task.params) local success, result = pcall(Token_get(task.func_token), task.params)
if not success then if not success then
if _DEBUG then if _DEBUG then
error(result) error(result)
@ -48,19 +75,18 @@ local function on_tick()
log(result) log(result)
ErrorLogging.generate_error_report(result) ErrorLogging.generate_error_report(result)
end end
Queue.pop(queue) Queue_pop(task_queue)
global.total_task_weight = global.total_task_weight - task.weight primitives.total_task_weight = primitives.total_task_weight - task.weight
elseif not result then elseif not result then
Queue.pop(queue) Queue_pop(task_queue)
global.total_task_weight = global.total_task_weight - task.weight primitives.total_task_weight = primitives.total_task_weight - task.weight
end end
end end
end end
local callbacks = global.callbacks local callback = PriorityQueue_peek(callbacks)
local callback = PriorityQueue.peek(callbacks) while callback ~= nil and tick >= callback.time do
while callback ~= nil and game.tick >= callback.time do local success, result = pcall(Token_get(callback.func_token), callback.params)
local success, result = pcall(Token.get(callback.func_token), callback.params)
if not success then if not success then
if _DEBUG then if _DEBUG then
error(result) error(result)
@ -69,8 +95,8 @@ local function on_tick()
ErrorLogging.generate_error_report(result) ErrorLogging.generate_error_report(result)
end end
end end
PriorityQueue.pop(callbacks, comp) PriorityQueue_pop(callbacks)
callback = PriorityQueue.peek(callbacks) callback = PriorityQueue_peek(callbacks)
end end
end end
@ -85,7 +111,7 @@ function Task.set_timeout_in_ticks(ticks, func_token, params)
end end
local time = game.tick + ticks local time = game.tick + ticks
local callback = {time = time, func_token = func_token, params = params} local callback = {time = time, func_token = func_token, params = params}
PriorityQueue.push(global.callbacks, callback, comp) PriorityQueue_push(callbacks, callback)
end end
--- Allows you to set a timer (in seconds) after which the tokened function will be run with params given as an argument --- Allows you to set a timer (in seconds) after which the tokened function will be run with params given as an argument
@ -109,8 +135,21 @@ end
-- Ex. if the task is expected to repeat multiple times (ie. the function returns true and loops several ticks) -- Ex. if the task is expected to repeat multiple times (ie. the function returns true and loops several ticks)
function Task.queue_task(func_token, params, weight) function Task.queue_task(func_token, params, weight)
weight = weight or 1 weight = weight or 1
global.total_task_weight = global.total_task_weight + weight primitives.total_task_weight = primitives.total_task_weight + weight
Queue.push(global.task_queue, {func_token = func_token, params = params, weight = weight}) Queue_push(task_queue, {func_token = func_token, params = params, weight = weight})
end
function Task.get_queue_speed()
return primitives.task_queue_speed
end
function Task.set_queue_speed(value)
value = value or 1
if value < 0 then
value = 0
end
primitives.task_queue_speed = value
end end
Event.add(defines.events.on_tick, on_tick) Event.add(defines.events.on_tick, on_tick)