Skip to content

Commit

Permalink
Merge branch 'release-8.0.6' into 8.0-master
Browse files Browse the repository at this point in the history
  • Loading branch information
Shiva Verma committed Oct 14, 2016
2 parents e730eae + 3789cf5 commit 44e8e9b
Show file tree
Hide file tree
Showing 35 changed files with 939 additions and 377 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ The Hadoop Connector is an extension to Hadoop’s MapReduce framework that allo
* Access MarkLogic text, geospatial, scalar, and document structure indexes to send only the most relevant data to Hadoop for processing
* Write results from MapReduce jobs to MarkLogic in parallel

## Release Note

### What's New in mlcp and Hadoop Connector 8.0-6

- mlcp distributed mode supports MapR 5.1, HDP 2.4 and CDH 5.8
- significant performance improvement in archive export
- mlcp honors user-specified InputFormat, OutputFormat and Mapper classes when creating jobs for import, export and copy
- mlcp export now streams out binary documents
- mlcp import now streams reading zip entries in compressed delimited texts, delimited JSON and aggregate XMLs
- bug fixes

## Getting Started

- [Getting Started with mlcp](http://docs.marklogic.com/guide/mlcp/getting-started)
Expand Down Expand Up @@ -55,7 +66,7 @@ The build writes to the respective **deliverable** directories under the top-lev

Alternatively, you can build mlcp and the Hadoop Connector independently from each component’s root directory (i.e. `./mlcp/` and `./mapreduce/`) with the above command. *Note that mlcp depends on the Hadoop Connector*, so a successful build of the Hadoop Connector is required to build mlcp.

For information on contributing to this project see [CONTRIBUTING.md](https://github.com/marklogic/marklogic-contentpump/blob/8.0-master/CONTRIBUTING.md). For information on working on development of this project see [project wiki page](https://github.com/marklogic/marklogic-contentpump/wiki).
For information on contributing to this project see [CONTRIBUTING.md](https://github.com/marklogic/marklogic-contentpump/blob/8.0-develop/CONTRIBUTING.md). For information on working on development of this project see [project wiki page](https://github.com/marklogic/marklogic-contentpump/wiki).

## Tests

Expand All @@ -73,4 +84,4 @@ If you have questions about mlcp or the Hadoop Connector, ask on [StackOverflow]

## Support

mlcp and the Hadoop Connector are maintained by MarkLogic Engineering and distributed under the [Apache 2.0 license](https://github.com/marklogic/marklogic-contentpump/blob/8.0-master/LICENSE). They are designed for use in production applications with MarkLogic Server. Everyone is encouraged [to file bug reports, feature requests, and pull requests through GitHub](https://github.com/marklogic/marklogic-contentpump/issues/new). This input is critical and will be carefully considered. However, we can’t promise a specific resolution or timeframe for any request. In addition, MarkLogic provides technical support for [release tags](https://github.com/marklogic/marklogic-contentpump/releases) of mlcp and the Hadoop Connector to licensed customers under the terms outlined in the [Support Handbook](http://www.marklogic.com/files/Mark_Logic_Support_Handbook.pdf). For more information or to sign up for support, visit [help.marklogic.com](http://help.marklogic.com).
mlcp and the Hadoop Connector are maintained by MarkLogic Engineering and distributed under the [Apache 2.0 license](https://github.com/marklogic/marklogic-contentpump/blob/8.0-develop/LICENSE). They are designed for use in production applications with MarkLogic Server. Everyone is encouraged [to file bug reports, feature requests, and pull requests through GitHub](https://github.com/marklogic/marklogic-contentpump/issues/new). This input is critical and will be carefully considered. However, we can’t promise a specific resolution or timeframe for any request. In addition, MarkLogic provides technical support for [release tags](https://github.com/marklogic/marklogic-contentpump/releases) of mlcp and the Hadoop Connector to licensed customers under the terms outlined in the [Support Handbook](http://www.marklogic.com/files/Mark_Logic_Support_Handbook.pdf). For more information or to sign up for support, visit [help.marklogic.com](http://help.marklogic.com).
2 changes: 2 additions & 0 deletions mapreduce/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.classpath
.project
.settings
target
deliverable
6 changes: 3 additions & 3 deletions mapreduce/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.marklogic</groupId>
<artifactId>marklogic-mapreduce2</artifactId>
<version>2.1</version>
<version>2.1.6</version>
<name>${mapreduce.product.name}</name>
<description>MarkLogic Connector for Hadoop MapReduce</description>
<url>https://github.com/marklogic/marklogic-contentpump</url>
Expand All @@ -18,12 +18,12 @@
<!-- Global definitions -->
<mapreduce.product.name>MarkLogic Connector for Hadoop</mapreduce.product.name>
<mapreduce.product.name.short>MarkLogic Connector for Hadoop</mapreduce.product.name.short>
<version.number.string>2.1</version.number.string>
<version.number.string>2.1.6</version.number.string>
<jar.version.number.string>${version.number.string}</jar.version.number.string>
<date-string>${maven.build.timestamp}</date-string>
<libdir>${basedir}/src/lib</libdir>
<skipTests>false</skipTests>
<xccVersion>8.0</xccVersion>
<xccVersion>8.0.6</xccVersion>
<deliverableName>Connector-for-Hadoop2</deliverableName>
<!-- Static definitions of where things are relative to the root -->
<java.source>src/main/java</java.source>
Expand Down
2 changes: 1 addition & 1 deletion mapreduce/src/conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

# To enable debug
log4j.logger.com.marklogic.dom=INFO
log4j.logger.com.marklogic.tree=TRACE
Binary file added mapreduce/src/lib/marklogic-xcc-8.0.6.jar
Binary file not shown.
Binary file modified mapreduce/src/lib/marklogic-xcc-8.0.jar
Binary file not shown.
12 changes: 7 additions & 5 deletions mapreduce/src/main/java/com/marklogic/io/Decoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,22 @@ public void realign()
}
}

public void decode(int[] array, int i, int count) throws IOException {
public void decode(int[] array, int count) throws IOException {
if (count <= 4) {
for (; i < count; i++) {
for (int i = 0; i < count; i++) {
array[i] = decode32bits();
}
} else {
int i = 0;
realign();
if (numBitsInReg==32) {
array[i++] = (int)reg;
++i;
array[0] = (int)reg;
reg = 0;
numBitsInReg = 0;
}
for (; i<count; ++i) {
if (load32(array, i)) break;
for ( ; i<count; ++i) {
if (!load32(array, i)) break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ protected LinkedMapWritable queryForestInfo(ContentSource cs)
}
}
if (forestStatusMap.size() == 0) {
throw new IOException("Number of forests is 0: "
throw new IOException("Target database has no forests attached: "
+ "check forests in database");
}
am.initialize(policy, forestStatusMap, conf.getInt(BATCH_SIZE,10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,6 +28,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;

import com.marklogic.io.IOHelper;
import com.marklogic.xcc.ResultItem;
import com.marklogic.xcc.types.ValueType;
import com.marklogic.xcc.types.XdmBinary;
Expand All @@ -38,10 +40,12 @@
* @author jchen
*
*/
public class DatabaseDocument implements MarkLogicDocument {
public class DatabaseDocument implements MarkLogicDocument,
InternalConstants {
public static final Log LOG = LogFactory.getLog(
DatabaseDocument.class);
protected byte[] content;
protected InputStream is; // streaming binary
protected ContentType contentType;

public DatabaseDocument(){}
Expand Down Expand Up @@ -70,11 +74,23 @@ public Text getContentAsText() {
* @see com.marklogic.mapreduce.MarkLogicDocument#getContentAsByteArray()
*/
public byte[] getContentAsByteArray() {
if (content == null) {
try {
content = IOHelper.byteArrayFromStream(is);
} catch (IOException e) {
throw new RuntimeException("IOException buffering binary data",
e);
}
is = null;
}
return content;
}

@Override
public InputStream getContentAsByteStream() {
if (is != null) {
return is;
}
return new ByteArrayInputStream(getContentAsByteArray());
}

Expand Down Expand Up @@ -116,7 +132,11 @@ public void set(ResultItem item){
content = item.asString().getBytes("UTF-8");
contentType = ContentType.TEXT;
} else if (item.getValueType() == ValueType.BINARY) {
content = ((XdmBinary) item.getItem()).asBinaryData();
if (item.isCached()) {
content = ((XdmBinary) item.getItem()).asBinaryData();
} else {
is = item.asInputStream();
}
contentType = ContentType.BINARY;
} else if (item.getValueType() == ValueType.ARRAY_NODE ||
item.getValueType() == ValueType.BOOLEAN_NODE ||
Expand Down Expand Up @@ -155,6 +175,10 @@ public void readFields(DataInput in) throws IOException {
int ordinal = in.readInt();
contentType = ContentType.valueOf(ordinal);
int length = WritableUtils.readVInt(in);
if (length > MAX_BUFFER_SIZE) {
is = (DataInputStream)in;
return;
}
content = new byte[length];
in.readFully(content, 0, length);
}
Expand All @@ -165,17 +189,32 @@ public void readFields(DataInput in) throws IOException {
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(contentType.ordinal());
WritableUtils.writeVInt(out, content.length);
out.write(content, 0, content.length);
if (content != null) {
WritableUtils.writeVInt(out, content.length);
out.write(content, 0, content.length);
} else if (is != null) {
content = new byte[MAX_BUFFER_SIZE];
int len = 0;
while ((len = is.read(content)) > 0) {
out.write(content, 0, len);
}
}
}

@Override
public long getContentSize() {
return content.length;
if (content != null) {
return content.length;
} else {
return Integer.MAX_VALUE;
}
}

@Override
public boolean isStreamable() {
if (content == null) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class ForestInputFormat<VALUE>
extends FileInputFormat<DocumentURIWithSourceInfo, VALUE>
implements MarkLogicConstants {
public static final Log LOG = LogFactory.getLog(ForestInputFormat.class);
static final int STREAM_BUFFER_SIZE = 1 << 24;

@Override
public RecordReader<DocumentURIWithSourceInfo, VALUE> createRecordReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ protected void setSkipKey(String sub, int line, int col, String reason) {
}
key.setSkipReason(reason);

if (LOG.isTraceEnabled()) {
LOG.trace("Set key: " + key);
if (LOG.isDebugEnabled()) {
LOG.debug("Set key: " + key);
}
}

Expand Down Expand Up @@ -344,10 +344,16 @@ private ExpandedTree getNextTree() throws IOException {
}

if (nascent == 0L || deleted != -1L) { // skip
position++;
bytesRead += dataIs.skipBytes(j);
if (nascent == 0L) nascentCnt++;
if (deleted != -1L) deletedCnt++;
ordIs.skipBytes(8);
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped a " +
(nascent == 0L ? "nascent" : "deleted") +
" document at position " + position);
}
position++;
return null;
}
} catch (EOFException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.marklogic.mapreduce;

/**
* Constants shared by internal modules.
*
* @author jchen
*
*/
public interface InternalConstants {
static final int MAX_BUFFER_SIZE = 1<<24;
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ protected TextArrayWritable queryHosts(ContentSource cs, String matchHost,
} else {
hosts.add(new Text(host));
}
}
if (hosts.isEmpty()) {
throw new IOException("Target database has no forests attached: "
+ "check forests in database");
}
return new TextArrayWritable(hosts.toArray(new Text[hosts.size()]));
} catch (RequestException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Input query: " + query);
LOG.debug("Input query: " + query.toString());
}
query.setOptions(options);
result = session.submitRequest(query);
Expand Down
15 changes: 13 additions & 2 deletions mapreduce/src/main/java/com/marklogic/mapreduce/test/FCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import org.apache.commons.modeler.util.DomUtil;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -310,6 +311,10 @@ public void checkOrdinals(File dir) throws IOException {
try {
for (;; ++position) {
ordinal = in.readLong();
if (verbose) {
System.out.println("position=" + position + ", ordinal="
+ ordinal);
}
}
} catch (EOFException e) {
}
Expand Down Expand Up @@ -561,7 +566,13 @@ public void decodeTreeData(File dir) throws IOException {
ExpandedTree tree = new CompressedTreeDecoder().decode(buf,j);
// TODO: count and verify bytes read
// int computed = computeChecksum(docid, in, datWords);
System.out.println(tree.getDocumentURI());
System.out.println("URI=" + tree.getDocumentURI());
if (verbose) {
String[] cols = tree.getCollections();
for (String col : cols) {
System.out.println("collection: " + col);
}
}

byte kind = tree.rootNodeKind();
if (kind == NodeKind.BINARY) {
Expand All @@ -577,7 +588,7 @@ public void decodeTreeData(File dir) throws IOException {
Node root = tree.node(0);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DomUtil.writeXml(root, bos);
System.out.println(bos.toString());
//System.out.println(bos.toString());
}
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ private void decodeBinary(Decoder decoder, ExpandedTree rep, int nbytes)
LOG.error("nbytes=" + nbytes + ", nwords=" + nwords);
}
rep.binaryData = new int[nwords];
decoder.decode(rep.binaryData, 1, nwords);
decoder.decode(rep.binaryData, nwords);
}

private void assignOrdinals(ExpandedTree rep) {
Expand Down
Loading

0 comments on commit 44e8e9b

Please sign in to comment.