Skip to content

Commit

Permalink
Move PR processing out of initial loop
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jul 23, 2020
1 parent bcca5c8 commit 9ab4292
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 117 deletions.
137 changes: 71 additions & 66 deletions src/main/java/com/ververica/CiBot.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.ververica.github.GitHubCheckerStatus;
import com.ververica.github.GithubActionsImpl;
import com.ververica.utils.ConsumerWithException;
import com.ververica.utils.FunctionWithException;
import com.ververica.utils.RevisionInformation;
import org.apache.commons.io.FileUtils;
import org.eclipse.jgit.api.errors.TransportException;
Expand Down Expand Up @@ -175,82 +176,86 @@ public void close() {
}

private void tick(Date lastUpdateTime) throws Exception {
final ObservedState observedRepositoryState = core.fetchGithubState(lastUpdateTime);

final ConsumerWithException<CiReport, Exception> pullRequestProcessor = ciReport -> {
final int pullRequestID = ciReport.getPullRequestID();
core.getPullRequests(lastUpdateTime)
.map(FunctionWithException.wrap(
core::processPullRequest,
(r, e) -> LOG.error("Error while processing pull request {}.", formatPullRequestID(r.getID()), e)))
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(ConsumerWithException.wrap(
this::processCiReport,
(r, e) -> LOG.error("Error while processing pull request {}.", formatPullRequestID(r.getPullRequestID()), e)));
}

if (core.isPullRequestClosed(pullRequestID)) {
LOG.info("PullRequest {} was closed; canceling builds and deleting branches.", formatPullRequestID(pullRequestID));
ciReport.getPendingBuilds().forEach(core::cancelBuild);
ciReport.getFinishedBuilds().forEach(core::deleteCiBranch);
return;
}
private void processCiReport(CiReport ciReport) throws Exception {
final int pullRequestID = ciReport.getPullRequestID();

// retry mirroring for builds with an unknown state, in case something went wrong during the push/CI trigger
ciReport.getUnknownBuilds().forEach(build -> core.mirrorPullRequest(build.pullRequestID));
if (core.isPullRequestClosed(pullRequestID)) {
LOG.info("PullRequest {} was closed; canceling builds and deleting branches.", formatPullRequestID(pullRequestID));
ciReport.getPendingBuilds().forEach(core::cancelBuild);
ciReport.getFinishedBuilds().forEach(core::deleteCiBranch);
return;
}

Map<Trigger.Type, List<Build>> builds = ciReport.getRequiredBuilds().collect(Collectors.groupingBy(build -> build.trigger.getType()));
List<Build> pushBuilds = builds.getOrDefault(Trigger.Type.PUSH, Collections.emptyList());
List<Build> manualBuilds = builds.getOrDefault(Trigger.Type.MANUAL, Collections.emptyList());
// retry mirroring for builds with an unknown state, in case something went wrong during the push/CI trigger
ciReport.getUnknownBuilds().forEach(build -> core.mirrorPullRequest(build.pullRequestID));

if (!pushBuilds.isEmpty() || !manualBuilds.isEmpty()) {
LOG.info("Canceling pending builds for PullRequest {} since a new build was triggered.", formatPullRequestID(pullRequestID));
// HACK: cancel all pending builds, on the assumption that they are all identical anyway
// this may not be necessarily true in the future
ciReport.getPendingBuilds().forEach(core::cancelBuild);
}
Map<Trigger.Type, List<Build>> builds = ciReport.getRequiredBuilds().collect(Collectors.groupingBy(build -> build.trigger.getType()));
List<Build> pushBuilds = builds.getOrDefault(Trigger.Type.PUSH, Collections.emptyList());
List<Build> manualBuilds = builds.getOrDefault(Trigger.Type.MANUAL, Collections.emptyList());

if (!pushBuilds.isEmpty()) {
// we've got a new commit, skip all triggered manual builds
manualBuilds.forEach(manualBuild -> ciReport.add(new Build(pullRequestID, "", Optional.of(new GitHubCheckerStatus(GitHubCheckerStatus.State.CANCELED, "TBD", CiProvider.Unknown)), manualBuild.trigger)));

// there should only ever by a single push build
Build latestPushBuild = pushBuilds.get(pushBuilds.size() - 1);
core.runBuild(ciReport, latestPushBuild)
.ifPresent(ciReport::add);
} else if (!manualBuilds.isEmpty()) {
// we are just re-running some previous build
// ideally make sure we don't trigger equivalent builds multiple times

// HACK: only process the last manual trigger, on the assumption that they are all identical anyway
// this may not be necessarily true in the future
for (int x = 0; x < manualBuilds.size() - 1; x++) {
Build manualBuild = manualBuilds.get(x);
ciReport.add(new Build(pullRequestID, "0000", Optional.of(new GitHubCheckerStatus(GitHubCheckerStatus.State.CANCELED, "TBD", CiProvider.Unknown)), manualBuild.trigger));
}
if (!pushBuilds.isEmpty() || !manualBuilds.isEmpty()) {
LOG.info("Canceling pending builds for PullRequest {} since a new build was triggered.", formatPullRequestID(pullRequestID));
// HACK: cancel all pending builds, on the assumption that they are all identical anyway
// this may not be necessarily true in the future
ciReport.getPendingBuilds().forEach(core::cancelBuild);
}

Build latestManualBuild = manualBuilds.get(manualBuilds.size() - 1);
core.runBuild(ciReport, latestManualBuild)
.ifPresent(ciReport::add);
if (!pushBuilds.isEmpty()) {
// we've got a new commit, skip all triggered manual builds
manualBuilds.forEach(manualBuild -> ciReport.add(new Build(pullRequestID, "", Optional.of(new GitHubCheckerStatus(GitHubCheckerStatus.State.CANCELED, "TBD", CiProvider.Unknown)), manualBuild.trigger)));

// there should only ever by a single push build
Build latestPushBuild = pushBuilds.get(pushBuilds.size() - 1);
core.runBuild(ciReport, latestPushBuild)
.ifPresent(ciReport::add);
} else if (!manualBuilds.isEmpty()) {
// we are just re-running some previous build
// ideally make sure we don't trigger equivalent builds multiple times

// HACK: only process the last manual trigger, on the assumption that they are all identical anyway
// this may not be necessarily true in the future
for (int x = 0; x < manualBuilds.size() - 1; x++) {
Build manualBuild = manualBuilds.get(x);
ciReport.add(new Build(pullRequestID, "0000", Optional.of(new GitHubCheckerStatus(GitHubCheckerStatus.State.CANCELED, "TBD", CiProvider.Unknown)), manualBuild.trigger));
}

final List<Build> finishedBuilds = ciReport.getFinishedBuilds().collect(Collectors.toList());
final int numFinishedBuilds = finishedBuilds.size();
if (numFinishedBuilds > 0) {
final String lastHash = finishedBuilds.get(numFinishedBuilds - 1).commitHash;

final List<Build> oldBuilds = finishedBuilds.stream().filter(build -> !build.commitHash.equals(lastHash)).collect(Collectors.toList());
if (!oldBuilds.isEmpty()) {
LOG.info("Deleting {} unnecessary branches for PullRequest {}, since newer commits are present. Retaining branches for commit {}.",
oldBuilds.size(),
formatPullRequestID(pullRequestID),
lastHash);

oldBuilds.stream()
.peek(core::deleteCiBranch)
.map(Build::asDeleted)
.forEach(ciReport::add);
}
}
Build latestManualBuild = manualBuilds.get(manualBuilds.size() - 1);
core.runBuild(ciReport, latestManualBuild)
.ifPresent(ciReport::add);
}

if (ciReport.getBuilds().count() > 0) {
core.updateCiReport(ciReport);
final List<Build> finishedBuilds = ciReport.getFinishedBuilds().collect(Collectors.toList());
final int numFinishedBuilds = finishedBuilds.size();
if (numFinishedBuilds > 0) {
final String lastHash = finishedBuilds.get(numFinishedBuilds - 1).commitHash;

final List<Build> oldBuilds = finishedBuilds.stream().filter(build -> !build.commitHash.equals(lastHash)).collect(Collectors.toList());
if (!oldBuilds.isEmpty()) {
LOG.info("Deleting {} unnecessary branches for PullRequest {}, since newer commits are present. Retaining branches for commit {}.",
oldBuilds.size(),
formatPullRequestID(pullRequestID),
lastHash);

oldBuilds.stream()
.peek(core::deleteCiBranch)
.map(Build::asDeleted)
.forEach(ciReport::add);
}
};
}

observedRepositoryState.getCiReports().forEach(ConsumerWithException.wrap(
pullRequestProcessor,
(r, e) -> LOG.error("Error while processing pull request {}.", formatPullRequestID(r.getPullRequestID()), e)));
if (ciReport.getBuilds().count() > 0) {
core.updateCiReport(ciReport);
}
}
}
101 changes: 50 additions & 51 deletions src/main/java/com/ververica/Core.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void deleteCiBranch(Build finishedBuild) throws GitException {
githubToken);
}

public ObservedState fetchGithubState(Date lastUpdatedAtCutoff) throws IOException {
public Stream<GithubPullRequest> getPullRequests(Date lastUpdatedAtCutoff) throws IOException {
LOG.info("Retrieving observed repository state ({}).", observedRepository);

Iterable<GithubPullRequest> recentlyUpdatedOpenPullRequests = gitHubActions.getRecentlyUpdatedOpenPullRequests(observedRepository, lastUpdatedAtCutoff);
Expand All @@ -209,59 +209,58 @@ public ObservedState fetchGithubState(Date lastUpdatedAtCutoff) throws IOExcepti
.filter(pr -> !pullRequestsToProcessByID.containsKey(pr.getID()))
.forEach(pr -> pullRequestsToProcessByID.put(pr.getID(), pr));

final List<CiReport> ciReports = new ArrayList<>();
for (GithubPullRequest pullRequest : pullRequestsToProcessByID.values()) {
LOG.debug("Processing PR {}@{}.", formatPullRequestID(pullRequest.getID()), pullRequest.getHeadCommitHash());
final int pullRequestID = pullRequest.getID();
final String headCommitHash = pullRequest.getHeadCommitHash();
final Collection<String> reportedCommitHashes = new ArrayList<>();

Optional<GitHubComment> ciReportComment = getCiReportComment(pullRequestID);
final CiReport ciReport;
if (ciReportComment.isPresent()) {
LOG.debug("CiReport comment found.");
ciReport = CiReport.fromComment(pullRequestID, ciReportComment.get().getCommentText(), ciActions);
ciReport.getBuilds().map(build -> build.commitHash).forEach(reportedCommitHashes::add);

final Collection<Build> buildsToAdd = new ArrayList<>();
ciReport.getBuilds()
.filter(build -> build.status.isPresent())
.filter(build -> build.status.get().getState() == GitHubCheckerStatus.State.PENDING || build.status.get().getState() == GitHubCheckerStatus.State.UNKNOWN)
.forEach(build -> {
String commitHash = build.commitHash;

LOG.debug("Checking commit state for {}.", commitHash);
Iterable<GitHubCheckerStatus> commitState = gitHubActions.getCommitState(ciRepository, commitHash, githubCheckerNamePattern);
StreamSupport.stream(commitState.spliterator(), false)
.filter(status -> status.getCiProvider() != CiProvider.Unknown)
.forEach(gitHubCheckerStatus -> {
if (gitHubCheckerStatus.getState() != build.status.get().getState()) {
buildsToAdd.add(new Build(build.pullRequestID, build.commitHash, Optional.of(gitHubCheckerStatus), build.trigger));
}
});
});
buildsToAdd.forEach(ciReport::add);

processManualTriggers(ciReport, pullRequestID)
.map(triggerComment -> new Build(
pullRequestID,
headCommitHash,
Optional.empty(),
new Trigger(Trigger.Type.MANUAL, String.valueOf(triggerComment.getCommentId()), triggerComment.getCommand())
))
.forEach(ciReport::add);
} else {
LOG.debug("No CIReport comment found.");
ciReport = CiReport.empty(pullRequestID);
}
return pullRequestsToProcessByID.values().stream();
}

if (!reportedCommitHashes.contains(headCommitHash)) {
ciReport.add(new Build(pullRequestID, headCommitHash, Optional.empty(), new Trigger(Trigger.Type.PUSH, headCommitHash, null)));
}
ciReports.add(ciReport);
public CiReport processPullRequest(GithubPullRequest pullRequest) throws IOException {
LOG.debug("Processing PR {}@{}.", formatPullRequestID(pullRequest.getID()), pullRequest.getHeadCommitHash());
final int pullRequestID = pullRequest.getID();
final String headCommitHash = pullRequest.getHeadCommitHash();
final Collection<String> reportedCommitHashes = new ArrayList<>();

Optional<GitHubComment> ciReportComment = getCiReportComment(pullRequestID);
final CiReport ciReport;
if (ciReportComment.isPresent()) {
LOG.debug("CiReport comment found.");
ciReport = CiReport.fromComment(pullRequestID, ciReportComment.get().getCommentText(), ciActions);
ciReport.getBuilds().map(build -> build.commitHash).forEach(reportedCommitHashes::add);

final Collection<Build> buildsToAdd = new ArrayList<>();
ciReport.getBuilds()
.filter(build -> build.status.isPresent())
.filter(build -> build.status.get().getState() == GitHubCheckerStatus.State.PENDING || build.status.get().getState() == GitHubCheckerStatus.State.UNKNOWN)
.forEach(build -> {
String commitHash = build.commitHash;

LOG.debug("Checking commit state for {}.", commitHash);
Iterable<GitHubCheckerStatus> commitState = gitHubActions.getCommitState(ciRepository, commitHash, githubCheckerNamePattern);
StreamSupport.stream(commitState.spliterator(), false)
.filter(status -> status.getCiProvider() != CiProvider.Unknown)
.forEach(gitHubCheckerStatus -> {
if (gitHubCheckerStatus.getState() != build.status.get().getState()) {
buildsToAdd.add(new Build(build.pullRequestID, build.commitHash, Optional.of(gitHubCheckerStatus), build.trigger));
}
});
});
buildsToAdd.forEach(ciReport::add);

processManualTriggers(ciReport, pullRequestID)
.map(triggerComment -> new Build(
pullRequestID,
headCommitHash,
Optional.empty(),
new Trigger(Trigger.Type.MANUAL, String.valueOf(triggerComment.getCommentId()), triggerComment.getCommand())
))
.forEach(ciReport::add);
} else {
LOG.debug("No CIReport comment found.");
ciReport = CiReport.empty(pullRequestID);
}

return new ObservedState(ciReports);
if (!reportedCommitHashes.contains(headCommitHash)) {
ciReport.add(new Build(pullRequestID, headCommitHash, Optional.empty(), new Trigger(Trigger.Type.PUSH, headCommitHash, null)));
}
return ciReport;
}

private Stream<TriggerComment> processManualTriggers(CiReport ciReport, int pullRequestID) {
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/com/ververica/utils/FunctionWithException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.utils;

import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

public interface FunctionWithException<T, R, E extends Throwable> {
R apply(T t) throws E;

static <T, R, E extends Exception> Function<T, Optional<R>> wrap(FunctionWithException<T, R, E> functionWithException, BiConsumer<T, Exception> errorHandler) {
return t -> {
try {
return Optional.of(functionWithException.apply(t));
} catch (Exception e) {
errorHandler.accept(t, e);
return Optional.empty();
}
};
}
}

0 comments on commit 9ab4292

Please sign in to comment.