Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at optional .md5 metadata file (Solr8) #17

Open
wants to merge 2 commits into
base: 14_update-solr8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@
<solr.version>8.2.0</solr.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
Expand Down Expand Up @@ -135,6 +145,11 @@
<version>1.8.3</version>
</dependency>

<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

<!-- Test -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.scaleunlimited.cascading.scheme.core;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

import org.apache.commons.codec.digest.DigestUtils;

public class Metadata {

public static final String MD5_FILE_NAME = ".md5";

public static File writeMetadata(File partDir) throws IOException {
File md5File = new File(partDir, Metadata.MD5_FILE_NAME);
OutputStream fos = new FileOutputStream(md5File);
OutputStreamWriter osw = new OutputStreamWriter(fos, "UTF-8");
try {
File indexDir = new File(partDir, "index");
File[] indexFiles = indexDir.listFiles();
for (File indexFile : indexFiles) {
InputStream is = new FileInputStream(indexFile);
String md5 = null;
try {
md5 = DigestUtils.md5Hex(is);
} finally {
is.close();
}
osw.write(indexFile.getName() + "\t" + md5 + "\n");
}
} finally {
osw.close();
}
return md5File;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.hadoop.util.HadoopUtil;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;

import com.scaleunlimited.cascading.scheme.core.KeepAliveHook;
import com.scaleunlimited.cascading.scheme.core.Metadata;
import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil;
import com.scaleunlimited.cascading.scheme.core.SolrWriter;

import cascading.flow.hadoop.util.HadoopUtil;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;

public class SolrOutputFormat extends FileOutputFormat<Tuple, Tuple> {
private static final Logger LOGGER = LoggerFactory.getLogger(SolrOutputFormat.class);

public static final String SOLR_CONF_PATH_KEY = "com.scaleunlimited.cascading.solr.confPath";
public static final String SINK_FIELDS_KEY = "com.scaleunlimited.cascading.solr.sinkFields";
public static final String MAX_SEGMENTS_KEY = "com.scaleunlimited.cascading.solr.maxSegments";

public static final String INCLUDE_METADATA_KEY = "com.scaleunlimited.cascading.solr.includeMetadata";

public static final int DEFAULT_MAX_SEGMENTS = 10;

private static class SolrRecordWriter implements RecordWriter<Tuple, Tuple> {

private Path _outputPath;
private FileSystem _outputFS;
private boolean _isIncludeMetadata;

private transient KeepAliveHook _keepAliveHook;
private transient File _localIndexDir;
Expand All @@ -59,7 +62,9 @@ public SolrRecordWriter(JobConf conf, String name, Progressable progress) throws
// Get the set of fields we're indexing.
Fields sinkFields = HadoopUtil.deserializeBase64(conf.get(SINK_FIELDS_KEY), conf, Fields.class);

// Load optional configuration parameters.
int maxSegments = conf.getInt(MAX_SEGMENTS_KEY, DEFAULT_MAX_SEGMENTS);
_isIncludeMetadata = conf.getBoolean(INCLUDE_METADATA_KEY, false);

// Set up local Solr home.
File localSolrHome = SolrSchemeUtil.makeTempSolrHome(localSolrConf, null);
Expand Down Expand Up @@ -102,6 +107,13 @@ private void copyToHDFS() throws IOException {
Thread reporterThread = startProgressThread();

try {
if (_isIncludeMetadata) {
File localMetadataFile = Metadata.writeMetadata(_localIndexDir);
Path metadataPath = new Path(_outputPath.getParent(), Metadata.MD5_FILE_NAME);
LOGGER.info(String.format("Copying index metadata from %s to %s", _localIndexDir, metadataPath));
_outputFS.copyFromLocalFile(true, new Path(localMetadataFile.getAbsolutePath()), metadataPath);
}

long indexSize = FileUtils.sizeOfDirectory(indexDir);
LOGGER.info(String.format("Copying %d bytes of index from %s to %s", indexSize, _localIndexDir, _outputPath));
_outputFS.copyFromLocalFile(true, new Path(indexDir.getAbsolutePath()), _outputPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.hadoop.mapred.RecordReader;
import org.xml.sax.SAXException;

import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.scheme.Scheme;
Expand All @@ -24,23 +26,31 @@
import cascading.tuple.Tuple;
import cascading.util.Util;

import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil;

@SuppressWarnings("serial")
public class SolrScheme extends Scheme<JobConf, RecordReader<Tuple, Tuple>, OutputCollector<Tuple, Tuple>, Object[], Void> {

private File _solrConfDir;
private int _maxSegments;

private boolean _isIncludeMetadata;

public SolrScheme(Fields schemeFields, String solrConfDir) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS);
}

public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS, false);
}

public SolrScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, SolrOutputFormat.DEFAULT_MAX_SEGMENTS, isIncludeMetadata);
}

public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException {
super(schemeFields, schemeFields);

_solrConfDir = new File(solrConfDir);
_maxSegments = maxSegments;
_isIncludeMetadata = isIncludeMetadata;

SolrSchemeUtil.validate(_solrConfDir, schemeFields);
}
Expand Down Expand Up @@ -87,6 +97,7 @@ public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordRe

conf.set(SolrOutputFormat.SOLR_CONF_PATH_KEY, hdfsSolrConfDir.toString());
conf.setInt(SolrOutputFormat.MAX_SEGMENTS_KEY, _maxSegments);
conf.setBoolean(SolrOutputFormat.INCLUDE_METADATA_KEY, _isIncludeMetadata);
}

@Override
Expand All @@ -98,4 +109,5 @@ public boolean source(FlowProcess<JobConf> conf, SourceCall<Object[], RecordRead
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector<Tuple, Tuple>> sinkCall) throws IOException {
sinkCall.getOutput().collect(Tuple.NULL, sinkCall.getOutgoingEntry().getTuple());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

import org.xml.sax.SAXException;

import com.scaleunlimited.cascading.local.DirectoryFileOutputStream;
import com.scaleunlimited.cascading.scheme.core.Metadata;
import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
Expand All @@ -18,26 +22,34 @@
import cascading.tap.TapException;
import cascading.tuple.Fields;

import com.scaleunlimited.cascading.local.DirectoryFileOutputStream;
import com.scaleunlimited.cascading.scheme.core.SolrSchemeUtil;

@SuppressWarnings("serial")
public class SolrScheme extends Scheme<Properties, InputStream, OutputStream, Void, SolrCollector> {

public static final int DEFAULT_DEFAULT_MAX_SEGMENTS = 1;

private File _solrConfDir;
private File _partDir;
private int _maxSegments;
private boolean _isIncludeMetadata;

public SolrScheme(Fields schemeFields, String solrConfDir) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, DEFAULT_DEFAULT_MAX_SEGMENTS);
}

public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, maxSegments, false);
}

public SolrScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException {
this(schemeFields, solrConfDir, DEFAULT_DEFAULT_MAX_SEGMENTS, isIncludeMetadata);
}

public SolrScheme(Fields schemeFields, String solrConfDir, int maxSegments, boolean isIncludeMetadata) throws IOException, ParserConfigurationException, SAXException {
super(schemeFields, schemeFields);

_solrConfDir = new File(solrConfDir);
_maxSegments = maxSegments;
_isIncludeMetadata = isIncludeMetadata;

SolrSchemeUtil.validate(_solrConfDir, schemeFields);
}
Expand Down Expand Up @@ -68,8 +80,11 @@ public void sinkPrepare(FlowProcess<Properties> flowProcess, SinkCall<SolrCollec
throw new TapException("SolrScheme can only be used with a DirectoryTap in local mode");
}

// Find the part-00000 directory to use as the Solr data directory,
// and save it in case we need to write metadata there.
DirectoryFileOutputStream os = (DirectoryFileOutputStream)sinkCall.getOutput();
String path = os.asDirectory();
_partDir = new File(path);

// Set context to be the embedded solr server (or rather a wrapper for it, that handles caching)
// TODO this call gets made BEFORE sinkConfInit, so I don't have the _dataDir set up at this point, which seems wrong.
Expand All @@ -91,6 +106,9 @@ public void sink(FlowProcess<Properties> flowProcess, SinkCall<SolrCollector, Ou
public void sinkCleanup(FlowProcess<Properties> flowProcess, SinkCall<SolrCollector, OutputStream> sinkCall) throws IOException {
SolrCollector collector = sinkCall.getContext();
collector.cleanup();
if (_isIncludeMetadata) {
Metadata.writeMetadata(_partDir);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package com.scaleunlimited.cascading.scheme.core;

import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
Expand All @@ -12,6 +20,7 @@
import org.apache.solr.core.CoreContainer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.scaleunlimited.cascading.local.DirectoryTap;
import com.scaleunlimited.cascading.scheme.local.SolrScheme;
Expand Down Expand Up @@ -39,13 +48,16 @@ public abstract class AbstractSolrSchemeTest extends Assert {

protected abstract Tap<?, ?, ?> makeSourceTap(Fields fields, String path);
protected abstract FlowProcess<?> makeFlowProcess();
protected abstract Tap<?, ?, ?> makeSolrSink(Scheme<?, ?, ?, ?, ?> scheme, String path) throws Exception;
protected abstract Tap<?, ?, ?> makeSolrSink(Fields fields, String path) throws Exception;
protected abstract FlowConnector makeFlowConnector();

protected abstract Scheme<?, ?, ?, ?, ?> makeScheme(Fields schemeFields, String solrConfDir) throws Exception;

protected abstract Scheme<?, ?, ?, ?, ?> makeScheme(Fields schemeFields, String solrConfDir, int maxSegments) throws Exception;

protected abstract Scheme<?, ?, ?, ?, ?> makeScheme(Fields schemeFields, String solrConfDir, boolean isIncludeMetadata) throws Exception;

@Before
public void setUp() throws IOException {
File outputDir = new File(getTestDir());
Expand Down Expand Up @@ -175,6 +187,85 @@ protected void testSimpleIndexing() throws Exception {

solrServer.close();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
protected void testMd5() throws Exception {

// Write input data
final Fields testFields = new Fields("id", "name", "price", "inStock");
final File inDir = new File(getTestDir() + "testMd5/in");
Tap source = makeSourceTap(testFields, inDir.getAbsolutePath());
TupleEntryCollector writer = source.openForWrite(makeFlowProcess());
for (int i = 0; i < 100; i++) {
writer.add(new Tuple(i, "product #" + i, i * 1.0f, true));
}
writer.close();

// Read input data and then write it to Solr index
final File outDir = new File(getTestDir() + "testMd5/out");
Scheme scheme = makeScheme( testFields, SOLR_CONF_DIR, true);
Tap<?, ?, ?> solrSink = makeSolrSink(scheme, outDir.getPath());
Pipe writePipe = new Pipe("tuples to Solr");
Flow flow = makeFlowConnector().connect(source, solrSink, writePipe);
flow.complete();

// Check MD5s saved within each part directory
File[] partDirs = outDir.listFiles(new FilenameFilter() {
public boolean accept(File file, String string) {
return string.startsWith("part-");
}
});
for (File partDir : partDirs) {

// Read MD5 metadata into a map
File md5File = new File(partDir, Metadata.MD5_FILE_NAME);
FileInputStream fis = new FileInputStream(md5File);
List<String> lines = IOUtils.readLines(fis);
Map<String, String> indexFileNameToMD5Map =
new HashMap<String, String>();
for (String rawLine : lines) {
String line = rawLine.replaceFirst("#.*$", "").trim();
if (!line.isEmpty()) {
String fields[] = line.split("\t", 3);
if (fields.length < 2) {
throw new RuntimeException( "Invalid MD5 metadata (expected <file path>\t<MD5>):\n"
+ line);
}
String indexFileName = fields[0].trim();
String md5 = fields[1].trim();
assertNull(indexFileNameToMD5Map.put(indexFileName, md5));
}
}

// Compare map to MD5 of index files in part directory
File indexDir = new File(partDir, "index");
File[] indexFiles = indexDir.listFiles(new FilenameFilter() {

@Override
public boolean accept(File dir, String name) {
return !(name.endsWith(".crc"));
}
});
for (File indexFile : indexFiles) {
String expectedMD5 = getMD5(indexFile);
assertEquals( "wrong MD5 for " + indexFile,
expectedMD5,
indexFileNameToMD5Map.get(indexFile.getName()));
}
}
}

private static String getMD5(File indexFile) throws IOException {
InputStream is = new FileInputStream(indexFile);
String result = null;
try {
result = DigestUtils.md5Hex(is);
} finally {
is.close();
}
return result;
}


private static void assertEquals(byte[] expected, byte[] actual) {
Expand Down
Loading