diff --git a/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxImplicitBlock.java b/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxImplicitBlock.java
new file mode 100644
index 0000000000..17c8098c09
--- /dev/null
+++ b/error-prone-contrib/src/main/java/tech/picnic/errorprone/bugpatterns/FluxImplicitBlock.java
@@ -0,0 +1,100 @@
+package tech.picnic.errorprone.bugpatterns;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.errorprone.BugPattern.LinkType.CUSTOM;
+import static com.google.errorprone.BugPattern.SeverityLevel.WARNING;
+import static com.google.errorprone.BugPattern.StandardTags.CONCURRENCY;
+import static com.google.errorprone.BugPattern.StandardTags.PERFORMANCE;
+import static com.google.errorprone.matchers.method.MethodMatchers.instanceMethod;
+import static tech.picnic.errorprone.bugpatterns.util.Documentation.BUG_PATTERNS_BASE_URL;
+
+import com.google.auto.service.AutoService;
+import com.google.errorprone.BugPattern;
+import com.google.errorprone.VisitorState;
+import com.google.errorprone.bugpatterns.BugChecker;
+import com.google.errorprone.bugpatterns.BugChecker.MethodInvocationTreeMatcher;
+import com.google.errorprone.fixes.SuggestedFix;
+import com.google.errorprone.fixes.SuggestedFixes;
+import com.google.errorprone.matchers.Description;
+import com.google.errorprone.matchers.Matcher;
+import com.google.errorprone.suppliers.Supplier;
+import com.google.errorprone.suppliers.Suppliers;
+import com.google.errorprone.util.ASTHelpers;
+import com.sun.source.tree.ExpressionTree;
+import com.sun.source.tree.MethodInvocationTree;
+import com.sun.tools.javac.code.Type;
+import com.sun.tools.javac.util.Position;
+import java.util.stream.Stream;
+import tech.picnic.errorprone.bugpatterns.util.ThirdPartyLibrary;
+
+/**
+ * A {@link BugChecker} that flags {@link reactor.core.publisher.Flux} operator usages that may
+ * implicitly cause the calling thread to be blocked.
+ *
+ *
Note that the methods flagged here are not themselves blocking, but iterating over the
+ * resulting {@link Iterable} or {@link Stream} may be.
+ */
+@AutoService(BugChecker.class)
+@BugPattern(
+ summary = "Avoid iterating over `Flux`es in an implicitly blocking manner",
+ link = BUG_PATTERNS_BASE_URL + "FluxImplicitBlock",
+ linkType = CUSTOM,
+ severity = WARNING,
+ tags = {CONCURRENCY, PERFORMANCE})
+public final class FluxImplicitBlock extends BugChecker implements MethodInvocationTreeMatcher {
+ private static final long serialVersionUID = 1L;
+ private static final Matcher FLUX_WITH_IMPLICIT_BLOCK =
+ instanceMethod()
+ .onDescendantOf("reactor.core.publisher.Flux")
+ .namedAnyOf("toIterable", "toStream")
+ .withNoParameters();
+ private static final Supplier STREAM = Suppliers.typeFromString(Stream.class.getName());
+
+ /** Instantiates a new {@link FluxImplicitBlock} instance. */
+ public FluxImplicitBlock() {}
+
+ @Override
+ public Description matchMethodInvocation(MethodInvocationTree tree, VisitorState state) {
+ if (!FLUX_WITH_IMPLICIT_BLOCK.matches(tree, state)) {
+ return Description.NO_MATCH;
+ }
+
+ Description.Builder description =
+ buildDescription(tree).addFix(SuggestedFixes.addSuppressWarnings(state, canonicalName()));
+ if (ThirdPartyLibrary.GUAVA.isIntroductionAllowed(state)) {
+ description.addFix(
+ suggestBlockingElementCollection(
+ tree, "com.google.common.collect.ImmutableList.toImmutableList", state));
+ }
+ description.addFix(
+ suggestBlockingElementCollection(tree, "java.util.stream.Collectors.toList", state));
+
+ return description.build();
+ }
+
+ private static SuggestedFix suggestBlockingElementCollection(
+ MethodInvocationTree tree, String fullyQualifiedCollectorMethod, VisitorState state) {
+ SuggestedFix.Builder importSuggestion = SuggestedFix.builder();
+ String replacementMethodInvocation =
+ SuggestedFixes.qualifyStaticImport(fullyQualifiedCollectorMethod, importSuggestion, state);
+
+ boolean isStream =
+ ASTHelpers.isSubtype(ASTHelpers.getResultType(tree), STREAM.get(state), state);
+ String replacement =
+ String.format(
+ ".collect(%s()).block()%s", replacementMethodInvocation, isStream ? ".stream()" : "");
+ return importSuggestion.merge(replaceMethodInvocation(tree, replacement, state)).build();
+ }
+
+ private static SuggestedFix.Builder replaceMethodInvocation(
+ MethodInvocationTree tree, String replacement, VisitorState state) {
+ int startPosition = state.getEndPosition(ASTHelpers.getReceiver(tree));
+ int endPosition = state.getEndPosition(tree);
+
+ checkState(
+ startPosition != Position.NOPOS && endPosition != Position.NOPOS,
+ "Cannot locate method to be replaced in source code");
+
+ return SuggestedFix.builder().replace(startPosition, endPosition, replacement);
+ }
+}
diff --git a/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxImplicitBlockTest.java b/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxImplicitBlockTest.java
new file mode 100644
index 0000000000..6390b2d041
--- /dev/null
+++ b/error-prone-contrib/src/test/java/tech/picnic/errorprone/bugpatterns/FluxImplicitBlockTest.java
@@ -0,0 +1,182 @@
+package tech.picnic.errorprone.bugpatterns;
+
+import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.containsPattern;
+import static com.google.common.base.Predicates.not;
+import static com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers.SECOND;
+import static com.google.errorprone.BugCheckerRefactoringTestHelper.FixChoosers.THIRD;
+
+import com.google.errorprone.BugCheckerRefactoringTestHelper;
+import com.google.errorprone.BugCheckerRefactoringTestHelper.TestMode;
+import com.google.errorprone.CompilationTestHelper;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
+import reactor.core.CorePublisher;
+import reactor.core.publisher.Flux;
+
+final class FluxImplicitBlockTest {
+ @Test
+ void identification() {
+ CompilationTestHelper.newInstance(FluxImplicitBlock.class, getClass())
+ .expectErrorMessage(
+ "X",
+ and(
+ containsPattern("SuppressWarnings"),
+ containsPattern("toImmutableList"),
+ containsPattern("toList")))
+ .addSourceLines(
+ "A.java",
+ "import com.google.common.collect.ImmutableList;",
+ "import java.util.stream.Stream;",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " // BUG: Diagnostic matches: X",
+ " Flux.just(1).toIterable();",
+ " // BUG: Diagnostic matches: X",
+ " Flux.just(2).toStream();",
+ " // BUG: Diagnostic matches: X",
+ " long count = Flux.just(3).toStream().count();",
+ "",
+ " Flux.just(4).toIterable(1);",
+ " Flux.just(5).toIterable(2, null);",
+ " Flux.just(6).toStream(3);",
+ " new Foo().toIterable();",
+ " new Foo().toStream();",
+ " }",
+ "",
+ " class Foo {",
+ " Iterable toIterable() {",
+ " return ImmutableList.of();",
+ " }",
+ "",
+ " Stream toStream() {",
+ " return Stream.empty();",
+ " }",
+ " }",
+ "}")
+ .doTest();
+ }
+
+ @Test
+ void identificationWithoutGuavaOnClasspath() {
+ CompilationTestHelper.newInstance(FluxImplicitBlock.class, getClass())
+ .withClasspath(CorePublisher.class, Flux.class, Publisher.class)
+ .expectErrorMessage("X", not(containsPattern("toImmutableList")))
+ .addSourceLines(
+ "A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " // BUG: Diagnostic matches: X",
+ " Flux.just(1).toIterable();",
+ " // BUG: Diagnostic matches: X",
+ " Flux.just(2).toStream();",
+ " }",
+ "}")
+ .doTest();
+ }
+
+ @Test
+ void replacementFirstSuggestedFix() {
+ BugCheckerRefactoringTestHelper.newInstance(FluxImplicitBlock.class, getClass())
+ .addInputLines(
+ "A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).toIterable();",
+ " Flux.just(2).toStream();",
+ " }",
+ "}")
+ .addOutputLines(
+ "A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " @SuppressWarnings(\"FluxImplicitBlock\")",
+ " void m() {",
+ " Flux.just(1).toIterable();",
+ " Flux.just(2).toStream();",
+ " }",
+ "}")
+ .doTest(TestMode.TEXT_MATCH);
+ }
+
+ @Test
+ void replacementSecondSuggestedFix() {
+ BugCheckerRefactoringTestHelper.newInstance(FluxImplicitBlock.class, getClass())
+ .setFixChooser(SECOND)
+ .addInputLines(
+ "A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).toIterable();",
+ " Flux.just(2).toStream();",
+ " Flux.just(3).toIterable().iterator();",
+ " Flux.just(4).toStream().count();",
+ " Flux.just(5) /* a */./* b */ toIterable /* c */(/* d */ ) /* e */;",
+ " Flux.just(6) /* a */./* b */ toStream /* c */(/* d */ ) /* e */;",
+ " }",
+ "}")
+ .addOutputLines(
+ "A.java",
+ "import static com.google.common.collect.ImmutableList.toImmutableList;",
+ "",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).collect(toImmutableList()).block();",
+ " Flux.just(2).collect(toImmutableList()).block().stream();",
+ " Flux.just(3).collect(toImmutableList()).block().iterator();",
+ " Flux.just(4).collect(toImmutableList()).block().stream().count();",
+ " Flux.just(5).collect(toImmutableList()).block() /* e */;",
+ " Flux.just(6).collect(toImmutableList()).block().stream() /* e */;",
+ " }",
+ "}")
+ .doTest(TestMode.TEXT_MATCH);
+ }
+
+ @Test
+ void replacementThirdSuggestedFix() {
+ BugCheckerRefactoringTestHelper.newInstance(FluxImplicitBlock.class, getClass())
+ .setFixChooser(THIRD)
+ .addInputLines(
+ "A.java",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).toIterable();",
+ " Flux.just(2).toStream();",
+ " Flux.just(3).toIterable().iterator();",
+ " Flux.just(4).toStream().count();",
+ " Flux.just(5) /* a */./* b */ toIterable /* c */(/* d */ ) /* e */;",
+ " Flux.just(6) /* a */./* b */ toStream /* c */(/* d */ ) /* e */;",
+ " }",
+ "}")
+ .addOutputLines(
+ "A.java",
+ "import static java.util.stream.Collectors.toList;",
+ "",
+ "import reactor.core.publisher.Flux;",
+ "",
+ "class A {",
+ " void m() {",
+ " Flux.just(1).collect(toList()).block();",
+ " Flux.just(2).collect(toList()).block().stream();",
+ " Flux.just(3).collect(toList()).block().iterator();",
+ " Flux.just(4).collect(toList()).block().stream().count();",
+ " Flux.just(5).collect(toList()).block() /* e */;",
+ " Flux.just(6).collect(toList()).block().stream() /* e */;",
+ " }",
+ "}")
+ .doTest(TestMode.TEXT_MATCH);
+ }
+}