diff --git a/src/lime/_internal/backend/html5/HTML5Thread.hx b/src/lime/_internal/backend/html5/HTML5Thread.hx index 845a2924d7..022a085da1 100644 --- a/src/lime/_internal/backend/html5/HTML5Thread.hx +++ b/src/lime/_internal/backend/html5/HTML5Thread.hx @@ -477,7 +477,7 @@ abstract Message(Dynamic) from Dynamic to Dynamic // Skip `null` for obvious reasons. return object == null // No need to preserve a primitive type. - || !#if (haxe_ver >= 4.2) Std.isOfType #else untyped __js__ #end (object, Object) + || !#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (object, Object) // Objects with this field have been deliberately excluded. || Reflect.field(object, SKIP_FIELD) == true // A `Uint8Array` (the type used by `haxe.io.Bytes`) can have diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index 2af97182d4..64f36c5200 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -67,24 +67,36 @@ import lime.utils.Log; @:noCompletion private var __progressListeners:ArrayInt->Void>; /** - @param work Deprecated; use `Future.withEventualValue()` instead. - @param useThreads Deprecated; use `Future.withEventualValue()` instead. + @param work Optional: a function to compute this future's value. + @param useThreads Whether to run `work` on a background thread, where supported. + If false or if this isn't a system target, it will run immediately on the main thread. **/ - public function new(work:WorkFunctionT> = null, useThreads:Bool = false) + public function new(work:Void->T = null, useThreads:Bool = false) { if (work != null) { - var promise = new Promise(); - promise.future = this; - - #if (lime_threads && html5) + #if (lime_threads && !html5) if (useThreads) { - work.makePortable(); + var promise = new Promise(); + promise.future = this; + + FutureWork.run(work, promise); } + else #end - - FutureWork.run(dispatchWorkFunction, work, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED, true); + { + try + { + value = work(); + isComplete = true; + } + catch (e:Dynamic) + { + error = e; + isError = true; + } + } } } @@ -189,6 +201,7 @@ import lime.utils.Log; **/ public function ready(waitTime:Int = -1):Future { + #if (lime_threads && !html5) if (isComplete || isError) { return this; @@ -196,34 +209,22 @@ import lime.utils.Log; else { var time = System.getTimer(); - var prevTime = time; var end = time + waitTime; - while (!isComplete && !isError && time <= end) + while (!isComplete && !isError && time <= end && FutureWork.activeJobs > 0) { - if (FutureWork.activeJobs < 1) - { - Log.error('Cannot block for a Future without a "work" function.'); - return this; - } - - if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0) - { - @:privateAccess FutureWork.singleThreadPool.__update(time - prevTime); - } - else - { - #if sys - Sys.sleep(0.01); - #end - } + #if sys + Sys.sleep(0.01); + #end - prevTime = time; time = System.getTimer(); } return this; } + #else + return this; + #end } /** @@ -305,41 +306,9 @@ import lime.utils.Log; future.value = value; return future; } - - /** - Creates a `Future` instance which will asynchronously compute a value. - - Once `work()` returns a non-null value, the `Future` will finish with that value. - If `work()` throws an error, the `Future` will finish with that error instead. - @param work A function that computes a value of type `T`. - @param state An argument to pass to `work()`. As this may be used on another thread, the - main thread must not access or modify `state` until the `Future` finishes. - @param mode Whether to use real threads as opposed to green threads. Green threads rely - on cooperative multitasking, meaning `work()` must return periodically to allow other code - enough time to run. In these cases, `work()` should return null to signal that it isn't finished. - @return A new `Future` instance. - @see https://en.wikipedia.org/wiki/Cooperative_multitasking - **/ - public static function withEventualValue(work:WorkFunction Null>, state:State, mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future - { - var future = new Future(); - var promise = new Promise(); - promise.future = future; - - FutureWork.run(work, state, promise, mode); - - return future; - } - - /** - (For backwards compatibility.) Dispatches the given zero-argument function. - **/ - @:noCompletion private static function dispatchWorkFunction(work:WorkFunction T>):Null - { - return work.dispatch(); - } } +#if (lime_threads && !html5) /** The class that handles asynchronous `work` functions passed to `new Future()`. **/ @@ -349,158 +318,78 @@ import lime.utils.Log; #end @:dox(hide) class FutureWork { - @:allow(lime.app.Future) - private static var singleThreadPool:ThreadPool; - #if lime_threads - private static var multiThreadPool:ThreadPool; - // It isn't safe to pass a promise object to a web worker, but since it's - // `@:generic` we can't store it as `Promise`. Instead, we'll store - // the two methods we need. - private static var promises:Map Dynamic, error:Dynamic -> Dynamic}> = new Map(); - #end + private static var threadPool:ThreadPool; + private static var promises:MapDynamic, error:Dynamic->Dynamic}>; + public static var minThreads(default, set):Int = 0; public static var maxThreads(default, set):Int = 1; public static var activeJobs(get, never):Int; - private static function getPool(mode:ThreadMode):ThreadPool - { - #if lime_threads - if (mode == MULTI_THREADED) { - if(multiThreadPool == null) { - multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); - multiThreadPool.onComplete.add(multiThreadPool_onComplete); - multiThreadPool.onError.add(multiThreadPool_onError); - } - return multiThreadPool; - } - #end - if(singleThreadPool == null) { - singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED); - singleThreadPool.onComplete.add(singleThreadPool_onComplete); - singleThreadPool.onError.add(singleThreadPool_onError); - } - return singleThreadPool; - } - @:allow(lime.app.Future) - private static function run(work:WorkFunctionNull>, state:State, promise:Promise, mode:ThreadMode = MULTI_THREADED, legacyCode:Bool = false):Void + private static function run(work:Void->T, promise:Promise):Void { - var bundle = {work: work, state: state, promise: promise, legacyCode: legacyCode}; - - #if lime_threads - if (mode == MULTI_THREADED) + if (threadPool == null) { - #if html5 - work.makePortable(); - #end + threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); + threadPool.onComplete.add(threadPool_onComplete); + threadPool.onError.add(threadPool_onError); - bundle.promise = null; + promises = new Map(); } - #end - - var jobID:Int = getPool(mode).run(threadPool_doWork, bundle); - #if lime_threads - if (mode == MULTI_THREADED) - { - promises[jobID] = {complete: promise.complete, error: promise.error}; - } - #end + var jobID:Int = threadPool.run(threadPool_doWork, work); + promises[jobID] = {complete: promise.complete, error: promise.error}; } // Event Handlers - private static function threadPool_doWork(bundle:{work:WorkFunctionDynamic>, state:State, legacyCode:Bool}, output:WorkOutput):Void + private static function threadPool_doWork(work:Void->Dynamic, output:WorkOutput):Void { try { - var result = bundle.work.dispatch(bundle.state); - if (result != null || bundle.legacyCode) - { - #if (lime_threads && html5) - bundle.work.makePortable(); - #end - output.sendComplete(result); - } + output.sendComplete(work()); } catch (e:Dynamic) { - #if (lime_threads && html5) - bundle.work.makePortable(); - #end output.sendError(e); } } - private static function singleThreadPool_onComplete(result:Dynamic):Void + private static function threadPool_onComplete(result:Dynamic):Void { - singleThreadPool.activeJob.state.promise.complete(result); - } - - private static function singleThreadPool_onError(error:Dynamic):Void - { - singleThreadPool.activeJob.state.promise.error(error); - } - - #if lime_threads - private static function multiThreadPool_onComplete(result:Dynamic):Void - { - var promise = promises[multiThreadPool.activeJob.id]; - promises.remove(multiThreadPool.activeJob.id); + var promise = promises[threadPool.activeJob.id]; + promises.remove(threadPool.activeJob.id); promise.complete(result); } - private static function multiThreadPool_onError(error:Dynamic):Void + private static function threadPool_onError(error:Dynamic):Void { - var promise = promises[multiThreadPool.activeJob.id]; - promises.remove(multiThreadPool.activeJob.id); + var promise = promises[threadPool.activeJob.id]; + promises.remove(threadPool.activeJob.id); promise.error(error); } - #end // Getters & Setters @:noCompletion private static inline function set_minThreads(value:Int):Int { - if (singleThreadPool != null) - { - singleThreadPool.minThreads = value; - } - #if lime_threads - if (multiThreadPool != null) + if (threadPool != null) { - multiThreadPool.minThreads = value; + threadPool.minThreads = value; } - #end return minThreads = value; } @:noCompletion private static inline function set_maxThreads(value:Int):Int { - if (singleThreadPool != null) + if (threadPool != null) { - singleThreadPool.maxThreads = value; + threadPool.maxThreads = value; } - #if lime_threads - if (multiThreadPool != null) - { - multiThreadPool.maxThreads = value; - } - #end return maxThreads = value; } - @:noCompletion private static function get_activeJobs():Int + @:noCompletion private static inline function get_activeJobs():Int { - var sum:Int = 0; - if (singleThreadPool != null) - { - sum += singleThreadPool.activeJobs; - } - #if lime_threads - if (multiThreadPool != null) - { - sum += multiThreadPool.activeJobs; - } - #end - return sum; + return threadPool != null ? threadPool.activeJobs : 0; } } +#end diff --git a/src/lime/graphics/Image.hx b/src/lime/graphics/Image.hx index c825d9d84d..97385cdc3a 100644 --- a/src/lime/graphics/Image.hx +++ b/src/lime/graphics/Image.hx @@ -1002,7 +1002,7 @@ class Image return promise.future; #else - return Future.withEventualValue(fromBytes, bytes, MULTI_THREADED); + return new Future(fromBytes.bind(bytes), true); #end } diff --git a/src/lime/media/AudioBuffer.hx b/src/lime/media/AudioBuffer.hx index ff067229ab..e74b531ec3 100644 --- a/src/lime/media/AudioBuffer.hx +++ b/src/lime/media/AudioBuffer.hx @@ -31,12 +31,41 @@ import flash.net.URLRequest; @:fileXml('tags="haxe,release"') @:noDebug #end + +/** + The `AudioBuffer` class represents a buffer of audio data that can be played back using an `AudioSource`. + It supports a variety of audio formats and platforms, providing a consistent API for loading and managing audio data. + + Depending on the platform, the audio backend may differ, but the class provides a unified interface for accessing + audio data, whether it's stored in memory, loaded from a file, or streamed. + + @see lime.media.AudioSource +**/ class AudioBuffer { + /** + The number of bits per sample in the audio data. + **/ public var bitsPerSample:Int; + + /** + The number of audio channels (e.g., 1 for mono, 2 for stereo). + **/ public var channels:Int; + + /** + The raw audio data stored as a `UInt8Array`. + **/ public var data:UInt8Array; + + /** + The sample rate of the audio data, in Hz. + **/ public var sampleRate:Int; + + /** + The source of the audio data. This can be an `Audio`, `Sound`, `Howl`, or other platform-specific object. + **/ public var src(get, set):Dynamic; @:noCompletion private var __srcAudio:#if (js && html5) Audio #else Dynamic #end; @@ -57,8 +86,14 @@ class AudioBuffer } #end + /** + Creates a new, empty `AudioBuffer` instance. + **/ public function new() {} + /** + Disposes of the resources used by this `AudioBuffer`, such as unloading any associated audio data. + **/ public function dispose():Void { #if (js && html5 && lime_howlerjs) @@ -66,6 +101,12 @@ class AudioBuffer #end } + /** + Creates an `AudioBuffer` from a Base64-encoded string. + + @param base64String The Base64-encoded audio data. + @return An `AudioBuffer` instance with the decoded audio data. + **/ public static function fromBase64(base64String:String):AudioBuffer { if (base64String == null) return null; @@ -112,6 +153,12 @@ class AudioBuffer return null; } + /** + Creates an `AudioBuffer` from a `Bytes` object. + + @param bytes The `Bytes` object containing the audio data. + @return An `AudioBuffer` instance with the decoded audio data. + **/ public static function fromBytes(bytes:Bytes):AudioBuffer { if (bytes == null) return null; @@ -145,6 +192,12 @@ class AudioBuffer return null; } + /** + Creates an `AudioBuffer` from a file. + + @param path The file path to the audio data. + @return An `AudioBuffer` instance with the audio data loaded from the file. + **/ public static function fromFile(path:String):AudioBuffer { if (path == null) return null; @@ -196,6 +249,12 @@ class AudioBuffer #end } + /** + Creates an `AudioBuffer` from an array of file paths. + + @param paths An array of file paths to search for audio data. + @return An `AudioBuffer` instance with the audio data loaded from the first valid file found. + **/ public static function fromFiles(paths:Array):AudioBuffer { #if (js && html5 && lime_howlerjs) @@ -221,7 +280,14 @@ class AudioBuffer #end } + /** + Creates an `AudioBuffer` from a `VorbisFile`. + + @param vorbisFile The `VorbisFile` object containing the audio data. + @return An `AudioBuffer` instance with the decoded audio data. + **/ #if lime_vorbis + public static function fromVorbisFile(vorbisFile:VorbisFile):AudioBuffer { if (vorbisFile == null) return null; @@ -243,6 +309,12 @@ class AudioBuffer } #end + /** + Asynchronously loads an `AudioBuffer` from a file. + + @param path The file path to the audio data. + @return A `Future` that resolves to the loaded `AudioBuffer`. + **/ public static function loadFromFile(path:String):Future { #if (flash || (js && html5)) @@ -307,6 +379,12 @@ class AudioBuffer #end } + /** + Asynchronously loads an `AudioBuffer` from multiple files. + + @param paths An array of file paths to search for audio data. + @return A `Future` that resolves to the loaded `AudioBuffer`. + **/ public static function loadFromFiles(paths:Array):Future { #if (js && html5 && lime_howlerjs) @@ -335,7 +413,7 @@ class AudioBuffer return promise.future; #else - return Future.withEventualValue(fromFiles, paths, MULTI_THREADED); + return new Future(fromFiles.bind(paths), true); #end } diff --git a/src/lime/media/AudioSource.hx b/src/lime/media/AudioSource.hx index ab0992e4fe..2ccc9b1465 100644 --- a/src/lime/media/AudioSource.hx +++ b/src/lime/media/AudioSource.hx @@ -9,20 +9,71 @@ import lime.math.Vector4; @:fileXml('tags="haxe,release"') @:noDebug #end +/** + The `AudioSource` class provides a way to control audio playback in a Lime application. + It allows for playing, pausing, and stopping audio, as well as controlling various + audio properties such as gain, pitch, and looping. + + Depending on the platform, the audio backend may vary, but the API remains consistent. + + @see lime.media.AudioBuffer +**/ class AudioSource { + /** + An event that is dispatched when the audio playback is complete. + **/ public var onComplete = new EventVoid>(); + + /** + The `AudioBuffer` associated with this `AudioSource`. + **/ public var buffer:AudioBuffer; + + /** + The current playback position of the audio, in milliseconds. + **/ public var currentTime(get, set):Int; + + /** + The gain (volume) of the audio. A value of `1.0` represents the default volume. + **/ public var gain(get, set):Float; + + /** + The length of the audio, in milliseconds. + **/ public var length(get, set):Int; + + /** + The number of times the audio will loop. A value of `0` means the audio will not loop. + **/ public var loops(get, set):Int; + + /** + The pitch of the audio. A value of `1.0` represents the default pitch. + **/ public var pitch(get, set):Float; + + /** + The offset within the audio buffer to start playback, in samples. + **/ public var offset:Int; + + /** + The 3D position of the audio source, represented as a `Vector4`. + **/ public var position(get, set):Vector4; @:noCompletion private var __backend:AudioSourceBackend; + /** + Creates a new `AudioSource` instance. + @param buffer The `AudioBuffer` to associate with this `AudioSource`. + @param offset The starting offset within the audio buffer, in samples. + @param length The length of the audio to play, in milliseconds. If `null`, the full buffer is used. + @param loops The number of times to loop the audio. `0` means no looping. + **/ public function new(buffer:AudioBuffer = null, offset:Int = 0, length:Null = null, loops:Int = 0) { this.buffer = buffer; @@ -43,6 +94,9 @@ class AudioSource } } + /** + Releases any resources used by this `AudioSource`. + **/ public function dispose():Void { __backend.dispose(); @@ -53,16 +107,25 @@ class AudioSource __backend.init(); } + /** + Starts or resumes audio playback. + **/ public function play():Void { __backend.play(); } + /** + Pauses audio playback. + **/ public function pause():Void { __backend.pause(); } + /** + Stops audio playback and resets the playback position to the beginning. + **/ public function stop():Void { __backend.stop(); diff --git a/src/lime/media/openal/ALC.hx b/src/lime/media/openal/ALC.hx index 62a3b6e264..4cd4379d21 100644 --- a/src/lime/media/openal/ALC.hx +++ b/src/lime/media/openal/ALC.hx @@ -76,12 +76,13 @@ class ALC public static function getContextsDevice(context:ALContext):ALDevice { - #if (lime_cffi && lime_openal && !macro) #if !hl var handle:Dynamic = NativeCFFI.lime_alc_get_contexts_device(context); + #if (lime_cffi && lime_openal && !macro) + var handle:Dynamic = NativeCFFI.lime_alc_get_contexts_device(context); if (handle != null) { return new ALDevice(handle); - } #else #end + } #end return null; diff --git a/src/lime/system/BackgroundWorker.hx b/src/lime/system/BackgroundWorker.hx index 5be17e9eae..f92ca0ca17 100644 --- a/src/lime/system/BackgroundWorker.hx +++ b/src/lime/system/BackgroundWorker.hx @@ -1,4 +1,231 @@ package lime.system; -@:deprecated("Replace references to lime.system.BackgroundWorker with lime.system.ThreadPool. As the API is identical, no other changes are necessary.") -typedef BackgroundWorker = ThreadPool; +import lime.app.Application; +import lime.app.Event; +#if sys +#if haxe4 +import sys.thread.Deque; +import sys.thread.Thread; +#elseif cpp +import cpp.vm.Deque; +import cpp.vm.Thread; +#elseif neko +import neko.vm.Deque; +import neko.vm.Thread; +#end +#end + +/** + A `BackgroundWorker` allows the execution of a function on a background thread, + avoiding the blocking of the main thread. This is particularly useful for long-running + operations like file I/O, network requests, or computationally intensive tasks. + + ### Notes: + - **Thread Support:** Only system targets (such as C++, Neko) support threading. + - **Events:** The class uses the `Event` class to dispatch completion, error, + and progress notifications. + + @see `ThreadPool` for more advanced threading capabilities, including thread + safety, HTML5 threads, and more robust handling of tasks. +**/ +#if !lime_debug +@:fileXml('tags="haxe,release"') +@:noDebug +#end +class BackgroundWorker +{ + private static var MESSAGE_COMPLETE = "__COMPLETE__"; + private static var MESSAGE_ERROR = "__ERROR__"; + + /** + Indicates whether the worker has been canceled. + **/ + public var canceled(default, null):Bool; + + /** + Indicates whether the worker has completed its task. + **/ + public var completed(default, null):Bool; + + /** + Dispatched when the worker is about to perform its task. + The function to execute should be added as a listener to this event. + **/ + public var doWork = new EventVoid>(); + + /** + Dispatched when the worker has successfully completed its task. + **/ + public var onComplete = new EventVoid>(); + + /** + Dispatched if an error occurs during the execution of the worker's task. + **/ + public var onError = new EventVoid>(); + + /** + Dispatched periodically during the worker's task to provide progress updates. + **/ + public var onProgress = new EventVoid>(); + + @:noCompletion private var __runMessage:Dynamic; + #if (cpp || neko) + @:noCompletion private var __messageQueue:Deque; + @:noCompletion private var __workerThread:Thread; + #end + + /** + Creates a new `BackgroundWorker` instance. + **/ + public function new() {} + + /** + Cancels the worker's task if it is still running. This won't stop the thread + immediately. + **/ + public function cancel():Void + { + canceled = true; + + #if (cpp || neko) + __workerThread = null; + #end + } + + /** + Starts the worker's task, optionally passing a message to the task. + @param message An optional message to pass to the worker's task. + **/ + public function run(message:Dynamic = null):Void + { + canceled = false; + completed = false; + __runMessage = message; + + #if (cpp || neko) + __messageQueue = new Deque(); + __workerThread = Thread.create(__doWork); + + // TODO: Better way to do this + + if (Application.current != null) + { + Application.current.onUpdate.add(__update); + } + #else + __doWork(); + #end + } + + /** + Sends a completion message, indicating that the worker has finished its task. + @param message An optional message to pass to the `onComplete` event. + **/ + public function sendComplete(message:Dynamic = null):Void + { + completed = true; + + #if (cpp || neko) + __messageQueue.add(MESSAGE_COMPLETE); + __messageQueue.add(message); + #else + if (!canceled) + { + canceled = true; + onComplete.dispatch(message); + } + #end + } + + /** + Sends an error message, indicating that an error occurred during the worker's task. + @param message An optional message to pass to the `onError` event. + **/ + public function sendError(message:Dynamic = null):Void + { + #if (cpp || neko) + __messageQueue.add(MESSAGE_ERROR); + __messageQueue.add(message); + #else + if (!canceled) + { + canceled = true; + onError.dispatch(message); + } + #end + } + + /** + Sends a progress update message. + @param message An optional message to pass to the `onProgress` event. + **/ + public function sendProgress(message:Dynamic = null):Void + { + #if (cpp || neko) + __messageQueue.add(message); + #else + if (!canceled) + { + onProgress.dispatch(message); + } + #end + } + + @:noCompletion private function __doWork():Void + { + doWork.dispatch(__runMessage); + + // #if (cpp || neko) + // + // __messageQueue.add (MESSAGE_COMPLETE); + // + // #else + // + // if (!canceled) { + // + // canceled = true; + // onComplete.dispatch (null); + // + // } + // + // #end + } + + @:noCompletion private function __update(deltaTime:Int):Void + { + #if (cpp || neko) + var message = __messageQueue.pop(false); + + if (message != null) + { + if (message == MESSAGE_ERROR) + { + Application.current.onUpdate.remove(__update); + + if (!canceled) + { + canceled = true; + onError.dispatch(__messageQueue.pop(false)); + } + } + else if (message == MESSAGE_COMPLETE) + { + Application.current.onUpdate.remove(__update); + + if (!canceled) + { + canceled = true; + onComplete.dispatch(__messageQueue.pop(false)); + } + } + else + { + if (!canceled) + { + onProgress.dispatch(message); + } + } + } + #end + } +} diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 7fd6561b95..ec8669b295 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -15,12 +15,18 @@ import lime._internal.backend.html5.HTML5Thread as Thread; #end /** - A simple and thread-safe way to run a one or more asynchronous jobs. It - manages a queue of jobs, starting new ones once the old ones are done. + A thread pool executes one or more functions asynchronously. - It can also keep a certain number of threads (configurable via `minThreads`) - running in the background even when no jobs are available. This avoids the - not-insignificant overhead of stopping and restarting threads. + In multi-threaded mode, jobs run on background threads. In HTML5, this means + using web workers, which impose additional restrictions (see below). In + single-threaded mode, jobs run between frames on the main thread. To avoid + blocking, these jobs should only do a small amount of work at a time. + + In multi-threaded mode, the pool spins up new threads as jobs arrive (up to + `maxThreads`). If too many jobs arrive at once, it places them in a queue to + run when threads open up. If you run jobs frequently but not constantly, you + can also set `minThreads` to keep a certain number of threads alive, + avoiding the overhead of repeatedly spinning them up. Sample usage: @@ -33,13 +39,20 @@ import lime._internal.backend.html5.HTML5Thread as Thread; threadPool.run(processFile, url); } - For thread safety, the worker function should only give output through the - `WorkOutput` object it receives. Calling `output.sendComplete()` will - trigger an `onComplete` event on the main thread. - - @see `lime.system.WorkOutput.WorkFunction` for important information about - `doWork`. - @see https://player03.com/openfl/threads-guide/ for a tutorial. + Guidelines to make your code work on all targets and configurations: + + - For thread safety and web worker compatibility, your work function should + only return data through the `WorkOutput` object it receives. + - For web worker compatibility, you should only send data to your work + function via the `State` object. But since this can be any object, you can + put an arbitrary amount of data there. + - For web worker compatibility, your work function must be static, and you + can't `bind()` any extra arguments. + - For single-threaded performance, your function should only do a small + amount of work at a time. Store progress in the `State` object so you can + pick up where you left off. You don't have to worry about timing: just aim + to take a small fraction of the frame's time, and `ThreadPool` will keep + running the function until enough time passes. **/ #if !lime_debug @:fileXml('tags="haxe,release"') @@ -65,11 +78,13 @@ class ThreadPool extends WorkOutput /** A rough estimate of how much of the app's time should be spent on single-threaded `ThreadPool`s. For instance, the default value of 1/2 - means they will aim to take up about half the app's available time every - frame. See `workIterations` for instructions to improve the accuracy of - this estimate. + means they'll use about half the app's available time every frame. + + The accuracy of this estimate depends on how often your work functions + return. If you find that a `ThreadPool` is taking longer than scheduled, + try making the work function return more often. **/ - public static var workLoad:Float = 1/2; + public static var workLoad:Float = 1 / 2; /** __Access this only from the main thread.__ @@ -91,17 +106,6 @@ class ThreadPool extends WorkOutput #end } - /** - Indicates that no further events will be dispatched. - **/ - public var canceled(default, null):Bool = false; - - /** - Indicates that the latest job finished successfully, and no other job - has been started/is ongoing. - **/ - public var completed(default, null):Bool = false; - /** The number of live threads in this pool, including both active and idle threads. Does not count threads that have been instructed to shut down. @@ -126,13 +130,6 @@ class ThreadPool extends WorkOutput The maximum number of live threads this pool can have at once. If this value decreases, active jobs will still be allowed to finish. - - You can set this in single-threaded mode, but it's rarely useful. For - instance, suppose you have six jobs, each of which takes about a second. - If you leave `maxThreads` at 1, then one will finish every second for - six seconds. If you set `maxThreads = 6`, then none will finish for five - seconds, and then they'll all finish at once. The total duration is - unchanged, but none of them finish early. **/ public var maxThreads:Int; @@ -140,10 +137,8 @@ class ThreadPool extends WorkOutput __Set this only from the main thread.__ The number of threads that will be kept alive at all times, even if - there's no work to do. Setting this won't add new threads, it'll just - keep existing ones running. - - Has no effect in single-threaded mode. + there's no work to do. Setting this won't immediately spin up new + threads; you must still call `run()` to get them started. **/ public var minThreads:Int; @@ -152,16 +147,19 @@ class ThreadPool extends WorkOutput Dispatched at most once per job. **/ public var onComplete(default, null) = new EventVoid>(); + /** Dispatched on the main thread when `doWork` calls `sendError()`. Dispatched at most once per job. **/ public var onError(default, null) = new EventVoid>(); + /** Dispatched on the main thread when `doWork` calls `sendProgress()`. May be dispatched any number of times per job. **/ public var onProgress(default, null) = new EventVoid>(); + /** Dispatched on the main thread when a new job begins. Dispatched exactly once per job. @@ -180,6 +178,7 @@ class ThreadPool extends WorkOutput @:deprecated("Instead pass the callback to ThreadPool.run().") @:noCompletion @:dox(hide) public var doWork(get, never):PseudoEvent; + private var __doWork:WorkFunctionWorkOutput->Void>; private var __activeJobs:JobList; @@ -188,13 +187,13 @@ class ThreadPool extends WorkOutput /** The set of threads actively running a job. **/ - private var __activeThreads:Map = new Map(); + private var __activeThreads:Map; /** A list of idle threads. Not to be confused with `idleThreads`, a public variable equal to `__idleThreads.length`. **/ - private var __idleThreads:List = new List(); + private var __idleThreads:Array; #end private var __jobQueue:JobList = new JobList(); @@ -219,6 +218,14 @@ class ThreadPool extends WorkOutput this.minThreads = minThreads; this.maxThreads = maxThreads; + + #if lime_threads + if (this.mode == MULTI_THREADED) + { + __activeThreads = new Map(); + __idleThreads = []; + } + #end } /** @@ -245,12 +252,12 @@ class ThreadPool extends WorkOutput var thread:Thread = __activeThreads[job.id]; if (idleThreads < minThreads) { - thread.sendMessage(new ThreadEvent(WORK, null, null)); + thread.sendMessage({event: CANCEL}); __idleThreads.push(thread); } else { - thread.sendMessage(new ThreadEvent(EXIT, null, null)); + thread.sendMessage({event: EXIT}); } } #end @@ -270,10 +277,10 @@ class ThreadPool extends WorkOutput __activeJobs.clear(); #if lime_threads - // Cancel idle threads if there are more than the minimum. + // Exit idle threads if there are more than the minimum. while (idleThreads > minThreads) { - __idleThreads.pop().sendMessage(new ThreadEvent(EXIT, null, null)); + __idleThreads.pop().sendMessage({event: EXIT}); } #end @@ -290,36 +297,25 @@ class ThreadPool extends WorkOutput __jobComplete.value = false; activeJob = null; - completed = false; - canceled = true; } /** Cancels one active or queued job. Does not dispatch an error event. - @param job A `JobData` object, or a job's unique `id`, `state`, or - `doWork` function. @return Whether a job was canceled. **/ - public function cancelJob(job:JobIdentifier):Bool + public function cancelJob(jobID:Int):Bool { - var data:JobData = __activeJobs.get(job); - - if (data != null) + #if lime_threads + var thread:Thread = __activeThreads[jobID]; + if (thread != null) { - #if lime_threads - var thread:Thread = __activeThreads[data.id]; - if (thread != null) - { - thread.sendMessage(new ThreadEvent(WORK, null, null)); - __activeThreads.remove(data.id); - __idleThreads.push(thread); - } - #end - - return __activeJobs.remove(data); + thread.sendMessage({event: CANCEL}); + __activeThreads.remove(jobID); + __idleThreads.push(thread); } + #end - return __jobQueue.remove(__jobQueue.get(job)); + return __activeJobs.remove(__activeJobs.get(jobID)) || __jobQueue.remove(__jobQueue.get(jobID)); } /** @@ -331,7 +327,13 @@ class ThreadPool extends WorkOutput } /** - Queues a new job, to be run once a thread becomes available. + Runs the given function asynchronously, or queues it for later if all + threads are busy. + @param doWork The function to run. For best results, see the guidelines + in the `ThreadPool` class overview. In brief: `doWork` should be static, + only access its arguments, and return often. + @param state An object to pass to `doWork`, ideally a mutable object so + that `doWork` can save its progress. @return The job's unique ID. **/ public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null):Int @@ -360,8 +362,6 @@ class ThreadPool extends WorkOutput var job:JobData = new JobData(doWork, state); __jobQueue.push(job); - completed = false; - canceled = false; if (!Application.current.onUpdate.has(__update)) { @@ -382,6 +382,7 @@ class ThreadPool extends WorkOutput **/ private static function __executeThread():Void { + // @formatter:off JSAsync.async({ var output:WorkOutput = #if html5 new WorkOutput(MULTI_THREADED) #else cast(Thread.readMessage(true), WorkOutput) #end; var event:ThreadEvent = null; @@ -395,7 +396,7 @@ class ThreadPool extends WorkOutput { event = Thread.readMessage(true); } - while (!#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (event, ThreadEvent)); + while (event == null || !Reflect.hasField(event, "event")); output.resetJobProgress(); } @@ -438,9 +439,9 @@ class ThreadPool extends WorkOutput if (interruption == null || output.__jobComplete.value) { // Work is done; wait for more. - event = null; + event = interruption; } - else if(#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (interruption, ThreadEvent)) + else if (Reflect.hasField(interruption, "event")) { // Work on the new job. event = interruption; @@ -454,6 +455,7 @@ class ThreadPool extends WorkOutput // Do it all again. } }); + // @formatter:on } #end @@ -492,9 +494,9 @@ class ThreadPool extends WorkOutput job.doWork.makePortable(); #end - var thread:Thread = __idleThreads.isEmpty() ? createThread(__executeThread) : __idleThreads.pop(); + var thread:Thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); __activeThreads[job.id] = thread; - thread.sendMessage(new ThreadEvent(WORK, null, job)); + thread.sendMessage({event: WORK, job: job}); } #end } @@ -511,8 +513,7 @@ class ThreadPool extends WorkOutput // `workLoad / frameRate` is the total time that pools may use per // frame. `workPriority / __totalWorkPriority` is this pool's // fraction of that total. - var maxTimeElapsed:Float = workPriority * workLoad - / (__totalWorkPriority * Application.current.window.frameRate); + var maxTimeElapsed:Float = workPriority * workLoad / (__totalWorkPriority * Application.current.window.frameRate); var startTime:Float = timestamp(); var timeElapsed:Float = 0; @@ -539,15 +540,19 @@ class ThreadPool extends WorkOutput var threadEvent:ThreadEvent; while ((threadEvent = __jobOutput.pop(false)) != null) { - if (!__activeJobs.exists(threadEvent.job)) + if (threadEvent.jobID != null) { - // Ignore events from canceled jobs. - continue; + activeJob = __activeJobs.get(threadEvent.jobID); + } + else + { + activeJob = threadEvent.job; } - // Get by ID because in HTML5, the object will have been cloned, - // which will interfere with attempts to test equality. - activeJob = __activeJobs.getByID(threadEvent.job.id); + if (activeJob == null || !__activeJobs.exists(activeJob)) + { + continue; + } if (mode == MULTI_THREADED) { @@ -582,7 +587,7 @@ class ThreadPool extends WorkOutput if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) { - thread.sendMessage(new ThreadEvent(EXIT, null, null)); + thread.sendMessage({event: EXIT}); } else { @@ -591,15 +596,13 @@ class ThreadPool extends WorkOutput } #end - completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.length == 0; - default: } activeJob = null; } - if (completed) + if (activeJobs == 0 && __jobQueue.length == 0) { Application.current.onUpdate.remove(__update); } @@ -634,8 +637,6 @@ class ThreadPool extends WorkOutput return activeJobs + idleThreads; } - // Note the distinction between `doWork` and `__doWork`: the former is for - // backwards compatibility, while the latter is always used. private function get_doWork():PseudoEvent { return this; @@ -651,34 +652,57 @@ class ThreadPool extends WorkOutput } } -@:access(lime.system.ThreadPool) @:forward(canceled) -private abstract PseudoEvent(ThreadPool) from ThreadPool { +@:access(lime.system.ThreadPool) +private abstract PseudoEvent(ThreadPool) from ThreadPool +{ @:noCompletion @:dox(hide) public var __listeners(get, never):Array; - private inline function get___listeners():Array { return []; }; + + private inline function get___listeners():Array + { + return []; + }; + @:noCompletion @:dox(hide) public var __repeat(get, never):Array; - private inline function get___repeat():Array { return []; }; - public function add(callback:Dynamic -> Void):Void { + private inline function get___repeat():Array + { + return []; + }; + + public function add(callback:Dynamic->Void):Void + { function callCallback(state:State, output:WorkOutput):Void { callback(state); } #if (lime_threads && html5) - if (this.mode == MULTI_THREADED) - throw "Unsupported operation; instead pass the callback to ThreadPool's constructor."; + if (this.mode == MULTI_THREADED) throw "Unsupported operation; instead pass the callback to ThreadPool's constructor."; else - this.__doWork = { func: callCallback }; + this.__doWork = {func: callCallback}; #else this.__doWork = callCallback; #end } public inline function cancel():Void {} + public inline function dispatch():Void {} - public inline function has(callback:Dynamic -> Void):Bool { return this.__doWork != null; } - public inline function remove(callback:Dynamic -> Void):Void { this.__doWork = null; } - public inline function removeAll():Void { this.__doWork = null; } + + public inline function has(callback:Dynamic->Void):Bool + { + return this.__doWork != null; + } + + public inline function remove(callback:Dynamic->Void):Void + { + this.__doWork = null; + } + + public inline function removeAll():Void + { + this.__doWork = null; + } } class JobList @@ -717,7 +741,7 @@ class JobList public inline function exists(job:JobData):Bool { - return getByID(job.id) != null; + return get(job.id) != null; } public inline function hasNext():Bool @@ -765,7 +789,7 @@ class JobList public inline function removeByID(id:Int):Bool { - if (__jobs.remove(getByID(id))) + if (__jobs.remove(get(id))) { __addingWorkPriority = length > 0; return true; @@ -776,7 +800,7 @@ class JobList } } - public function getByID(id:Int):JobData + public function get(id:Int):JobData { for (job in __jobs) { @@ -787,33 +811,6 @@ class JobList } return null; } - - public function get(jobIdentifier:JobIdentifier):JobData - { - switch (jobIdentifier) - { - case ID(id): - return getByID(id); - case FUNCTION(doWork): - for (job in __jobs) - { - if (job.doWork == doWork) - { - return job; - } - } - case STATE(state): - for (job in __jobs) - { - if (job.state == state) - { - return job; - } - } - } - return null; - } - public inline function push(job:JobData):Void { __jobs.push(job); @@ -822,7 +819,8 @@ class JobList // Getters & Setters - private inline function set___addingWorkPriority(value:Bool):Bool { + private inline function set___addingWorkPriority(value:Bool):Bool + { if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread()) { if (value) @@ -846,35 +844,3 @@ class JobList return __jobs.length; } } - -/** - A piece of data that uniquely represents a job. This can be the integer ID - (and integers will be assumed to be such), the `doWork` function, or the - `JobData` object itself. Failing any of those, a value will be assumed to be - the job's `state`. - - Caution: if the provided data isn't unique, such as a `doWork` function - that's in use by multiple jobs, the wrong job may be selected or canceled. -**/ -@:forward -abstract JobIdentifier(JobIdentifierImpl) from JobIdentifierImpl { - @:from private static inline function fromJob(job:JobData):JobIdentifier { - return ID(job.id); - } - @:from private static inline function fromID(id:Int):JobIdentifier { - return ID(id); - } - @:from private static inline function fromFunction(doWork:WorkFunctionWorkOutput->Void>):JobIdentifier { - return FUNCTION(doWork); - } - @:from private static inline function fromState(state:State):JobIdentifier { - return STATE(state); - } -} - -private enum JobIdentifierImpl -{ - ID(id:Int); - FUNCTION(doWork:WorkFunctionWorkOutput->Void>); - STATE(state:State); -} diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index 433fad10e3..c2673703bd 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -13,12 +13,10 @@ import neko.vm.Deque; import neko.vm.Thread; import neko.vm.Tls; #end - #if html5 import lime._internal.backend.html5.HTML5Thread as Thread; import lime._internal.backend.html5.HTML5Thread.Transferable; #end - #if macro import haxe.macro.Expr; @@ -44,8 +42,9 @@ class WorkOutput the current job, including (if applicable) the ongoing call. In single-threaded mode, it only counts the number of calls this frame. - This helps you adjust `doWork`'s length: too few iterations per frame - means `workLoad` may be inaccurate, while too many may add overhead. + The lower the number, the less accurate `ThreadPool.workLoad` becomes, + but the higher the number, the more overhead there is. As a ballpark + estimate, aim for 10-100 iterations. **/ public var workIterations(default, null):Tls = new Tls(); @@ -54,6 +53,7 @@ class WorkOutput available on this target, `mode` will always be `SINGLE_THREADED`. **/ public var mode(get, never):ThreadMode; + #if lime_threads /** __Set this only via the constructor.__ @@ -65,6 +65,7 @@ class WorkOutput Messages sent by active jobs, received by the main thread. **/ private var __jobOutput:Deque = new Deque(); + /** Thread-local storage. Tracks whether `sendError()` or `sendComplete()` was called by this job. @@ -77,6 +78,7 @@ class WorkOutput Will be null in all other cases. **/ public var activeJob(get, set):Null; + @:noCompletion private var __activeJob:Tls = new Tls(); private inline function new(mode:Null) @@ -105,12 +107,11 @@ class WorkOutput #if (lime_threads && html5) if (mode == MULTI_THREADED) { - activeJob.doWork.makePortable(); - Thread.returnMessage(new ThreadEvent(COMPLETE, message, activeJob), transferList); + Thread.returnMessage({event: COMPLETE, message: message, jobID: activeJob.id}, transferList); } else #end - __jobOutput.add(new ThreadEvent(COMPLETE, message, activeJob)); + __jobOutput.add({event: COMPLETE, message: message, jobID: activeJob.id}); } } @@ -130,12 +131,11 @@ class WorkOutput #if (lime_threads && html5) if (mode == MULTI_THREADED) { - activeJob.doWork.makePortable(); - Thread.returnMessage(new ThreadEvent(ERROR, message, activeJob), transferList); + Thread.returnMessage({event: ERROR, message: message, jobID: activeJob.id}, transferList); } else #end - __jobOutput.add(new ThreadEvent(ERROR, message, activeJob)); + __jobOutput.add({event: ERROR, message: message, jobID: activeJob.id}); } } @@ -153,12 +153,11 @@ class WorkOutput #if (lime_threads && html5) if (mode == MULTI_THREADED) { - activeJob.doWork.makePortable(); - Thread.returnMessage(new ThreadEvent(PROGRESS, message, activeJob), transferList); + Thread.returnMessage({event: PROGRESS, message: message, jobID: activeJob.id}, transferList); } else #end - __jobOutput.add(new ThreadEvent(PROGRESS, message, activeJob)); + __jobOutput.add({event: PROGRESS, message: message, jobID: activeJob.id}); } } @@ -174,7 +173,8 @@ class WorkOutput var thread:Thread = Thread.create(executeThread); #if html5 - thread.onMessage.add(function(event:ThreadEvent) { + thread.onMessage.add(function(event:ThreadEvent) + { __jobOutput.add(event); }); #end @@ -198,6 +198,7 @@ class WorkOutput { return __activeJob.value; } + private inline function set_activeJob(value:JobData):JobData { return __activeJob.value = value; @@ -236,21 +237,18 @@ class WorkOutput /** A function that performs asynchronous work. This can either be work on - another thread ("multi-threaded mode"), or it can represent a virtual - thread ("single-threaded mode"). + another thread ("multi-threaded mode"), or it can represent a green thread + ("single-threaded mode"). In single-threaded mode, the work function shouldn't complete the job all at once, as the main thread would lock up. Instead, it should perform a fraction of the job each time it's called. `ThreadPool` provides the - function with a persistent `State` argument that can track progress. - Alternatively, you may be able to bind your own `State` argument. + function with a persistent `State` argument for tracking progress, which can + be any object of your choice. Caution: if using multi-threaded mode in HTML5, this must be a static function and binding arguments is forbidden. Compile with `-Dlime-warn-portability` to highlight functions that won't work. - - The exact length of `doWork` can vary, but single-threaded mode will run - more smoothly if it's short enough to run several times per frame. **/ #if (lime_threads && html5) typedef WorkFunction = lime._internal.backend.html5.HTML5Thread.WorkFunction; @@ -264,8 +262,8 @@ abstract WorkFunction(T) from T to T { switch (self.typeof().follow().toComplexType()) { - case TPath({ sub: "WorkFunction", params: [TPType(t)] }): - return macro ($self:$t)($a{args}); + case TPath({sub: "WorkFunction", params: [TPType(t)]}): + return macro($self : $t)($a{args}); default: throw "Underlying function type not found."; } @@ -278,8 +276,8 @@ abstract WorkFunction(T) from T to T only accepts a single argument, you can pass multiple values as part of an anonymous structure. (Or an array, or a class.) - // Does not work: too many arguments. - // threadPool.run(doWork, argument0, argument1, argument2); + // Does not work: too many arguments. + // threadPool.run(doWork, argument0, argument1, argument2); // Works: all arguments are combined into one `State` object. threadPool.run(doWork, { arg0: argument0, arg1: argument1, arg2: argument2 }); @@ -302,6 +300,7 @@ typedef State = Dynamic; class JobData { private static var nextID:Int = 0; + /** `JobData` instances will regularly be copied in HTML5, so checking equality won't work. Instead, compare identifiers. @@ -342,44 +341,25 @@ class JobData } #if haxe4 enum #else @:enum #end abstract ThreadEventType(String) + { - /** - Sent by the background thread, indicating completion. - **/ + // Events sent from a worker thread to the main thread var COMPLETE = "COMPLETE"; - /** - Sent by the background thread, indicating failure. - **/ var ERROR = "ERROR"; - /** - Sent by the background thread. - **/ var PROGRESS = "PROGRESS"; - /** - Sent by the main thread, indicating that the provided job should begin - in place of any ongoing job. If `state == null`, the existing job will - stop and the thread will go idle. (To run a job with no argument, set - `state = {}` instead.) - **/ + + // Commands sent from the main thread to a worker thread var WORK = "WORK"; - /** - Sent by the main thread to shut down a thread. - **/ + var CANCEL = "CANCEL"; var EXIT = "EXIT"; } -class ThreadEvent +typedef ThreadEvent = { - public var event(default, null):ThreadEventType; - public var message(default, null):State; - public var job(default, null):JobData; - - public inline function new(event:ThreadEventType, message:State, job:JobData) - { - this.event = event; - this.message = message; - this.job = job; - } + var event:ThreadEventType; + @:optional var message:Dynamic; + @:optional var job:JobData; + @:optional var jobID:Int; } class JSAsync @@ -403,7 +383,6 @@ class JSAsync } // Define platform-specific types - #if target.threaded // Haxe 3 compatibility: "target.threaded" can't go in parentheses. #elseif !(cpp || neko)