Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First cut at upgrading to Solr 6.6.2, Hadoop 2.6.0 and Cascading 2.7.1. #13

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

name=cascading.solr
version=2.6-SNAPSHOT
version=2.6-SOLR6

jar.name=${ant.project.name}-${version}.jar

Expand All @@ -36,7 +36,7 @@ build.dir.test-reports=${build.dir}/test
javac.debug=on
javac.optimize=on
javac.deprecation=off
javac.version=1.7
javac.version=1.8
javac.args=
javac.args.warnings=-Xlint:none
build.encoding=UTF-8
14 changes: 10 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<artifactId>cascading.solr</artifactId>
<name>Cascading Scheme for Apache Solr</name>
<packaging>jar</packaging>
<version>2.6-SNAPSHOT</version>
<version>2.6-SOLR6</version>
<description>Cascading Scheme for creating Lucene indexes using Solr</description>
<url>http://github.com/ScaleUnlimited/cascading.solr</url>
<issueManagement>
Expand All @@ -34,9 +34,9 @@
</repositories>

<properties>
<cascading.version>2.5.6</cascading.version>
<hadoop.version>2.2.0</hadoop.version>
<solr.version>4.10.1</solr.version>
<cascading.version>2.7.1</cascading.version>
<hadoop.version>2.6.0</hadoop.version>
<solr.version>6.6.2</solr.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -84,6 +84,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public String getXML() throws IOException {
}

/**
* @return
* @since solr 1.4
*/
public void writeXML( Writer writer ) throws IOException {
public UpdateRequest writeXML( Writer writer ) throws IOException {
throw new IllegalStateException("Can't write XML when using binary protocol");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -19,52 +20,56 @@

public class SolrSchemeUtil {

public static final String DEFAULT_DATA_DIR_PROPERTY_NAME = "solr.data.dir";
public static final String CORE_DIR_NAME = "core";

public static File makeTempSolrHome(File solrCoreDir) throws IOException {
public static File makeTempSolrHome(File solrConfDir, File dataDir) throws IOException {
String tmpFolder = System.getProperty("java.io.tmpdir");
File tmpSolrHome = new File(tmpFolder, UUID.randomUUID().toString());

// Set up a temp location for Solr home, where we're write out a synthetic solr.xml
// that references the core directory.
String coreName = solrCoreDir.getName();
String corePath = solrCoreDir.getAbsolutePath();
String solrXmlContent = String.format("<solr><cores><core name=\"%s\" instanceDir=\"%s\"></core></cores></solr>",
coreName, corePath);
File solrXmlFile = new File(tmpSolrHome, "solr.xml");
FileUtils.write(solrXmlFile, solrXmlContent);
FileUtils.write(solrXmlFile, "<solr></solr>", StandardCharsets.UTF_8);
File coreDir = new File(tmpSolrHome, CORE_DIR_NAME);
coreDir.mkdirs();

// Create the core.properties file with appropriate entries.
File coreProps = new File(coreDir, "core.properties");

StringBuilder props = new StringBuilder();
props.append("enable.special-handlers=false\n"); // All we need is the update request handler
props.append("enable.cache-warming=false\n"); // We certainly don't need to warm the cache

if (dataDir != null) {
props.append("dataDir=");
props.append(dataDir.getAbsolutePath());
props.append('\n');
}

FileUtils.write(coreProps, props.toString(), StandardCharsets.UTF_8);

// Copy over all of the conf/ dir files.
File destDir = new File(coreDir, "conf");
FileUtils.copyDirectory(solrConfDir, destDir);

return tmpSolrHome;
}

public static void validate(File solrCoreDir, String dataDirPropertyName, Fields schemeFields) throws IOException {
public static void validate(File solrConfDir, Fields schemeFields) throws IOException {

// Verify solrHomeDir exists
if (!solrCoreDir.exists() || !solrCoreDir.isDirectory()) {
throw new TapException("Solr core directory doesn't exist: " + solrCoreDir);
// Verify solrConfDir exists
if (!solrConfDir.exists() || !solrConfDir.isDirectory()) {
throw new TapException("Solr conf directory doesn't exist: " + solrConfDir);
}

File tmpSolrHome = makeTempSolrHome(solrCoreDir);

// Set up a temp location for Solr home, where we're write out a synthetic solr.xml
// that references the core directory.
String coreName = solrCoreDir.getName();
String corePath = solrCoreDir.getAbsolutePath();
String solrXmlContent = String.format("<solr><cores><core name=\"%s\" instanceDir=\"%s\"></core></cores></solr>",
coreName, corePath);
File solrXmlFile = new File(tmpSolrHome, "solr.xml");
FileUtils.write(solrXmlFile, solrXmlContent);

// Set up a temp location for data, so when we instantiate the coreContainer,
// we don't pollute the solr home with a /data sub-dir.
String tmpFolder = System.getProperty("java.io.tmpdir");
File tmpDataDir = new File(tmpFolder, UUID.randomUUID().toString());
tmpDataDir.mkdir();

System.setProperty(dataDirPropertyName, tmpDataDir.getAbsolutePath());
System.setProperty("enable.special-handlers", "false"); // All we need is the update request handler
System.setProperty("enable.cache-warming", "false"); // We certainly don't need to warm the cache


// Create a temp solr home dir with a solr.xml and core.properties file to work off.
File tmpSolrHome = makeTempSolrHome(solrConfDir, tmpDataDir);
CoreContainer coreContainer = new CoreContainer(tmpSolrHome.getAbsolutePath());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.tuple.Fields;
import cascading.tuple.Tuple;

public abstract class SolrWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(SolrWriter.class);

// TODO KKr - make this configurable.
private static final int MAX_DOCS_PER_ADD = 500;

Expand All @@ -25,10 +28,10 @@ public abstract class SolrWriter {
private int _maxSegments;

private transient CoreContainer _coreContainer;
private transient SolrServer _solrServer;
private transient EmbeddedSolrServer _solrServer;
private transient BinaryUpdateRequest _updateRequest;

public SolrWriter(KeepAliveHook keepAlive, Fields sinkFields, String dataDirPropertyName, String dataDir, File solrCoreDir, int maxSegments) throws IOException {
public SolrWriter(KeepAliveHook keepAlive, Fields sinkFields, String dataDir, File solrConfDir, int maxSegments) throws IOException {
_keepAlive = keepAlive;
_sinkFields = sinkFields;
_maxSegments = maxSegments;
Expand All @@ -40,13 +43,10 @@ public SolrWriter(KeepAliveHook keepAlive, Fields sinkFields, String dataDirProp

// Fire up an embedded Solr server
try {
System.setProperty(dataDirPropertyName, dataDir);
System.setProperty("enable.special-handlers", "false"); // All we need is the update request handler
System.setProperty("enable.cache-warming", "false"); // We certainly don't need to warm the cache
File solrHome = SolrSchemeUtil.makeTempSolrHome(solrCoreDir);
File solrHome = SolrSchemeUtil.makeTempSolrHome(solrConfDir, new File(dataDir));
_coreContainer = new CoreContainer(solrHome.getAbsolutePath());
_coreContainer.load();
_solrServer = new EmbeddedSolrServer(_coreContainer, solrCoreDir.getName());
_solrServer = new EmbeddedSolrServer(_coreContainer, SolrSchemeUtil.CORE_DIR_NAME);
} catch (Exception e) {
if (_coreContainer != null) {
_coreContainer.shutdown();
Expand Down Expand Up @@ -108,8 +108,8 @@ private void flushInputDocuments(boolean force) throws IOException {
_updateRequest.process(_solrServer);

if (force) {
_solrServer.commit(true, true);
_solrServer.optimize(true, true, _maxSegments);
_solrServer.commit(SolrSchemeUtil.CORE_DIR_NAME, true, true);
_solrServer.optimize(SolrSchemeUtil.CORE_DIR_NAME, true, true, _maxSegments);
}
} catch (SolrServerException e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
public class SolrOutputFormat extends FileOutputFormat<Tuple, Tuple> {
private static final Logger LOGGER = LoggerFactory.getLogger(SolrOutputFormat.class);

public static final String SOLR_CORE_PATH_KEY = "com.scaleunlimited.cascading.solr.corePath";
public static final String SOLR_CONF_PATH_KEY = "com.scaleunlimited.cascading.solr.confPath";
public static final String SINK_FIELDS_KEY = "com.scaleunlimited.cascading.solr.sinkFields";
public static final String MAX_SEGMENTS_KEY = "com.scaleunlimited.cascading.solr.maxSegments";
public static final String DATA_DIR_PROPERTY_NAME_KEY = "com.scaleunlimited.cascading.solr.dataDirPropertyName";

public static final int DEFAULT_MAX_SEGMENTS = 10;

Expand All @@ -45,13 +44,13 @@ private static class SolrRecordWriter implements RecordWriter<Tuple, Tuple> {

public SolrRecordWriter(JobConf conf, String name, Progressable progress) throws IOException {

// Copy Solr core directory from HDFS to temp local location.
Path sourcePath = new Path(conf.get(SOLR_CORE_PATH_KEY));
String coreName = sourcePath.getName();
// Copy Solr conf directory from HDFS to temp local location.
Path sourcePath = new Path(conf.get(SOLR_CONF_PATH_KEY));
String confName = sourcePath.getName();
String tmpDir = System.getProperty("java.io.tmpdir");
File localSolrCore = new File(tmpDir, "cascading.solr-" + UUID.randomUUID() + "/" + coreName);
File localSolrConf = new File(tmpDir, "cascading.solr-" + UUID.randomUUID() + "/" + confName);
FileSystem sourceFS = sourcePath.getFileSystem(conf);
sourceFS.copyToLocalFile(sourcePath, new Path(localSolrCore.getAbsolutePath()));
sourceFS.copyToLocalFile(sourcePath, new Path(localSolrConf.getAbsolutePath()));

// Figure out where ultimately the results need to wind up.
_outputPath = new Path(FileOutputFormat.getTaskOutputPath(conf, name), "index");
Expand All @@ -62,17 +61,15 @@ public SolrRecordWriter(JobConf conf, String name, Progressable progress) throws

int maxSegments = conf.getInt(MAX_SEGMENTS_KEY, DEFAULT_MAX_SEGMENTS);

String dataDirPropertyName = conf.get(DATA_DIR_PROPERTY_NAME_KEY);

// Set up local Solr home.
File localSolrHome = SolrSchemeUtil.makeTempSolrHome(localSolrCore);
File localSolrHome = SolrSchemeUtil.makeTempSolrHome(localSolrConf, null);

// This is where data will wind up, inside of an index subdir.
_localIndexDir = new File(localSolrHome, "data");

_keepAliveHook = new HadoopKeepAliveHook(progress);

_solrWriter = new SolrWriter(_keepAliveHook, sinkFields, dataDirPropertyName, _localIndexDir.getAbsolutePath(), localSolrCore, maxSegments) { };
_solrWriter = new SolrWriter(_keepAliveHook, sinkFields, _localIndexDir.getAbsolutePath(), localSolrConf, maxSegments) { };
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,20 @@
@SuppressWarnings("serial")
public class SolrScheme extends Scheme<JobConf, RecordReader<Tuple, Tuple>, OutputCollector<Tuple, Tuple>, Object[], Void> {

private File _solrCoreDir;
private File _solrConfDir;
private int _maxSegments;
private String _dataDirPropertyName;

public SolrScheme(Fields schemeFields, String solrCoreDir) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrCoreDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS);
public SolrScheme(Fields schemeFields, String solrConfDir) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS);
}

public SolrScheme(Fields schemeFields, String solrCoreDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrCoreDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS, SolrSchemeUtil.DEFAULT_DATA_DIR_PROPERTY_NAME);
}

public SolrScheme(Fields schemeFields, String solrCoreDir, int maxSegments, String dataDirPropertyName) throws IOException, ParserConfigurationException, SAXException {
public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException {
super(schemeFields, schemeFields);

_solrCoreDir = new File(solrCoreDir);
_solrConfDir = new File(solrConfDir);
_maxSegments = maxSegments;
_dataDirPropertyName = dataDirPropertyName;

SolrSchemeUtil.validate(_solrCoreDir, _dataDirPropertyName, schemeFields);
SolrSchemeUtil.validate(_solrConfDir, schemeFields);
}

@Override
Expand All @@ -70,15 +64,15 @@ public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, Record
public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader<Tuple, Tuple>, OutputCollector<Tuple, Tuple>> tap, JobConf conf) {
// Pick temp location in HDFS for conf files.
// TODO KKr - should I get rid of this temp directory when we're done?
String coreDirname = _solrCoreDir.getName();
Path hdfsSolrCoreDir = new Path(Hfs.getTempPath(conf), "solr-core-" + Util.createUniqueID() + "/" + coreDirname);
String confDirname = _solrConfDir.getName();
Path hdfsSolrConfDir = new Path(Hfs.getTempPath(conf), "solr-conf-" + Util.createUniqueID() + "/" + confDirname);

// Copy Solr core directory into HDFS.
// Copy Solr conf directory into HDFS.
try {
FileSystem fs = hdfsSolrCoreDir.getFileSystem(conf);
fs.copyFromLocalFile(new Path(_solrCoreDir.getAbsolutePath()), hdfsSolrCoreDir);
FileSystem fs = hdfsSolrConfDir.getFileSystem(conf);
fs.copyFromLocalFile(new Path(_solrConfDir.getAbsolutePath()), hdfsSolrConfDir);
} catch (IOException e) {
throw new TapException("Can't copy Solr core directory into HDFS", e);
throw new TapException("Can't copy Solr conf directory into HDFS", e);
}

conf.setOutputKeyClass(Tuple.class);
Expand All @@ -91,9 +85,8 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe
throw new TapException("Can't serialize sink fields", e);
}

conf.set(SolrOutputFormat.SOLR_CORE_PATH_KEY, hdfsSolrCoreDir.toString());
conf.set(SolrOutputFormat.SOLR_CONF_PATH_KEY, hdfsSolrConfDir.toString());
conf.setInt(SolrOutputFormat.MAX_SEGMENTS_KEY, _maxSegments);
conf.set(SolrOutputFormat.DATA_DIR_PROPERTY_NAME_KEY, _dataDirPropertyName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

public class SolrCollector extends SolrWriter {

public SolrCollector(FlowProcess<Properties> flowProcess, Fields sinkFields, File solrCoreDir, int maxSegments, String dataDirPropertyName, String dataDir) throws IOException {
super(new LocalKeepAliveHook(flowProcess), sinkFields, dataDirPropertyName, dataDir, solrCoreDir, maxSegments);
public SolrCollector(FlowProcess<Properties> flowProcess, Fields sinkFields, File solrConfDir, int maxSegments, String dataDir) throws IOException {
super(new LocalKeepAliveHook(flowProcess), sinkFields, dataDir, solrConfDir, maxSegments);
}

public void collect(Tuple value) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,20 @@ public class SolrScheme extends Scheme<Properties, InputStream, OutputStream, Vo

public static final int DEFAULT_DEFAULT_MAX_SEGMENTS = 1;

private File _solrCoreDir;
private File _solrConfDir;
private int _maxSegments;
private String _dataDirPropertyName;

public SolrScheme(Fields schemeFields, String solrCoreDir) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrCoreDir, DEFAULT_DEFAULT_MAX_SEGMENTS);
public SolrScheme(Fields schemeFields, String solrConfDir) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, DEFAULT_DEFAULT_MAX_SEGMENTS);
}

public SolrScheme(Fields schemeFields, String solrCoreDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrCoreDir, DEFAULT_DEFAULT_MAX_SEGMENTS, SolrSchemeUtil.DEFAULT_DATA_DIR_PROPERTY_NAME);
}

public SolrScheme(Fields schemeFields, String solrCoreDir, int maxSegments, String dataDirPropertyName) throws IOException, ParserConfigurationException, SAXException {
public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException {
super(schemeFields, schemeFields);

_solrCoreDir = new File(solrCoreDir);
_solrConfDir = new File(solrConfDir);
_maxSegments = maxSegments;
_dataDirPropertyName = dataDirPropertyName;

SolrSchemeUtil.validate(_solrCoreDir, _dataDirPropertyName, schemeFields);
SolrSchemeUtil.validate(_solrConfDir, schemeFields);
}

@Override
Expand Down Expand Up @@ -79,7 +73,7 @@ public void sinkPrepare(FlowProcess<Properties> flowProcess, SinkCall<SolrCollec

// Set context to be the embedded solr server (or rather a wrapper for it, that handles caching)
// TODO this call gets made BEFORE sinkConfInit, so I don't have the _dataDir set up at this point, which seems wrong.
SolrCollector collector = new SolrCollector(flowProcess, getSinkFields(), _solrCoreDir, _maxSegments, _dataDirPropertyName, path);
SolrCollector collector = new SolrCollector(flowProcess, getSinkFields(), _solrConfDir, _maxSegments, path);
sinkCall.setContext(collector);
}

Expand Down
Loading