Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index version 4 for distinct records with identical url & date #78

Merged
merged 2 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 152 additions & 15 deletions src/outbackcdx/Capture.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.util.Date;
import java.util.Map;

import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A CDX record which can be encoded to a reasonable space-efficient packed representation.
* <p>
Expand All @@ -23,12 +26,19 @@
* | ASCII urlkey | 64-bit big-endian timestamp |
* +--------------+-----------------------------+
* </pre>
* <p>Version 4 keys are extended to include the WARC filename and record offset and two NUL bytes. The first NUL used
* to determine the length of filename (searching backwards from the end). The second is a flag indicating this is the
* new key version.
* <pre>
* +---------+------------------+-----|----------+-----+---------------+
* | urlkey | 64-bit timestamp | NUL | filename | NUL | 64-bit offset |
* +---------+------------------+-----|----------+-----+---------------+
* </pre>
* <p>
* The record's consists of a static list fields packed using {@link outbackcdx.VarInt}. The first field in the
* The record's value consists of a static list fields packed using {@link outbackcdx.VarInt}. The first field in the
* value is a schema version number to allow fields to be added or removed in later versions.
*/
public class Capture {
private static int CURRENT_VERSION = 3;
static final DateTimeFormatter arcTimeFormat = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
static final Base32 base32 = new Base32();

Expand Down Expand Up @@ -62,23 +72,70 @@ public Capture() {
}

public void decodeKey(byte[] key) {
urlkey = new String(key, 0, key.length - 8, StandardCharsets.US_ASCII);
if (key.length > 8 && key[key.length - 9] == 0) {
decodeKeyV4(key);
} else {
decodeKeyV0(key);
}
}

private void decodeKeyV0(byte[] key) {
urlkey = new String(key, 0, key.length - 8, US_ASCII);
ByteBuffer keyBuf = ByteBuffer.wrap(key);
keyBuf.order(ByteOrder.BIG_ENDIAN);
timestamp = keyBuf.getLong(key.length - 8);
}

public static byte[] encodeKey(String keyurl, long timestamp) {
byte[] urlBytes = keyurl.getBytes(StandardCharsets.US_ASCII);
@SuppressWarnings("StatementWithEmptyBody")
private void decodeKeyV4(byte[] key) {
int i;
for (i = key.length - 10; i >= 0 && key[i] != 0; i--);
if (i <= 8) throw new IllegalArgumentException("bad key");
ByteBuffer keyBuf = ByteBuffer.wrap(key);
urlkey = new String(key, 0, i - 8, US_ASCII);
timestamp = keyBuf.getLong(i - 8);
file = new String(key, i + 1, key.length - i - 10);
compressedoffset = keyBuf.getLong(key.length - 8);
}

public static byte[] encodeKeyV0(String keyurl, long timestamp) {
byte[] urlBytes = keyurl.getBytes(US_ASCII);
ByteBuffer bb = ByteBuffer.allocate(urlBytes.length + 8);
bb.order(ByteOrder.BIG_ENDIAN);
bb.put(urlBytes);
bb.putLong(timestamp);
return bb.array();
}

public static byte[] encodeKeyV4(String keyurl, long timestamp, String file, long offset) {
byte[] urlBytes = keyurl.getBytes(US_ASCII);
byte[] fileBytes = file.getBytes(UTF_8);
ByteBuffer bb = ByteBuffer.allocate(urlBytes.length + 8 + 1 + fileBytes.length + 1 + 8);
bb.order(ByteOrder.BIG_ENDIAN);
bb.put(urlBytes);
bb.putLong(timestamp);
bb.put((byte)0);
bb.put(fileBytes);
bb.put((byte)0);
bb.putLong(offset);
return bb.array();
}

public byte[] encodeKey() {
return encodeKey(urlkey, timestamp);
return encodeKey(FeatureFlags.indexVersion());
}

public byte[] encodeKey(int version) {
switch (version) {
case 0:
case 1:
case 2:
case 3:
return encodeKeyV0(urlkey, timestamp);
case 4:
return encodeKeyV4(urlkey, timestamp, file, compressedoffset);
default:
throw new IllegalArgumentException("unsupported version: " + 4);
}
}

public void decodeValue(ByteBuffer bb) {
Expand All @@ -96,9 +153,11 @@ public void decodeValue(ByteBuffer bb) {
case 3:
decodeValueV3(bb);
break;
case 4:
decodeValueV4(bb);
break;
default:
throw new IllegalArgumentException("CDX encoding is too new (v" + version + ") only versions up to v"
+ CURRENT_VERSION + " are supported");
throw new IllegalArgumentException("CDX encoding is too new (v" + version + ") only versions up to v4 are supported");
}
}

Expand Down Expand Up @@ -138,8 +197,36 @@ private void decodeValueV3(ByteBuffer bb) {
originalCompressedoffset = VarInt.decode(bb);
}

private void decodeValueV4(ByteBuffer bb) {
original = VarInt.decodeAscii(bb);
status = (int) VarInt.decode(bb);
mimetype = VarInt.decodeAscii(bb);
length = VarInt.decode(bb);
digest = base32.encodeAsString(VarInt.decodeBytes(bb));
redirecturl = VarInt.decodeAscii(bb);
robotflags = VarInt.decodeAscii(bb);
originalLength = VarInt.decode(bb);
originalFile = VarInt.decodeAscii(bb);
originalCompressedoffset = VarInt.decode(bb);
}

public int sizeValue() {
return VarInt.size(CURRENT_VERSION) +
return sizeValue(FeatureFlags.indexVersion());
}

public int sizeValue(int version) {
switch (version) {
case 3:
return sizeValueV3();
case 4:
return sizeValueV4();
default:
throw new IllegalArgumentException("Unsupported version " + version);
}
}

private int sizeValueV3() {
return VarInt.size(3) +
VarInt.sizeAscii(original) +
VarInt.size(status) +
VarInt.sizeAscii(mimetype) +
Expand All @@ -154,8 +241,40 @@ public int sizeValue() {
VarInt.size(originalCompressedoffset);
}

private int sizeValueV4() {
return VarInt.size(4) +
VarInt.sizeAscii(original) +
VarInt.size(status) +
VarInt.sizeAscii(mimetype) +
VarInt.size(length) +
VarInt.sizeBytes(base32.decode(digest)) +
VarInt.sizeAscii(redirecturl) +
VarInt.sizeAscii(robotflags) +
VarInt.size(originalLength) +
VarInt.sizeAscii(originalFile) +
VarInt.size(originalCompressedoffset);
}


public void encodeValue(ByteBuffer bb) {
VarInt.encode(bb, CURRENT_VERSION);
encodeValue(bb, FeatureFlags.indexVersion());
}

private void encodeValue(ByteBuffer bb, int version) {
switch (version) {
case 3:
encodeValueV3(bb);
break;
case 4:
encodeValueV4(bb);
break;
default:
throw new IllegalArgumentException("Unsupported version " + version);
}
}

private void encodeValueV3(ByteBuffer bb) {
VarInt.encode(bb, 3);
VarInt.encodeAscii(bb, original);
VarInt.encode(bb, status);
VarInt.encodeAscii(bb, mimetype);
Expand All @@ -170,12 +289,30 @@ public void encodeValue(ByteBuffer bb) {
VarInt.encode(bb, originalCompressedoffset);
}

public byte[] encodeValue() {
ByteBuffer bb = ByteBuffer.allocate(sizeValue());
encodeValue(bb);
private void encodeValueV4(ByteBuffer bb) {
VarInt.encode(bb, 4);
VarInt.encodeAscii(bb, original);
VarInt.encode(bb, status);
VarInt.encodeAscii(bb, mimetype);
VarInt.encode(bb, length);
VarInt.encodeBytes(bb, base32.decode(digest));
VarInt.encodeAscii(bb, redirecturl);
VarInt.encodeAscii(bb, robotflags);
VarInt.encode(bb, originalLength);
VarInt.encodeAscii(bb, originalFile);
VarInt.encode(bb, originalCompressedoffset);
}

public byte[] encodeValue(int version) {
ByteBuffer bb = ByteBuffer.allocate(sizeValue(version));
encodeValue(bb, version);
return bb.array();
}

public byte[] encodeValue() {
return encodeValue(FeatureFlags.indexVersion());
}

/**
* Format as a CDX11 line, or CDX14, depending on what fields are present.
*/
Expand All @@ -193,7 +330,7 @@ public String toString() {
out.append(compressedoffset).append(" ");
out.append(file);

if (CURRENT_VERSION == 3) {
if (FeatureFlags.indexVersion() >= 3) {
out.append(" ");

if (originalLength > 0) {
Expand Down
17 changes: 15 additions & 2 deletions src/outbackcdx/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class FeatureFlags {
private static boolean filterPlugins;
private static boolean acceptWrites;
private static boolean cdx14;
private static int indexVersion = 3;

static {
experimentalAccessControl = "1".equals(System.getenv("EXPERIMENTAL_ACCESS_CONTROL"));
Expand Down Expand Up @@ -63,14 +64,26 @@ public static void setCdx14(boolean enabled) {
cdx14 = enabled;
}

public static Map<String, Boolean> asMap() {
Map<String,Boolean> map = new HashMap<>();
public static Map<String, Object> asMap() {
Map<String,Object> map = new HashMap<>();
map.put("experimentalAccessControl", experimentalAccessControl());
map.put("pandoraHacks", pandoraHacks());
map.put("secondaryMode", isSecondary());
map.put("filterPlugins", filterPlugins());
map.put("acceptsWrites", acceptsWrites());
map.put("cdx14", cdx14);
map.put("indexVersion", indexVersion());
return map;
}

public static int indexVersion() {
return indexVersion;
}

public static void setIndexVersion(int indexVersion) {
if (indexVersion != 3 && indexVersion != 4) {
throw new IllegalArgumentException("Only index versions 3 and 4 are supported, not " + indexVersion);
}
FeatureFlags.indexVersion = indexVersion;
}
}
14 changes: 7 additions & 7 deletions src/outbackcdx/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public long getLatestSequenceNumber() {
* Returns all captures that match the given prefix.
*/
public Iterable<Capture> prefixQuery(String surtPrefix, Predicate<Capture> filter) {
return () -> filteredCaptures(Capture.encodeKey(surtPrefix, 0), record -> record.urlkey.startsWith(surtPrefix), filter, false);
return () -> filteredCaptures(Capture.encodeKeyV0(surtPrefix, 0), record -> record.urlkey.startsWith(surtPrefix), filter, false);
}

public Iterable<Capture> prefixQueryAP(String surtPrefix, String accessPoint) {
Expand All @@ -77,7 +77,7 @@ public Iterable<Capture> prefixQueryAP(String surtPrefix, String accessPoint) {
* Returns all captures with keys in the given range.
*/
public Iterable<Capture> rangeQuery(String startSurt, String endSurt, Predicate<Capture> filter) {
return () -> filteredCaptures(Capture.encodeKey(startSurt, 0), record -> record.urlkey.compareTo(endSurt) < 0, filter, false);
return () -> filteredCaptures(Capture.encodeKeyV0(startSurt, 0), record -> record.urlkey.compareTo(endSurt) < 0, filter, false);
}

/**
Expand All @@ -89,7 +89,7 @@ public Iterable<Capture> query(String surt, Predicate<Capture> filter) {

public Iterable<Capture> query(String surt, long from, long to, Predicate<Capture> filter) {
String urlkey = resolveAlias(surt);
byte[] key = Capture.encodeKey(urlkey, from);
byte[] key = Capture.encodeKeyV0(urlkey, from);
return () -> filteredCaptures(key, record -> record.urlkey.equals(urlkey) && record.timestamp <= to, filter, false);
}

Expand All @@ -113,7 +113,7 @@ public Iterable<Capture> reverseQuery(String surt, Predicate<Capture> filter) {

public Iterable<Capture> reverseQuery(String surt, long from, long to, Predicate<Capture> filter) {
String urlkey = resolveAlias(surt);
byte[] key = Capture.encodeKey(urlkey, to);
byte[] key = Capture.encodeKeyV0(urlkey, to);
return () -> filteredCaptures(key, record -> record.urlkey.equals(urlkey) && record.timestamp >= from, filter, true);
}

Expand All @@ -122,7 +122,7 @@ public Iterable<Capture> reverseQuery(String surt, long from, long to, Predicate
*/
public Iterable<Capture> closestQuery(String surt, long targetTimestamp, Predicate<Capture> filter) {
String urlkey = resolveAlias(surt);
byte[] key = Capture.encodeKey(urlkey, targetTimestamp);
byte[] key = Capture.encodeKeyV0(urlkey, targetTimestamp);
Predicate<Capture> scope = record -> record.urlkey.equals(urlkey);
return () -> new ClosestTimestampIterator(targetTimestamp,
filteredCaptures(key, scope, filter, false),
Expand Down Expand Up @@ -239,14 +239,14 @@ private Capture pickForward() {
* Perform a query without first resolving aliases.
*/
private Iterable<Capture> rawQuery(String key, Predicate<Capture> filter, boolean reverse) {
return () -> filteredCaptures(Capture.encodeKey(key, 0), record -> record.urlkey.equals(key), filter, reverse);
return () -> filteredCaptures(Capture.encodeKeyV0(key, 0), record -> record.urlkey.equals(key), filter, reverse);
}

/**
* Returns all captures starting from the given key.
*/
Iterable<Capture> capturesAfter(String start) {
return () -> filteredCaptures(Capture.encodeKey(start, 0), record -> true, null, false);
return () -> filteredCaptures(Capture.encodeKeyV0(start, 0), record -> true, null, false);
}

public String resolveAlias(String surt) {
Expand Down
14 changes: 12 additions & 2 deletions src/outbackcdx/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class Main {
public static void usage() {
System.err.println("Usage: java " + Main.class.getName() + " [options...]");
System.err.println("");
System.err.println();
System.err.println(" -b bindaddr Bind to a particular IP address");
System.err.println(" -d datadir Directory to store index data under");
System.err.println(" -i Inherit the server socket via STDIN (for use with systemd, inetd etc)");
Expand All @@ -53,10 +53,15 @@ public static void usage() {
System.err.println(" --update-interval poll-interval Polling frequency for upstream changes, in seconds. Default: 10");
System.err.println(" --accept-writes Allow writes to this node, even though running as a secondary");
System.err.println(" --batch-size Approximate max size (in bytes) per replication batch");
System.err.println();
System.err.println("Enable experimental index versions. DANGER: Upgrading a version 3 index to version 4 is not yet supported and " +
"updating or deleting existing version 3 records will silently fail.");
System.err.println(" --index-version 4 Treats records as distinct if they have a different filename or offset" +
" even if they have identical url and date");
System.exit(1);
}

public static void main(String args[]) {
public static void main(String[] args) {
boolean undertow = false;
String host = null;
int port = 8080;
Expand Down Expand Up @@ -93,6 +98,11 @@ public static void main(String args[]) {
case "-i":
inheritSocket = true;
break;
case "--index-version":
System.err.println("WARNING: Experimental index version 4 enabled. Do not use this option (yet) on an " +
"pre-existing version 3 index. Updating or deleting older records will silently fail.");
FeatureFlags.setIndexVersion(Integer.parseInt(args[++i]));
break;
case "-j":
try {
authorizer = new JwtAuthorizer(new URL(args[++i]), args[++i]);
Expand Down
4 changes: 2 additions & 2 deletions test-integration/test-openwayback.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ function fetch {
#

mkdir -p deps
fetch wayback.war http://central.maven.org/maven2/org/netpreserve/openwayback/openwayback-webapp/2.3.1/openwayback-webapp-2.3.1.war
fetch jetty-runner.jar http://central.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.0.M1/jetty-runner-9.4.0.M1.jar
fetch wayback.war https://repo1.maven.org/maven2/org/netpreserve/openwayback/openwayback-webapp/2.3.1/openwayback-webapp-2.3.1.war
fetch jetty-runner.jar https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.0.M1/jetty-runner-9.4.0.M1.jar

#
# Prepare wayback
Expand Down
Loading