diff --git a/pom.xml b/pom.xml index 95af5cd..839365b 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,16 @@ 8.2.0 + + + + commons-codec + commons-codec + 1.9 + + + + @@ -135,6 +145,11 @@ 1.8.3 + + commons-codec + commons-codec + + diff --git a/src/main/java/com/scaleunlimited/cascading/scheme/core/Metadata.java b/src/main/java/com/scaleunlimited/cascading/scheme/core/Metadata.java new file mode 100644 index 0000000..c7dbf64 --- /dev/null +++ b/src/main/java/com/scaleunlimited/cascading/scheme/core/Metadata.java @@ -0,0 +1,40 @@ +package com.scaleunlimited.cascading.scheme.core; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; + +import org.apache.commons.codec.digest.DigestUtils; + +public class Metadata { + + public static final String MD5_FILE_NAME = ".md5"; + + public static File writeMetadata(File partDir) throws IOException { + File md5File = new File(partDir, Metadata.MD5_FILE_NAME); + OutputStream fos = new FileOutputStream(md5File); + OutputStreamWriter osw = new OutputStreamWriter(fos, "UTF-8"); + try { + File indexDir = new File(partDir, "index"); + File[] indexFiles = indexDir.listFiles(); + for (File indexFile : indexFiles) { + InputStream is = new FileInputStream(indexFile); + String md5 = null; + try { + md5 = DigestUtils.md5Hex(is); + } finally { + is.close(); + } + osw.write(indexFile.getName() + "\t" + md5 + "\n"); + } + } finally { + osw.close(); + } + return md5File; + } + +} diff --git a/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrOutputFormat.java b/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrOutputFormat.java index 3fda3ff..5bc24c2 100644 --- a/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrOutputFormat.java +++ b/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrOutputFormat.java @@ -16,27 +16,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import cascading.flow.hadoop.util.HadoopUtil; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; - import com.scaleunlimited.cascading.scheme.core.KeepAliveHook; +import com.scaleunlimited.cascading.scheme.core.Metadata; import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil; import com.scaleunlimited.cascading.scheme.core.SolrWriter; +import cascading.flow.hadoop.util.HadoopUtil; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; + public class SolrOutputFormat extends FileOutputFormat { private static final Logger LOGGER = LoggerFactory.getLogger(SolrOutputFormat.class); public static final String SOLR_CONF_PATH_KEY = "com.scaleunlimited.cascading.solr.confPath"; public static final String SINK_FIELDS_KEY = "com.scaleunlimited.cascading.solr.sinkFields"; public static final String MAX_SEGMENTS_KEY = "com.scaleunlimited.cascading.solr.maxSegments"; - + public static final String INCLUDE_METADATA_KEY = "com.scaleunlimited.cascading.solr.includeMetadata"; + public static final int DEFAULT_MAX_SEGMENTS = 10; private static class SolrRecordWriter implements RecordWriter { private Path _outputPath; private FileSystem _outputFS; + private boolean _isIncludeMetadata; private transient KeepAliveHook _keepAliveHook; private transient File _localIndexDir; @@ -59,7 +62,9 @@ public SolrRecordWriter(JobConf conf, String name, Progressable progress) throws // Get the set of fields we're indexing. Fields sinkFields = HadoopUtil.deserializeBase64(conf.get(SINK_FIELDS_KEY), conf, Fields.class); + // Load optional configuration parameters. int maxSegments = conf.getInt(MAX_SEGMENTS_KEY, DEFAULT_MAX_SEGMENTS); + _isIncludeMetadata = conf.getBoolean(INCLUDE_METADATA_KEY, false); // Set up local Solr home. File localSolrHome = SolrSchemeUtil.makeTempSolrHome(localSolrConf, null); @@ -102,6 +107,13 @@ private void copyToHDFS() throws IOException { Thread reporterThread = startProgressThread(); try { + if (_isIncludeMetadata) { + File localMetadataFile = Metadata.writeMetadata(_localIndexDir); + Path metadataPath = new Path(_outputPath.getParent(), Metadata.MD5_FILE_NAME); + LOGGER.info(String.format("Copying index metadata from %s to %s", _localIndexDir, metadataPath)); + _outputFS.copyFromLocalFile(true, new Path(localMetadataFile.getAbsolutePath()), metadataPath); + } + long indexSize = FileUtils.sizeOfDirectory(indexDir); LOGGER.info(String.format("Copying %d bytes of index from %s to %s", indexSize, _localIndexDir, _outputPath)); _outputFS.copyFromLocalFile(true, new Path(indexDir.getAbsolutePath()), _outputPath); diff --git a/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrScheme.java b/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrScheme.java index f8758a5..bee1c2a 100644 --- a/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrScheme.java +++ b/src/main/java/com/scaleunlimited/cascading/scheme/hadoop/SolrScheme.java @@ -12,6 +12,8 @@ import org.apache.hadoop.mapred.RecordReader; import org.xml.sax.SAXException; +import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil; + import cascading.flow.FlowProcess; import cascading.flow.hadoop.util.HadoopUtil; import cascading.scheme.Scheme; @@ -24,23 +26,31 @@ import cascading.tuple.Tuple; import cascading.util.Util; -import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil; - @SuppressWarnings("serial") public class SolrScheme extends Scheme, OutputCollector, Object[], Void> { - + private File _solrConfDir; private int _maxSegments; - + private boolean _isIncludeMetadata; + public SolrScheme(Fields schemeFields, String solrConfDir) throws IOException, ParserConfigurationException, SAXException { this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS); } public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException { + this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS, false); + } + + public SolrScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException { + this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS, isIncludeMetadata); + } + + public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException { super(schemeFields, schemeFields); _solrConfDir = new File(solrConfDir); _maxSegments = maxSegments; + _isIncludeMetadata = isIncludeMetadata; SolrSchemeUtil.validate(_solrConfDir, schemeFields); } @@ -87,6 +97,7 @@ public void sinkConfInit(FlowProcess flowProcess, Tap conf, SourceCall flowProcess, SinkCall> sinkCall) throws IOException { sinkCall.getOutput().collect(Tuple.NULL, sinkCall.getOutgoingEntry().getTuple()); } -} + +} \ No newline at end of file diff --git a/src/main/java/com/scaleunlimited/cascading/scheme/local/SolrScheme.java b/src/main/java/com/scaleunlimited/cascading/scheme/local/SolrScheme.java index 5f76cc7..b9b2290 100644 --- a/src/main/java/com/scaleunlimited/cascading/scheme/local/SolrScheme.java +++ b/src/main/java/com/scaleunlimited/cascading/scheme/local/SolrScheme.java @@ -10,6 +10,10 @@ import org.xml.sax.SAXException; +import com.scaleunlimited.cascading.local.DirectoryFileOutputStream; +import com.scaleunlimited.cascading.scheme.core.Metadata; +import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil; + import cascading.flow.FlowProcess; import cascading.scheme.Scheme; import cascading.scheme.SinkCall; @@ -18,26 +22,34 @@ import cascading.tap.TapException; import cascading.tuple.Fields; -import com.scaleunlimited.cascading.local.DirectoryFileOutputStream; -import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil; - @SuppressWarnings("serial") public class SolrScheme extends Scheme { public static final int DEFAULT_DEFAULT_MAX_SEGMENTS = 1; private File _solrConfDir; + private File _partDir; private int _maxSegments; + private boolean _isIncludeMetadata; public SolrScheme(Fields schemeFields, String solrConfDir) throws IOException, ParserConfigurationException, SAXException { this(schemeFields, solrConfDir, DEFAULT_DEFAULT_MAX_SEGMENTS); } public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException { + this(schemeFields, solrConfDir, maxSegments, false); + } + + public SolrScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException { + this(schemeFields, solrConfDir, DEFAULT_DEFAULT_MAX_SEGMENTS, isIncludeMetadata); + } + + public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException { super(schemeFields, schemeFields); _solrConfDir = new File(solrConfDir); _maxSegments = maxSegments; + _isIncludeMetadata = isIncludeMetadata; SolrSchemeUtil.validate(_solrConfDir, schemeFields); } @@ -68,8 +80,11 @@ public void sinkPrepare(FlowProcess flowProcess, SinkCall flowProcess, SinkCall flowProcess, SinkCall sinkCall) throws IOException { SolrCollector collector = sinkCall.getContext(); collector.cleanup(); + if (_isIncludeMetadata) { + Metadata.writeMetadata(_partDir); + } } } diff --git a/src/test/java/com/scaleunlimited/cascading/scheme/core/AbstractSolrSchemeTest.java b/src/test/java/com/scaleunlimited/cascading/scheme/core/AbstractSolrSchemeTest.java index 88faec2..e2e45f2 100644 --- a/src/test/java/com/scaleunlimited/cascading/scheme/core/AbstractSolrSchemeTest.java +++ b/src/test/java/com/scaleunlimited/cascading/scheme/core/AbstractSolrSchemeTest.java @@ -1,9 +1,17 @@ package com.scaleunlimited.cascading.scheme.core; import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.BytesWritable; import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; import org.apache.solr.client.solrj.response.QueryResponse; @@ -12,6 +20,7 @@ import org.apache.solr.core.CoreContainer; import org.junit.Assert; import org.junit.Before; +import org.junit.Test; import com.scaleunlimited.cascading.local.DirectoryTap; import com.scaleunlimited.cascading.scheme.local.SolrScheme; @@ -39,6 +48,7 @@ public abstract class AbstractSolrSchemeTest extends Assert { protected abstract Tap makeSourceTap(Fields fields, String path); protected abstract FlowProcess makeFlowProcess(); + protected abstract Tap makeSolrSink(Scheme scheme, String path) throws Exception; protected abstract Tap makeSolrSink(Fields fields, String path) throws Exception; protected abstract FlowConnector makeFlowConnector(); @@ -46,6 +56,8 @@ public abstract class AbstractSolrSchemeTest extends Assert { protected abstract Scheme makeScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws Exception; + protected abstract Scheme makeScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws Exception; + @Before public void setUp() throws IOException { File outputDir = new File(getTestDir()); @@ -175,6 +187,85 @@ protected void testSimpleIndexing() throws Exception { solrServer.close(); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + protected void testMd5() throws Exception { + + // Write input data + final Fields testFields = new Fields("id", "name", "price", "inStock"); + final File inDir = new File(getTestDir() + "testMd5/in"); + Tap source = makeSourceTap(testFields, inDir.getAbsolutePath()); + TupleEntryCollector writer = source.openForWrite(makeFlowProcess()); + for (int i = 0; i < 100; i++) { + writer.add(new Tuple(i, "product #" + i, i * 1.0f, true)); + } + writer.close(); + + // Read input data and then write it to Solr index + final File outDir = new File(getTestDir() + "testMd5/out"); + Scheme scheme = makeScheme( testFields, SOLR_CONF_DIR, true); + Tap solrSink = makeSolrSink(scheme, outDir.getPath()); + Pipe writePipe = new Pipe("tuples to Solr"); + Flow flow = makeFlowConnector().connect(source, solrSink, writePipe); + flow.complete(); + + // Check MD5s saved within each part directory + File[] partDirs = outDir.listFiles(new FilenameFilter() { + public boolean accept(File file, String string) { + return string.startsWith("part-"); + } + }); + for (File partDir : partDirs) { + + // Read MD5 metadata into a map + File md5File = new File(partDir, Metadata.MD5_FILE_NAME); + FileInputStream fis = new FileInputStream(md5File); + List lines = IOUtils.readLines(fis); + Map indexFileNameToMD5Map = + new HashMap(); + for (String rawLine : lines) { + String line = rawLine.replaceFirst("#.*$", "").trim(); + if (!line.isEmpty()) { + String fields[] = line.split("\t", 3); + if (fields.length < 2) { + throw new RuntimeException( "Invalid MD5 metadata (expected \t):\n" + + line); + } + String indexFileName = fields[0].trim(); + String md5 = fields[1].trim(); + assertNull(indexFileNameToMD5Map.put(indexFileName, md5)); + } + } + + // Compare map to MD5 of index files in part directory + File indexDir = new File(partDir, "index"); + File[] indexFiles = indexDir.listFiles(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + return !(name.endsWith(".crc")); + } + }); + for (File indexFile : indexFiles) { + String expectedMD5 = getMD5(indexFile); + assertEquals( "wrong MD5 for " + indexFile, + expectedMD5, + indexFileNameToMD5Map.get(indexFile.getName())); + } + } + } + + private static String getMD5(File indexFile) throws IOException { + InputStream is = new FileInputStream(indexFile); + String result = null; + try { + result = DigestUtils.md5Hex(is); + } finally { + is.close(); + } + return result; + } private static void assertEquals(byte[] expected, byte[] actual) { diff --git a/src/test/java/com/scaleunlimited/cascading/scheme/hadoop/SolrSchemeHadoopTest.java b/src/test/java/com/scaleunlimited/cascading/scheme/hadoop/SolrSchemeHadoopTest.java index 272e9f0..fdb7e27 100644 --- a/src/test/java/com/scaleunlimited/cascading/scheme/hadoop/SolrSchemeHadoopTest.java +++ b/src/test/java/com/scaleunlimited/cascading/scheme/hadoop/SolrSchemeHadoopTest.java @@ -5,6 +5,8 @@ import org.junit.Test; +import com.scaleunlimited.cascading.scheme.core.AbstractSolrSchemeTest; + import cascading.flow.FlowConnector; import cascading.flow.FlowProcess; import cascading.flow.hadoop.HadoopFlowConnector; @@ -19,8 +21,6 @@ import cascading.tuple.hadoop.BytesSerialization; import cascading.tuple.hadoop.TupleSerializationProps; -import com.scaleunlimited.cascading.scheme.core.AbstractSolrSchemeTest; - public class SolrSchemeHadoopTest extends AbstractSolrSchemeTest { private static final String TEST_DIR = "build/test/SolrSchemeHadoopTest/"; @@ -54,6 +54,11 @@ protected FlowProcess makeFlowProcess() { return new SolrScheme(schemeFields, solrConfDir, maxSegments); } + @Override + protected Scheme makeScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws Exception { + return new SolrScheme(schemeFields, solrConfDir, isIncludeMetadata); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected Tap makeSolrSink(Fields fields, String path) throws Exception { @@ -61,6 +66,11 @@ protected FlowProcess makeFlowProcess() { return new Hfs(scheme, path, SinkMode.REPLACE); } + @Override + protected Tap makeSolrSink(Scheme scheme, String path) throws Exception { + return new Hfs(scheme, path, SinkMode.REPLACE); + } + @Override protected Tap makeSourceTap(Fields fields, String path) { return new Hfs(new SequenceFile(fields), path, SinkMode.REPLACE); @@ -96,4 +106,9 @@ public void testSimpleIndexing() throws Exception { super.testSimpleIndexing(); } + @Test + public void testMd5() throws Exception { + super.testMd5(); + } + } diff --git a/src/test/java/com/scaleunlimited/cascading/scheme/local/SolrSchemeLocalTest.java b/src/test/java/com/scaleunlimited/cascading/scheme/local/SolrSchemeLocalTest.java index 4baf08c..3a1e292 100644 --- a/src/test/java/com/scaleunlimited/cascading/scheme/local/SolrSchemeLocalTest.java +++ b/src/test/java/com/scaleunlimited/cascading/scheme/local/SolrSchemeLocalTest.java @@ -2,6 +2,10 @@ import org.junit.Test; +import com.scaleunlimited.cascading.local.DirectoryTap; +import com.scaleunlimited.cascading.local.KryoScheme; +import com.scaleunlimited.cascading.scheme.core.AbstractSolrSchemeTest; + import cascading.flow.FlowConnector; import cascading.flow.FlowProcess; import cascading.flow.local.LocalFlowConnector; @@ -12,10 +16,6 @@ import cascading.tap.local.FileTap; import cascading.tuple.Fields; -import com.scaleunlimited.cascading.local.DirectoryTap; -import com.scaleunlimited.cascading.local.KryoScheme; -import com.scaleunlimited.cascading.scheme.core.AbstractSolrSchemeTest; - public class SolrSchemeLocalTest extends AbstractSolrSchemeTest { private static final String TEST_DIR = "build/test/SolrSchemeLocalTest/"; @@ -40,6 +40,11 @@ protected FlowProcess makeFlowProcess() { return new DirectoryTap(new SolrScheme(fields, SOLR_CONF_DIR), path); } + @Override + protected Tap makeSolrSink(Scheme scheme, String path) throws Exception { + return new DirectoryTap(scheme, path); + } + @Override protected FlowConnector makeFlowConnector() { return new LocalFlowConnector(); @@ -55,6 +60,11 @@ protected cascading.scheme.Scheme makeScheme(Fields schemeFields, Str return new SolrScheme(schemeFields, solrConfDir, maxSegments); } + @Override + protected Scheme makeScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws Exception { + return new SolrScheme(schemeFields, solrConfDir, isIncludeMetadata); + } + @Test public void testSchemeChecksMissingConf() throws Exception { super.testSchemeChecksMissingConf(); @@ -85,4 +95,9 @@ public void testSimpleIndexing() throws Exception { super.testSimpleIndexing(); } + @Test + public void testMd5() throws Exception { + super.testMd5(); + } + }