From 0c73153fb1342ab2e0b32d49a04bdf44daf54970 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 3 Jan 2025 16:40:57 -0500 Subject: [PATCH 1/7] Resume Pipeline builds asynchronously --- .../org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index a3a448cf..8ecf798e 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -214,7 +214,7 @@ public boolean isResumptionComplete() { public static class ItemListenerImpl extends ItemListener { @Override public void onLoaded() { - FlowExecutionList.get().resume(); + Timer.get().submit(FlowExecutionList.get()::resume); } } From a692eff9ed017c71f55fa38d82720368400c291c Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Mon, 6 Jan 2025 18:59:37 -0500 Subject: [PATCH 2/7] `FlowExecutionList.Storage` --- .../workflow/flow/FlowExecutionList.java | 277 +++++++++++------- 1 file changed, 170 insertions(+), 107 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index eace4b4e..55268302 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -9,12 +9,12 @@ import edu.umd.cs.findbugs.annotations.NonNull; import hudson.Extension; import hudson.ExtensionList; +import hudson.ExtensionPoint; import hudson.XmlFile; import hudson.init.InitMilestone; import hudson.init.TermMilestone; import hudson.init.Terminator; import hudson.model.Computer; -import hudson.model.Queue; import hudson.model.listeners.ItemListener; import hudson.remoting.SingleLaneExecutorService; import hudson.util.CopyOnWriteList; @@ -38,18 +38,16 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import jenkins.util.SystemProperties; import org.jenkinsci.plugins.workflow.graph.FlowNode; import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner; import org.kohsuke.accmod.Restricted; import org.kohsuke.accmod.restrictions.Beta; import org.kohsuke.accmod.restrictions.DoNotUse; +import org.kohsuke.accmod.restrictions.NoExternalUse; /** - * Tracks the running {@link FlowExecution}s so that it can be enumerated. - * - * @author Kohsuke Kawaguchi + * Enumerates running builds and ensures they resume after Jenkins is restarted. */ @Extension public class FlowExecutionList implements Iterable { @@ -66,23 +64,19 @@ public class FlowExecutionList implements Iterable { */ public static final String LIST_SAVED = "FlowExecutionList.LIST_SAVED"; - private final CopyOnWriteList runningTasks = new CopyOnWriteList<>(); - private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get()); - private XmlFile configFile; - private transient volatile boolean resumptionComplete; public FlowExecutionList() { - load(); + ExtensionList.lookupFirst(Storage.class).load(); } /** - * Lists all the current {@link FlowExecutionOwner}s. + * Lists all the current {@link FlowExecution}s. */ @Override public Iterator iterator() { return new AbstractIterator<>() { - final Iterator base = runningTasks.iterator(); + final Iterator base = ExtensionList.lookupFirst(Storage.class).owners(); @Override protected FlowExecution computeNext() { @@ -103,93 +97,23 @@ protected FlowExecution computeNext() { }; } - private synchronized @CheckForNull XmlFile configFile() { - if (configFile == null) { - Jenkins j = Jenkins.getInstanceOrNull(); - if (j != null) { - String id = SystemProperties.getString(Queue.class.getName() + ".id"); - File f; - if (id != null) { - f = new File(Jenkins.get().getRootDir(), FlowExecutionList.class.getName() + "/" + id + ".xml"); - } else { - f = new File(j.getRootDir(), FlowExecutionList.class.getName() + ".xml"); - } - configFile = new XmlFile(f); - } - } - return configFile; - } - - @SuppressWarnings("unchecked") - private synchronized void load() { - XmlFile cf = configFile(); - if (cf == null) { - return; // oh well - } - if (cf.exists()) { - try { - runningTasks.replaceBy((List) cf.read()); - LOGGER.log(Level.FINE, "loaded: {0}", runningTasks); - } catch (Exception x) { - LOGGER.log(Level.WARNING, "ignoring broken " + cf, x); - } - } - } - /** * It is the responsibility of the {@link FlowExecutionOwner} to register itself before it starts executing. * And likewise, unregister itself after it is completed, even though this class does clean up entries that * are no longer running. */ public synchronized void register(final FlowExecutionOwner self) { - if (runningTasks.contains(self)) { - LOGGER.log(Level.WARNING, "{0} was already in the list: {1}", new Object[] {self, runningTasks.getView()}); - } else { - runningTasks.add(self); - saveLater(); - } + ExtensionList.lookupFirst(Storage.class).register(self); } public synchronized void unregister(final FlowExecutionOwner self) { - if (runningTasks.remove(self)) { - LOGGER.log(Level.FINE, "unregistered {0}", new Object[] {self}); - saveLater(); - } else { - LOGGER.log(Level.WARNING, "{0} was not in the list to begin with: {1}", new Object[] {self, runningTasks.getView()}); - } - } - - private synchronized void saveLater() { - final List copy = new ArrayList<>(runningTasks.getView()); - LOGGER.log(Level.FINE, "scheduling save of {0}", copy); - try { - executor.submit(() -> save(copy)); - } catch (RejectedExecutionException x) { - LOGGER.log(Level.FINE, "could not schedule save, perhaps because Jenkins is shutting down; saving immediately", x); - save(copy); - } - } - private void save(List copy) { - XmlFile cf = configFile(); - LOGGER.log(Level.FINE, "saving {0} to {1}", new Object[] {copy, cf}); - if (cf == null) { - return; // oh well - } - try { - cf.write(copy); - } catch (IOException x) { - LOGGER.log(Level.WARNING, null, x); - } + ExtensionList.lookupFirst(Storage.class).unregister(self); } private static final Logger LOGGER = Logger.getLogger(FlowExecutionList.class.getName()); public static FlowExecutionList get() { - FlowExecutionList l = ExtensionList.lookup(FlowExecutionList.class).get(FlowExecutionList.class); - if (l == null) { // might be called during shutdown - l = new FlowExecutionList(); - } - return l; + return ExtensionList.lookupSingleton(FlowExecutionList.class); } /** @@ -219,27 +143,168 @@ public void onLoaded() { } private void resume() { - boolean needSave = false; - for (var it = runningTasks.iterator(); it.hasNext(); ) { - var o = it.next(); - try { - FlowExecution e = o.get(); - LOGGER.log(Level.FINE, "Eagerly loaded {0}", e); - if (e.isComplete()) { - LOGGER.log(Level.FINE, "Unregistering completed " + o, e); + ExtensionList.lookupFirst(Storage.class).resume(); + resumptionComplete = true; + } + + /** + * Alternate mechanism for implementing the storage of the set of builds. + */ + @Restricted(Beta.class) + public interface Storage extends ExtensionPoint { + + /** + * Enumerate the build handles. + * Order is unspecified. + * The set may be mutated while the iterator is active. + */ + Iterator owners(); + + /** + * Add an entry, if not already present. + */ + void register(FlowExecutionOwner owner); + + /** + * Remove an entry, if present. + */ + void unregister(FlowExecutionOwner owner); + + /** + * Check if an entry is present. + */ + boolean contains(FlowExecutionOwner o); + + /** + * Load data during startup. + */ + void load(); + + /** + * Resume builds. + * {@link FlowExecutionOwner#get} should be called on each entry. + * If {@link FlowExecution#isComplete} already, or an exception is thrown, + * the entry should be removed as if {@link #unregister} had been called. + */ + void resume(); + + /** + * Flush any unsaved data before Jenkins exits. + */ + void shutDown() throws InterruptedException; + } + + @Restricted(NoExternalUse.class) + @Extension(ordinal = -1000) + public static final class DefaultStorage implements Storage { + + private final CopyOnWriteList runningTasks = new CopyOnWriteList<>(); + private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get()); + private XmlFile configFile; + + @Override public Iterator owners() { + return runningTasks.iterator(); + } + + @Override public void register(FlowExecutionOwner o) { + if (runningTasks.contains(o)) { + LOGGER.log(Level.WARNING, "{0} was already in the list: {1}", new Object[] {o, runningTasks.getView()}); + } else { + runningTasks.add(o); + saveLater(); + } + } + + @Override public void unregister(FlowExecutionOwner o) { + if (runningTasks.remove(o)) { + LOGGER.log(Level.FINE, "unregistered {0}", new Object[] {o}); + saveLater(); + } else { + LOGGER.log(Level.WARNING, "{0} was not in the list to begin with: {1}", new Object[] {o, runningTasks.getView()}); + } + } + + @Override public boolean contains(FlowExecutionOwner o) { + return runningTasks.contains(o); + } + + @SuppressWarnings("unchecked") + @Override public void load() { + XmlFile cf = configFile(); + if (cf == null) { + return; // oh well + } + if (cf.exists()) { + try { + runningTasks.replaceBy((List) cf.read()); + LOGGER.log(Level.FINE, "loaded: {0}", runningTasks); + } catch (Exception x) { + LOGGER.log(Level.WARNING, "ignoring broken " + cf, x); + } + } + } + + @Override public void resume() { + boolean needSave = false; + for (var it = runningTasks.iterator(); it.hasNext();) { + var o = it.next(); + try { + FlowExecution e = o.get(); + LOGGER.log(Level.FINE, "Eagerly loaded {0}", e); + if (e.isComplete()) { + LOGGER.log(Level.FINE, "Unregistering completed " + o, e); + it.remove(); + needSave = true; + } + } catch (IOException ex) { + LOGGER.log(Level.FINE, "Failed to load " + o + ". Unregistering", ex); it.remove(); needSave = true; } - } catch (IOException ex) { - LOGGER.log(Level.FINE, "Failed to load " + o + ". Unregistering", ex); - it.remove(); - needSave = true; + } + if (needSave) { + saveLater(); } } - if (needSave) { - saveLater(); + + @Override public void shutDown() throws InterruptedException { + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.MINUTES); + } + + private synchronized void saveLater() { + final List copy = new ArrayList<>(runningTasks.getView()); + LOGGER.log(Level.FINE, "scheduling save of {0}", copy); + try { + executor.submit(() -> save(copy)); + } catch (RejectedExecutionException x) { + LOGGER.log(Level.FINE, "could not schedule save, perhaps because Jenkins is shutting down; saving immediately", x); + save(copy); + } + } + + private void save(List copy) { + XmlFile cf = configFile(); + LOGGER.log(Level.FINE, "saving {0} to {1}", new Object[] {copy, cf}); + if (cf == null) { + return; // oh well + } + try { + cf.write(copy); + } catch (IOException x) { + LOGGER.log(Level.WARNING, null, x); + } + } + + private synchronized @CheckForNull XmlFile configFile() { + if (configFile == null) { + Jenkins j = Jenkins.getInstanceOrNull(); + if (j != null) { + configFile = new XmlFile(new File(j.getRootDir(), FlowExecutionList.class.getName() + ".xml")); + } + } + return configFile; } - resumptionComplete = true; } /** @@ -282,9 +347,7 @@ public ListenableFuture apply(final Function f) { @Terminator(requires = EXECUTIONS_SUSPENDED, attains = LIST_SAVED) public static void saveAll() throws InterruptedException { LOGGER.fine("ensuring all executions are saved"); - SingleLaneExecutorService executor = get().executor; - executor.shutdown(); - executor.awaitTermination(1, TimeUnit.MINUTES); + ExtensionList.lookupFirst(Storage.class).shutDown(); } /** @@ -305,11 +368,11 @@ public void onSuccess(@NonNull List result) { // and CpsFlowExecution should not then complete until afterStepExecutionsResumed, so this is defensive. return; } - FlowExecutionList list = FlowExecutionList.get(); + var storage = ExtensionList.lookupFirst(Storage.class); FlowExecutionOwner owner = e.getOwner(); - if (!list.runningTasks.contains(owner)) { - LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", new Object[] {owner, list.runningTasks.getView()}); - list.register(owner); + if (!storage.contains(owner)) { + LOGGER.warning(() -> "Resuming " + owner + ", which is missing from FlowExecutionList, so registering it now"); + storage.register(owner); } LOGGER.log(Level.FINE, "Will resume {0}", result); new ParallelResumer(result, e::afterStepExecutionsResumed).run(); From 18fd4d29e64b31ed661bc6adafa43e437da1ac50 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 8 Jan 2025 13:03:58 -0500 Subject: [PATCH 3/7] Renamed parameter for consistency --- .../org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index 55268302..a6929964 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -173,7 +173,7 @@ public interface Storage extends ExtensionPoint { /** * Check if an entry is present. */ - boolean contains(FlowExecutionOwner o); + boolean contains(FlowExecutionOwner owner); /** * Load data during startup. From ba2c2a42fd3c28a1096b55c226e583c97b0571ae Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 8 Jan 2025 13:12:14 -0500 Subject: [PATCH 4/7] Defining `FlowExecutionOwner.getExternalizableId` --- .../workflow/flow/FlowExecutionOwner.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java index e4a846ef..c378429c 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java @@ -99,6 +99,25 @@ public String getUrlOfExecution() throws IOException { return getUrl()+"execution/"; } + /** + * The {@link Run#getExternalizableId}, if this owner is indeed a {@link Run}. + * The default implementation uses {@link #getExecutable} + * but an implementation may override this to avoid loading the actual {@link Run}. + * @return an id, or null if unknown, unloadable, or unapplicable + */ + @CheckForNull + public String getExternalizableId() { + try { + var exec = getExecutable(); + if (exec instanceof Run) { + return ((Run) exec).getExternalizableId(); + } + } catch (IOException x) { + LOGGER.log(Level.WARNING, "cannot look up externalizableId of " + this, x); + } + return null; + } + /** * {@link FlowExecutionOwner}s are equal to one another if and only if * they point to the same {@link FlowExecution} object. From e965a39fb4c3ad99a4dbff1ea3d80e2051437dbb Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 8 Jan 2025 13:29:50 -0500 Subject: [PATCH 5/7] Logger format fix --- .../plugins/workflow/flow/FlowExecutionList.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index a6929964..aa5078a2 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -247,17 +247,17 @@ public static final class DefaultStorage implements Storage { @Override public void resume() { boolean needSave = false; for (var it = runningTasks.iterator(); it.hasNext();) { - var o = it.next(); + var owner = it.next(); try { - FlowExecution e = o.get(); - LOGGER.log(Level.FINE, "Eagerly loaded {0}", e); - if (e.isComplete()) { - LOGGER.log(Level.FINE, "Unregistering completed " + o, e); + var exec = owner.get(); + LOGGER.fine(() -> "eagerly loaded " + exec); + if (exec.isComplete()) { + LOGGER.fine(() -> "unregistering completed " + exec); it.remove(); needSave = true; } - } catch (IOException ex) { - LOGGER.log(Level.FINE, "Failed to load " + o + ". Unregistering", ex); + } catch (IOException x) { + LOGGER.log(Level.FINE, x, () -> "failed to load " + owner + "; unregistering"); it.remove(); needSave = true; } From 1eaa30aa28a3158258ba523d0604795901180458 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 8 Jan 2025 13:49:56 -0500 Subject: [PATCH 6/7] Fixing `serialVersionUID` --- .../org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java index c378429c..43f94545 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionOwner.java @@ -44,6 +44,8 @@ */ public abstract class FlowExecutionOwner implements Serializable { + private static final long serialVersionUID = 1796027762257567194L; + private static final Logger LOGGER = Logger.getLogger(FlowExecutionOwner.class.getName()); /** From 0a18e1bef04319cd40a94a305879d5fb22fa31fb Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 8 Jan 2025 19:26:08 -0500 Subject: [PATCH 7/7] Allow `Storage` to completely override `iterator` --- .../workflow/flow/FlowExecutionList.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index aa5078a2..9074db3d 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -75,26 +75,7 @@ public FlowExecutionList() { */ @Override public Iterator iterator() { - return new AbstractIterator<>() { - final Iterator base = ExtensionList.lookupFirst(Storage.class).owners(); - - @Override - protected FlowExecution computeNext() { - while (base.hasNext()) { - FlowExecutionOwner o = base.next(); - try { - FlowExecution e = o.get(); - if (!e.isComplete()) { - return e; - } - } catch (Throwable e) { - LOGGER.log(Level.FINE, "Failed to load " + o + ". Unregistering", e); - unregister(o); - } - } - return endOfData(); - } - }; + return ExtensionList.lookupFirst(Storage.class).iterator(); } /** @@ -154,11 +135,11 @@ private void resume() { public interface Storage extends ExtensionPoint { /** - * Enumerate the build handles. + * Enumerate running builds. * Order is unspecified. * The set may be mutated while the iterator is active. */ - Iterator owners(); + Iterator iterator(); /** * Add an entry, if not already present. @@ -202,8 +183,27 @@ public static final class DefaultStorage implements Storage { private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get()); private XmlFile configFile; - @Override public Iterator owners() { - return runningTasks.iterator(); + @Override public Iterator iterator() { + return new AbstractIterator<>() { + final Iterator base = runningTasks.iterator(); + + @Override + protected FlowExecution computeNext() { + while (base.hasNext()) { + FlowExecutionOwner o = base.next(); + try { + FlowExecution e = o.get(); + if (!e.isComplete()) { + return e; + } + } catch (Throwable e) { + LOGGER.log(Level.FINE, "Failed to load " + o + ". Unregistering", e); + unregister(o); + } + } + return endOfData(); + } + }; } @Override public void register(FlowExecutionOwner o) {