Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add feature to control systemic property of executeCode #6519

Merged
merged 15 commits into from
Jan 8, 2025
2,510 changes: 1,293 additions & 1,217 deletions go/internal/proto/console/console.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,23 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException;
default Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCode(code, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code code} against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param code the code
* @param systemic if the code should be executed systemically.
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeCode(String code, ExecuteCodeOptions options)
throws InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code path path's} code against the script session.
Expand All @@ -51,23 +67,64 @@ public interface ConsoleSession extends Closeable {
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path) throws IOException, InterruptedException, ExecutionException, TimeoutException;
default Changes executeScript(Path path)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScript(path, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code path path's} code against the script session. The code may be executed systemically,
* meaning that failures of the executed script constitute failures of the application and cause shut down.
*
* @param path the path to the code
* @param systemic if the code should be executed systemically.
* @return the changes
* @throws InterruptedException if the current thread is interrupted
* @throws ExecutionException if the request has an exception
* @throws TimeoutException if the request times out
*/
Changes executeScript(Path path, ExecuteCodeOptions options)
throws IOException, InterruptedException, ExecutionException, TimeoutException;

/**
* Execute the given {@code code} against the script session.
*
* @param code the code
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code);
default CompletableFuture<Changes> executeCodeFuture(String code) {
return executeCodeFuture(code, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code code} against the script session. The code may be executed systemically, meaning that
* failures of the executed script constitute failures of the application and cause shut down.
*
* @param code the code
* @param systemic if the code should be executed systemically.
* @return the changes future
*/
CompletableFuture<Changes> executeCodeFuture(String code, ExecuteCodeOptions options);

/**
* Execute the given {@code path path's} code against the script session.
*
* @param path the path to the code
* @return the changes future
*/
CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException;
default CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
return executeScriptFuture(path, ExecuteCodeOptions.DEFAULT);
}

/**
* Execute the given {@code path path's} code against the script session. The code may be executed systemically,
* meaning that failures of the executed script constitute failures of the application and cause shut down.
*
* @param path the path to the code
* @param systemic if the code should be executed systemically.
* @return the changes future
*/
CompletableFuture<Changes> executeScriptFuture(Path path, ExecuteCodeOptions options) throws IOException;

/**
* Closes {@code this} console session.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.client.impl;

import io.deephaven.annotations.BuildableStyle;
import org.immutables.value.Value;

import javax.annotation.Nullable;

@Value.Immutable
@BuildableStyle
public interface ExecuteCodeOptions {
ExecuteCodeOptions DEFAULT = ExecuteCodeOptions.builder().build();

@Value.Default
@Nullable
default Boolean executeSystemic() {
return null;
}

static Builder builder() {
return ImmutableExecuteCodeOptions.builder();
}

interface Builder {
ExecuteCodeOptions.Builder executeSystemic(Boolean executeSystemic);

ExecuteCodeOptions build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,21 +418,30 @@ public Ticket ticket() {
}

@Override
public Changes executeCode(String code) throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
public Changes executeCode(String code, ExecuteCodeOptions options)
throws InterruptedException, ExecutionException, TimeoutException {
return executeCodeFuture(code, options).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public Changes executeScript(Path path)
public Changes executeScript(Path path, ExecuteCodeOptions options)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
return executeScriptFuture(path).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
return executeScriptFuture(path, options).get(config.executeTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public CompletableFuture<Changes> executeCodeFuture(String code) {
final ExecuteCommandRequest request =
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code).build();
return UnaryGrpcFuture.of(request, channel().console()::executeCommand,
public CompletableFuture<Changes> executeCodeFuture(String code, ExecuteCodeOptions options) {
final ExecuteCommandRequest.Builder requestBuilder =
ExecuteCommandRequest.newBuilder().setConsoleId(ticket()).setCode(code);

final Boolean systemicOption = options.executeSystemic();
if (systemicOption != null) {
requestBuilder.setSystemic(systemicOption
? ExecuteCommandRequest.SystemicType.EXECUTE_SYSTEMIC
: ExecuteCommandRequest.SystemicType.EXECUTE_NOT_SYSTEMIC);
}

return UnaryGrpcFuture.of(requestBuilder.build(), channel().console()::executeCommand,
response -> {
Changes.Builder builder = Changes.builder().changes(new FieldChanges(response.getChanges()));
if (!response.getErrorMessage().isEmpty()) {
Expand All @@ -443,9 +452,10 @@ public CompletableFuture<Changes> executeCodeFuture(String code) {
}

@Override
public CompletableFuture<Changes> executeScriptFuture(Path path) throws IOException {
public CompletableFuture<Changes> executeScriptFuture(Path path, ExecuteCodeOptions options)
throws IOException {
final String code = String.join(System.lineSeparator(), Files.readAllLines(path, StandardCharsets.UTF_8));
return executeCodeFuture(code);
return executeCodeFuture(code, options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,24 @@ message LogSubscriptionData {
reserved 4;//if we can scope logs to a script session
// Ticket console_id = 4;
}

message ExecuteCommandRequest {
enum SystemicType {
NOT_SET_SYSTEMIC = 0;
EXECUTE_NOT_SYSTEMIC = 1;
EXECUTE_SYSTEMIC = 2;
}

io.deephaven.proto.backplane.grpc.Ticket console_id = 1;
reserved 2;//if script sessions get a ticket, we will use this reserved tag
string code = 3;

// If set to `true` the command will be executed systemically. Failures in systemic code
// are treated as important failures and cause the server to shut down.
abaranec marked this conversation as resolved.
Show resolved Hide resolved
// If this is unset it is treated as `false`
optional SystemicType systemic = 4;
}

message ExecuteCommandResponse {
string error_message = 1;
io.deephaven.proto.backplane.grpc.FieldsChangeUpdate changes = 2;
Expand Down
252 changes: 127 additions & 125 deletions py/client/deephaven_core/proto/console_pb2.py

Large diffs are not rendered by default.

15 changes: 12 additions & 3 deletions py/client/pydeephaven/_console_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,25 @@ def start_console(self):
except Exception as e:
raise DHError("failed to start a console.") from e

def run_script(self, server_script: str) -> Any:
"""Runs a Python script in the console."""
def run_script(self, server_script: str, systemic: bool = None) -> Any:
abaranec marked this conversation as resolved.
Show resolved Hide resolved
"""Runs a Python script in the console.
Args:
server_script (str): The script code to run
systemic (bool): Set to True to treat the code as systemically important
abaranec marked this conversation as resolved.
Show resolved Hide resolved
"""
self.start_console()

try:
systemic_opt = console_pb2.ExecuteCommandRequest.SystemicType.NOT_SET_SYSTEMIC if systemic is None else \
console_pb2.ExecuteCommandRequest.SystemicType.EXECUTE_SYSTEMIC if systemic else \
console_pb2.ExecuteCommandRequest.SystemicType.EXECUTE_NOT_SYSTEMIC

response = self.session.wrap_rpc(
self._grpc_console_stub.ExecuteCommand,
console_pb2.ExecuteCommandRequest(
console_id=self.console_id,
code=server_script))
code=server_script,
systemic=systemic_opt))
return response
except Exception as e:
raise DHError("failed to execute a command in the console.") from e
Expand Down
5 changes: 3 additions & 2 deletions py/client/pydeephaven/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,16 +494,17 @@ def release(self, ticket: ExportTicket) -> None:
self.session_service.release(ticket)

# convenience/factory methods
def run_script(self, script: str) -> None:
def run_script(self, script: str, systemic: bool = False) -> None:
"""Runs the supplied Python script on the server.
abaranec marked this conversation as resolved.
Show resolved Hide resolved

Args:
script (str): the Python script code
systemic (bool): Set to True to treat the code as systemically important

Raises:
DHError
"""
response = self.console_service.run_script(script)
response = self.console_service.run_script(script, systemic)
if response.error_message != '':
raise DHError("could not run script: " + response.error_message)

Expand Down
30 changes: 29 additions & 1 deletion py/client/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pydeephaven import DHError
from pydeephaven import Session
from pydeephaven.ticket import SharedTicket
from pydeephaven.ticket import SharedTicket, ScopeTicket
from tests.testbase import BaseTestCase


Expand Down Expand Up @@ -409,6 +409,34 @@ def _interact_with_server(ti):
for t in threads:
t.join()

def test_systemic_scripts(self):
with Session() as session:
console_script = ("""
from deephaven import time_table
import jpy

j_sot = jpy.get_type("io.deephaven.engine.util.systemicmarking.SystemicObjectTracker")

print("SYSTEMIC: " + str(j_sot.isSystemicThread()))
print("SYSTEMIC_ENABLED: " + str(j_sot.isSystemicObjectMarkingEnabled()))

t1 = time_table("PT1S").update("A=ii")
t2 = empty_table(1).update("S = (boolean)j_sot.isSystemic(t1.j_table)")
""")
session.run_script(console_script, False)
t = session.fetch_table(ticket=ScopeTicket.scope_ticket("t2"))
pa_data = [
pa.array([True])
]
fields = [pa.field(f"S", pa.bool_())]
schema = pa.schema(fields)

pa_table = pa.table(pa_data, schema=schema)

print(pa_table.to_string(preview_cols=1))
print(t.to_arrow().to_string(preview_cols=1))
self.assertTrue(pa_table.equals(t.to_arrow()))
self.assertFalse(True)
abaranec marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.deephaven.engine.table.impl.util.RuntimeMemory.Sample;
import io.deephaven.engine.util.DelegatingScriptSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.integrations.python.PythonDeephavenSession;
import io.deephaven.internal.log.LoggerFactory;
Expand Down Expand Up @@ -189,7 +190,30 @@ public void executeCommand(
response))
.submit(() -> {
final ScriptSession scriptSession = exportedConsole.get();
final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
final ExecuteCommandRequest.SystemicType systemicOption =
request.hasSystemic()
? request.getSystemic()
: ExecuteCommandRequest.SystemicType.NOT_SET_SYSTEMIC;

// If not set, we'll use defaults, otherwise we will explicitly set the systemicness.
final ScriptSession.Changes changes;
switch (systemicOption) {
case NOT_SET_SYSTEMIC:
changes = scriptSession.evaluateScript(request.getCode());
break;
case EXECUTE_NOT_SYSTEMIC:
changes = SystemicObjectTracker.executeSystemically(false,
() -> scriptSession.evaluateScript(request.getCode()));
break;
case EXECUTE_SYSTEMIC:
changes = SystemicObjectTracker.executeSystemically(true,
() -> scriptSession.evaluateScript(request.getCode()));
break;
default:
throw new UnsupportedOperationException(
"Unrecognized systemic option: " + systemicOption);
}

final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
Expand Down
Loading