Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite 20322-2 #4677

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -661,49 +661,49 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
return ClientTableGetRequest.process(in, out, igniteTables);

case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources);
return ClientTupleUpsertRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, out, igniteTables, resources);

case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources);
return ClientTupleUpsertAllRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, out, igniteTables, resources);

case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources);
return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, out, igniteTables, resources);
return ClientTupleInsertRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources);
return ClientTupleInsertAllRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources);
return ClientTupleReplaceRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources);
return ClientTupleReplaceExactRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources);
return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources);
return ClientTupleDeleteRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources);
return ClientTupleDeleteAllRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources);
return ClientTupleDeleteExactRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources);
return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources);
return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables, resources, igniteTransactions);

case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, out, igniteTables, resources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypeSpec;
Expand Down Expand Up @@ -390,7 +391,10 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @return Transaction, if present, or null.
*/
public static @Nullable InternalTransaction readTx(
ClientMessageUnpacker in, ClientMessagePacker out, ClientResourceRegistry resources) {
ClientMessageUnpacker in,
ClientMessagePacker out,
ClientResourceRegistry resources
) {
if (in.tryUnpackNil()) {
return null;
}
Expand All @@ -410,6 +414,33 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
}
}

/**
* Reads transaction or start implicit one.
*
* @param in Unpacker.
* @param out Packer.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @param readOnly Read only flag.
* @return Transaction.
*/
public static @Nullable InternalTransaction readOrStartImplicitTx(
ClientMessageUnpacker in,
ClientMessagePacker out,
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions,
boolean readOnly
) {

var tx = readTx(in, out, resources);

if (tx == null) {
tx = igniteTransactions.beginImplicit(readOnly);
}

return tx;
}

/**
* Gets client type by server type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.ignite.client.handler.requests.table;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readOrStartImplicitTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.table.IgniteTables;

/**
Expand All @@ -35,20 +36,22 @@ public class ClientTupleDeleteAllExactRequest {
/**
* Processes the request.
*
* @param in Unpacker.
* @param out Packer.
* @param tables Ignite tables.
* @param in Unpacker.
* @param out Packer.
* @param tables Ignite tables.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables,
ClientResourceRegistry resources
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions
) {
return readTableAsync(in, tables).thenCompose(table -> {
var tx = readTx(in, out, resources);
var tx = readOrStartImplicitTx(in, out, resources, igniteTransactions, false);
return readTuples(in, table, false).thenCompose(tuples -> {
return table.recordView().deleteAllExactAsync(tx, tuples)
.thenAccept(skippedTuples -> writeTuples(out, skippedTuples, table.schemaView()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.ignite.client.handler.requests.table;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readOrStartImplicitTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.TuplePart;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.table.IgniteTables;

/**
Expand All @@ -40,16 +41,18 @@ public class ClientTupleDeleteAllRequest {
* @param out Packer.
* @param tables Ignite tables.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables,
ClientResourceRegistry resources
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions
) {
return readTableAsync(in, tables).thenCompose(table -> {
var tx = readTx(in, out, resources);
var tx = readOrStartImplicitTx(in, out, resources, igniteTransactions, false);
return readTuples(in, table, true).thenCompose(tuples -> {
return table.recordView().deleteAllAsync(tx, tuples).thenAccept(skippedTuples ->
writeTuples(out, skippedTuples, TuplePart.KEY, table.schemaView()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.ignite.client.handler.requests.table;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readOrStartImplicitTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.table.IgniteTables;

/**
Expand All @@ -38,16 +39,18 @@ public class ClientTupleDeleteExactRequest {
* @param out Packer.
* @param tables Ignite tables.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables,
ClientResourceRegistry resources
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions
) {
return readTableAsync(in, tables).thenCompose(table -> {
var tx = readTx(in, out, resources);
var tx = readOrStartImplicitTx(in, out, resources, igniteTransactions, false);
return readTuple(in, table, false).thenCompose(tuple -> {
return table.recordView().deleteExactAsync(tx, tuple).thenAccept(res -> {
out.packInt(table.schemaView().lastKnownSchemaVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.ignite.client.handler.requests.table;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readOrStartImplicitTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.table.IgniteTables;

/**
Expand All @@ -38,16 +39,18 @@ public class ClientTupleDeleteRequest {
* @param out Packer.
* @param tables Ignite tables.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables,
ClientResourceRegistry resources
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions
) {
return readTableAsync(in, tables).thenCompose(table -> {
var tx = readTx(in, out, resources);
var tx = readOrStartImplicitTx(in, out, resources, igniteTransactions, false);
return readTuple(in, table, true).thenCompose(tuple -> {
return table.recordView().deleteAsync(tx, tuple).thenAccept(res -> {
out.packInt(table.schemaView().lastKnownSchemaVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static CompletableFuture<Void> process(
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
// TODO: IGNITE-23603 We have to create an implicit transaction, but leave a possibility to start RO direct.
var tx = readTx(in, out, resources);
return readTuples(in, table, true).thenCompose(keyTuples -> {
return table.recordView().getAllAsync(tx, keyTuples).thenAccept(tuples ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.ignite.client.handler.requests.table;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readOrStartImplicitTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.TuplePart;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.table.IgniteTables;

/**
Expand All @@ -39,16 +40,18 @@ public class ClientTupleGetAndDeleteRequest {
* @param out Packer.
* @param tables Ignite tables.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables,
ClientResourceRegistry resources
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions
) {
return readTableAsync(in, tables).thenCompose(table -> {
var tx = readTx(in, out, resources);
var tx = readOrStartImplicitTx(in, out, resources, igniteTransactions, false);
return readTuple(in, table, true).thenCompose(tuple -> {
return table.recordView().getAndDeleteAsync(tx, tuple).thenAccept(
resTuple -> ClientTableCommon.writeTupleOrNil(out, resTuple, TuplePart.KEY_AND_VAL, table.schemaView()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.ignite.client.handler.requests.table;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readOrStartImplicitTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.TuplePart;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.table.IgniteTables;

/**
Expand All @@ -39,16 +40,18 @@ public class ClientTupleGetAndReplaceRequest {
* @param out Packer.
* @param tables Ignite tables.
* @param resources Resource registry.
* @param igniteTransactions Ignite transactions.
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables,
ClientResourceRegistry resources
ClientResourceRegistry resources,
IgniteTransactionsImpl igniteTransactions
) {
return readTableAsync(in, tables).thenCompose(table -> {
var tx = readTx(in, out, resources);
var tx = readOrStartImplicitTx(in, out, resources, igniteTransactions, false);
return readTuple(in, table, false).thenCompose(tuple -> {
return table.recordView().getAndReplaceAsync(tx, tuple).thenAccept(
resTuple -> ClientTableCommon.writeTupleOrNil(out, resTuple, TuplePart.KEY_AND_VAL, table.schemaView()));
Expand Down
Loading