diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index 64f36c5200..e724a42863 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -81,7 +81,7 @@ import lime.utils.Log; var promise = new Promise(); promise.future = this; - FutureWork.run(work, promise); + FutureWork.runSimpleJob(work, promise); } else #end @@ -308,7 +308,6 @@ import lime.utils.Log; } } -#if (lime_threads && !html5) /** The class that handles asynchronous `work` functions passed to `new Future()`. **/ @@ -319,26 +318,42 @@ import lime.utils.Log; @:dox(hide) class FutureWork { private static var threadPool:ThreadPool; - private static var promises:MapDynamic, error:Dynamic->Dynamic}>; + private static var promises:MapDynamic, error:Dynamic->Dynamic, progress:Int->Int->Dynamic}>; public static var minThreads(default, set):Int = 0; public static var maxThreads(default, set):Int = 1; public static var activeJobs(get, never):Int; + @:allow(lime.app.Promise) + private static inline function cancelJob(id:Int):Void + { + threadPool.cancelJob(id); + } + + #if (lime_threads && !html5) @:allow(lime.app.Future) - private static function run(work:Void->T, promise:Promise):Void + private static function runSimpleJob(work:Void->T, promise:Promise):Void + { + run(threadPool_doWork, promise, work, MULTI_THREADED); + } + #end + + @:allow(lime.app.Promise) + private static function run(work:WorkFunctionWorkOutput->Void>, promise:Promise, state:State, mode:ThreadMode):Int { if (threadPool == null) { threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); threadPool.onComplete.add(threadPool_onComplete); threadPool.onError.add(threadPool_onError); + threadPool.onProgress.add(threadPool_onProgress); promises = new Map(); } - var jobID:Int = threadPool.run(threadPool_doWork, work); - promises[jobID] = {complete: promise.complete, error: promise.error}; + var jobID:Int = threadPool.run(work, state, mode); + promises[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress}; + return jobID; } // Event Handlers @@ -368,6 +383,15 @@ import lime.utils.Log; promise.error(error); } + private static function threadPool_onProgress(progress:{progress:Int, total:Int}):Void + { + // ThreadPool doesn't enforce types, so check manually + if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt) + { + promises[threadPool.activeJob.id].progress(progress.progress, progress.total); + } + } + // Getters & Setters @:noCompletion private static inline function set_minThreads(value:Int):Int { @@ -392,4 +416,3 @@ import lime.utils.Log; return threadPool != null ? threadPool.activeJobs : 0; } } -#end diff --git a/src/lime/app/Promise.hx b/src/lime/app/Promise.hx index 627f32063d..606b2230d3 100644 --- a/src/lime/app/Promise.hx +++ b/src/lime/app/Promise.hx @@ -1,5 +1,9 @@ package lime.app; +import lime.app.Future; +import lime.system.ThreadPool; +import lime.system.WorkOutput; + /** `Promise` is an implementation of Futures and Promises, with the exception that in addition to "success" and "failure" states (represented as "complete" and "error"), @@ -10,18 +14,20 @@ package lime.app; for recipients of it's `Future` object. For example: ```haxe - function examplePromise ():Future { - - var promise = new Promise (); + function examplePromise():Future + { + var promise = new Promise(); var progress = 0, total = 10; - var timer = new Timer (100); - timer.run = function () { + var timer = new Timer(100); + timer.run = function() + { promise.progress (progress, total); progress++; - if (progress == total) { + if (progress == total) + { promise.complete ("Done!"); timer.stop (); @@ -31,12 +37,11 @@ package lime.app; }; return promise.future; - } - var future = examplePromise (); - future.onComplete (function (message) { trace (message); }); - future.onProgress (function (loaded, total) { trace ("Progress: " + loaded + ", " + total); }); + var future = examplePromise(); + future.onComplete(function(message) { trace(message); }); + future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); }); ``` **/ #if !lime_debug @@ -69,6 +74,8 @@ class Promise **/ public var isError(get, null):Bool; + private var jobID:Int = -1; + #if commonjs private static function __init__() { @@ -96,11 +103,23 @@ class Promise **/ public function complete(data:T):Promise { + if (!ThreadPool.isMainThread()) + { + haxe.MainLoop.runInMainThread(complete.bind(data)); + return this; + } + if (!future.isError) { future.isComplete = true; future.value = data; + if (jobID != -1) + { + FutureWork.cancelJob(jobID); + jobID = -1; + } + if (future.__completeListeners != null) { for (listener in future.__completeListeners) @@ -115,6 +134,45 @@ class Promise return this; } + /** + Runs the given function asynchronously, and resolves this `Promise` with + the complete, error, and/or progress events sent by that function. + Sample usage: + + ```haxe + function examplePromise():Future + { + var promise = new Promise(); + promise.completeAsync(function(state:State, output:WorkOutput):Void + { + output.sendProgress({progress:state.progress, total:10}); + state.progress++; + + if (state.progress == 10) + { + output.sendComplete("Done!"); + } + }, + {progress: 0}, MULTI_THREADED); + + return promise.future; + } + + var future = examplePromise(); + future.onComplete(function(message) { trace(message); }); + future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); }); + ``` + + @param doWork A function to perform work asynchronously. For best results, + see the guidelines in the `ThreadPool` class overview. + @param state The value to pass to `doWork`. + @param mode Which mode to run the job in: `SINGLE_THREADED` or `MULTI_THREADED`. + **/ + public function completeAsync(doWork:WorkFunctionWorkOutput->Void>, ?state:State, ?mode:ThreadMode = MULTI_THREADED):Void + { + jobID = FutureWork.run(doWork, this, state, mode); + } + /** Resolves this `Promise` with the complete, error and/or progress state of another `Future` @@ -137,11 +195,23 @@ class Promise **/ public function error(msg:Dynamic):Promise { + if (!ThreadPool.isMainThread()) + { + haxe.MainLoop.runInMainThread(error.bind(msg)); + return this; + } + if (!future.isComplete) { future.isError = true; future.error = msg; + if (jobID != -1) + { + FutureWork.cancelJob(jobID); + jobID = -1; + } + if (future.__errorListeners != null) { for (listener in future.__errorListeners) @@ -164,6 +234,12 @@ class Promise **/ public function progress(progress:Int, total:Int):Promise { + if (!ThreadPool.isMainThread()) + { + haxe.MainLoop.runInMainThread(this.progress.bind(progress, total)); + return this; + } + if (!future.isError && !future.isComplete) { if (future.__progressListeners != null) @@ -179,12 +255,12 @@ class Promise } // Get & Set Methods - @:noCompletion private function get_isComplete():Bool + @:noCompletion private inline function get_isComplete():Bool { return future.isComplete; } - @:noCompletion private function get_isError():Bool + @:noCompletion private inline function get_isError():Bool { return future.isError; } diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index ec8669b295..cc83ba8b5e 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -89,9 +89,9 @@ class ThreadPool extends WorkOutput /** __Access this only from the main thread.__ - The sum of all active single-threaded pools' `workPriority` values. + The sum of `workPriority` values from all pools with an ongoing + single-threaded job. **/ - @:allow(lime.system.JobList) private static var __totalWorkPriority:Float = 0; /** @@ -181,22 +181,19 @@ class ThreadPool extends WorkOutput private var __doWork:WorkFunctionWorkOutput->Void>; - private var __activeJobs:JobList; - #if lime_threads - /** - The set of threads actively running a job. - **/ - private var __activeThreads:Map; - /** A list of idle threads. Not to be confused with `idleThreads`, a public variable equal to `__idleThreads.length`. **/ - private var __idleThreads:Array; + private var __idleThreads:Array = []; + + private var __multiThreadedJobs:JobArray = []; + private var __multiThreadedQueue:JobArray = []; #end - private var __jobQueue:JobList = new JobList(); + private var __singleThreadedJob(default, set):JobData; + private var __singleThreadedQueue:JobArray = []; /** __Call this only from the main thread.__ @@ -206,26 +203,20 @@ class ThreadPool extends WorkOutput immediately; only after enough calls to `run()`. Only applies in multi-threaded mode. @param maxThreads The maximum number of threads that will run at once. - @param mode Defaults to `MULTI_THREADED` on most targets, but - `SINGLE_THREADED` in HTML5. In HTML5, `MULTI_THREADED` mode uses web - workers, which impose additional restrictions. + @param mode The mode jobs will run in by default. Defaults to + `SINGLE_THREADED` in HTML5 for backwards compatibility. **/ public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null) { - super(mode); + if (!isMainThread()) + { + throw "Call new ThreadPool() only from the main thread."; + } - __activeJobs = new JobList(this); + super(mode); this.minThreads = minThreads; this.maxThreads = maxThreads; - - #if lime_threads - if (this.mode == MULTI_THREADED) - { - __activeThreads = new Map(); - __idleThreads = []; - } - #end } /** @@ -243,13 +234,13 @@ class ThreadPool extends WorkOutput Application.current.onUpdate.remove(__update); + #if lime_threads // Cancel active jobs, leaving `minThreads` idle threads. - for (job in __activeJobs) + for (job in __multiThreadedJobs) { - #if lime_threads if (mode == MULTI_THREADED) { - var thread:Thread = __activeThreads[job.id]; + var thread:Thread = job.thread; if (idleThreads < minThreads) { thread.sendMessage({event: CANCEL}); @@ -260,7 +251,6 @@ class ThreadPool extends WorkOutput thread.sendMessage({event: EXIT}); } } - #end if (error != null) { @@ -274,9 +264,8 @@ class ThreadPool extends WorkOutput activeJob = null; } } - __activeJobs.clear(); + __multiThreadedJobs.clear(); - #if lime_threads // Exit idle threads if there are more than the minimum. while (idleThreads > minThreads) { @@ -284,16 +273,34 @@ class ThreadPool extends WorkOutput } #end - // Clear the job queue. + if (__singleThreadedJob != null && error != null) + { + activeJob = __singleThreadedJob; + onError.dispatch(error); + activeJob = null; + } + __singleThreadedJob = null; + + // Clear the job queues. if (error != null) { - for (job in __jobQueue) + for (job in __singleThreadedQueue) + { + activeJob = job; + onError.dispatch(error); + } + #if lime_threads + for (job in __multiThreadedQueue) { activeJob = job; onError.dispatch(error); } + #end } - __jobQueue.clear(); + __singleThreadedQueue.clear(); + #if lime_threads + __multiThreadedQueue.clear(); + #end __jobComplete.value = false; activeJob = null; @@ -305,17 +312,32 @@ class ThreadPool extends WorkOutput **/ public function cancelJob(jobID:Int):Bool { + if (__singleThreadedJob != null && __singleThreadedJob.id == jobID) + { + __singleThreadedJob = null; + return true; + } + else if (__singleThreadedQueue.removeJob(jobID) != null) + { + return true; + } + #if lime_threads - var thread:Thread = __activeThreads[jobID]; - if (thread != null) + var job:JobData = __multiThreadedJobs.removeJob(jobID); + if (job != null) { - thread.sendMessage({event: CANCEL}); - __activeThreads.remove(jobID); - __idleThreads.push(thread); + if (job.thread != null) + { + job.thread.sendMessage({event: CANCEL}); + __idleThreads.push(job.thread); + } + return true; } - #end - return __activeJobs.remove(__activeJobs.get(jobID)) || __jobQueue.remove(__jobQueue.get(jobID)); + return __multiThreadedQueue.removeJob(jobID) != null; + #else + return false; + #end } /** @@ -334,9 +356,11 @@ class ThreadPool extends WorkOutput only access its arguments, and return often. @param state An object to pass to `doWork`, ideally a mutable object so that `doWork` can save its progress. + @param mode Which mode to run the job in. If omitted, the pool's default + mode will be used. @return The job's unique ID. **/ - public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null):Int + public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null, ?mode:ThreadMode = null):Int { if (!isMainThread()) { @@ -361,13 +385,24 @@ class ThreadPool extends WorkOutput } var job:JobData = new JobData(doWork, state); - __jobQueue.push(job); + #if lime_threads + if (mode == MULTI_THREADED || mode == null && this.mode == MULTI_THREADED) + { + __multiThreadedQueue.push(job); + } + else + #end + { + __singleThreadedQueue.push(job); + } if (!Application.current.onUpdate.has(__update)) { Application.current.onUpdate.add(__update); } + __startJobs(); + return job.id; } @@ -410,7 +445,7 @@ class ThreadPool extends WorkOutput return; } - if (event.event != WORK || event.job == null) + if (event.event != WORK || event.doWork == null || event.jobID == null) { // Go idle. event = null; @@ -418,7 +453,7 @@ class ThreadPool extends WorkOutput } // Get to work. - output.activeJob = event.job; + output.activeJob = new JobData(event.doWork, event.state, event.jobID); var interruption:Dynamic = null; try @@ -426,7 +461,7 @@ class ThreadPool extends WorkOutput while (!output.__jobComplete.value && (interruption = Thread.readMessage(false)) == null) { output.workIterations.value++; - event.job.doWork.dispatch(event.job.state, output); + event.doWork.dispatch(event.state, output); } } catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end) @@ -469,50 +504,66 @@ class ThreadPool extends WorkOutput } /** - Schedules (in multi-threaded mode) or runs (in single-threaded mode) the - job queue, then processes incoming events. + Processes the job queues, starting any jobs that can be started. **/ - private function __update(deltaTime:Int):Void + private function __startJobs():Void { if (!isMainThread()) { return; } - // Process the queue. - while (__jobQueue.length > 0 && activeJobs < maxThreads) + if (__singleThreadedJob == null && __singleThreadedQueue.length > 0) { - var job:JobData = __jobQueue.pop(); - - job.startTime = timestamp(); - __activeJobs.push(job); + __singleThreadedJob = __singleThreadedQueue.shift(); + __singleThreadedJob.startTime = timestamp(); + } - #if lime_threads - if (mode == MULTI_THREADED) + #if lime_threads + for (job in __multiThreadedQueue) + { + if (__multiThreadedJobs.length >= maxThreads) { - #if html5 - job.doWork.makePortable(); - #end - - var thread:Thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); - __activeThreads[job.id] = thread; - thread.sendMessage({event: WORK, job: job}); + break; } + + #if html5 + job.doWork.makePortable(); #end + + job.thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); + job.thread.sendMessage({event: WORK, jobID: job.id, doWork: job.doWork, state: job.state}); + job.startTime = timestamp(); + + __multiThreadedJobs.push(job); + __multiThreadedQueue.remove(job); } + #end + } - // Run the next single-threaded job, if any. - if (mode == SINGLE_THREADED && __activeJobs.hasNext()) + /** + Processes the job queues, then processes incoming events. + **/ + private function __update(deltaTime:Int):Void + { + if (!isMainThread()) { - activeJob = __activeJobs.next(); + return; + } + + __startJobs(); + + // Run the single-threaded job. + if (__singleThreadedJob != null) + { + activeJob = __singleThreadedJob; var state:State = activeJob.state; __jobComplete.value = false; workIterations.value = 0; - // `workLoad / frameRate` is the total time that pools may use per - // frame. `workPriority / __totalWorkPriority` is this pool's - // fraction of that total. + // `workLoad / frameRate` is the total time that pools may use per frame. + // `workPriority / __totalWorkPriority` is this pool's fraction of that total. var maxTimeElapsed:Float = workPriority * workLoad / (__totalWorkPriority * Application.current.window.frameRate); var startTime:Float = timestamp(); @@ -540,24 +591,32 @@ class ThreadPool extends WorkOutput var threadEvent:ThreadEvent; while ((threadEvent = __jobOutput.pop(false)) != null) { - if (threadEvent.jobID != null) + var activeJobMode:ThreadMode = SINGLE_THREADED; + if (__singleThreadedJob != null && threadEvent.jobID == __singleThreadedJob.id) { - activeJob = __activeJobs.get(threadEvent.jobID); + activeJob = __singleThreadedJob; } else { - activeJob = threadEvent.job; + #if lime_threads + activeJob = __multiThreadedJobs.getJob(threadEvent.jobID); + activeJobMode = MULTI_THREADED; + #else + continue; + #end } - if (activeJob == null || !__activeJobs.exists(activeJob)) + if (activeJob == null) { continue; } - if (mode == MULTI_THREADED) + #if lime_threads + if (activeJobMode == MULTI_THREADED) { activeJob.duration = timestamp() - activeJob.startTime; } + #end switch (threadEvent.event) { @@ -577,24 +636,25 @@ class ThreadPool extends WorkOutput onError.dispatch(threadEvent.message); } - __activeJobs.remove(activeJob); - #if lime_threads - if (mode == MULTI_THREADED) + if (activeJobMode == MULTI_THREADED) { - var thread:Thread = __activeThreads[activeJob.id]; - __activeThreads.remove(activeJob.id); + __multiThreadedJobs.remove(activeJob); - if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) + if (currentThreads > maxThreads || currentThreads - __multiThreadedQueue.length > minThreads) { - thread.sendMessage({event: EXIT}); + activeJob.thread.sendMessage({event: EXIT}); } else { - __idleThreads.push(thread); + __idleThreads.push(activeJob.thread); } } + else #end + { + __singleThreadedJob = null; + } default: } @@ -602,7 +662,7 @@ class ThreadPool extends WorkOutput activeJob = null; } - if (activeJobs == 0 && __jobQueue.length == 0) + if (0 == activeJobs + __singleThreadedQueue.length #if lime_threads + __multiThreadedQueue.length #end) { Application.current.onUpdate.remove(__update); } @@ -624,7 +684,8 @@ class ThreadPool extends WorkOutput private inline function get_activeJobs():Int { - return __activeJobs.length; + return #if lime_threads __multiThreadedJobs.length + #end + (__singleThreadedJob != null ? 1 : 0); } private inline function get_idleThreads():Int @@ -642,9 +703,22 @@ class ThreadPool extends WorkOutput return this; } + private inline function set___singleThreadedJob(value:JobData):JobData + { + if (value != null && __singleThreadedJob == null) + { + __totalWorkPriority += workPriority; + } + else if (value == null && __singleThreadedJob != null) + { + __totalWorkPriority -= workPriority; + } + return __singleThreadedJob = value; + } + private function set_workPriority(value:Float):Float { - if (mode == SINGLE_THREADED && activeJobs > 0) + if (__singleThreadedJob != null) { __totalWorkPriority += value - workPriority; } @@ -705,104 +779,21 @@ private abstract PseudoEvent(ThreadPool) from ThreadPool } } -class JobList +@:forward +private abstract JobArray(Array) from Array { - /** - * Whether `pool.workPriority` is being added to - * `ThreadPool.__totalWorkPriority`. Set this to true when `length > 0` and - * false when `length == 0`. The setter will ensure it is only added once. - */ - @:allow(lime.system.ThreadPool) - private var __addingWorkPriority(default, set):Bool; - - private var __index:Int = 0; - - private var __jobs:Array = []; - - public var length(get, never):Int; - - public var pool(default, null):ThreadPool; - - public inline function new(?pool:ThreadPool) - { - this.pool = pool; - @:bypassAccessor __addingWorkPriority = false; - } - public inline function clear():Void { #if haxe4 - __jobs.resize(0); + this.resize(0); #else - __jobs = []; + this.splice(0, this.length); #end - __addingWorkPriority = false; - } - - public inline function exists(job:JobData):Bool - { - return get(job.id) != null; - } - - public inline function hasNext():Bool - { - return __jobs.length > 0; - } - - /** - Iterates in an endless loop, starting over upon reaching the end. - **/ - public inline function next():JobData - { - __index++; - if (__index >= length) - { - __index = 0; - } - - return __jobs[__index]; - } - - public inline function pop():JobData - { - var job:JobData = __jobs.pop(); - __addingWorkPriority = length > 0; - return job; - } - - public function remove(job:JobData):Bool - { - if (__jobs.remove(job)) - { - __addingWorkPriority = length > 0; - return true; - } - else if (removeByID(job.id)) - { - return true; - } - else - { - return false; - } } - public inline function removeByID(id:Int):Bool + public function getJob(id:Int):JobData { - if (__jobs.remove(get(id))) - { - __addingWorkPriority = length > 0; - return true; - } - else - { - return false; - } - } - - public function get(id:Int):JobData - { - for (job in __jobs) + for (job in this) { if (job.id == id) { @@ -811,36 +802,18 @@ class JobList } return null; } - public inline function push(job:JobData):Void - { - __jobs.push(job); - __addingWorkPriority = true; - } - // Getters & Setters - - private inline function set___addingWorkPriority(value:Bool):Bool + public function removeJob(id:Int):JobData { - if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread()) + for (i in 0...this.length) { - if (value) - { - ThreadPool.__totalWorkPriority += pool.workPriority; - } - else + var job:JobData = this[i]; + if (job.id == id) { - ThreadPool.__totalWorkPriority -= pool.workPriority; + this.splice(i, 1); + return job; } - return __addingWorkPriority = value; - } - else - { - return __addingWorkPriority; } - } - - private inline function get_length():Int - { - return __jobs.length; + return null; } } diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index c2673703bd..d32b7469a7 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -49,17 +49,10 @@ class WorkOutput public var workIterations(default, null):Tls = new Tls(); /** - Whether background threads are being/will be used. If threads aren't - available on this target, `mode` will always be `SINGLE_THREADED`. + The mode jobs will run in by default. If threads aren't available, jobs + will always run in `SINGLE_THREADED` mode. **/ - public var mode(get, never):ThreadMode; - - #if lime_threads - /** - __Set this only via the constructor.__ - **/ - private var __mode:ThreadMode; - #end + public var mode:ThreadMode; /** Messages sent by active jobs, received by the main thread. @@ -87,7 +80,7 @@ class WorkOutput __jobComplete.value = false; #if lime_threads - __mode = mode != null ? mode : #if html5 SINGLE_THREADED #else MULTI_THREADED #end; + this.mode = mode != null ? mode : #if html5 SINGLE_THREADED #else MULTI_THREADED #end; #end } @@ -185,15 +178,6 @@ class WorkOutput // Getters & Setters - private inline function get_mode():ThreadMode - { - #if lime_threads - return __mode; - #else - return SINGLE_THREADED; - #end - } - private inline function get_activeJob():JobData { return __activeJob.value; @@ -331,10 +315,19 @@ class JobData @:allow(lime.system.WorkOutput) private var startTime:Float = 0; + #if lime_threads + @:allow(lime.system.WorkOutput) + private var thread:Thread; + #end + @:allow(lime.system.WorkOutput) - private inline function new(doWork:WorkFunctionWorkOutput->Void>, state:State) + private inline function new(doWork:WorkFunctionWorkOutput->Void>, state:State, ?id:Int) { - id = nextID++; + this.id = id != null ? id : nextID++; + if (this.id == -1) + { + throw "All job IDs have been used!"; + } this.doWork = doWork; this.state = state; } @@ -358,8 +351,11 @@ typedef ThreadEvent = { var event:ThreadEventType; @:optional var message:Dynamic; - @:optional var job:JobData; @:optional var jobID:Int; + + // Only for "WORK" events + @:optional var doWork:WorkFunctionWorkOutput->Void>; + @:optional var state:State; } class JSAsync