Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit with high-level design for refactoring DagManager #3756

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

/**
* Responsible for defining the behavior of {@link DagTask} handling scenarios for launch, resume, kill, job start
* and flow completion deadlines
*
*/
public interface DagManagement {

void launchFlow();
void resumeFlow();
void killFlow();
void enforceFlowCompletionDeadline();
void enforceJobStartDeadline();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;


/**
* Responsible to performing the actual work for a given {@link DagTask}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"performing work and notifying other components/modules of it's imminent or completed actions"

* It processes the {@link DagTask} by first initializing its state, performing actions
* like updating {@link DagStateStore} and finally submiting an event to the executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's differentiate between contacting the executor to carry out an action and submitting status events

* @param <S> current state of the dag node
* @param <R> result after processing the dag node
*/
public abstract class DagProc<S, R> {
abstract protected S initialize() throws MaybeRetryableException;
abstract protected R act(S state) throws MaybeRetryableException;
abstract protected void sendNotification(R result) throws MaybeRetryableException;

final void process() {
throw new UnsupportedOperationException(" Process unsupported");
Comment on lines +31 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate a little on how S, R, and these functions are used with pseudocode in process? Who is the user of DagProc? Is it sufficient just to call process with the DagTask as input or do they need to initialize, then act, etc...? Does process do all of the above instead? Does it handle retries? Can you add some java docs to these methods.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

/**
* Factory for creating {@link DagProc} based on the visitor type {@link DagTask}.
*/
public interface DagProcFactory extends DagTaskVisitor<DagProc> {
DagProc meet(LaunchDagTask ldt);
DagProc meet(KillDagTask kdt);
DagProc meet(ResumeDagTask rdt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the SLA type actions as well from above

  • enforceFlowCompletionDeadline
  • void enforceJobStartDeadline

DagProc createFor(DagTask t);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

/**
* Defines an individual task or job in a Dag.
* It carries the state information required by {@link DagProc} to for its processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra "to"

* Upon completion of the {@link DagProc#process()} it will mark the lease
* acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} as complete
* @param <T>
*/
abstract class DagTask<T> {

abstract void initialize();
abstract void conclude();
abstract T host(DagTaskVisitor<T> visitor);
Comment on lines +29 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add short one line java docs here

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import java.util.Iterator;


/**
* Holds a stream of {@link DagTask} that needs to be processed by the {@link DagManager}.
* It provides an implementation for {@link DagManagement} defines the rules for a flow and job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"(and) defines the rules ...." missing the word and. What are said rules btw? Can you elaborate?

Can you also explain if each host's DagTaskStream will be unique or all tasks will be common between hosts? That will imply whether or not we handle lease arbitration under the hood in this class.

* Implements {@link Iterator} to provide the next {@link DagTask} if available to {@link DagManager}
*/
public class DagTaskStream implements Iterator<DagTask>, DagManagement {
@Override
public boolean hasNext() {
return false;
}

@Override
public DagTask next() {
return null;
}

@Override
public void launchFlow() {

}

@Override
public void resumeFlow() {

}

@Override
public void killFlow() {

}

@Override
public void enforceFlowCompletionDeadline() {

}

@Override
public void enforceJobStartDeadline() {
Comment on lines +40 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who calls these functions? Why are there no parameters? I initially thought DagManagement from above will be the one to obtain these objects one by one from DagTaskStream but perhaps DagManager is the user of DagTaskStream and utilizes the DagManagement interface? It would be good to include in these classes how they fit into the other classes (who is a caller of what)? A diagram might actually be super helpful at clarifying for any reader who interacts with whom in what capacity.


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

/**
* Interface defining {@link DagTask} based on the type of visitor.
* @param <T>
*/
public interface DagTaskVisitor<T> {
T meet(LaunchDagTask launchDagTask);
T meet(KillDagTask killDagTask);
T meet(ResumeDagTask resumeDagTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing remaining SLA tasks

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;


/**
* An implmentation of {@link DagProc} for killing {@link DagTask}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only for API call to kill or from any other pt? Good to clarify in java doc

*/
public final class KillDagProc extends DagProc{
@Override
protected Object initialize() throws MaybeRetryableException {
return null;
}

@Override
protected Object act(Object state) throws MaybeRetryableException {
return null;
}

@Override
protected void sendNotification(Object result) throws MaybeRetryableException {

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;
/**
* A {@link DagTask} responsible to handle kill tasks.
*/
public class KillDagTask extends DagTask {
@Override
void initialize() {

}

@Override
void conclude() {

}

@Override
Object host(DagTaskVisitor visitor) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;


/**
* An implmentation of {@link DagProc} for launching {@link DagTask}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newly added launches or every time task needs to be launched on executor?

*/
public final class LaunchDagProc extends DagProc{
@Override
protected Object initialize() throws MaybeRetryableException {
return null;
}

@Override
protected Object act(Object state) throws MaybeRetryableException {
return null;
}

@Override
protected void sendNotification(Object result) throws MaybeRetryableException {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;
/**
* A {@link DagTask} responsible to handle launch tasks.
*/
public class LaunchDagTask extends DagTask {
@Override
void initialize() {

}

@Override
void conclude() {

}

@Override
Object host(DagTaskVisitor visitor) {
return null;
}
}
Loading