Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add feature to control systemic property of executeCode #6519

Merged
merged 15 commits into from
Jan 8, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,23 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException;
default Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCode(code, false);
}

/**
* Execute the given {@code code} against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param code the code
* @param systemic if the code should be executed systemically.
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code, boolean systemic)
throws InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code path path's} code against the script session.
Expand All @@ -51,23 +67,64 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path) throws IOException, InterruptedException, ExecutionException, TimeoutException;
default Changes executeScript(Path path)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScript(path, false);
}

/**
* Execute the given {@code path path's} code against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param path the path to the code
* @param systemic if the code should be executed systemically.
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path, boolean systemic)
throws IOException, InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code);
default CompletableFuture<Changes> executeCodeFuture(String code) {
return executeCodeFuture(code, false);
}

/**
* Execute the given {@code code} against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param code the code
* @param systemic if the code should be executed systemically.
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code, boolean systemic);

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes future
*/
CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException;
default CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
return executeScriptFuture(path, false);
}

/**
* Execute the given {@code path path's} code against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param path the path to the code
* @param systemic if the code should be executed systemically.
* @return the changes future
*/
CompletableFuture<Changes> executeScriptFuture(Path path, boolean systemic) throws IOException;

/**
* Closes {@code this} console session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,20 +418,22 @@ public Ticket ticket() {
}

@Override
public Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
public Changes executeCode(String code, boolean systemic)
throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code, systemic).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public Changes executeScript(Path path)
public Changes executeScript(Path path, boolean systemic)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScriptFuture(path).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
return executeScriptFuture(path, systemic).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public CompletableFuture<Changes> executeCodeFuture(String code) {
public CompletableFuture<Changes> executeCodeFuture(String code, boolean systemic) {
final ExecuteCommandRequest request =
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).build();
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).setSystemic(systemic)
.build();
return UnaryGrpcFuture.of(request, channel().console()::executeCommand,
response -> {
Changes.Builder builder = Changes.builder().changes(new FieldChanges(response.getChanges()));
Expand All @@ -443,9 +445,9 @@ public CompletableFuture<Changes> executeCodeFuture(String code) {
}

@Override
public CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
public CompletableFuture<Changes> executeScriptFuture(Path path, boolean systemic) throws IOException {
final String code = String.join(System.lineSeparator(), Files.readAllLines(path, StandardCharsets.UTF_8));
return executeCodeFuture(code);
return executeCodeFuture(code, systemic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,17 @@ message LogSubscriptionData {
reserved 4;//if we can scope logs to a script session
// Ticket console_id = 4;
}

message ExecuteCommandRequest {
io.deephaven.proto.backplane.grpc.Ticket console_id = 1;
reserved 2;//if script sessions get a ticket, we will use this reserved tag
string code = 3;

// If set to `true` the command will be executed systemically. Failures in systemic code
// are treated as important failures and cause the server to shut down
optional bool systemic=4;
abaranec marked this conversation as resolved.
Show resolved Hide resolved
}

message ExecuteCommandResponse {
string error_message = 1;
io.deephaven.proto.backplane.grpc.FieldsChangeUpdate changes = 2;
Expand Down
7 changes: 4 additions & 3 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
from typing import Any
from typing import Any,
abaranec marked this conversation as resolved.
Show resolved Hide resolved

from pydeephaven.dherror import DHError
from deephaven_core.proto import console_pb2_grpc, console_pb2
Expand Down Expand Up @@ -32,7 +32,7 @@ def start_console(self):
except Exception as e:
raise DHError("failed to start a console.") from e

def run_script(self, server_script: str) -> Any:
def run_script(self, server_script: str, systemic: bool = False) -> Any:
"""Runs a Python script in the console."""
abaranec marked this conversation as resolved.
Show resolved Hide resolved
self.start_console()

Expand All @@ -41,7 +41,8 @@ def run_script(self, server_script: str) -> Any:
self._grpc_console_stub.ExecuteCommand,
console_pb2.ExecuteCommandRequest(
console_id=self.console_id,
code=server_script))
code=server_script,
systemic=systemic))
return response
except Exception as e:
raise DHError("failed to execute a command in the console.") from e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.deephaven.engine.table.impl.util.RuntimeMemory.Sample;
import io.deephaven.engine.util.DelegatingScriptSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.integrations.python.PythonDeephavenSession;
import io.deephaven.internal.log.LoggerFactory;
Expand Down Expand Up @@ -189,7 +190,13 @@ public void executeCommand(
response))
.submit(() -> {
final ScriptSession scriptSession = exportedConsole.get();
final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());

final ScriptSession.Changes changes = request.getSystemic()
abaranec marked this conversation as resolved.
Show resolved Hide resolved
? SystemicObjectTracker.executeSystemically(true,
() -> scriptSession.evaluateScript(request.getCode()))
: scriptSession.evaluateScript(request.getCode());


abaranec marked this conversation as resolved.
Show resolved Hide resolved
final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
Expand Down
Loading