Skip to content

Commit

Permalink
remove refresh for searching
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Zhang <[email protected]>
  • Loading branch information
Zhangxunmt committed Dec 19, 2023
1 parent 16c8c58 commit 2e98448
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Update

updateRequest.doc(updateContent);
updateRequest.docAsUpsert(true);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.update(updateRequest, getUpdateResponseListener(conversationId, listener, context));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,7 @@ public void updateConversation(UpdateRequest updateRequest, ActionListener<Updat
}
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<UpdateResponse> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
client.admin().indices().refresh(Requests.refreshRequest(META_INDEX_NAME), ActionListener.wrap(refreshResponse -> {
client.update(updateRequest, internalListener);
}, e -> {
log.error("Failed to refresh memory-meta index during update conversation ", e);
internalListener.onFailure(e);
}));
client.update(updateRequest, internalListener);
} catch (Exception e) {
log.error("Failed to update Conversation. Details {}:", e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -393,6 +394,7 @@ public void updateConversation(String conversationId, Map<String, Object> update

updateRequest.doc(updateContent);
updateRequest.docAsUpsert(true);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

conversationMetaIndex.updateConversation(updateRequest, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,6 @@ public void testUpdateConversation_NoIndex_ThenFail() {

public void testUpdateConversation_Success() {
doReturn(true).when(metadata).hasIndex(anyString());
setupRefreshSuccess();
@SuppressWarnings("unchecked")
ActionListener<UpdateResponse> getListener = mock(ActionListener.class);

Expand All @@ -673,7 +672,6 @@ public void testUpdateConversation_Success() {

public void testUpdateConversation_ClientFails() {
doReturn(true).when(metadata).hasIndex(anyString());
setupRefreshSuccess();
@SuppressWarnings("unchecked")
ActionListener<UpdateResponse> getListener = mock(ActionListener.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void createInteraction(
*/
public void getFinalInteractions(String conversationId, int lastNInteraction, ActionListener<List<Interaction>> actionListener) {
Preconditions.checkArgument(lastNInteraction > 0, "lastN must be at least 1.");
log.info("Getting Interactions, conversationId {}, lastN {}", conversationId, lastNInteraction);
log.debug("Getting Interactions, conversationId {}, lastN {}", conversationId, lastNInteraction);

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().newStoredContext(true)) {
if (!clusterService.state().metadata().hasIndex(INTERACTIONS_INDEX_NAME)) {
Expand All @@ -147,12 +147,7 @@ public void getFinalInteractions(String conversationId, int lastNInteraction, Ac
if (access) {
innerGetFinalInteractions(conversationId, lastNInteraction, actionListener);
} else {
String userstr = client
.threadPool()
.getThreadContext()
.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT);
String user = User.parse(userstr) == null ? ActionConstants.DEFAULT_USERNAME_FOR_ERRORS : User.parse(userstr).getName();
throw new OpenSearchSecurityException("User [" + user + "] does not have access to conversation " + conversationId);
throw new OpenSearchSecurityException("User does not have access to conversation " + conversationId);
}
}, e -> { actionListener.onFailure(e); });
conversationMetaIndex.checkAccess(conversationId, accessListener);
Expand Down Expand Up @@ -193,9 +188,7 @@ void innerGetFinalInteractions(String conversationId, int lastNInteraction, Acti
}
internalListener.onResponse(result);
}, e -> { internalListener.onFailure(e); });
client.admin().indices().refresh(Requests.refreshRequest(INTERACTIONS_INDEX_NAME), ActionListener.wrap(r -> {
client.search(searchRequest, al);
}, e -> { internalListener.onFailure(e); }));
client.search(searchRequest, al);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,6 @@ public void testGetInteractions_SearchFails_ThenFail() {
return null;
}).when(conversationMetaIndex).checkAccess(anyString(), any());

doAnswer(invocation -> {
ActionListener<RefreshResponse> al = invocation.getArgument(1);
al.onResponse(mock(RefreshResponse.class));
return null;
}).when(indicesAdminClient).refresh(any(), any());

doAnswer(invocation -> {
ActionListener<SearchResponse> al = invocation.getArgument(1);
al.onFailure(new Exception("Failure in Search"));
Expand All @@ -244,41 +238,6 @@ public void testGetInteractions_SearchFails_ThenFail() {
assert (argCaptor.getValue().getMessage().equals("Failure in Search"));
}

@Test
public void testGetInteractions_RefreshFails_ThenFail() {
doReturn(true).when(metadata).hasIndex(anyString());
doAnswer(invocation -> {
ActionListener<Boolean> al = invocation.getArgument(1);
al.onResponse(true);
return null;
}).when(conversationMetaIndex).checkAccess(anyString(), any());

doAnswer(invocation -> {
ActionListener<RefreshResponse> al = invocation.getArgument(1);
al.onFailure(new Exception("Failed to Refresh"));
return null;
}).when(indicesAdminClient).refresh(any(), any());
mlMemoryManager.getFinalInteractions("cid", 10, interactionListActionListener);
ArgumentCaptor<Exception> argCaptor = ArgumentCaptor.forClass(Exception.class);
verify(interactionListActionListener, times(1)).onFailure(argCaptor.capture());
assert (argCaptor.getValue().getMessage().equals("Failed to Refresh"));
}

@Test
public void testGetInteractions_ClientFails_ThenFail() {
doReturn(true).when(metadata).hasIndex(anyString());
doAnswer(invocation -> {
ActionListener<Boolean> al = invocation.getArgument(1);
al.onResponse(true);
return null;
}).when(conversationMetaIndex).checkAccess(anyString(), any());
doThrow(new RuntimeException("Client Failure")).when(indicesAdminClient).refresh(any(), any());
mlMemoryManager.getFinalInteractions("cid", 10, interactionListActionListener);
ArgumentCaptor<Exception> argCaptor = ArgumentCaptor.forClass(Exception.class);
verify(interactionListActionListener, times(1)).onFailure(argCaptor.capture());
assert (argCaptor.getValue().getMessage().equals("Client Failure"));
}

@Test
public void testGetInteractions_NoAccessNoUser_ThenFail() {
doReturn(true).when(metadata).hasIndex(anyString());
Expand All @@ -300,7 +259,7 @@ public void testGetInteractions_NoAccessNoUser_ThenFail() {
assert (argCaptor
.getValue()
.getMessage()
.equals("User [" + ActionConstants.DEFAULT_USERNAME_FOR_ERRORS + "] does not have access to conversation cid"));
.equals("User does not have access to conversation cid"));
}

@Test
Expand All @@ -312,12 +271,6 @@ public void testGetInteractions_Success() {
return null;
}).when(conversationMetaIndex).checkAccess(anyString(), any());

doAnswer(invocation -> {
ActionListener<RefreshResponse> al = invocation.getArgument(1);
al.onResponse(mock(RefreshResponse.class));
return null;
}).when(indicesAdminClient).refresh(any(), any());

doAnswer(invocation -> {
XContentBuilder content = XContentBuilder.builder(XContentType.JSON.xContent());
content.startObject();
Expand Down

0 comments on commit 2e98448

Please sign in to comment.