Skip to content

Commit

Permalink
IGNITE-23303 Fix code style.
Browse files Browse the repository at this point in the history
  • Loading branch information
ascherbakoff committed Nov 15, 2024
1 parent 8d4fd43 commit 830f98c
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
public class HybridClockImpl implements HybridClock {
private final IgniteLogger log = Loggers.forClass(HybridClockImpl.class);

/**
* Var handle for {@link #latestTime}.
*/
/**
* Var handle for {@link #latestTime}.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1044,19 +1044,11 @@ private void sendAwaitReplicaResponse(String senderConsistentId, long correlatio
private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, ReplicaResult result) {
if (sendTimestamp) {
HybridTimestamp commitTs = result.applyResult().getCommitTimestamp();
if (commitTs != null) {
return REPLICA_MESSAGES_FACTORY
.commitReplicaResponse()
.result(result.result())
.timestamp(commitTs)
.build();
} else {
return REPLICA_MESSAGES_FACTORY
.timestampAwareReplicaResponse()
.result(result.result())
.timestamp(clockService.current())
.build();
}
return REPLICA_MESSAGES_FACTORY
.timestampAwareReplicaResponse()
.result(result.result())
.timestamp(commitTs == null ? clockService.current() : commitTs)
.build();
} else {
return REPLICA_MESSAGES_FACTORY
.replicaResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,20 @@ public ReplicaService(
}

private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, ReplicaRequest req) {
return (CompletableFuture<R>) sendToReplica2(targetNodeConsistentId, req).thenApply(res -> res.result());
return (CompletableFuture<R>) sendToReplicaRaw(targetNodeConsistentId, req).thenApply(res -> res.result());
}

/**
* Sends request to the replica node.
* Sends request to the replica node and provides raw response.
*
* @param targetNodeConsistentId A consistent id of the replica node..
* @param targetNodeConsistentId A consistent id of the replica node.
* @param req Replica request.
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
* @see ReplicationTimeoutException If the response could not be received due to a timeout.
*/
private CompletableFuture<ReplicaResponse> sendToReplica2(String targetNodeConsistentId, ReplicaRequest req) {
private CompletableFuture<ReplicaResponse> sendToReplicaRaw(String targetNodeConsistentId, ReplicaRequest req) {
CompletableFuture<ReplicaResponse> res = new CompletableFuture<>();

messagingService.invoke(
Expand Down Expand Up @@ -228,7 +228,7 @@ private CompletableFuture<ReplicaResponse> sendToReplica2(String targetNodeConsi
assert response0 instanceof AwaitReplicaResponse :
"Incorrect response type [type=" + response0.getClass().getSimpleName() + ']';

sendToReplica2(targetNodeConsistentId, req).whenComplete((r, e) -> {
sendToReplicaRaw(targetNodeConsistentId, req).whenComplete((r, e) -> {
if (e != null) {
res.completeExceptionally(e);
} else {
Expand Down Expand Up @@ -270,11 +270,7 @@ private CompletableFuture<ReplicaResponse> sendToReplica2(String targetNodeConsi
* @see ReplicationTimeoutException If the response could not be received due to a timeout.
*/
public <R> CompletableFuture<R> invoke(ClusterNode node, ReplicaRequest request) {
return invoke2(node, request).thenApply(r -> (R) r.result());
}

public CompletableFuture<ReplicaResponse> invoke2(ClusterNode node, ReplicaRequest request) {
return sendToReplica2(node.name(), request);
return invokeRaw(node, request).thenApply(r -> (R) r.result());
}

/**
Expand All @@ -291,6 +287,17 @@ public <R> CompletableFuture<R> invoke(String replicaConsistentId, ReplicaReques
return sendToReplica(replicaConsistentId, request);
}

/**
* Sends a request to the given replica {@code node} and returns a future that will be completed with a raw response.
*
* @param node Cluster node.
* @param request The request.
* @return Response future with either evaluation raw response or completed exceptionally.
*/
public CompletableFuture<ReplicaResponse> invokeRaw(ClusterNode node, ReplicaRequest request) {
return sendToReplicaRaw(node.name(), request);
}

/**
* Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;

/**
* Dummy transaction that should be used as mock transaction for execution tests.
Expand Down Expand Up @@ -157,6 +158,15 @@ public TablePartitionId commitPartition() {
return groupId;
}

@Override
public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp, boolean full) {
CompletableFuture<Void> fut = commit ? commitFut : rollbackFut;

fut.complete(null);

return fut;
}

@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId tablePartitionId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2881,32 +2881,33 @@ private CompletableFuture<ApplyResult> applyUpdateAllCommand(
return completedFuture(new ApplyResult(null, repFut));
} else {
return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult();
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult();

if (!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(
cmd.txId(),
cmd.leaseStartTime(),
updateCommandResult.currentLeaseStartTime()
);
}
if (updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(((UpdateAllCommand) res.getCommand()).safeTime()).thenApply(ret -> new ApplyResult(cmd.safeTime(), null));
} else {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(),
false,
null,
cmd.safeTime(),
indexIdsAtRwTxBeginTs(txId)
);
if (!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(
cmd.txId(),
cmd.leaseStartTime(),
updateCommandResult.currentLeaseStartTime()
);
}
if (updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(((UpdateAllCommand) res.getCommand()).safeTime())
.thenApply(ret -> new ApplyResult(cmd.safeTime(), null));
} else {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(),
false,
null,
cmd.safeTime(),
indexIdsAtRwTxBeginTs(txId)
);

return completedFuture(new ApplyResult(cmd.safeTime(), null));
}
});
return completedFuture(new ApplyResult(cmd.safeTime(), null));
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import org.apache.ignite.internal.partition.replicator.network.replication.SwapRowReplicaRequest;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.CommitReplicaResponse;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
Expand All @@ -110,6 +109,7 @@
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
Expand Down Expand Up @@ -634,17 +634,17 @@ private <R> CompletableFuture<R> trackingInvoke(
|| request instanceof SwapRowReplicaRequest;

if (full) { // Full transaction retries are handled in postEnlist.
return replicaSvc.invoke2(primaryReplicaAndConsistencyToken.get1(), request).handle((r, e) -> {
assert r instanceof CommitReplicaResponse;
return replicaSvc.invokeRaw(primaryReplicaAndConsistencyToken.get1(), request).handle((r, e) -> {
assert r instanceof TimestampAware;

CommitReplicaResponse crr = (CommitReplicaResponse) r;
tx.finish(true, crr.timestamp(), e == null);
TimestampAware tsAware = (TimestampAware) r;
tx.finish(true, tsAware.timestamp(), e == null);

if (e != null) {
sneakyThrow(e);
}

return (R) r;
return (R) r.result();
});
} else {
if (write) { // Track only write requests from explicit transactions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@
import org.mockito.quality.Strictness;

/**
* Commit tx test scenarios set.
*
* TODO asch IGNITE-15928 validate zero locks after test commit.
* Common tx test scenarios set. TODO asch IGNITE-15928 validate zero locks after test commit.
*/
@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.impl;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.deriveUuidFrom;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
Expand Down Expand Up @@ -61,6 +60,7 @@
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.serialization.MessageSerializer;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
Expand All @@ -78,6 +78,7 @@
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryRowEx;
Expand Down Expand Up @@ -298,6 +299,45 @@ public DummyInternalTableImpl(
return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId).thenApply(ReplicaResult::result);
})
.when(replicaSvc).invoke(anyString(), any());

lenient()
.doAnswer(invocationOnMock -> {
ClusterNode node = invocationOnMock.getArgument(0);

return replicaListener.invoke(invocationOnMock.getArgument(1), node.id())
.thenApply(r -> new TimestampAwareReplicaResponse() {
@Override
public @Nullable Object result() {
return r.result();
}

@Override
public @Nullable HybridTimestamp timestamp() {
return CLOCK.now();
}

@Override
public MessageSerializer<NetworkMessage> serializer() {
return null;
}

@Override
public short messageType() {
return 0;
}

@Override
public short groupType() {
return 0;
}

@Override
public NetworkMessage clone() {
return null;
}
});
})
.when(replicaSvc).invokeRaw(any(ClusterNode.class), any());
}

AtomicLong raftIndex = new AtomicLong(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public interface InternalTransaction extends Transaction {
* Finishes a read-only transaction with a specific execution timestamp.
*
* @param commit Commit flag. The flag is ignored for read-only transactions.
* @param executionTimestamp The timestamp to advance observable timestamp for RO txn. Can be null.
* @param executionTimestamp The timestamp is the time when a read-only transaction is applied to the remote node. The parameter
* is not used for read-write transactions.
* @param full Full state transaction marker.
* @return The future.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
Expand Down

0 comments on commit 830f98c

Please sign in to comment.