Skip to content

Commit

Permalink
update to elasticsearch 1.2.1, requires java 7, fixes issues #5, #6, #7
Browse files Browse the repository at this point in the history
  • Loading branch information
salyh committed Jun 20, 2014
1 parent e53379d commit 19ba058
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 60 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ The river acts currently as a disconnected client. This means that the river is
At a later time is planned to use additionally the IMAP IDLE feature (if server supports it).

Branches:
* master for Elasticsearch 1.1.0 - 1.1.x
* master for Elasticsearch 1.2.1

<h3>Installation</h3>
Prerequisites:
* Open JDK 6/7 or Oracle 7 JRE
* Elasticsearch 1.1.0 or higher
* Open JDK 7 or Oracle 7 JRE (Java 6 is no longer supported and Java 8 is untested)
* Elasticsearch 1.2.1 or higher
* At least one IMAP4 or POP3 server to connect to

``plugin.sh|.bat -i river-imap -u http://dl.bintray.com/salyh/maven/de/saly/elasticsearch/plugin/elasticsearch-river-imap/0.0.7-b12/elasticsearch-river-imap-0.0.7-b12-plugin.zip``
``plugin.sh|.bat -i river-imap -u http://dl.bintray.com/salyh/maven/de/saly/elasticsearch/plugin/elasticsearch-river-imap/0.0.7-b20/elasticsearch-river-imap-0.0.7-b20-plugin.zip``

<h3>Configuration</h3>
<pre>curl -XPUT 'http://localhost:9200/_river/nameofyourriver/_meta' -d '{
Expand Down
54 changes: 44 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>de.saly.elasticsearch.plugin</groupId>
<artifactId>elasticsearch-river-imap</artifactId>
<version>0.0.7-b12</version>
<version>0.0.7-b20</version>
<packaging>jar</packaging>
<description>IMAP river for Elasticsearch</description>
<inceptionYear>2014</inceptionYear>
Expand All @@ -25,7 +25,7 @@
</licenses>

<properties>
<elasticsearch.version>1.1.1</elasticsearch.version>
<elasticsearch.version>1.2.1</elasticsearch.version>
<github.global.server>github</github.global.server>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
Expand Down Expand Up @@ -73,24 +73,35 @@
<dependency>
<groupId>com.github.tlrx</groupId>
<artifactId>elasticsearch-test</artifactId>
<version>1.1.0</version>
<version>1.2.1</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>de.saly</groupId>
<artifactId>javamail-mock2-halfmock</artifactId>
<version>0.5-beta3</version>
<version>0.5-beta4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>test</scope>

<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -139,11 +150,11 @@
<version>1.0.0</version>
</dependency>

<dependency>
<!--<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
</dependency>-->

<dependency>
<groupId>org.quartz-scheduler</groupId>
Expand All @@ -170,13 +181,36 @@
</resource>
</resources>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce-versions</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireJavaVersion>
<version>[1.7,)</version>
</requireJavaVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
<optimize>true</optimize>
<showDeprecation>true</showDeprecation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.mail.Message;
import javax.mail.MessagingException;

import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -96,11 +97,11 @@ public ElasticsearchMailDestination client(final Client client) {
}

@Override
public void close() {
public synchronized void close() {

super.close();

while (!isError() && queue.get() > 0) {
/*while (!isError() && queue.get() > 0) {
logger.info("There are {} outstanding bulk messages, will wait until flushed", queue.get());
try {
Expand All @@ -109,10 +110,10 @@ public void close() {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted", e);
}
}
}*/

if (bulk != null) {
logger.debug("Shutdown (flush) bulk processor");
logger.debug("Shutdown (flush) bulk processor, is super closed " + isClosed());
bulk.close();
}

Expand Down Expand Up @@ -160,8 +161,20 @@ public void onMessage(final Message msg) throws IOException, MessagingException
}

// following block not needs to be synchronized
bulk.add(createIndexRequest(imsg));
queue.incrementAndGet();
try {

if (!isClosed()) {
bulk.add(createIndexRequest(imsg));
queue.incrementAndGet();
}
} catch (final ElasticsearchIllegalStateException e) {

if (isClosed()) {
logger.debug("Bulkprocessing error due to {}", e.toString());
} else {
logger.error("Bulkprocessing error due to {}", e, e.toString());
}
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ public ElasticsearchMailDestination client(final Client client) {
}

@Override
public void close() {
public synchronized void close() {

if (closed) {
return;
}

closed = true;

logger.info("Closed");
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down Expand Up @@ -328,6 +330,11 @@ private synchronized void createIndexIfNotExists() throws IOException {
return;
}

// see if index already exists
if (client.admin().indices().prepareExists(index).execute().actionGet().isExists()) {
return;
}

final CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(index);
if (settings != null) {
createIndexRequestBuilder.setSettings(settings);
Expand Down Expand Up @@ -392,11 +399,11 @@ protected Client getClient() {
return client;
}

protected boolean isClosed() {
protected synchronized boolean isClosed() {
return closed;
}

protected boolean isError() {
protected synchronized boolean isError() {
return error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import javax.mail.Folder;
Expand All @@ -47,7 +46,6 @@
import javax.mail.UIDFolder;
import javax.mail.internet.MimeMessage;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
Expand Down Expand Up @@ -84,17 +82,17 @@ public ParallelPollingIMAPMailSource(final Properties props, final int threadCou
public void close() {
if (es != null) {

logger.debug("Initiate shutdown");
logger.info("Initiate shutdown");
es.shutdown();
try {
if (es.awaitTermination(10, TimeUnit.SECONDS)) {
logger.debug("Shutdown completed gracefully");
/*try {
if (es.awaitTermination(2, TimeUnit.SECONDS)) {
logger.info("Shutdown completed gracefully");
} else {
logger.warn("Shutdown completed not gracefully, timeout elapsed");
}
} catch (final InterruptedException e) {
logger.warn("Shutdown completed not gracefully, thread interrupted");
}
}*/
}
}

Expand Down Expand Up @@ -201,7 +199,7 @@ public ProcessResult call() throws Exception {
processedCount += fu.get().processedCount;
logger.debug("Finished with " + fu.get());
} catch (final Exception e) {
logger.error("Unable to process some mails due to {}, will retry ...", e, e.toString());
logger.error("Unable to process some mails due to {}", e, e.toString());
}
}

Expand All @@ -227,6 +225,8 @@ private ProcessResult processMessageSlice(final int start, final int end, final
final Message[] msgs = folder.getMessages(start, end);
folder.fetch(msgs, IMAPUtils.FETCH_PROFILE_HEAD);

logger.debug("folder fetch done");

long highestUid = 0;
int processedCount = 0;

Expand All @@ -240,6 +240,11 @@ private ProcessResult processMessageSlice(final int start, final int end, final

highestUid = Math.max(highestUid, uid);
processedCount++;

if (Thread.currentThread().isInterrupted()) {
break;
}

} catch (final Exception e) {
stateManager.onError("Unable to make indexable message", m, e);
logger.error("Unable to make indexable message due to {}", e, e.toString());
Expand Down Expand Up @@ -358,19 +363,14 @@ protected void fetch(final Folder folder) throws MessagingException, IOException
final long highestUID = riverState.getLastUid(); // this uid is
// already
// processed
// (except
// highestUID
// == 1, then
// we cannot be
// sure)

logger.debug("highestUID: {}", highestUID);

Message[] msgsnew = uidfolder.getMessagesByUID(highestUID, UIDFolder.LASTUID);
final Message[] msgsnew = uidfolder.getMessagesByUID(highestUID, UIDFolder.LASTUID);

// msgnew.size is always >= 1
if (highestUID > 1 && uidfolder.getUID(msgsnew[0]) <= highestUID) {
msgsnew = (Message[]) ArrayUtils.remove(msgsnew, 0);
if (highestUID > 0 && uidfolder.getUID(msgsnew[0]) <= highestUID) {
// msgsnew = (Message[]) ArrayUtils.remove(msgsnew, 0);
}

if (msgsnew.length > 0) {
Expand Down Expand Up @@ -529,7 +529,7 @@ protected void recurseFolders(final Folder folder, final Pattern pattern) throws

if (folder != null) {

if (es == null || es.isShutdown() || es.isTerminated()) {
if (es == null || es.isShutdown() || es.isTerminated() || Thread.currentThread().isInterrupted()) {

logger.warn("Stop processing of mails due to mail source is closed");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import javax.mail.Folder;
Expand Down Expand Up @@ -82,17 +81,17 @@ public ParallelPollingPOPMailSource(final Properties props, final int threadCoun
public void close() {
if (es != null) {

logger.debug("Initiate shutdown");
logger.info("Initiate shutdown");
es.shutdown();
try {
if (es.awaitTermination(10, TimeUnit.SECONDS)) {
logger.debug("Shutdown completed gracefully");
/*try {
if (es.awaitTermination(2, TimeUnit.SECONDS)) {
logger.info("Shutdown completed gracefully");
} else {
logger.warn("Shutdown completed not gracefully, timeout elapsed");
}
} catch (final InterruptedException e) {
logger.warn("Shutdown completed not gracefully, thread interrupted");
}
}*/
}
}

Expand Down Expand Up @@ -192,7 +191,7 @@ public ProcessResult call() throws Exception {

processedCount += fu.get().processedCount;
} catch (final Exception e) {
logger.error("Unable to process some mails due to {}, will retry ...", e, e.toString());
logger.error("Unable to process some mails due to {}", e, e.toString());
}
}

Expand Down Expand Up @@ -222,6 +221,11 @@ private ProcessResult processMessageSlice(final int start, final int end, final
try {
mailDestination.onMessage(m);
processedCount++;

if (Thread.currentThread().isInterrupted()) {
break;
}

} catch (final Exception e) {
stateManager.onError("Unable to make indexable message", m, e);
logger.error("Unable to make indexable message due to {}", e, e.toString());
Expand Down Expand Up @@ -376,7 +380,7 @@ protected void recurseFolders(final Folder folder, final Pattern pattern) throws

if (folder != null) {

if (es == null || es.isShutdown() || es.isTerminated()) {
if (es == null || es.isShutdown() || es.isTerminated() || Thread.currentThread().isInterrupted()) {

logger.warn("Stop processing of mails due to mail source is closed");
return;
Expand Down
Loading

0 comments on commit 19ba058

Please sign in to comment.