From 1f11e09affb2ab7014191cbccf50df892e671e45 Mon Sep 17 00:00:00 2001 From: Alex Osborne Date: Mon, 17 Feb 2020 22:07:17 +0900 Subject: [PATCH 1/2] Add index version 4 for distinct records with identical url & date Gated behind a feature flag as the upgrade process has not been implemented yet. --- src/outbackcdx/Capture.java | 167 ++++++++++++++++++++++++++++--- src/outbackcdx/FeatureFlags.java | 17 +++- src/outbackcdx/Index.java | 14 +-- src/outbackcdx/Main.java | 14 ++- test/outbackcdx/CaptureTest.java | 9 ++ test/outbackcdx/IndexTest.java | 22 +++- 6 files changed, 216 insertions(+), 27 deletions(-) diff --git a/src/outbackcdx/Capture.java b/src/outbackcdx/Capture.java index f7dea19..dcbe235 100644 --- a/src/outbackcdx/Capture.java +++ b/src/outbackcdx/Capture.java @@ -11,6 +11,9 @@ import java.util.Date; import java.util.Map; +import static java.nio.charset.StandardCharsets.US_ASCII; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * A CDX record which can be encoded to a reasonable space-efficient packed representation. *

@@ -23,12 +26,19 @@ * | ASCII urlkey | 64-bit big-endian timestamp | * +--------------+-----------------------------+ * + *

Version 4 keys are extended to include the WARC filename and record offset and two NUL bytes. The first NUL used + * to determine the length of filename (searching backwards from the end). The second is a flag indicating this is the + * new key version. + *

+ *     +---------+------------------+-----|----------+-----+---------------+
+ *     | urlkey  | 64-bit timestamp | NUL | filename | NUL | 64-bit offset |
+ *     +---------+------------------+-----|----------+-----+---------------+
+ * 
*

- * The record's consists of a static list fields packed using {@link outbackcdx.VarInt}. The first field in the + * The record's value consists of a static list fields packed using {@link outbackcdx.VarInt}. The first field in the * value is a schema version number to allow fields to be added or removed in later versions. */ public class Capture { - private static int CURRENT_VERSION = 3; static final DateTimeFormatter arcTimeFormat = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); static final Base32 base32 = new Base32(); @@ -62,14 +72,33 @@ public Capture() { } public void decodeKey(byte[] key) { - urlkey = new String(key, 0, key.length - 8, StandardCharsets.US_ASCII); + if (key.length > 8 && key[key.length - 9] == 0) { + decodeKeyV4(key); + } else { + decodeKeyV0(key); + } + } + + private void decodeKeyV0(byte[] key) { + urlkey = new String(key, 0, key.length - 8, US_ASCII); ByteBuffer keyBuf = ByteBuffer.wrap(key); - keyBuf.order(ByteOrder.BIG_ENDIAN); timestamp = keyBuf.getLong(key.length - 8); } - public static byte[] encodeKey(String keyurl, long timestamp) { - byte[] urlBytes = keyurl.getBytes(StandardCharsets.US_ASCII); + @SuppressWarnings("StatementWithEmptyBody") + private void decodeKeyV4(byte[] key) { + int i; + for (i = key.length - 10; i >= 0 && key[i] != 0; i--); + if (i <= 8) throw new IllegalArgumentException("bad key"); + ByteBuffer keyBuf = ByteBuffer.wrap(key); + urlkey = new String(key, 0, i - 8, US_ASCII); + timestamp = keyBuf.getLong(i - 8); + file = new String(key, i + 1, key.length - i - 10); + compressedoffset = keyBuf.getLong(key.length - 8); + } + + public static byte[] encodeKeyV0(String keyurl, long timestamp) { + byte[] urlBytes = keyurl.getBytes(US_ASCII); ByteBuffer bb = ByteBuffer.allocate(urlBytes.length + 8); bb.order(ByteOrder.BIG_ENDIAN); bb.put(urlBytes); @@ -77,8 +106,36 @@ public static byte[] encodeKey(String keyurl, long timestamp) { return bb.array(); } + public static byte[] encodeKeyV4(String keyurl, long timestamp, String file, long offset) { + byte[] urlBytes = keyurl.getBytes(US_ASCII); + byte[] fileBytes = file.getBytes(UTF_8); + ByteBuffer bb = ByteBuffer.allocate(urlBytes.length + 8 + 1 + fileBytes.length + 1 + 8); + bb.order(ByteOrder.BIG_ENDIAN); + bb.put(urlBytes); + bb.putLong(timestamp); + bb.put((byte)0); + bb.put(fileBytes); + bb.put((byte)0); + bb.putLong(offset); + return bb.array(); + } + public byte[] encodeKey() { - return encodeKey(urlkey, timestamp); + return encodeKey(FeatureFlags.indexVersion()); + } + + public byte[] encodeKey(int version) { + switch (version) { + case 0: + case 1: + case 2: + case 3: + return encodeKeyV0(urlkey, timestamp); + case 4: + return encodeKeyV4(urlkey, timestamp, file, compressedoffset); + default: + throw new IllegalArgumentException("unsupported version: " + 4); + } } public void decodeValue(ByteBuffer bb) { @@ -96,9 +153,11 @@ public void decodeValue(ByteBuffer bb) { case 3: decodeValueV3(bb); break; + case 4: + decodeValueV4(bb); + break; default: - throw new IllegalArgumentException("CDX encoding is too new (v" + version + ") only versions up to v" - + CURRENT_VERSION + " are supported"); + throw new IllegalArgumentException("CDX encoding is too new (v" + version + ") only versions up to v4 are supported"); } } @@ -138,8 +197,36 @@ private void decodeValueV3(ByteBuffer bb) { originalCompressedoffset = VarInt.decode(bb); } + private void decodeValueV4(ByteBuffer bb) { + original = VarInt.decodeAscii(bb); + status = (int) VarInt.decode(bb); + mimetype = VarInt.decodeAscii(bb); + length = VarInt.decode(bb); + digest = base32.encodeAsString(VarInt.decodeBytes(bb)); + redirecturl = VarInt.decodeAscii(bb); + robotflags = VarInt.decodeAscii(bb); + originalLength = VarInt.decode(bb); + originalFile = VarInt.decodeAscii(bb); + originalCompressedoffset = VarInt.decode(bb); + } + public int sizeValue() { - return VarInt.size(CURRENT_VERSION) + + return sizeValue(FeatureFlags.indexVersion()); + } + + public int sizeValue(int version) { + switch (version) { + case 3: + return sizeValueV3(); + case 4: + return sizeValueV4(); + default: + throw new IllegalArgumentException("Unsupported version " + version); + } + } + + private int sizeValueV3() { + return VarInt.size(3) + VarInt.sizeAscii(original) + VarInt.size(status) + VarInt.sizeAscii(mimetype) + @@ -154,8 +241,40 @@ public int sizeValue() { VarInt.size(originalCompressedoffset); } + private int sizeValueV4() { + return VarInt.size(4) + + VarInt.sizeAscii(original) + + VarInt.size(status) + + VarInt.sizeAscii(mimetype) + + VarInt.size(length) + + VarInt.sizeBytes(base32.decode(digest)) + + VarInt.sizeAscii(redirecturl) + + VarInt.sizeAscii(robotflags) + + VarInt.size(originalLength) + + VarInt.sizeAscii(originalFile) + + VarInt.size(originalCompressedoffset); + } + + public void encodeValue(ByteBuffer bb) { - VarInt.encode(bb, CURRENT_VERSION); + encodeValue(bb, FeatureFlags.indexVersion()); + } + + private void encodeValue(ByteBuffer bb, int version) { + switch (version) { + case 3: + encodeValueV3(bb); + break; + case 4: + encodeValueV4(bb); + break; + default: + throw new IllegalArgumentException("Unsupported version " + version); + } + } + + private void encodeValueV3(ByteBuffer bb) { + VarInt.encode(bb, 3); VarInt.encodeAscii(bb, original); VarInt.encode(bb, status); VarInt.encodeAscii(bb, mimetype); @@ -170,12 +289,30 @@ public void encodeValue(ByteBuffer bb) { VarInt.encode(bb, originalCompressedoffset); } - public byte[] encodeValue() { - ByteBuffer bb = ByteBuffer.allocate(sizeValue()); - encodeValue(bb); + private void encodeValueV4(ByteBuffer bb) { + VarInt.encode(bb, 4); + VarInt.encodeAscii(bb, original); + VarInt.encode(bb, status); + VarInt.encodeAscii(bb, mimetype); + VarInt.encode(bb, length); + VarInt.encodeBytes(bb, base32.decode(digest)); + VarInt.encodeAscii(bb, redirecturl); + VarInt.encodeAscii(bb, robotflags); + VarInt.encode(bb, originalLength); + VarInt.encodeAscii(bb, originalFile); + VarInt.encode(bb, originalCompressedoffset); + } + + public byte[] encodeValue(int version) { + ByteBuffer bb = ByteBuffer.allocate(sizeValue(version)); + encodeValue(bb, version); return bb.array(); } + public byte[] encodeValue() { + return encodeValue(FeatureFlags.indexVersion()); + } + /** * Format as a CDX11 line, or CDX14, depending on what fields are present. */ @@ -193,7 +330,7 @@ public String toString() { out.append(compressedoffset).append(" "); out.append(file); - if (CURRENT_VERSION == 3) { + if (FeatureFlags.indexVersion() >= 3) { out.append(" "); if (originalLength > 0) { diff --git a/src/outbackcdx/FeatureFlags.java b/src/outbackcdx/FeatureFlags.java index 0513533..701f7d0 100644 --- a/src/outbackcdx/FeatureFlags.java +++ b/src/outbackcdx/FeatureFlags.java @@ -15,6 +15,7 @@ public class FeatureFlags { private static boolean filterPlugins; private static boolean acceptWrites; private static boolean cdx14; + private static int indexVersion = 3; static { experimentalAccessControl = "1".equals(System.getenv("EXPERIMENTAL_ACCESS_CONTROL")); @@ -63,14 +64,26 @@ public static void setCdx14(boolean enabled) { cdx14 = enabled; } - public static Map asMap() { - Map map = new HashMap<>(); + public static Map asMap() { + Map map = new HashMap<>(); map.put("experimentalAccessControl", experimentalAccessControl()); map.put("pandoraHacks", pandoraHacks()); map.put("secondaryMode", isSecondary()); map.put("filterPlugins", filterPlugins()); map.put("acceptsWrites", acceptsWrites()); map.put("cdx14", cdx14); + map.put("indexVersion", indexVersion()); return map; } + + public static int indexVersion() { + return indexVersion; + } + + public static void setIndexVersion(int indexVersion) { + if (indexVersion != 3 && indexVersion != 4) { + throw new IllegalArgumentException("Only index versions 3 and 4 are supported, not " + indexVersion); + } + FeatureFlags.indexVersion = indexVersion; + } } diff --git a/src/outbackcdx/Index.java b/src/outbackcdx/Index.java index bb27325..e42d960 100644 --- a/src/outbackcdx/Index.java +++ b/src/outbackcdx/Index.java @@ -62,7 +62,7 @@ public long getLatestSequenceNumber() { * Returns all captures that match the given prefix. */ public Iterable prefixQuery(String surtPrefix, Predicate filter) { - return () -> filteredCaptures(Capture.encodeKey(surtPrefix, 0), record -> record.urlkey.startsWith(surtPrefix), filter, false); + return () -> filteredCaptures(Capture.encodeKeyV0(surtPrefix, 0), record -> record.urlkey.startsWith(surtPrefix), filter, false); } public Iterable prefixQueryAP(String surtPrefix, String accessPoint) { @@ -77,7 +77,7 @@ public Iterable prefixQueryAP(String surtPrefix, String accessPoint) { * Returns all captures with keys in the given range. */ public Iterable rangeQuery(String startSurt, String endSurt, Predicate filter) { - return () -> filteredCaptures(Capture.encodeKey(startSurt, 0), record -> record.urlkey.compareTo(endSurt) < 0, filter, false); + return () -> filteredCaptures(Capture.encodeKeyV0(startSurt, 0), record -> record.urlkey.compareTo(endSurt) < 0, filter, false); } /** @@ -89,7 +89,7 @@ public Iterable query(String surt, Predicate filter) { public Iterable query(String surt, long from, long to, Predicate filter) { String urlkey = resolveAlias(surt); - byte[] key = Capture.encodeKey(urlkey, from); + byte[] key = Capture.encodeKeyV0(urlkey, from); return () -> filteredCaptures(key, record -> record.urlkey.equals(urlkey) && record.timestamp <= to, filter, false); } @@ -113,7 +113,7 @@ public Iterable reverseQuery(String surt, Predicate filter) { public Iterable reverseQuery(String surt, long from, long to, Predicate filter) { String urlkey = resolveAlias(surt); - byte[] key = Capture.encodeKey(urlkey, to); + byte[] key = Capture.encodeKeyV0(urlkey, to); return () -> filteredCaptures(key, record -> record.urlkey.equals(urlkey) && record.timestamp >= from, filter, true); } @@ -122,7 +122,7 @@ public Iterable reverseQuery(String surt, long from, long to, Predicate */ public Iterable closestQuery(String surt, long targetTimestamp, Predicate filter) { String urlkey = resolveAlias(surt); - byte[] key = Capture.encodeKey(urlkey, targetTimestamp); + byte[] key = Capture.encodeKeyV0(urlkey, targetTimestamp); Predicate scope = record -> record.urlkey.equals(urlkey); return () -> new ClosestTimestampIterator(targetTimestamp, filteredCaptures(key, scope, filter, false), @@ -239,14 +239,14 @@ private Capture pickForward() { * Perform a query without first resolving aliases. */ private Iterable rawQuery(String key, Predicate filter, boolean reverse) { - return () -> filteredCaptures(Capture.encodeKey(key, 0), record -> record.urlkey.equals(key), filter, reverse); + return () -> filteredCaptures(Capture.encodeKeyV0(key, 0), record -> record.urlkey.equals(key), filter, reverse); } /** * Returns all captures starting from the given key. */ Iterable capturesAfter(String start) { - return () -> filteredCaptures(Capture.encodeKey(start, 0), record -> true, null, false); + return () -> filteredCaptures(Capture.encodeKeyV0(start, 0), record -> true, null, false); } public String resolveAlias(String surt) { diff --git a/src/outbackcdx/Main.java b/src/outbackcdx/Main.java index 5387741..3658a4e 100644 --- a/src/outbackcdx/Main.java +++ b/src/outbackcdx/Main.java @@ -28,7 +28,7 @@ public class Main { public static void usage() { System.err.println("Usage: java " + Main.class.getName() + " [options...]"); - System.err.println(""); + System.err.println(); System.err.println(" -b bindaddr Bind to a particular IP address"); System.err.println(" -d datadir Directory to store index data under"); System.err.println(" -i Inherit the server socket via STDIN (for use with systemd, inetd etc)"); @@ -53,10 +53,15 @@ public static void usage() { System.err.println(" --update-interval poll-interval Polling frequency for upstream changes, in seconds. Default: 10"); System.err.println(" --accept-writes Allow writes to this node, even though running as a secondary"); System.err.println(" --batch-size Approximate max size (in bytes) per replication batch"); + System.err.println(); + System.err.println("Enable experimental index versions. DANGER: Upgrading a version 3 index to version 4 is not yet supported and " + + "updating or deleting existing version 3 records will silently fail."); + System.err.println(" --index-version 4 Treats records as distinct if they have a different filename or offset" + + " even if they have identical url and date"); System.exit(1); } - public static void main(String args[]) { + public static void main(String[] args) { boolean undertow = false; String host = null; int port = 8080; @@ -93,6 +98,11 @@ public static void main(String args[]) { case "-i": inheritSocket = true; break; + case "--index-version": + System.err.println("WARNING: Experimental index version 4 enabled. Do not use this option (yet) on an " + + "pre-existing version 3 index. Updating or deleting older records will silently fail."); + FeatureFlags.setIndexVersion(Integer.parseInt(args[++i])); + break; case "-j": try { authorizer = new JwtAuthorizer(new URL(args[++i]), args[++i]); diff --git a/test/outbackcdx/CaptureTest.java b/test/outbackcdx/CaptureTest.java index a6498a0..f5ff67b 100644 --- a/test/outbackcdx/CaptureTest.java +++ b/test/outbackcdx/CaptureTest.java @@ -24,6 +24,15 @@ public void testRecordsCanBeEncodedAndDecoded() { assertEquals(src.date().getTime(), 1388579640000L); } + @Test + public void testV4Encoding() { + Capture src = dummyRecord(); + byte[] key = src.encodeKey(4); + byte[] value = src.encodeValue(4); + Capture dst = new Capture(key, value); + assertFieldsEqual(src, dst); + } + static void assertFieldsEqual(Capture src, Capture dst) { assertEquals(src.compressedoffset, dst.compressedoffset); assertEquals(src.digest, dst.digest); diff --git a/test/outbackcdx/IndexTest.java b/test/outbackcdx/IndexTest.java index 487e877..b46903f 100644 --- a/test/outbackcdx/IndexTest.java +++ b/test/outbackcdx/IndexTest.java @@ -155,4 +155,24 @@ public void testFromAndTo() throws IOException { } } -} + @Test + public void indexV4ShouldPreserveDistinctRecordsWithTheSameUrlAndDate() throws IOException { + int oldVersion = FeatureFlags.indexVersion(); + FeatureFlags.setIndexVersion(4); + try { + try (Index.Batch batch = index.beginUpdate()) { + batch.putCapture(Capture.fromCdxLine("- 20050101000000 http://v4.org/ text/html 200 - - 0 w1", index.canonicalizer)); + batch.putCapture(Capture.fromCdxLine("- 20050101000000 http://v4.org/ text/html 200 - - 10 w1", index.canonicalizer)); + batch.putCapture(Capture.fromCdxLine("- 20050101000000 http://v4.org/ text/html 200 - - 0 w2", index.canonicalizer)); + batch.putCapture(Capture.fromCdxLine("- 20050101000000 http://v4.org/ text/html 200 - - 0 w2", index.canonicalizer)); + batch.commit(); + } + + List results = new ArrayList<>(); + index.query("org,v4)/", null).forEach(results::add); + assertEquals(3, results.size()); + } finally { + FeatureFlags.setIndexVersion(oldVersion); + } + } +} \ No newline at end of file From 48b452464805b597333f8e7ecc561603a1771141 Mon Sep 17 00:00:00 2001 From: Alex Osborne Date: Mon, 17 Feb 2020 22:12:39 +0900 Subject: [PATCH 2/2] Fix integration tests: central.maven.org not longer works --- test-integration/test-openwayback.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-integration/test-openwayback.sh b/test-integration/test-openwayback.sh index b88a72b..e44b7e4 100755 --- a/test-integration/test-openwayback.sh +++ b/test-integration/test-openwayback.sh @@ -13,8 +13,8 @@ function fetch { # mkdir -p deps -fetch wayback.war http://central.maven.org/maven2/org/netpreserve/openwayback/openwayback-webapp/2.3.1/openwayback-webapp-2.3.1.war -fetch jetty-runner.jar http://central.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.0.M1/jetty-runner-9.4.0.M1.jar +fetch wayback.war https://repo1.maven.org/maven2/org/netpreserve/openwayback/openwayback-webapp/2.3.1/openwayback-webapp-2.3.1.war +fetch jetty-runner.jar https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.0.M1/jetty-runner-9.4.0.M1.jar # # Prepare wayback