diff --git a/README.md b/README.md index 4e0bedc..78df0dc 100644 --- a/README.md +++ b/README.md @@ -25,15 +25,15 @@ The river acts currently as a disconnected client. This means that the river is At a later time is planned to use additionally the IMAP IDLE feature (if server supports it). Branches: -* master for Elasticsearch 1.1.0 - 1.1.x +* master for Elasticsearch 1.2.1
curl -XPUT 'http://localhost:9200/_river/nameofyourriver/_meta' -d '{ diff --git a/pom.xml b/pom.xml index 3c4ed64..0dec44d 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@de.saly.elasticsearch.plugin elasticsearch-river-imap -0.0.7-b12 +0.0.7-b20 jar IMAP river for Elasticsearch 2014 @@ -25,7 +25,7 @@- @@ -73,7 +73,7 @@1.1.1 +1.2.1 github UTF-8 @@ -81,16 +81,27 @@ com.github.tlrx elasticsearch-test -1.1.0 +1.2.1 test + de.saly javamail-mock2-halfmock -0.5-beta3 +0.5-beta4 test + +log4j +log4j +1.2.17 +test +org.slf4j slf4j-log4j12 1.7.7 -test - ++ + +log4j +log4j +@@ -139,11 +150,11 @@ -1.0.0 + org.quartz-scheduler @@ -170,13 +181,36 @@+ + + + +org.apache.maven.plugins +maven-enforcer-plugin +1.3.1 ++ ++ +enforce-versions ++ +enforce ++ ++ ++ +[1.7,) +org.apache.maven.plugins maven-compiler-plugin 3.1 - - 1.6 + +1.7 UTF-8 true true diff --git a/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchBulkMailDestination.java b/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchBulkMailDestination.java index 1be6e17..53c0fcd 100644 --- a/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchBulkMailDestination.java +++ b/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchBulkMailDestination.java @@ -31,6 +31,7 @@ import javax.mail.Message; import javax.mail.MessagingException; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -96,11 +97,11 @@ public ElasticsearchMailDestination client(final Client client) { } @Override - public void close() { + public synchronized void close() { super.close(); - while (!isError() && queue.get() > 0) { + /*while (!isError() && queue.get() > 0) { logger.info("There are {} outstanding bulk messages, will wait until flushed", queue.get()); try { @@ -109,10 +110,10 @@ public void close() { Thread.currentThread().interrupt(); throw new RuntimeException("interrupted", e); } - } + }*/ if (bulk != null) { - logger.debug("Shutdown (flush) bulk processor"); + logger.debug("Shutdown (flush) bulk processor, is super closed " + isClosed()); bulk.close(); } @@ -160,8 +161,20 @@ public void onMessage(final Message msg) throws IOException, MessagingException } // following block not needs to be synchronized - bulk.add(createIndexRequest(imsg)); - queue.incrementAndGet(); + try { + + if (!isClosed()) { + bulk.add(createIndexRequest(imsg)); + queue.incrementAndGet(); + } + } catch (final ElasticsearchIllegalStateException e) { + + if (isClosed()) { + logger.debug("Bulkprocessing error due to {}", e.toString()); + } else { + logger.error("Bulkprocessing error due to {}", e, e.toString()); + } + } } diff --git a/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchMailDestination.java b/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchMailDestination.java index f9ae78a..ba522eb 100644 --- a/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchMailDestination.java +++ b/src/main/java/de/saly/elasticsearch/maildestination/ElasticsearchMailDestination.java @@ -105,13 +105,15 @@ public ElasticsearchMailDestination client(final Client client) { } @Override - public void close() { + public synchronized void close() { if (closed) { return; } closed = true; + + logger.info("Closed"); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -328,6 +330,11 @@ private synchronized void createIndexIfNotExists() throws IOException { return; } + // see if index already exists + if (client.admin().indices().prepareExists(index).execute().actionGet().isExists()) { + return; + } + final CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(index); if (settings != null) { createIndexRequestBuilder.setSettings(settings); @@ -392,11 +399,11 @@ protected Client getClient() { return client; } - protected boolean isClosed() { + protected synchronized boolean isClosed() { return closed; } - protected boolean isError() { + protected synchronized boolean isError() { return error; } diff --git a/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingIMAPMailSource.java b/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingIMAPMailSource.java index 71a82cd..9ccab0d 100644 --- a/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingIMAPMailSource.java +++ b/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingIMAPMailSource.java @@ -36,7 +36,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import javax.mail.Folder; @@ -47,7 +46,6 @@ import javax.mail.UIDFolder; import javax.mail.internet.MimeMessage; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; @@ -84,17 +82,17 @@ public ParallelPollingIMAPMailSource(final Properties props, final int threadCou public void close() { if (es != null) { - logger.debug("Initiate shutdown"); + logger.info("Initiate shutdown"); es.shutdown(); - try { - if (es.awaitTermination(10, TimeUnit.SECONDS)) { - logger.debug("Shutdown completed gracefully"); + /*try { + if (es.awaitTermination(2, TimeUnit.SECONDS)) { + logger.info("Shutdown completed gracefully"); } else { logger.warn("Shutdown completed not gracefully, timeout elapsed"); } } catch (final InterruptedException e) { logger.warn("Shutdown completed not gracefully, thread interrupted"); - } + }*/ } } @@ -201,7 +199,7 @@ public ProcessResult call() throws Exception { processedCount += fu.get().processedCount; logger.debug("Finished with " + fu.get()); } catch (final Exception e) { - logger.error("Unable to process some mails due to {}, will retry ...", e, e.toString()); + logger.error("Unable to process some mails due to {}", e, e.toString()); } } @@ -227,6 +225,8 @@ private ProcessResult processMessageSlice(final int start, final int end, final final Message[] msgs = folder.getMessages(start, end); folder.fetch(msgs, IMAPUtils.FETCH_PROFILE_HEAD); + logger.debug("folder fetch done"); + long highestUid = 0; int processedCount = 0; @@ -240,6 +240,11 @@ private ProcessResult processMessageSlice(final int start, final int end, final highestUid = Math.max(highestUid, uid); processedCount++; + + if (Thread.currentThread().isInterrupted()) { + break; + } + } catch (final Exception e) { stateManager.onError("Unable to make indexable message", m, e); logger.error("Unable to make indexable message due to {}", e, e.toString()); @@ -358,19 +363,14 @@ protected void fetch(final Folder folder) throws MessagingException, IOException final long highestUID = riverState.getLastUid(); // this uid is // already // processed - // (except - // highestUID - // == 1, then - // we cannot be - // sure) logger.debug("highestUID: {}", highestUID); - Message[] msgsnew = uidfolder.getMessagesByUID(highestUID, UIDFolder.LASTUID); + final Message[] msgsnew = uidfolder.getMessagesByUID(highestUID, UIDFolder.LASTUID); // msgnew.size is always >= 1 - if (highestUID > 1 && uidfolder.getUID(msgsnew[0]) <= highestUID) { - msgsnew = (Message[]) ArrayUtils.remove(msgsnew, 0); + if (highestUID > 0 && uidfolder.getUID(msgsnew[0]) <= highestUID) { + // msgsnew = (Message[]) ArrayUtils.remove(msgsnew, 0); } if (msgsnew.length > 0) { @@ -529,7 +529,7 @@ protected void recurseFolders(final Folder folder, final Pattern pattern) throws if (folder != null) { - if (es == null || es.isShutdown() || es.isTerminated()) { + if (es == null || es.isShutdown() || es.isTerminated() || Thread.currentThread().isInterrupted()) { logger.warn("Stop processing of mails due to mail source is closed"); return; diff --git a/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingPOPMailSource.java b/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingPOPMailSource.java index 149cfb8..1dd0d6c 100644 --- a/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingPOPMailSource.java +++ b/src/main/java/de/saly/elasticsearch/mailsource/ParallelPollingPOPMailSource.java @@ -38,7 +38,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import javax.mail.Folder; @@ -82,17 +81,17 @@ public ParallelPollingPOPMailSource(final Properties props, final int threadCoun public void close() { if (es != null) { - logger.debug("Initiate shutdown"); + logger.info("Initiate shutdown"); es.shutdown(); - try { - if (es.awaitTermination(10, TimeUnit.SECONDS)) { - logger.debug("Shutdown completed gracefully"); + /*try { + if (es.awaitTermination(2, TimeUnit.SECONDS)) { + logger.info("Shutdown completed gracefully"); } else { logger.warn("Shutdown completed not gracefully, timeout elapsed"); } } catch (final InterruptedException e) { logger.warn("Shutdown completed not gracefully, thread interrupted"); - } + }*/ } } @@ -192,7 +191,7 @@ public ProcessResult call() throws Exception { processedCount += fu.get().processedCount; } catch (final Exception e) { - logger.error("Unable to process some mails due to {}, will retry ...", e, e.toString()); + logger.error("Unable to process some mails due to {}", e, e.toString()); } } @@ -222,6 +221,11 @@ private ProcessResult processMessageSlice(final int start, final int end, final try { mailDestination.onMessage(m); processedCount++; + + if (Thread.currentThread().isInterrupted()) { + break; + } + } catch (final Exception e) { stateManager.onError("Unable to make indexable message", m, e); logger.error("Unable to make indexable message due to {}", e, e.toString()); @@ -376,7 +380,7 @@ protected void recurseFolders(final Folder folder, final Pattern pattern) throws if (folder != null) { - if (es == null || es.isShutdown() || es.isTerminated()) { + if (es == null || es.isShutdown() || es.isTerminated() || Thread.currentThread().isInterrupted()) { logger.warn("Stop processing of mails due to mail source is closed"); return; diff --git a/src/main/java/de/saly/elasticsearch/river/imap/IMAPRiver.java b/src/main/java/de/saly/elasticsearch/river/imap/IMAPRiver.java index 26a1458..4766459 100644 --- a/src/main/java/de/saly/elasticsearch/river/imap/IMAPRiver.java +++ b/src/main/java/de/saly/elasticsearch/river/imap/IMAPRiver.java @@ -186,23 +186,22 @@ public void close() { try { if (sched != null && sched.isStarted()) { - - sched.shutdown(true); - + sched.shutdown(); + logger.info("Scheduler shutted down"); } } catch (final SchedulerException e) { logger.warn("Unable to shutdown scheduler due to " + e, e); } - if (mailSource != null) { - mailSource.close(); - } - if (mailDestination != null) { mailDestination.close(); } + if (mailSource != null) { + mailSource.close(); + } + logger.info("IMAPRiver closed"); } diff --git a/src/main/java/de/saly/elasticsearch/riverstate/ElasticsearchRiverStateManager.java b/src/main/java/de/saly/elasticsearch/riverstate/ElasticsearchRiverStateManager.java index b200897..95a99f2 100644 --- a/src/main/java/de/saly/elasticsearch/riverstate/ElasticsearchRiverStateManager.java +++ b/src/main/java/de/saly/elasticsearch/riverstate/ElasticsearchRiverStateManager.java @@ -83,7 +83,7 @@ public synchronized RiverState getRiverState(final Folder folder) throws Messagi final RiverState rs = new RiverState(); rs.setFolderUrl(folder.getURLName().toString()); - rs.setLastUid(1L); + // rs.setLastUid(1L); rs.setExists(true); return rs; diff --git a/src/main/java/de/saly/elasticsearch/riverstate/RiverState.java b/src/main/java/de/saly/elasticsearch/riverstate/RiverState.java index 2767a6d..e452f69 100644 --- a/src/main/java/de/saly/elasticsearch/riverstate/RiverState.java +++ b/src/main/java/de/saly/elasticsearch/riverstate/RiverState.java @@ -41,7 +41,7 @@ public class RiverState { private long lastTook = -1; - private long lastUid = 1; + private long lastUid = -1; private Long uidValidity; @@ -132,7 +132,7 @@ public void setLastTook(final long lastTook) { } public void setLastUid(final long lastUid) { - this.lastUid = lastUid < 1 ? 1L : lastUid; + this.lastUid = lastUid; } public void setUidValidity(final Long uidValidity) { diff --git a/src/main/java/de/saly/elasticsearch/support/IndexableMailMessage.java b/src/main/java/de/saly/elasticsearch/support/IndexableMailMessage.java index b6a6273..c5f14a3 100644 --- a/src/main/java/de/saly/elasticsearch/support/IndexableMailMessage.java +++ b/src/main/java/de/saly/elasticsearch/support/IndexableMailMessage.java @@ -184,7 +184,13 @@ private static String getText(final Part p, int depth) throws MessagingException if (p.isMimeType("text/*")) { - final Object content = p.getContent(); + Object content = null; + try { + content = p.getContent(); + } catch (final Exception e) { + logger.error("Unable to index the content of a message due to {}", e.toString()); + return null; + } if (content instanceof String) { final String s = (String) p.getContent(); diff --git a/src/main/java/de/saly/elasticsearch/support/PooledStoreFactory.java b/src/main/java/de/saly/elasticsearch/support/PooledStoreFactory.java index 5b4222e..aa8396e 100644 --- a/src/main/java/de/saly/elasticsearch/support/PooledStoreFactory.java +++ b/src/main/java/de/saly/elasticsearch/support/PooledStoreFactory.java @@ -25,6 +25,7 @@ **********************************************************************************************************************/ package de.saly.elasticsearch.support; +/* import java.util.Properties; import javax.mail.Session; @@ -72,4 +73,4 @@ public PooledObjectwrap(final Store obj) { return new DefaultPooledObject (obj); } -} +}*/