From dd8717fdc7acf1a20a2c383e471dbac7f4fa4b5d Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Fri, 18 Oct 2024 04:28:07 -0700 Subject: [PATCH] Define Gobblin-on-Temporal`WorkforcePlan` and dynamic `ScalingDirective`s with parser --- .../GobblinTemporalClusterManager.java | 2 +- .../temporal/dynscale/ProfileDerivation.java | 54 ++++ .../temporal/dynscale/ProfileOverlay.java | 150 +++++++++++ .../temporal/dynscale/ScalingDirective.java | 44 ++++ .../dynscale/ScalingDirectiveParser.java | 132 ++++++++++ .../temporal/dynscale/StaffingDeltas.java | 39 +++ .../temporal/dynscale/WorkerProfile.java | 28 ++ .../temporal/dynscale/WorkforcePlan.java | 148 +++++++++++ .../temporal/dynscale/WorkforceProfiles.java | 76 ++++++ .../temporal/dynscale/WorkforceStaffing.java | 99 +++++++ .../dynscale/ProfileDerivationTest.java | 78 ++++++ .../temporal/dynscale/ProfileOverlayTest.java | 100 +++++++ .../dynscale/ScalingDirectiveParserTest.java | 247 ++++++++++++++++++ .../temporal/dynscale/WorkforcePlanTest.java | 187 +++++++++++++ .../dynscale/WorkforceStaffingTest.java | 95 +++++++ 15 files changed, 1478 insertions(+), 1 deletion(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileDerivation.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileOverlay.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirective.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParser.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/StaffingDeltas.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkerProfile.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforcePlan.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceProfiles.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffing.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileDerivationTest.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileOverlayTest.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParserTest.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforcePlanTest.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffingTest.java diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java index a460bb42026..19a65078909 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java @@ -224,7 +224,7 @@ public synchronized void stop() { this.stopStatus.setStopInprogress(true); - log.info("Stopping the Gobblin Cluster Manager"); + log.info("Stopping the Gobblin Temporal Cluster Manager"); stopAppLauncherAndServices(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileDerivation.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileDerivation.java new file mode 100644 index 00000000000..0e192c2c784 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileDerivation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Optional; +import java.util.function.Function; + +import com.typesafe.config.Config; +import lombok.Data; +import lombok.Getter; + + +@Data +public class ProfileDerivation { + public static class UnknownBasisException extends Exception { + @Getter + private final String name; + public UnknownBasisException(String basisName) { + super("named '" + WorkforceProfiles.renderName(basisName) + "'"); + this.name = basisName; + } + } + + private final String basisProfileName; + private final ProfileOverlay overlay; + + public Config formulateConfig(Function> basisResolver) throws UnknownBasisException { + Optional optProfile = basisResolver.apply(basisProfileName); + if (!optProfile.isPresent()) { + throw new UnknownBasisException(basisProfileName); + } else { + return overlay.applyOverlay(optProfile.get().getConfig()); + } + } + + public String renderName() { + return WorkforceProfiles.renderName(this.basisProfileName); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileOverlay.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileOverlay.java new file mode 100644 index 00000000000..ed36ee9c16f --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ProfileOverlay.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; +import lombok.Data; + + +public interface ProfileOverlay { + + Config applyOverlay(Config config); + + ProfileOverlay over(ProfileOverlay other); + + + @Data + class KVPair { + private final String key; + private final String value; + } + + + @Data + class Adding implements ProfileOverlay { + private final List additionPairs; + + @Override + public Config applyOverlay(Config config) { + return additionPairs.stream().sequential().reduce(config, + (currConfig, additionPair) -> + currConfig.withValue(additionPair.getKey(), ConfigValueFactory.fromAnyRef(additionPair.getValue())), + (configA, configB) -> + configB.withFallback(configA) + ); + } + + @Override + public ProfileOverlay over(ProfileOverlay other) { + if (other instanceof Adding) { + Map base = ((Adding) other).getAdditionPairs().stream().collect(Collectors.toMap(KVPair::getKey, KVPair::getValue)); + additionPairs.stream().forEach(additionPair -> + base.put(additionPair.getKey(), additionPair.getValue())); + return new Adding(base.entrySet().stream().map(entry -> new KVPair(entry.getKey(), entry.getValue())).collect(Collectors.toList())); + } else if (other instanceof Removing) { + return Combo.normalize(this, (Removing) other); + } else if (other instanceof Combo) { + Combo otherCombo = (Combo) other; + return Combo.normalize((Adding) this.over(otherCombo.getAdding()), otherCombo.getRemoving()); + } else { + throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other); + } + } + } + + + @Data + class Removing implements ProfileOverlay { + private final List removalKeys; + + @Override + public Config applyOverlay(Config config) { + return removalKeys.stream().sequential().reduce(config, + (currConfig, removalKey) -> + currConfig.withoutPath(removalKey), + (configA, configB) -> + configA.withFallback(configB) + ); + } + + @Override + public ProfileOverlay over(ProfileOverlay other) { + if (other instanceof Adding) { + return Combo.normalize((Adding) other, this); + } else if (other instanceof Removing) { + Set otherKeys = new HashSet(((Removing) other).getRemovalKeys()); + otherKeys.addAll(removalKeys); + return new Removing(new ArrayList<>(otherKeys)); + } else if (other instanceof Combo) { + Combo otherCombo = (Combo) other; + return Combo.normalize(otherCombo.getAdding(), (Removing) this.over(otherCombo.getRemoving())); + } else { + throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other); + } + } + } + + + @Data + class Combo implements ProfileOverlay { + private final Adding adding; + private final Removing removing; + + // merely restrict access modifier from `public` to `protected`, as not meant to be instantiated outside this enclosing interface + private Combo(Adding adding, Removing removing) { + this.adding = adding; + this.removing = removing; + } + + protected static Combo normalize(Adding toAdd, Removing toRemove) { + // pre-remove any in `toAdd` that are also in `toRemove`... yet still maintain them in `toRemove`, in case the eventual `Config` "basis" also has any + Set removeKeysLookup = toRemove.getRemovalKeys().stream().collect(Collectors.toSet()); + List unmatchedAdditionPairs = toAdd.getAdditionPairs().stream().sequential().filter(additionPair -> + !removeKeysLookup.contains(additionPair.getKey()) + ).collect(Collectors.toList()); + return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new ArrayList<>(removeKeysLookup))); + } + + @Override + public Config applyOverlay(Config config) { + return adding.applyOverlay(removing.applyOverlay(config)); + } + + @Override + public ProfileOverlay over(ProfileOverlay other) { + if (other instanceof Adding) { + return Combo.normalize((Adding) this.adding.over((Adding) other), this.removing); + } else if (other instanceof Removing) { + return Combo.normalize(this.adding, (Removing) this.removing.over((Removing) other)); + } else if (other instanceof Combo) { + Combo otherCombo = (Combo) other; + return Combo.normalize((Adding) this.adding.over(otherCombo.getAdding()), (Removing) this.removing.over(otherCombo.getRemoving())); + } else { + throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other); + } + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirective.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirective.java new file mode 100644 index 00000000000..77d9b483c9a --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirective.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Optional; +import lombok.Data; +import lombok.RequiredArgsConstructor; + + +@Data +@RequiredArgsConstructor +public class ScalingDirective { + private final String profileName; + private final int setPoint; + private final long timestampEpochMillis; + private final Optional optDerivedFrom; + + public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis) { + this(profileName, setPoint, timestampEpochMillis, Optional.empty()); + } + + public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis, String basisProfileName, ProfileOverlay overlay) { + this(profileName, setPoint, timestampEpochMillis, Optional.of(new ProfileDerivation(basisProfileName, overlay))); + } + + public String renderName() { + return WorkforceProfiles.renderName(this.profileName); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParser.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParser.java new file mode 100644 index 00000000000..4dbbc065b78 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParser.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import lombok.extern.slf4j.Slf4j; + + +/** + * parse {@link ScalingDirective}s with syntax of the form: + * TIMESTAMP '.' WORKER_NAME '=' SETPOINT [ ( ',' | ';' ) WORKER_NAME ( '+(' KV_PAIR (*SEP* KV_PAIR)* ')' | '-( KEY (*SEP* KEY* ')' ) ] + * where *SEP* is either ',' or ';' (whichever did follow SETPOINT) + * the first form with '+' is an "adding" (upsert) overlay, the second form with '-' is a removing overlay + * allows for URL-encoded values in the KV_PAIRs and whitespace around any token + * + 1728435970.my_profile=24 + 1728436821.=24 + 1728436828.baseline()=24 + + 1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen) + 1728439223.new_profile=16;bar+(a.b.c=7;l.m=sixteen) + 1728460832.new_profile=16,bar+(a.b.c=7,l.m=sixteen%2C%20again) + + 1728436436.other_profile=9,my_profile-(x,y.z) + 1728436499.other_profile=9;my_profile-(x;y.z) + + 1728441200.plus_profile=16,+(a.b.c=7,l.m=sixteen) + 1728443640.plus_profile=16,baseline()+(a.b.c=7,l.m=sixteen) + + 1728448521.extra_profile=9,-(a.b, c.d) + 1728449978.extra_profile=9,baseline()-(a.b, c.d) +*/ +@Slf4j +public class ScalingDirectiveParser { + public static class MalformedDirectiveException extends IllegalArgumentException { + private final String directive; + public MalformedDirectiveException(String directive, String desc) { + super("error: " + desc + ", in ==>" + directive + "<=="); + this.directive = directive; + } + } + + private static final String DIRECTIVE_REGEX = "(?x) \\s* (\\d+) \\s* \\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) " + + "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\( \\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s*"; + + private static final String KEY_REGEX = "(\\w+(?:\\.\\w+)*)"; + private static final String KEY_VALUE_REGEX = KEY_REGEX + "\\s*=\\s*(.*)"; + private static final Pattern directivePattern = Pattern.compile(DIRECTIVE_REGEX); + private static final Pattern keyPattern = Pattern.compile(KEY_REGEX); + private static final Pattern keyValuePattern = Pattern.compile(KEY_VALUE_REGEX); + + private static final String BASELINE_ID = "baseline()"; + + public ScalingDirective parse(String directive) { + Matcher parsed = directivePattern.matcher(directive); + if (parsed.matches()) { + long timestamp = Long.parseLong(parsed.group(1)); + String profileId = parsed.group(2); + String profileName = identifyProfileName(profileId); + int setpoint = Integer.parseInt(parsed.group(3)); + Optional optDerivedFrom = Optional.empty(); + String overlayIntroSep = parsed.group(4); + if (overlayIntroSep != null) { + String basisProfileName = identifyProfileName(parsed.group(5)); + if (parsed.group(6) != null) { // '+' == adding + List additions = new ArrayList<>(); + String additionsStr = parsed.group(7); + if (!additionsStr.equals("")) { + for (String addStr : additionsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (negative limit to disallow trailing empty strings) + Matcher keyValueParsed = keyValuePattern.matcher(addStr); + if (keyValueParsed.matches()) { + additions.add(new ProfileOverlay.KVPair(keyValueParsed.group(1), urlDecode(directive, keyValueParsed.group(2)))); + } else { + throw new MalformedDirectiveException(directive, "unable to parse key-value pair - {{" + addStr + "}}"); + } + } + } + optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(additions))); + } else { // '-' == removing + List removalKeys = new ArrayList<>(); + String removalsStr = parsed.group(9); + if (!removalsStr.equals("")) { + for (String removeStr : removalsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (negative limit to disallow trailing empty strings) + Matcher keyParsed = keyPattern.matcher(removeStr); + if (keyParsed.matches()) { + removalKeys.add(keyParsed.group(1)); + } else { + throw new MalformedDirectiveException(directive, "unable to parse key - {{" + removeStr + "}}"); + } + } + } + optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Removing(removalKeys))); + } + } + return new ScalingDirective(profileName, setpoint, timestamp, optDerivedFrom); + } else { + throw new MalformedDirectiveException(directive, "invalid syntax"); + } + } + + private static String identifyProfileName(String profileId) { + return profileId.equals(BASELINE_ID) ? WorkforceProfiles.BASELINE_NAME : profileId; + } + + private static String urlDecode(String directive, String s) { + try { + return java.net.URLDecoder.decode(s, "UTF-8"); + } catch (java.io.UnsupportedEncodingException e) { + throw new MalformedDirectiveException(directive, "unable to URL-decode - {{" + s + "}}"); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/StaffingDeltas.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/StaffingDeltas.java new file mode 100644 index 00000000000..018af44ff95 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/StaffingDeltas.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.List; +import lombok.Data; + + +@Data +public class StaffingDeltas { + @Data + public static class ProfileDelta { + private final WorkerProfile profile; + private final int delta; + private final long setPointProvenanceEpochMillis; + + public boolean isUnchanged() { + return delta == 0; + } + } + + + private final List perProfileDeltas; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkerProfile.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkerProfile.java new file mode 100644 index 00000000000..df67eb47224 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkerProfile.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import com.typesafe.config.Config; +import lombok.Data; + + +@Data +public class WorkerProfile { + private final String name; + private final Config config; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforcePlan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforcePlan.java new file mode 100644 index 00000000000..7c61eb65c13 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforcePlan.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +import com.typesafe.config.Config; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class WorkforcePlan { + + public static class IllegalRevisionException extends Exception { + @Getter private final ScalingDirective directive; + private IllegalRevisionException(ScalingDirective directive, String msg) { + super(msg); + this.directive = directive; + } + + public static class OutdatedDirective extends IllegalRevisionException { + protected OutdatedDirective(ScalingDirective directive, long lastRevisionEpochMillis) { + super(directive, "directive for profile '" + directive.renderName() + "' precedes last revision at " + + lastRevisionEpochMillis + ": " + directive); + } + } + + public static class Redefinition extends IllegalRevisionException { + protected Redefinition(ScalingDirective directive, ProfileDerivation proposedDerivation) { + super(directive, "profile '" + directive.renderName() + "' already exists, so may not be redefined on the basis of '" + + proposedDerivation.renderName() + "': " + directive); + } + } + + public static class UnrecognizedProfile extends IllegalRevisionException { + protected UnrecognizedProfile(ScalingDirective directive) { + super(directive, "unrecognized profile reference '" + directive.renderName() + "': " + directive); + } + } + + public static class UnknownBasis extends IllegalRevisionException { + protected UnknownBasis(ScalingDirective directive, ProfileDerivation.UnknownBasisException ube) { + super(directive, "profile '" + directive.renderName() + "' may not be defined on the basis of an unknown profile '" + + WorkforceProfiles.renderName(ube.getName()) + "': " + directive); + } + } + } + + private final WorkforceProfiles profiles; + private final WorkforceStaffing staffing; + @Getter private volatile long lastRevisionEpochMillis; + + public WorkforcePlan(Config baselineConfig, int initialSetPoint) { + this.profiles = WorkforceProfiles.withBaseline(baselineConfig); + this.staffing = WorkforceStaffing.initialize(initialSetPoint); + this.lastRevisionEpochMillis = 0; + } + + public int getNumProfiles() { + return profiles.size(); + } + + public synchronized void revise(ScalingDirective directive) throws IllegalRevisionException { + String name = directive.getProfileName(); + if (this.lastRevisionEpochMillis >= directive.getTimestampEpochMillis()) { + throw new IllegalRevisionException.OutdatedDirective(directive, this.lastRevisionEpochMillis); + }; + Optional optExistingProfile = profiles.apply(name); + Optional optDerivation = directive.getOptDerivedFrom(); + if (optExistingProfile.isPresent() && optDerivation.isPresent()) { + throw new IllegalRevisionException.Redefinition(directive, optDerivation.get()); + } else if (!optExistingProfile.isPresent() && !optDerivation.isPresent()) { + throw new IllegalRevisionException.UnrecognizedProfile(directive); + } else { // [exclusive-or: either, but not both present] + if (optDerivation.isPresent()) { // define a new profile on the basis of another + try { + this.profiles.addProfile(new WorkerProfile(name, optDerivation.get().formulateConfig(this.profiles))); + } catch (ProfileDerivation.UnknownBasisException ube) { + throw new IllegalRevisionException.UnknownBasis(directive, ube); + } + } + // adjust the set-point now that either a new profile is defined OR the profile already existed + this.staffing.reviseStaffing(name, directive.getSetPoint(), directive.getTimestampEpochMillis()); + this.lastRevisionEpochMillis = directive.getTimestampEpochMillis(); + } + } + + /** atomic bulk revision + * + * !!!!requires sorted order of directives by timestamp!!!! + * + */ + public synchronized void reviseWhenNewer(List directives) { + reviseWhenNewer(directives, ire -> { log.warn("Failure: ", ire); }); + } + + public synchronized void reviseWhenNewer(List directives, Consumer illegalRevisionHandler) { + directives.stream().sequential() + .filter(directive -> directive.getTimestampEpochMillis() > this.lastRevisionEpochMillis) + .forEach(directive -> { + try { + revise(directive); + } catch (IllegalRevisionException ire) { + System.err.println("uh oh it's: " + ire); + illegalRevisionHandler.accept(ire); + } + }); + } + + /** @returns diff of {@link StaffingDeltas} of this, current {@link WorkforcePlan} against some `reference` {@link WorkforceStaffing} */ + public synchronized StaffingDeltas calcStaffingDeltas(WorkforceStaffing reference) { + return staffing.calcDeltas(reference, profiles); + } + + @VisibleForTesting + Optional peepStaffing(String profileName) { + return staffing.getStaffing(profileName); + } + + @VisibleForTesting + WorkerProfile peepProfile(String profileName) throws WorkforceProfiles.UnknownProfileException { + return profiles.getOrThrow(profileName); + } + + @VisibleForTesting + WorkerProfile peepBaselineProfile() throws WorkforceProfiles.UnknownProfileException { + return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceProfiles.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceProfiles.java new file mode 100644 index 00000000000..b76c33562e2 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceProfiles.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import com.typesafe.config.Config; + + +public class WorkforceProfiles implements Function> { + public static final String BASELINE_NAME = ""; + public static final String BASELINE_NAME_RENDERING = "<>"; + + public static String renderName(String name) { + return name.equals(BASELINE_NAME) ? BASELINE_NAME_RENDERING : name; + } + + + public static class UnknownProfileException extends RuntimeException { + public UnknownProfileException(String profileName) { + super("named '" + WorkforceProfiles.renderName(profileName) + "'"); + } + } + + private final ConcurrentHashMap profileByName; + + private WorkforceProfiles() { + this.profileByName = new ConcurrentHashMap<>(); + } + + public static WorkforceProfiles withBaseline(Config baselineConfig) { + WorkforceProfiles profiles = new WorkforceProfiles(); + profiles.addProfile(new WorkerProfile(BASELINE_NAME, baselineConfig)); + return profiles; + } + + @Override + public Optional apply(String profileName) { + return Optional.ofNullable(profileByName.get(profileName)); + } + + public WorkerProfile getOrThrow(String profileName) { + WorkerProfile profile = profileByName.get(profileName); + if (profile != null) { + return profile; + } + throw new UnknownProfileException(profileName); + } + + public void addProfile(WorkerProfile profile) { + if (profileByName.putIfAbsent(profile.getName(), profile) != null) { + throw new RuntimeException("profile '" + WorkforceProfiles.renderName(profile.getName()) + "' already exists!"); + } + } + + public int size() { + return profileByName.size(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffing.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffing.java new file mode 100644 index 00000000000..f54c8035170 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffing.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import lombok.Data; + + +public class WorkforceStaffing { + public static long INITIALIZATION_PROVENANCE_EPOCH_MILLIS = 0L; + // CAUTION: sentinel value only for use with `StaffingDeltas.ProfileDelta` - NOT for use with `WorkforceStaffing::reviseStaffing`! + public static long UNKNOWN_PROVENANCE_EPOCH_MILLIS = -1L; + + @Data + private static class SetPoint { + private final int point; + private final long provenanceEpochMillis; // for debuggability + } + + + private final Map setPointByName; + + private WorkforceStaffing() { + this.setPointByName = new ConcurrentHashMap<>(); + } + + public static WorkforceStaffing initialize(int initialBaselineSetPoint) { + WorkforceStaffing staffing = new WorkforceStaffing(); + staffing.reviseStaffing(WorkforceProfiles.BASELINE_NAME, initialBaselineSetPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + return staffing; + } + + @VisibleForTesting + public static WorkforceStaffing initializeStaffing(int initialBaselineSetPoint, Map initialSetPointsByProfileName) { + WorkforceStaffing staffing = initialize(initialBaselineSetPoint); + initialSetPointsByProfileName.forEach((profileName, setPoint) -> + staffing.reviseStaffing(profileName, setPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS) + ); + return staffing; + } + + public Optional getStaffing(String profileName) { + return Optional.ofNullable(setPointByName.get(profileName)).map(SetPoint::getPoint); + } + + public void reviseStaffing(String profileName, int setPoint, long provenanceEpochMillis) { + Preconditions.checkArgument(setPoint >= 0, "set points must be non-negative: '" + profileName + "' had " + setPoint); + Preconditions.checkArgument(provenanceEpochMillis >= INITIALIZATION_PROVENANCE_EPOCH_MILLIS, + "provenanceEpochMillis must be non-negative: '" + profileName + "' had " + provenanceEpochMillis); + setPointByName.put(profileName, new SetPoint(setPoint, provenanceEpochMillis)); + } + + /** + * NOTE: so long as the same {@link WorkforcePlan} managed both this {@link WorkforceStaffing} and {@link WorkforceProfiles}, + * {@link WorkforceProfiles.UnknownProfileException} should NOT be possible. + */ + public synchronized StaffingDeltas calcDeltas(WorkforceStaffing reference, WorkforceProfiles profiles) { + Map frozenReferenceSetPointsByName = new HashMap<>(); // freeze entries for consistency amidst multiple traversals + reference.setPointByName.entrySet().forEach(entry -> frozenReferenceSetPointsByName.put(entry.getKey(), entry.getValue())); + // not expecting any profile earlier in `reference` to no longer be set... (but defensive coding nonetheless) + List profileDeltas = frozenReferenceSetPointsByName.entrySet().stream() + .filter(entry -> !this.setPointByName.containsKey(entry.getKey())) + .map(entry -> new StaffingDeltas.ProfileDelta(profiles.getOrThrow(entry.getKey()), 0 - entry.getValue().getPoint(), UNKNOWN_PROVENANCE_EPOCH_MILLIS)) + .collect(Collectors.toList()); + profileDeltas.addAll(this.setPointByName.entrySet().stream().map(entry -> { + Optional optEquivReferenceSetPoint = Optional.ofNullable(frozenReferenceSetPointsByName.get(entry.getKey())).map(SetPoint::getPoint); + return new StaffingDeltas.ProfileDelta( + profiles.getOrThrow(entry.getKey()), + entry.getValue().getPoint() - optEquivReferenceSetPoint.orElse(0), + entry.getValue().getProvenanceEpochMillis()); + } + ).filter(delta -> !delta.isUnchanged()) + .collect(Collectors.toList())); + return new StaffingDeltas(profileDeltas); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileDerivationTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileDerivationTest.java new file mode 100644 index 00000000000..d87df961c54 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileDerivationTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Optional; +import java.util.function.Function; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.testng.annotations.Test; +import org.testng.Assert; + + +public class ProfileDerivationTest { + + @Test + public void testFormulateConfigSuccess() throws ProfileDerivation.UnknownBasisException { + String basisProfileName = "testProfile"; + ProfileOverlay overlay = new ProfileOverlay.Adding(Lists.newArrayList(new ProfileOverlay.KVPair("key1", "value1B"))); + ProfileDerivation profileDerivation = new ProfileDerivation(basisProfileName, overlay); + + Function> basisResolver = name -> { + if (basisProfileName.equals(name)) { + Config config = ConfigFactory.parseString("key1=value1A, key2=value2"); + WorkerProfile profile = new WorkerProfile(basisProfileName, config); + return Optional.of(profile); + } + return Optional.empty(); + }; + + Config resultConfig = profileDerivation.formulateConfig(basisResolver); + Assert.assertEquals(resultConfig.getString("key1"), "value1B"); + Assert.assertEquals(resultConfig.getString("key2"), "value2"); + } + + public void testFormulateConfigUnknownBasis() { + String basisProfileName = "foo"; + try { + ProfileDerivation derivation = new ProfileDerivation(basisProfileName, null); + derivation.formulateConfig(ignore -> Optional.empty()); + Assert.fail("Expected UnknownBasisException"); + } catch (ProfileDerivation.UnknownBasisException ube) { + Assert.assertEquals(ube.getName(), basisProfileName); + } + } + + @Test + public void testRenderNameNonBaseline() { + String name = "testProfile"; + ProfileDerivation profileDerivation = new ProfileDerivation(name, null); + String renderedName = profileDerivation.renderName(); + Assert.assertEquals(renderedName, name); + } + + @Test + public void testRenderNameBaseline() { + ProfileDerivation profileDerivation = new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null); + String renderedName = profileDerivation.renderName(); + Assert.assertEquals(renderedName, WorkforceProfiles.BASELINE_NAME_RENDERING); + } +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileOverlayTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileOverlayTest.java new file mode 100644 index 00000000000..125487cde03 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ProfileOverlayTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.testng.annotations.Test; +import org.testng.Assert; + + +public class ProfileOverlayTest { + + @Test + public void testAddingApplyOverlay() { + Config config = ConfigFactory.parseString("key1=value1A, key4=value4"); + ProfileOverlay.Adding adding = new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("key1", "value1B"), new ProfileOverlay.KVPair("key2", "value2"))); + Config updatedConfig = adding.applyOverlay(config); + Assert.assertEquals(updatedConfig.getString("key1"), "value1B"); + Assert.assertEquals(updatedConfig.getString("key2"), "value2"); + Assert.assertEquals(updatedConfig.getString("key4"), "value4"); + } + + @Test + public void testRemovingApplyOverlay() { + Config config = ConfigFactory.parseString("key1=value1, key2=value2"); + ProfileOverlay.Removing removing = new ProfileOverlay.Removing(Lists.newArrayList("key1")); + Config updatedConfig = removing.applyOverlay(config); + Assert.assertFalse(updatedConfig.hasPath("key1")); + Assert.assertEquals(updatedConfig.getString("key2"), "value2"); + } + + @Test + public void testComboApplyOverlay() { + Config config = ConfigFactory.parseString("key1=value1, key2=value2, key3=value3"); + ProfileOverlay.Adding adding = new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("key4", "value4"), new ProfileOverlay.KVPair("key5", "value5"))); + ProfileOverlay.Removing removing = new ProfileOverlay.Removing(Lists.newArrayList("key2", "key4")); + ProfileOverlay.Combo combo = ProfileOverlay.Combo.normalize(adding, removing); + Config updatedConfig = combo.applyOverlay(config); + Assert.assertEquals(updatedConfig.getString("key1"), "value1"); + Assert.assertEquals(updatedConfig.hasPath("key2"), false); + Assert.assertEquals(updatedConfig.getString("key3"), "value3"); + Assert.assertEquals(updatedConfig.hasPath("key4"), false); + Assert.assertEquals(updatedConfig.getString("key5"), "value5"); + + // validate `Combo::normalize` works too: + Assert.assertEquals(combo.getAdding().getAdditionPairs().size(), 1); + Assert.assertEquals(combo.getAdding().getAdditionPairs().get(0), new ProfileOverlay.KVPair("key5", "value5")); + Assert.assertEquals(combo.getRemoving().getRemovalKeys().size(), 2); + Assert.assertEqualsNoOrder(combo.getRemoving().getRemovalKeys().toArray(), removing.getRemovalKeys().toArray()); + } + + @Test + public void testAddingOver() { + ProfileOverlay.Adding adding1 = new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("key1", "value1"), new ProfileOverlay.KVPair("key2", "value2A"))); + ProfileOverlay.Adding adding2 = new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("key2", "value2B"), new ProfileOverlay.KVPair("key3", "value3"))); + ProfileOverlay result = adding1.over(adding2); + Config config = result.applyOverlay(ConfigFactory.empty()); + Assert.assertEquals(config.getString("key1"), "value1"); + Assert.assertEquals(config.getString("key2"), "value2A"); + Assert.assertEquals(config.getString("key3"), "value3"); + } + + @Test + public void testRemovingOver() { + ProfileOverlay.Removing removing1 = new ProfileOverlay.Removing(Lists.newArrayList("key1", "key2")); + ProfileOverlay.Removing removing2 = new ProfileOverlay.Removing(Lists.newArrayList("key2", "key3")); + ProfileOverlay result = removing1.over(removing2); + Assert.assertTrue(result instanceof ProfileOverlay.Removing); + ProfileOverlay.Removing removingResult = (ProfileOverlay.Removing) result; + Assert.assertEqualsNoOrder(removingResult.getRemovalKeys().toArray(), new String[]{"key1", "key2", "key3"}); + + Config config = + result.applyOverlay(ConfigFactory.parseString("key1=value1, key2=value2, key3=value3, key4=value4")); + Assert.assertFalse(config.hasPath("key1")); + Assert.assertFalse(config.hasPath("key2")); + Assert.assertFalse(config.hasPath("key3")); + Assert.assertTrue(config.hasPath("key4")); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParserTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParserTest.java new file mode 100644 index 00000000000..282d7a52987 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/ScalingDirectiveParserTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Optional; +import com.google.common.collect.Lists; + +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import org.testng.Assert; + + +public class ScalingDirectiveParserTest { + + private final ScalingDirectiveParser parser = new ScalingDirectiveParser(); + + @Test + public void parseSimpleDirective() { + ScalingDirective sd = parser.parse("1728435970.my_profile=24"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728435970L); + Assert.assertEquals(sd.getProfileName(), "my_profile"); + Assert.assertEquals(sd.getSetPoint(), 24); + Assert.assertFalse(sd.getOptDerivedFrom().isPresent()); + } + + @Test + public void parseUnnamedBaselineProfile() { + ScalingDirective sd = parser.parse("1728436821.=12"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436821L); + Assert.assertEquals(sd.getProfileName(), WorkforceProfiles.BASELINE_NAME); + Assert.assertEquals(sd.getSetPoint(), 12); + Assert.assertFalse(sd.getOptDerivedFrom().isPresent()); + } + + @Test + public void parseBaselineProfile() { + ScalingDirective sd = parser.parse("1728436828.baseline()=6"); + Assert.assertEquals(sd, new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 1728436828L, Optional.empty())); + } + + @Test + public void parseAddingOverlayWithCommaSep() { + ScalingDirective sd = parser.parse("1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728439210L); + Assert.assertEquals(sd.getProfileName(), "new_profile"); + Assert.assertEquals(sd.getSetPoint(), 16); + Assert.assertTrue(sd.getOptDerivedFrom().isPresent()); + ProfileDerivation derivation = sd.getOptDerivedFrom().get(); + Assert.assertEquals(derivation.getBasisProfileName(), "bar"); + Assert.assertEquals(derivation.getOverlay(), new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("a.b.c", "7"), new ProfileOverlay.KVPair("l.m", "sixteen")))); + } + + @Test + public void parseAddingOverlayWithSemicolonSep() { + ScalingDirective sd = parser.parse("1728439223.new_profile=32;baz+( a.b.c=7 ; l.m.n.o=sixteen )"); + Assert.assertEquals(sd, new ScalingDirective("new_profile", 32, 1728439223L, "baz", new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("a.b.c", "7"), new ProfileOverlay.KVPair("l.m.n.o", "sixteen"))))); + } + + @Test + public void parseAddingOverlayWithCommaSepUrlEncoded() { + ScalingDirective sd = parser.parse("1728460832.new_profile=16,baa+(a.b.c=7,l.m=sixteen%2C%20again)"); + Assert.assertEquals(sd, new ScalingDirective("new_profile", 16, 1728460832L, "baa", new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("a.b.c", "7"), new ProfileOverlay.KVPair("l.m", "sixteen, again"))))); + } + + @Test + public void parseRemovingOverlayWithCommaSep() { + ScalingDirective sd = parser.parse("1728436436.other_profile=9,my_profile-( x , y.z )"); + Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436436L); + Assert.assertEquals(sd.getProfileName(), "other_profile"); + Assert.assertEquals(sd.getSetPoint(), 9); + Assert.assertTrue(sd.getOptDerivedFrom().isPresent()); + ProfileDerivation derivation = sd.getOptDerivedFrom().get(); + Assert.assertEquals(derivation.getBasisProfileName(), "my_profile"); + Assert.assertEquals(derivation.getOverlay(), new ProfileOverlay.Removing(Lists.newArrayList("x", "y.z"))); + } + + @Test + public void parseRemovingOverlayWithSemicolonSep() { + ScalingDirective sd = parser.parse("1728436499.other_profile=9;my_profile-(x.y;z.z)"); + Assert.assertEquals(sd, new ScalingDirective("other_profile", 9, 1728436499L, "my_profile", + new ProfileOverlay.Removing(Lists.newArrayList("x.y", "z.z")))); + } + + @Test + public void parseAddingOverlayWithWhitespace() { + ScalingDirective sd = parser.parse(" 1728998877 . another = 999 ; wow + ( t.r = jump%20 ; cb.az = foo%20#%20111 ) "); + Assert.assertEquals(sd, new ScalingDirective("another", 999, 1728998877L, "wow", new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("t.r", "jump "), + new ProfileOverlay.KVPair("cb.az", "foo # 111"))))); + } + + @Test + public void parseRemovingOverlayWithWhitespace() { + ScalingDirective sd = parser.parse(" 1728334455 . also = 77 , really - ( t.r , cb.az ) "); + Assert.assertEquals(sd, new ScalingDirective("also", 77, 1728334455L, "really", + new ProfileOverlay.Removing(Lists.newArrayList("t.r", "cb.az")))); + } + + @Test + public void parseAddingOverlayWithUnnamedBaselineProfile() { + ScalingDirective sd = parser.parse("1728441200.plus_profile=16,+(q.r.s=four,l.m=16)"); + Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16, 1728441200L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("q.r.s", "four"), new ProfileOverlay.KVPair("l.m", "16"))))); + } + + @Test + public void parseAddingOverlayWithBaselineProfile() { + ScalingDirective sd = parser.parse("1728443640.plus_profile=16,baseline()+(q.r=five,l.m=12)"); + Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16, 1728443640L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + Lists.newArrayList(new ProfileOverlay.KVPair("q.r", "five"), new ProfileOverlay.KVPair("l.m", "12"))))); + } + + @Test + public void parseRemovingOverlayWithUnnamedBaselineProfile() { + ScalingDirective sd = parser.parse("1728448521.extra_profile=0,-(a.b, c.d)"); + Assert.assertEquals(sd, new ScalingDirective("extra_profile", 0, 1728448521L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Removing(Lists.newArrayList("a.b", "c.d")))); + } + + @Test + public void parseRemovingOverlayWithBaselineProfile() { + ScalingDirective sd = parser.parse("4.extra_profile=9,baseline()-(a.b, c.d)"); + Assert.assertEquals(sd, new ScalingDirective("extra_profile", 9, 4L, WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Removing(Lists.newArrayList("a.b", "c.d")))); + } + + + @DataProvider(name = "funkyButValidDirectives") + public String[][] validDirectives() { + return new String[][]{ + // null overlay upon unnamed baseline profile: + {"1728435970.my_profile=24,+()"}, + {"1728435970.my_profile=24,-()"}, + {"1728435970.my_profile=24;+()"}, + {"1728435970.my_profile=24;-()"}, + + // null overlay upon named profile: + {"1728435970.my_profile=24,foo+()"}, + {"1728435970.my_profile=24,foo-()"}, + {"1728435970.my_profile=24;foo+()"}, + {"1728435970.my_profile=24;foo-()"}, + + // seemingly separator mismatch, but in fact the NOT-separator is part of the value (e.g. a="7;m=sixteen"): + { "1728439210.new_profile=16,bar+(a=7;m=sixteen)" }, + { "1728439210.new_profile=16;bar+(a=7,m=sixteen)" }, + { "1728439210.new_profile=16,bar+(a=7;)" }, + { "1728439210.new_profile=16;bar+(a=7,)" } + }; + } + + @Test( + expectedExceptions = {}, + dataProvider = "funkyButValidDirectives" + ) + public void parseValidDirectives(String directive) { + Assert.assertNotNull(parser.parse(directive)); + } + + + @DataProvider(name = "invalidDirectives") + public String[][] invalidDirectives() { + return new String[][] { + // invalid values: + { "invalid_timestamp.my_profile=24" }, + { "1728435970.my_profile=invalid_setpoint" }, + { "1728435970.my_profile=-15" }, + + // incomplete/fragments: + { "1728435970.my_profile=24," }, + { "1728435970.my_profile=24;" }, + { "1728435970.my_profile=24,+" }, + { "1728435970.my_profile=24,-" }, + { "1728435970.my_profile=24,foo+" }, + { "1728435970.my_profile=24,foo-" }, + { "1728435970.my_profile=24,foo+a=7" }, + { "1728435970.my_profile=24,foo-x" }, + + // adding: invalid set-point + missing token examples: + { "1728439210.new_profile=-6,bar+(a=7,m=sixteen)" }, + { "1728439210.new_profile=16,bar+(a=7,m=sixteen" }, + { "1728439210.new_profile=16,bar+a=7,m=sixteen)" }, + + // adding: key, instead of key-value pair: + { "1728439210.new_profile=16,bar+(a=7,m)" }, + { "1728439210.new_profile=16,bar+(a,m)" }, + + // adding: superfluous separator or used instead as a terminator: + { "1728439210.new_profile=16,bar+(,)" }, + { "1728439210.new_profile=16;bar+(;)" }, + { "1728439210.new_profile=16,bar+(,,)" }, + { "1728439210.new_profile=16;bar+(;;)" }, + { "1728439210.new_profile=16,bar+(a=7,)" }, + { "1728439210.new_profile=16;bar+(a=7;)" }, + + // removing: invalid set-point + missing token examples: + { "1728436436.other_profile=-9,my_profile-(x)" }, + { "1728436436.other_profile=69,my_profile-(x" }, + { "1728436436.other_profile=69,my_profile-x)" }, + + // removing: key-value pair instead of key: + { "1728436436.other_profile=69,my_profile-(x=y,z)" }, + { "1728436436.other_profile=69,my_profile-(x=y,z=1)" }, + + // removing: superfluous separator or used instead as a terminator: + { "1728436436.other_profile=69,my_profile-(,)" }, + { "1728436436.other_profile=69;my_profile-(;)" }, + { "1728436436.other_profile=69,my_profile-(,,)" }, + { "1728436436.other_profile=69;my_profile-(;;)" }, + { "1728436436.other_profile=69,my_profile-(x,)" }, + { "1728436436.other_profile=69;my_profile-(x;)" }, + + // removing: seemingly separator mismatch, but in fact the NOT-separator is illegal in a key (e.g. "x;y"): + { "1728436436.other_profile=69,my_profile-(x;y)" }, + { "1728436436.other_profile=69;my_profile-(x,y)" }, + { "1728436436.other_profile=69,my_profile-(x;)" }, + { "1728436436.other_profile=69;my_profile-(x,)" } + }; + } + + @Test( + expectedExceptions = ScalingDirectiveParser.MalformedDirectiveException.class, + dataProvider = "invalidDirectives" + ) + public void parseInvalidDirectives(String directive) { + parser.parse(directive); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforcePlanTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforcePlanTest.java new file mode 100644 index 00000000000..838964aa4fc --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforcePlanTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.Assert; + + +public class WorkforcePlanTest { + private Config baselineConfig = ConfigFactory.parseString("key1=value1, key2=value2"); + private final int initialBaselineSetPoint = 10; + private WorkforcePlan plan; + + @BeforeMethod + public void setUp() { + plan = new WorkforcePlan(baselineConfig, initialBaselineSetPoint); + } + + private static ScalingDirective createNewProfileDirective(String profileName, int setPoint, long epochMillis, String basisProfileName) { + return new ScalingDirective(profileName, setPoint, epochMillis, Optional.of( + new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(Lists.newArrayList( + new ProfileOverlay.KVPair("key1", "new_value"), + new ProfileOverlay.KVPair("key4", "value4")))))); + } + + @Test + public void reviseWithValidReSetPoint() throws WorkforcePlan.IllegalRevisionException { + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7,10000L)); + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1,20000L)); + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 20000L); + Assert.assertEquals(plan.getNumProfiles(), 1); + } + + @Test + public void reviseWithValidDerivation() throws WorkforcePlan.IllegalRevisionException { + Assert.assertEquals(plan.getLastRevisionEpochMillis(), WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + Assert.assertEquals(plan.getNumProfiles(), 1); + ScalingDirective directive = createNewProfileDirective("new_profile", 5,10000L, WorkforceProfiles.BASELINE_NAME); + plan.revise(directive); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 10000L); + Assert.assertEquals(plan.getNumProfiles(), 2); + Config expectedConfig = ConfigFactory.parseString("key1=new_value, key2=value2, key4=value4"); + Assert.assertEquals(plan.peepProfile("new_profile").getConfig(), expectedConfig); + } + + @Test + public void reviseWhenNewerIgnoresOutOfOrderDirectives() throws WorkforcePlan.IllegalRevisionException { + AtomicInteger numErrors = new AtomicInteger(0); + Assert.assertEquals(plan.getLastRevisionEpochMillis(), WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS); + Assert.assertEquals(plan.getNumProfiles(), 1); + plan.reviseWhenNewer(Lists.newArrayList( + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2,100L), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3,500L), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4,200L), + createNewProfileDirective("new_profile", 5,400L, WorkforceProfiles.BASELINE_NAME), + // NOTE: the second attempt at derivation is NOT judged a duplicate, as the outdated timestamp of first attempt (above) meant it was ignored! + createNewProfileDirective("new_profile", 6,600L, WorkforceProfiles.BASELINE_NAME), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7,800L), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8,700L) + ), failure -> numErrors.incrementAndGet()); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L); + Assert.assertEquals(plan.getNumProfiles(), 2); + Assert.assertEquals(numErrors.get(), 0); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6), "new_profile"); + } + + @Test + public void reviseWhenNewerSwallowsErrors() throws WorkforcePlan.IllegalRevisionException { + AtomicInteger numErrors = new AtomicInteger(0); + plan.reviseWhenNewer(Lists.newArrayList( + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1,100L), + // (1) error: `UnrecognizedProfile` + new ScalingDirective("unknown_profile", 2,250L), + createNewProfileDirective("new_profile", 3,200L, WorkforceProfiles.BASELINE_NAME), + // (2) error: `Redefinition` + createNewProfileDirective("new_profile", 4,450L, WorkforceProfiles.BASELINE_NAME), + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 5,300L), + // (3) error: `UnknownBasis` + createNewProfileDirective("other_profile", 6,550L, "never_defined"), + new ScalingDirective("new_profile", 7,400L), + // ignored: out-of-order timestamp (not an error... see: `reviseWhenNewerIgnoresOutOfOrderDirectives`) + new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8,350L), + createNewProfileDirective("another", 9,500L, "new_profile") + ), failure -> numErrors.incrementAndGet()); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L); + Assert.assertEquals(plan.getNumProfiles(), 3); + Assert.assertEquals(numErrors.get(), 3); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7), "new_profile"); + Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9), "another"); + } + + @Test + public void calcStaffingDeltas() throws WorkforcePlan.IllegalRevisionException { + plan.revise(createNewProfileDirective("new_profile", 3,10L, WorkforceProfiles.BASELINE_NAME)); + plan.revise(createNewProfileDirective("other_profile", 8,20L, "new_profile")); + plan.revise(createNewProfileDirective("another", 7,30L, "new_profile")); + plan.revise(new ScalingDirective("new_profile", 5,40L)); + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6,50L)); + plan.revise(new ScalingDirective("another", 4,60L)); + + Assert.assertEquals(plan.getLastRevisionEpochMillis(), 60L); + Assert.assertEquals(plan.getNumProfiles(), 4); + Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(6), WorkforceProfiles.BASELINE_NAME_RENDERING); + Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5), "new_profile"); + Assert.assertEquals(plan.peepStaffing("another"), Optional.of(4), "another"); + Assert.assertEquals(plan.peepStaffing("other_profile"), Optional.of(8), "other_profile"); + + WorkforceStaffing referenceStaffing = WorkforceStaffing.initializeStaffing(100, ImmutableMap.of( + WorkforceProfiles.BASELINE_NAME, 100, + "new_profile", 1, + // not initialized - "another" + "other_profile", 8 + )); + StaffingDeltas deltas = plan.calcStaffingDeltas(referenceStaffing); + Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3); + deltas.getPerProfileDeltas().forEach(delta -> { + switch (delta.getProfile().getName()) { + case WorkforceProfiles.BASELINE_NAME: + Assert.assertEquals(delta.getDelta(), -94); + Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 50L); + break; + case "new_profile": + Assert.assertEquals(delta.getDelta(), 4); + Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 40L); + break; + case "another": + Assert.assertEquals(delta.getDelta(), 4); + Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 60L); + break; + case "other_profile": // NOTE: should NOT be present (since delta == 0)! + default: + Assert.fail("Unexpected profile: " + delta.getProfile().getName()); + } + }); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.OutdatedDirective.class) + public void reviseWithOutdatedDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7,30000L)); + plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 12,8000L)); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.UnrecognizedProfile.class) + public void reviseWithUnrecognizedProfileDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(new ScalingDirective("unknown_profile", 7,10000L)); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.Redefinition.class) + public void reviseWithRedefinitionDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(createNewProfileDirective("new_profile", 5,10000L, WorkforceProfiles.BASELINE_NAME)); + plan.revise(createNewProfileDirective("new_profile", 9,20000L, WorkforceProfiles.BASELINE_NAME)); + } + + @Test(expectedExceptions = WorkforcePlan.IllegalRevisionException.UnknownBasis.class) + public void reviseWithUnknownBasisDirective() throws WorkforcePlan.IllegalRevisionException { + plan.revise(createNewProfileDirective("new_profile", 5,10000L, "never_defined")); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffingTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffingTest.java new file mode 100644 index 00000000000..e34673195c3 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynscale/WorkforceStaffingTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynscale; + +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.testng.Assert; + +import static org.mockito.ArgumentMatchers.anyString; + + +public class WorkforceStaffingTest { + + @Mock private WorkforceProfiles profiles; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + Mockito.when(profiles.getOrThrow(anyString())).thenAnswer(invocation -> + new WorkerProfile(invocation.getArgument(0), null)); + } + + @Test + public void initializeShouldSetInitialBaselineSetPoint() { + int initialBaselineSetPoint = 5; + WorkforceStaffing staffing = WorkforceStaffing.initialize(initialBaselineSetPoint); + Assert.assertEquals(staffing.getStaffing(WorkforceProfiles.BASELINE_NAME), Optional.of(initialBaselineSetPoint)); + } + + @Test + public void reviseStaffingShouldUpdateSetPoint() { + String profileName = "testProfile"; + WorkforceStaffing staffing = WorkforceStaffing.initialize(0); + staffing.reviseStaffing(profileName, 10, 1000L); + Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(10)); + + staffing.reviseStaffing(profileName, 17, 2000L); + Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(17)); + } + + @Test + public void calcDeltasShouldReturnCorrectDeltas() { + String subsequentlyUnreferencedProfileName = "unreferenced"; + String newlyAddedProfileName = "added"; + String heldSteadyProfileName = "steady"; + WorkforceStaffing currentStaffing = WorkforceStaffing.initialize(5); + currentStaffing.reviseStaffing(subsequentlyUnreferencedProfileName, 3, 1000L); + currentStaffing.reviseStaffing(heldSteadyProfileName, 9, 2000L); + + WorkforceStaffing improvedStaffing = WorkforceStaffing.initialize(7); + improvedStaffing.reviseStaffing(newlyAddedProfileName, 10, 3000L); + improvedStaffing.reviseStaffing(heldSteadyProfileName, 9, 4000L); + + StaffingDeltas deltas = improvedStaffing.calcDeltas(currentStaffing, profiles); + Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3); + // validate every delta + Map deltaByProfileName = deltas.getPerProfileDeltas().stream() + .collect(Collectors.toMap(delta -> delta.getProfile().getName(), StaffingDeltas.ProfileDelta::getDelta)); + ImmutableMap expectedDeltaByProfileName = ImmutableMap.of( + WorkforceProfiles.BASELINE_NAME, 2, + subsequentlyUnreferencedProfileName, -3, + // NOTE: NOT present (when delta == 0)! + // heldSteadyProfileName, 0, + newlyAddedProfileName, 10 + ); + Assert.assertEqualsNoOrder(deltaByProfileName.keySet().toArray(), expectedDeltaByProfileName.keySet().toArray()); + Assert.assertEquals(deltaByProfileName.get(WorkforceProfiles.BASELINE_NAME), expectedDeltaByProfileName.get(WorkforceProfiles.BASELINE_NAME)); + Assert.assertEquals(deltaByProfileName.get(subsequentlyUnreferencedProfileName), expectedDeltaByProfileName.get(subsequentlyUnreferencedProfileName)); + Assert.assertEquals(deltaByProfileName.get(newlyAddedProfileName), expectedDeltaByProfileName.get(newlyAddedProfileName)); + } +}