-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to disable RxJavaAssemblyTracking in specific observable #104
Comments
I can't think of any good way of doing this. If you can make sure you create chains on a single thread, you can turn the tracking on and off. |
Correct me if I am wrong. Some rough approach. RxJavaAssemblyTracking.enable() works via setOnObservableAssembly. We can create custom rx operator that adds current Observable's hashCode to designated list. |
How would you remove from that list? You could experiment with it by taking the hooks from RxJavaAssemblyTracking.enable();
var set = new ConcurrentHashMap<Integer, Object>();
var observableHook = RxJavaPlugins.getOnObservableAssembly();
RxJavaPlugins.setOnObservableAssembly(o -> {
if (set.containsKey(o.hashCode()) {
return observableHook.apply(o);
}
return o;
}); |
Thanks fot your snippet! I will depart from it.
When onSubscribe called, one can remove hashCode from list. |
This is what I came up with. I rely on observable's source. Current blockers in RxJava3 ver 3.0.10:
I am glad for suggestions on where it may fail as well as general comments. Working snippet: Observable
.just(1)
.compose(AssemblyStop.stopAssemblyTrackingDownstream())
// .flatMap (__ -> Observable.just(2) ) //<--- Breaks chain
// .compose (__ -> Observable.just(1) ) //<--- Breaks chain
.subscribeOn(Schedulers.io())
.map (it -> it + 1)
.observeOn(Schedulers.computation())
.subscribe(); Implementation: import java.util.HashSet;
import java.util.Set;
import hu.akarnokd.rxjava3.debug.RxJavaAssemblyTracking;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.rxjava3.internal.observers.LambdaObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
// Java 8
// Uses two Rx hooks: setOnObservableAssembly and setOnObservableSubscribe
// Note: switching to different observable (compose, flatMap) will yield IllegalStateException in subscribe
public class AssemblyStop {
private static final Set<String> registry = new HashSet<>();
static <T> ObservableTransformer<T, T> stopAssemblyTrackingDownstream() {
return observable -> {
registry.add(observable.toString());
return observable;
};
}
@SuppressWarnings("rawtypes")
static void init() {
RxJavaAssemblyTracking.enable();
Function observableHook = RxJavaPlugins.getOnObservableAssembly();
RxJavaPlugins.setOnObservableAssembly(o -> {
if(registry.contains(o.toString())) return o; // This is result of our compose operator
if(!(o instanceof HasUpstreamObservableSource)) {
// Operator w/o source like Just or Zip with multiple sources
return (Observable) observableHook.apply(o);
}
Object source = ((HasUpstreamObservableSource) o).source();
String sourceHash = source.toString();
if(registry.contains(sourceHash)) {
// Request to stop assembly tracking
// We continue to track this object
registry.remove(sourceHash);
registry.add(o.toString());
return o;
} else return (Observable) observableHook.apply(o);
});
RxJavaPlugins.setOnObservableSubscribe( (observable, observer) -> {
if(observer instanceof LambdaObserver) {
// Check if our chain was broken by some compose
String hash = observable.toString();
if(!registry.contains(hash)) throw new IllegalStateException("AssemblyStop chain was broken. Did you switch observable (compose, flatMap) ?");
registry.remove(observable.toString());
}
return observer;
});
}
} |
You have disconnected output, there is no way to link it to the original inside the operator.
You may have to prevent optimizations by using In general, what you try to achieve is highly contextual and as such would need a different architecture for RxJava. Alternative would be an RxJava-backed custom factory for operators so you control the application of operators and thus can attach extra context to flow. Basically as if you'd write a |
Thanks for thorough comment!
Yeah, switching observables! What do you think about adding extra hook inside
Thanks, I will research on if (this instanceof ScalarSupplier) {
@SuppressWarnings("unchecked")
T v = ((ScalarSupplier<T>)this).get();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize));
I was inspired by following comment in RxJava issues. Quote: "RxJavaAssemblyTracking ... comes with performance drawbacks. It's a global on/off switch. ..." Could you give concrete examples when different architecture will be needed (i.e. relying on My goal is to create approach that fits into current RxJava architecture or requires very minor additions (or achievable with simple bytecode manipulation on complile time via Gradle plugin).
I hope I will not need this :) Ok, it will be last resort. Gradle plugin that will replace rxjava imports with mine. |
I'm not fond of keeping a reference to something otherwise not used.
Anything that would otherwise cause multi-wrapping. Also anything relying on internal components. |
I decided to go with this approach:
Basically my operator will wrap into my proxy observable, that wraps all observable methods. |
To make RxJavaAssemblyTracking work everywhere except high-load places.
Smth like
The text was updated successfully, but these errors were encountered: