Skip to content

Commit

Permalink
Define Gobblin-on-TemporalWorkforcePlan and dynamic `ScalingDirecti…
Browse files Browse the repository at this point in the history
…ve`s with parser
  • Loading branch information
phet committed Oct 18, 2024
1 parent 2afab69 commit dd8717f
Show file tree
Hide file tree
Showing 15 changed files with 1,478 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public synchronized void stop() {

this.stopStatus.setStopInprogress(true);

log.info("Stopping the Gobblin Cluster Manager");
log.info("Stopping the Gobblin Temporal Cluster Manager");

stopAppLauncherAndServices();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.dynscale;

import java.util.Optional;
import java.util.function.Function;

import com.typesafe.config.Config;
import lombok.Data;
import lombok.Getter;


@Data
public class ProfileDerivation {
public static class UnknownBasisException extends Exception {
@Getter
private final String name;
public UnknownBasisException(String basisName) {
super("named '" + WorkforceProfiles.renderName(basisName) + "'");
this.name = basisName;
}
}

private final String basisProfileName;
private final ProfileOverlay overlay;

public Config formulateConfig(Function<String, Optional<WorkerProfile>> basisResolver) throws UnknownBasisException {
Optional<WorkerProfile> optProfile = basisResolver.apply(basisProfileName);
if (!optProfile.isPresent()) {
throw new UnknownBasisException(basisProfileName);
} else {
return overlay.applyOverlay(optProfile.get().getConfig());
}
}

public String renderName() {
return WorkforceProfiles.renderName(this.basisProfileName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.dynscale;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import lombok.Data;


public interface ProfileOverlay {

Config applyOverlay(Config config);

ProfileOverlay over(ProfileOverlay other);


@Data
class KVPair {
private final String key;
private final String value;
}


@Data
class Adding implements ProfileOverlay {
private final List<KVPair> additionPairs;

@Override
public Config applyOverlay(Config config) {
return additionPairs.stream().sequential().reduce(config,
(currConfig, additionPair) ->
currConfig.withValue(additionPair.getKey(), ConfigValueFactory.fromAnyRef(additionPair.getValue())),
(configA, configB) ->
configB.withFallback(configA)
);
}

@Override
public ProfileOverlay over(ProfileOverlay other) {
if (other instanceof Adding) {
Map<String, String> base = ((Adding) other).getAdditionPairs().stream().collect(Collectors.toMap(KVPair::getKey, KVPair::getValue));
additionPairs.stream().forEach(additionPair ->
base.put(additionPair.getKey(), additionPair.getValue()));
return new Adding(base.entrySet().stream().map(entry -> new KVPair(entry.getKey(), entry.getValue())).collect(Collectors.toList()));
} else if (other instanceof Removing) {
return Combo.normalize(this, (Removing) other);
} else if (other instanceof Combo) {
Combo otherCombo = (Combo) other;
return Combo.normalize((Adding) this.over(otherCombo.getAdding()), otherCombo.getRemoving());
} else {
throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other);
}
}
}


@Data
class Removing implements ProfileOverlay {
private final List<String> removalKeys;

@Override
public Config applyOverlay(Config config) {
return removalKeys.stream().sequential().reduce(config,
(currConfig, removalKey) ->
currConfig.withoutPath(removalKey),
(configA, configB) ->
configA.withFallback(configB)
);
}

@Override
public ProfileOverlay over(ProfileOverlay other) {
if (other instanceof Adding) {
return Combo.normalize((Adding) other, this);
} else if (other instanceof Removing) {
Set<String> otherKeys = new HashSet<String>(((Removing) other).getRemovalKeys());
otherKeys.addAll(removalKeys);
return new Removing(new ArrayList<>(otherKeys));
} else if (other instanceof Combo) {
Combo otherCombo = (Combo) other;
return Combo.normalize(otherCombo.getAdding(), (Removing) this.over(otherCombo.getRemoving()));
} else {
throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other);
}
}
}


@Data
class Combo implements ProfileOverlay {
private final Adding adding;
private final Removing removing;

// merely restrict access modifier from `public` to `protected`, as not meant to be instantiated outside this enclosing interface
private Combo(Adding adding, Removing removing) {
this.adding = adding;
this.removing = removing;
}

protected static Combo normalize(Adding toAdd, Removing toRemove) {
// pre-remove any in `toAdd` that are also in `toRemove`... yet still maintain them in `toRemove`, in case the eventual `Config` "basis" also has any
Set<String> removeKeysLookup = toRemove.getRemovalKeys().stream().collect(Collectors.toSet());
List<KVPair> unmatchedAdditionPairs = toAdd.getAdditionPairs().stream().sequential().filter(additionPair ->
!removeKeysLookup.contains(additionPair.getKey())
).collect(Collectors.toList());
return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new ArrayList<>(removeKeysLookup)));
}

@Override
public Config applyOverlay(Config config) {
return adding.applyOverlay(removing.applyOverlay(config));
}

@Override
public ProfileOverlay over(ProfileOverlay other) {
if (other instanceof Adding) {
return Combo.normalize((Adding) this.adding.over((Adding) other), this.removing);
} else if (other instanceof Removing) {
return Combo.normalize(this.adding, (Removing) this.removing.over((Removing) other));
} else if (other instanceof Combo) {
Combo otherCombo = (Combo) other;
return Combo.normalize((Adding) this.adding.over(otherCombo.getAdding()), (Removing) this.removing.over(otherCombo.getRemoving()));
} else {
throw new IllegalArgumentException("unknown derived class of type '" + other.getClass().getName() + "': " + other);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.dynscale;

import java.util.Optional;
import lombok.Data;
import lombok.RequiredArgsConstructor;


@Data
@RequiredArgsConstructor
public class ScalingDirective {
private final String profileName;
private final int setPoint;
private final long timestampEpochMillis;
private final Optional<ProfileDerivation> optDerivedFrom;

public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis) {
this(profileName, setPoint, timestampEpochMillis, Optional.empty());
}

public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis, String basisProfileName, ProfileOverlay overlay) {
this(profileName, setPoint, timestampEpochMillis, Optional.of(new ProfileDerivation(basisProfileName, overlay)));
}

public String renderName() {
return WorkforceProfiles.renderName(this.profileName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.dynscale;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import lombok.extern.slf4j.Slf4j;


/**
* parse {@link ScalingDirective}s with syntax of the form:
* TIMESTAMP '.' WORKER_NAME '=' SETPOINT [ ( ',' | ';' ) WORKER_NAME ( '+(' KV_PAIR (*SEP* KV_PAIR)* ')' | '-( KEY (*SEP* KEY* ')' ) ]
* where *SEP* is either ',' or ';' (whichever did follow SETPOINT)
* the first form with '+' is an "adding" (upsert) overlay, the second form with '-' is a removing overlay
* allows for URL-encoded values in the KV_PAIRs and whitespace around any token
*
1728435970.my_profile=24
1728436821.=24
1728436828.baseline()=24
1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)
1728439223.new_profile=16;bar+(a.b.c=7;l.m=sixteen)
1728460832.new_profile=16,bar+(a.b.c=7,l.m=sixteen%2C%20again)
1728436436.other_profile=9,my_profile-(x,y.z)
1728436499.other_profile=9;my_profile-(x;y.z)
1728441200.plus_profile=16,+(a.b.c=7,l.m=sixteen)
1728443640.plus_profile=16,baseline()+(a.b.c=7,l.m=sixteen)
1728448521.extra_profile=9,-(a.b, c.d)
1728449978.extra_profile=9,baseline()-(a.b, c.d)
*/
@Slf4j
public class ScalingDirectiveParser {
public static class MalformedDirectiveException extends IllegalArgumentException {
private final String directive;
public MalformedDirectiveException(String directive, String desc) {
super("error: " + desc + ", in ==>" + directive + "<==");
this.directive = directive;
}
}

private static final String DIRECTIVE_REGEX = "(?x) \\s* (\\d+) \\s* \\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) "
+ "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\( \\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s*";

private static final String KEY_REGEX = "(\\w+(?:\\.\\w+)*)";
private static final String KEY_VALUE_REGEX = KEY_REGEX + "\\s*=\\s*(.*)";
private static final Pattern directivePattern = Pattern.compile(DIRECTIVE_REGEX);
private static final Pattern keyPattern = Pattern.compile(KEY_REGEX);
private static final Pattern keyValuePattern = Pattern.compile(KEY_VALUE_REGEX);

private static final String BASELINE_ID = "baseline()";

public ScalingDirective parse(String directive) {
Matcher parsed = directivePattern.matcher(directive);
if (parsed.matches()) {
long timestamp = Long.parseLong(parsed.group(1));
String profileId = parsed.group(2);
String profileName = identifyProfileName(profileId);
int setpoint = Integer.parseInt(parsed.group(3));
Optional<ProfileDerivation> optDerivedFrom = Optional.empty();
String overlayIntroSep = parsed.group(4);
if (overlayIntroSep != null) {
String basisProfileName = identifyProfileName(parsed.group(5));
if (parsed.group(6) != null) { // '+' == adding
List<ProfileOverlay.KVPair> additions = new ArrayList<>();
String additionsStr = parsed.group(7);
if (!additionsStr.equals("")) {
for (String addStr : additionsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (negative limit to disallow trailing empty strings)
Matcher keyValueParsed = keyValuePattern.matcher(addStr);
if (keyValueParsed.matches()) {
additions.add(new ProfileOverlay.KVPair(keyValueParsed.group(1), urlDecode(directive, keyValueParsed.group(2))));
} else {
throw new MalformedDirectiveException(directive, "unable to parse key-value pair - {{" + addStr + "}}");
}
}
}
optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(additions)));
} else { // '-' == removing
List<String> removalKeys = new ArrayList<>();
String removalsStr = parsed.group(9);
if (!removalsStr.equals("")) {
for (String removeStr : removalsStr.split("\\s*" + overlayIntroSep + "\\s*", -1)) { // (negative limit to disallow trailing empty strings)
Matcher keyParsed = keyPattern.matcher(removeStr);
if (keyParsed.matches()) {
removalKeys.add(keyParsed.group(1));
} else {
throw new MalformedDirectiveException(directive, "unable to parse key - {{" + removeStr + "}}");
}
}
}
optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, new ProfileOverlay.Removing(removalKeys)));
}
}
return new ScalingDirective(profileName, setpoint, timestamp, optDerivedFrom);
} else {
throw new MalformedDirectiveException(directive, "invalid syntax");
}
}

private static String identifyProfileName(String profileId) {
return profileId.equals(BASELINE_ID) ? WorkforceProfiles.BASELINE_NAME : profileId;
}

private static String urlDecode(String directive, String s) {
try {
return java.net.URLDecoder.decode(s, "UTF-8");
} catch (java.io.UnsupportedEncodingException e) {
throw new MalformedDirectiveException(directive, "unable to URL-decode - {{" + s + "}}");
}
}
}
Loading

0 comments on commit dd8717f

Please sign in to comment.