Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Upgrade Elastic search t0 5.6.3
Browse files Browse the repository at this point in the history
  • Loading branch information
peterj99a committed Oct 25, 2017
1 parent b93f8d4 commit aa38b0a
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 145 deletions.
2 changes: 1 addition & 1 deletion stack/corepersistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ limitations under the License.
<commons.io.version>2.4</commons.io.version>
<commons.lang.version>3.1</commons.lang.version>
<datastax.version>2.1.10.3</datastax.version>
<elasticsearch.version>1.7.5</elasticsearch.version>
<elasticsearch.version>5.6.3</elasticsearch.version>
<fasterxml-uuid.version>3.1.3</fasterxml-uuid.version>
<guava.version>18.0</guava.version>
<guice.version>4.0-beta5</guice.version>
Expand Down
6 changes: 6 additions & 0 deletions stack/corepersistence/queryindex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@
<type>jar</type>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>


<dependency>
<groupId>org.elasticsearch</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@
import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
Expand All @@ -59,22 +57,21 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -206,13 +203,13 @@ public void addIndex(final String indexName,
//Create index
try {
final AdminClient admin = esProvider.getClient().admin();
Settings settings = ImmutableSettings.settingsBuilder()
Settings settings = Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas)
//dont' allow unmapped queries, and don't allow dynamic mapping
.put("index.query.parse.allow_unmapped_fields", false)
.put("index.mapper.dynamic", false)
.put("action.write_consistency", writeConsistency)
// .put("action.write_consistency", writeConsistency)
.build();

//Added For Graphite Metrics
Expand All @@ -230,7 +227,7 @@ public void addIndex(final String indexName,


logger.info("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
} catch (IndexAlreadyExistsException e) {
} catch (ResourceAlreadyExistsException e) {
logger.info("Index Name [{}] already exists", indexName);
}
/**
Expand All @@ -244,7 +241,7 @@ public void addIndex(final String indexName,

testNewIndex();

} catch (IndexAlreadyExistsException expected) {
} catch (ResourceAlreadyExistsException expected) {
// this is expected to happen if index already exists, it's a no-op and swallow
} catch (IOException e) {
throw new RuntimeException("Unable to initialize index", e);
Expand Down Expand Up @@ -452,8 +449,7 @@ public CandidateResults search( final SearchEdge searchEdge, final SearchTypes s
for (SortPredicate sortPredicate : parsedQuery.getSortPredicates() ){
hasGeoSortPredicates = visitor.getGeoSorts().contains(sortPredicate.getPropertyName());
}



final String cacheKey = applicationScope.getApplication().getUuid().toString()+"_"+searchEdge.getEdgeName();
final Object totalEdgeSizeFromCache = sizeCache.getIfPresent(cacheKey);
long totalEdgeSizeInBytes;
Expand Down Expand Up @@ -498,6 +494,9 @@ public CandidateResults search( final SearchEdge searchEdge, final SearchTypes s
final Timer.Context timerContext = searchTimer.time();

try {
if (logger.isDebugEnabled()) {
logger.debug("Query to execute = {}", srb.toString());
}

searchResponse = srb.execute().actionGet();
}
Expand Down Expand Up @@ -594,21 +593,25 @@ public Observable deleteApplication() {
//Added For Graphite Metrics
return Observable.from( indexes ).flatMap( index -> {

final ListenableActionFuture<DeleteByQueryResponse> response =
esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
ListenableActionFuture<BulkByScrollResponse> response =
DeleteByQueryAction.INSTANCE.newRequestBuilder( esProvider.getClient())
.filter(tqb)
.source(indexes)
.execute();


response.addListener( new ActionListener<DeleteByQueryResponse>() {
response.addListener( new ActionListener<BulkByScrollResponse>() {

@Override
public void onResponse( DeleteByQueryResponse response ) {
public void onResponse( BulkByScrollResponse response ) {
checkDeleteByQueryResponse( tqb, response );
}


@Override
public void onFailure( Throwable e ) {
public void onFailure(Exception e) {
logger.error( "Failed on delete index", e.getMessage() );
}

} );
return Observable.from( response );
} ).doOnError( t -> logger.error( "Failed on delete application", t.getMessage() ) );
Expand All @@ -618,17 +621,14 @@ public void onFailure( Throwable e ) {
/**
* Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
*/
private void checkDeleteByQueryResponse( final QueryBuilder query, final DeleteByQueryResponse response ) {
private void checkDeleteByQueryResponse( final QueryBuilder query, final BulkByScrollResponse response ) {

for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
List<BulkItemResponse.Failure> failures = response.getBulkFailures();

for ( ShardOperationFailedException failedException : failures ) {
logger.error("Unable to delete by query {}. Failed with code {} and reason {} on shard {} in index {}",
query.toString(),
failedException.status().getStatus(), failedException.reason(),
failedException.shardId(), failedException.index() );
}

for ( BulkItemResponse.Failure failure : failures ) {
logger.error("Unable to delete by query {}. Failed with code {} and reason {} in index {}",
query.toString(), failure.getStatus() , failure.getMessage(), failure.getIndex());
}
}

Expand Down Expand Up @@ -817,7 +817,7 @@ private long getIndexSize(){
.actionGet();
final CommonStats indexStats = statsResponse.getIndex(indexName).getTotal();
indexSize = indexStats.getStore().getSizeInBytes();
} catch (IndexMissingException e) {
} catch (IndexNotFoundException e) {
// if for some reason the index size does not exist,
// log an error and we can assume size is 0 as it doesn't exist
logger.error("Unable to get size for index {} due to IndexMissingException for app {}",
Expand All @@ -836,7 +836,7 @@ public long getTotalEntitySizeInBytes(final SearchEdge edge){

private long getEntitySizeAggregation( final SearchRequestBuilder builder ) {
final String key = "entitySize";
SumBuilder sumBuilder = new SumBuilder(key);
SumAggregationBuilder sumBuilder = new SumAggregationBuilder(key);
sumBuilder.field("entitySize");
builder.addAggregation(sumBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@


import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
Expand Down Expand Up @@ -169,9 +169,26 @@ private Observable<IndexOperationMessage> processBatch( final IndexOperationMess
* initialize request
*/
private BulkRequestBuilder initRequest() {

BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
bulkRequest.setRefresh( config.isForcedRefresh() );

String refreshPolicyConfig = String.valueOf(config.isForcedRefresh());
bulkRequest.setRefreshPolicy(refreshPolicyConfig);

String consistencyLevel = config.getWriteConsistencyLevel();

if ("one".equals(consistencyLevel)) {
bulkRequest.setWaitForActiveShards(1);
}

if ("all".equals(consistencyLevel)) {
bulkRequest.setWaitForActiveShards(ActiveShardCount.ALL);
}

if ("none".equals(consistencyLevel)) {
bulkRequest.setWaitForActiveShards(ActiveShardCount.NONE);
}

return bulkRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -123,7 +122,8 @@ private Client createTransportClient() {
final String clusterName = indexFig.getClusterName();
final int port = indexFig.getPort();

ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder().put( "cluster.name", clusterName )
Settings.Builder settings = Settings.builder()
.put( "cluster.name", clusterName )
.put( "client.transport.sniff", true );

String nodeName = indexFig.getNodeName();
Expand All @@ -142,11 +142,15 @@ private Client createTransportClient() {
settings.put( "node.name", nodeName );


TransportClient transportClient = new TransportClient( settings.build() );
TransportClient transportClient = new PreBuiltTransportClient(settings.build());

// we will connect to ES on all configured hosts
for ( String host : indexFig.getHosts().split( "," ) ) {
transportClient.addTransportAddress( new InetSocketTransportAddress( host, port ) );
try {
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
} catch (UnknownHostException uhe) {
logger.error( "Couldn't resolve hostname {} to use as ES node ", host );
}
}

return transportClient;
Expand Down Expand Up @@ -181,7 +185,7 @@ public Client createNodeClient() {
final String hostString = hosts.toString();


Settings settings = ImmutableSettings.settingsBuilder()
Settings settings = Settings.builder()

.put( "cluster.name", clusterName )

Expand All @@ -200,7 +204,7 @@ public Client createNodeClient() {
logger.trace("Creating ElasticSearch client with settings: {}", settings.getAsMap());
}

Node node = NodeBuilder.nodeBuilder().settings( settings ).client( true ).data( false ).node();
Node node = new Node( settings );

return node.client();
}
Expand Down
Loading

0 comments on commit aa38b0a

Please sign in to comment.