Skip to content

Commit

Permalink
feat(index) Automatically create indexes if they do not exist (#28805)
Browse files Browse the repository at this point in the history
[this commit checks to see if live/working indexes are available at
startup and if they are not, will automatically try to create and
reindex into them. This allows a dotCMS instance to be restored using
the db + assets and then, when started, dotCMS will automatically start
the reindex process


ref: #28802
  • Loading branch information
wezell authored Jun 11, 2024
1 parent 48d37a9 commit fff363b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.dotcms.variant.model.Variant;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.business.CacheLocator;
import com.dotmarketing.business.cache.provider.CacheProvider;
import com.dotmarketing.common.db.DotConnect;
import com.dotmarketing.common.reindex.BulkProcessorListener;
import com.dotmarketing.common.reindex.ReindexEntry;
Expand Down Expand Up @@ -55,9 +54,7 @@
import com.liferay.util.StringPool;
import com.rainerhahnekamp.sneakythrow.Sneaky;
import io.vavr.control.Try;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
Expand Down Expand Up @@ -132,27 +129,66 @@ public synchronized void getRidOfOldIndex() throws DotDataException {
}

/**
* Tells if at least we have a "working_XXXXXX" index
* This checks to make sure that we have good live and working indexes set in the db and that
* are available in the ES cluster
*
* @return
* @throws DotDataException
*/
private synchronized boolean indexReady() throws DotDataException {
IndiciesInfo info = APILocator.getIndiciesAPI().loadIndicies();
return info.getWorking() != null && info.getLive() != null;
@VisibleForTesting
@CloseDBIfOpened
public synchronized boolean indexReady() throws DotDataException {
final IndiciesInfo info = APILocator.getIndiciesAPI().loadIndicies();


final boolean hasWorking = Try.of(()->APILocator.getESIndexAPI().indexExists(info.getWorking()))
.getOrElse(false);
final boolean hasLive = Try.of(()->APILocator.getESIndexAPI().indexExists(info.getLive()))
.getOrElse(false);

if(!hasWorking){
Logger.debug(this.getClass(), "-- WORKING INDEX DOES NOT EXIST");
}
if(!hasLive){
Logger.debug(this.getClass(), "-- LIVE INDEX DOES NOT EXIST");
}
return hasWorking && hasLive;
}

/**
* Inits the indexs
* Inits the indexes and starts the reindex process if no indexes are found
*/
@CloseDBIfOpened
public synchronized void checkAndInitialiazeIndex() {
try {
// if we don't have a working index, create it
if (!indexReady()) {
Logger.info(this.getClass(), "No indexes found, creating live and working indexes");
initIndex();
}


// if there are indexes but they are empty, start reindex process
if(Config.getBooleanProperty("REINDEX_IF_NO_INDEXES_FOUND", true)
&& getIndexDocumentCount(APILocator.getIndiciesAPI().loadIndicies().getWorking())==0
){
DotConcurrentFactory.getInstance().getSubmitter().submit(()->{
try {
Logger.info(this.getClass(), "No content found in index, starting reindex process in background thread.");
APILocator.getReindexQueueAPI().deleteFailedRecords();
APILocator.getReindexQueueAPI().addAllToReindexQueue();

} catch (Throwable e) { // nosonar

Logger.error(this.getClass(), "Error starting reindex process", e);
}
});

}


} catch (Exception e) {
Logger.fatal("ESUil.checkAndInitializeIndex", e.getMessage());
Logger.fatal(this.getClass(), "Failed to create new indexes:" + e.getMessage(),e);

}
}
Expand Down Expand Up @@ -355,7 +391,7 @@ public boolean fullReindexSwitchover(Connection conn, final boolean forceSwitch)
< Config.getLongProperty("REINDEX_THREAD_MINIMUM_RUNTIME_IN_SEC", 30) * 1000) {
if (reindexTimeElapsed().isPresent()) {
Logger.info(this.getClass(),
"Reindex has been running only " + reindexTimeElapsed().get()
"Reindex has been running only " + (reindexTimeElapsed().isPresent() ? reindexTimeElapsed().get() : "n/a")
+ ". Letting the reindex settle.");
} else {
Logger.info(this.getClass(), "Reindex Time Elapsed not set.");
Expand Down Expand Up @@ -393,7 +429,7 @@ public boolean fullReindexSwitchover(Connection conn, final boolean forceSwitch)
DotConcurrentFactory.getInstance().getSubmitter().submit(() -> {
try {
Logger.info(this.getClass(), "Updating and optimizing ElasticSearch Indexes");
optimize(ImmutableList.of(newInfo.getWorking(), newInfo.getLive()));
optimize(List.of(newInfo.getWorking(), newInfo.getLive()));
} catch (Exception e) {
Logger.warnAndDebug(this.getClass(),
"unable to expand ES replicas:" + e.getMessage(), e);
Expand Down Expand Up @@ -515,7 +551,7 @@ public void addContentToIndex(final Contentlet parentContenlet,
parentContenlet.getIndexPolicyDependencies()))
.collect(Collectors.toList()))
.build()
: ImmutableList.of(parentContenlet);
: List.of(parentContenlet);

if (ElasticReadOnlyCommand.getInstance().isIndexOrClusterReadOnly()) {
ElasticReadOnlyCommand.getInstance().sendReadOnlyMessage();
Expand Down Expand Up @@ -743,7 +779,7 @@ public void appendBulkRequest(final BulkIndexWrapper bulk, final ReindexEntry id
idx
.getPriority()));
contentlet.setIndexPolicy(IndexPolicy.DEFER);
addBulkRequest(bulk, ImmutableList.of(contentlet), idx.isReindex());
addBulkRequest(bulk, List.of(contentlet), idx.isReindex());
}
} catch (final Exception e) {
// An error occurred when trying to reindex the Contentlet. Flag it as "failed"
Expand Down Expand Up @@ -908,7 +944,7 @@ private List<Contentlet> loadDeps(final Contentlet parentContentlet) {
return Arrays.asList(workingInode);
})
.flatMap(Collection::stream)
.filter(inode -> UtilMethods.isSet(inode))
.filter(UtilMethods::isSet)
.distinct()
.collect(Collectors.toList());

Expand Down Expand Up @@ -1131,7 +1167,7 @@ public void removeContentFromIndex(final Contentlet content, final boolean onlyL
}

List<Relationship> relationships = APILocator.getRelationshipAPI()
.byContentType(content.getStructure());
.byContentType(content.getContentType());

// add a commit listener to index the contentlet if the entire
// transaction finish clean
Expand Down Expand Up @@ -1219,8 +1255,8 @@ public void fullReindexAbort() {

IndiciesInfo newinfo = builder.build();

final String rew = info.getReindexWorking();
final String rel = info.getReindexLive();
info.getReindexWorking();
info.getReindexLive();

APILocator.getIndiciesAPI().point(newinfo);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.dotmarketing.util.ConfigUtils;
import com.dotmarketing.util.DateUtil;
import com.dotmarketing.util.Logger;
import com.dotmarketing.util.SecurityLogger;
import com.dotmarketing.util.UtilMethods;
import com.dotmarketing.util.ZipUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.util.Strings;
import org.apache.tools.zip.ZipEntry;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
Expand Down Expand Up @@ -386,6 +388,8 @@ public boolean deleteMultiple(String...indexNames) {
return true;
}

SecurityLogger.logInfo(this.getClass(), "Deleting Indexes : " + String.join(",", indexNames));

List<String> indicesWithClusterPrefix = Arrays.stream(indexNames)
.map(this::getNameWithClusterIDPrefix).collect(Collectors.toList());
try {
Expand Down Expand Up @@ -467,114 +471,7 @@ private String getIndexTimestamp(final String indexName) {
return Try.of(()->indexName.substring(indexName.lastIndexOf("_") + 1)).getOrNull();
}

/**
* Restores an index from a backup file
* @param backupFile
* @param index
* @throws IOException
*/
public void restoreIndex(File backupFile, String index) throws IOException {

AdminLogger.log(this.getClass(), "restoreIndex", "Trying to restore index: " + index);

BufferedReader br = null;

boolean indexExists = indexExists(index);

try {
if (!indexExists) {

createIndex(index);
}

final ZipInputStream zipIn = new ZipInputStream(
Files.newInputStream(backupFile.toPath()));
zipIn.getNextEntry();
br = new BufferedReader(new InputStreamReader(zipIn));

// wait a bit for the changes be made
Thread.sleep(1000L);

// setting up mapping
String mapping = br.readLine();
boolean mappingExists = mapping.startsWith(MAPPING_MARKER);
String type;
ArrayList<String> jsons = new ArrayList<>();
if (mappingExists) {

String patternStr = "^" + MAPPING_MARKER + "\\s*\\{\\s*\"(\\w+)\"";
Pattern pattern = Pattern.compile(patternStr);
Matcher matcher = pattern.matcher(mapping);
boolean matchFound = matcher.find();
if (matchFound) {
type = matcher.group(1);

// we recover the line that wasn't a mapping so it should be content

ObjectMapper mapper = new ObjectMapper();
while (br.ready()) {
//read in 100 lines
for (int i = 0; i < 100; i++) {
if (!br.ready()) {
break;
}
jsons.add(br.readLine());
}

if (jsons.size() > 0) {
try {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.
timeValueMillis(INDEX_OPERATIONS_TIMEOUT_IN_MS));
for (String raw : jsons) {
int delimidx = raw.indexOf(JSON_RECORD_DELIMITER);
if (delimidx > 0) {
String id = raw.substring(0, delimidx);
String json = raw.substring(
delimidx + JSON_RECORD_DELIMITER.length());
if (id != null) {
@SuppressWarnings("unchecked")
Map<String, Object> oldMap = mapper
.readValue(json, HashMap.class);
Map<String, Object> newMap = new HashMap<>();

for (String key : oldMap.keySet()) {
Object val = oldMap.get(key);
if (val != null && UtilMethods
.isSet(val.toString())) {
newMap.put(key, oldMap.get(key));
}
}
request.add(new IndexRequest(getNameWithClusterIDPrefix(index), type, id)
.source(mapper.writeValueAsString(newMap)));
}
}
}
if (request.numberOfActions() > 0) {
RestHighLevelClientProvider.getInstance().getClient()
.bulk(request, RequestOptions.DEFAULT);
}
} finally {
jsons = new ArrayList<>();
}
}
}
}
}
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
} finally {
if (br != null) {
br.close();
}

final List<String> list = new ArrayList<>();
list.add(index);
optimize(list);

AdminLogger.log(this.getClass(), "restoreIndex", "Index restored: " + index);
}
}

/**
* List of all indicies
Expand Down Expand Up @@ -605,7 +502,9 @@ public boolean isIndexClosed(String index) {
* @return
*/
public boolean indexExists(String indexName) {
return listIndices().contains(indexName.toLowerCase());

return listIndices().contains(indexName.toLowerCase())
|| listIndices().contains(removeClusterIdFromName(indexName.toLowerCase()));
}

/**
Expand Down Expand Up @@ -690,30 +589,17 @@ public synchronized CreateIndexResponse createIndex(final String indexName, Stri
AdminLogger.log(this.getClass(), "createIndex",
"Trying to create index: " + indexName + " with shards: " + shards);

shards = getShardsFromConfigIfNeeded(shards);
shards = shards > 0 ? shards : Config.getIntProperty("es.index.number_of_shards", 1);

Map map;
//default settings, if null
if(settings ==null){
map = new HashMap();
} else{
map = new ObjectMapper().readValue(settings, LinkedHashMap.class);
}
Map<String,Object> map = (settings ==null) ? new HashMap<>() : new ObjectMapper().readValue(settings, LinkedHashMap.class);

map.put("number_of_shards", shards);
map.put("index.auto_expand_replicas", "0-all");
if (!map.containsKey("index.mapping.total_fields.limit")) {
map.put("index.mapping.total_fields.limit", 10000);
}
if (!map.containsKey("index.mapping.nested_fields.limit")) {
map.put("index.mapping.nested_fields.limit", 10000);
}

map.put("index.query.default_field",
Config.getStringProperty("ES_INDEX_QUERY_DEFAULT_FIELD", "catchall"));
map.putIfAbsent("index.mapping.total_fields.limit", 10000);
map.putIfAbsent("index.mapping.nested_fields.limit", 10000);
map.putIfAbsent("index.query.default_field", Config.getStringProperty("ES_INDEX_QUERY_DEFAULT_FIELD", "catchall"));

final CreateIndexRequest request = new CreateIndexRequest(getNameWithClusterIDPrefix(indexName));

request.settings(map);
request.setTimeout(TimeValue.timeValueMillis(INDEX_OPERATIONS_TIMEOUT_IN_MS));
final CreateIndexResponse createIndexResponse =
Expand All @@ -726,28 +612,6 @@ public synchronized CreateIndexResponse createIndex(final String indexName, Stri
return createIndexResponse;
}

private int getShardsFromConfigIfNeeded(int shards) {
if(shards <1){
try{
shards = Integer.parseInt(System.getProperty("es.index.number_of_shards"));
}catch(Exception e){
Logger.warnAndDebug(ESIndexAPI.class, "Unable to parse shards from config", e);
}
}
if(shards <1){
try{
shards = Config.getIntProperty("es.index.number_of_shards", 2);
}catch(Exception e){
Logger.warnAndDebug(ESIndexAPI.class, "Unable to parse shards from config", e);
}
}

if(shards <0){
shards=1;
}
return shards;
}


public String getDefaultIndexSettings() {
String settings = null;
Expand Down Expand Up @@ -988,10 +852,11 @@ private Collection<String> getIndices(
.get(request, RequestOptions.DEFAULT).getIndices()));

return indexes.stream()
.filter(indexName -> hasClusterPrefix(indexName))
.filter(this::hasClusterPrefix)
.map(this::removeClusterIdFromName)
.sorted(new IndexSortByDate())
.collect(Collectors.toList());

} catch (ElasticsearchStatusException | IOException e) {
Logger.warnAndDebug(ContentletIndexAPIImpl.class, "The list of indexes cannot be returned. Reason: " + e.getMessage(), e);
}
Expand Down
Loading

0 comments on commit fff363b

Please sign in to comment.