From 3431b16469f621f816990b4cfc61395cc1eff3ae Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Thu, 2 May 2013 15:23:53 -0400 Subject: [PATCH 01/14] adding todo notes --- .../com/basho/proserv/datamigrator/BucketDumper.java | 12 ++++++++++++ .../java/com/basho/proserv/datamigrator/Main.java | 2 ++ .../basho/proserv/datamigrator/io/KeyJournal.java | 4 +++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index cf0b8d0..15fe4f1 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -98,6 +98,14 @@ public long dumpBuckets(Set bucketNames, boolean resume, boolean keysOnl } return objectCount; } + + + //TODO: Implement + public int dumpKeysFromBucket() { + //key journal? readmode, use key journal as iterator, key event / datatype + //dumpBucket logic? except for dumping the keys, take file, give it to key journal, and pass it to the bucket process + return 1; + } // resume is unimplemented public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { @@ -118,6 +126,10 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { if (this.verboseStatusOutput) { System.out.println("\nDumping bucket " + bucketName); } + + //keyevent + //riakevent + long objectCount = 0; long valueErrorCount = 0; diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 375c69c..e9f6df1 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -521,6 +521,8 @@ private static CommandLine parseCommandLine(Options options, String[] args) thro return cmd; } + //TODO: add option that reads a partial list of keys from file to backup (in same format as -k) + private static Options createOptions() { Options options = new Options(); diff --git a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java index ea7cdfe..c18e75f 100644 --- a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java +++ b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java @@ -40,7 +40,9 @@ public KeyJournal(File path, Mode mode) { } this.mode = mode; } - + + //TODO: Modify this file if necessary + public void write(Key key) throws IOException { if (key == null) { throw new IllegalArgumentException("key must not be null"); From 10fc6bae28e3f2698b10bbc9855746b3b2e62ab8 Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Tue, 7 May 2013 23:05:34 -0400 Subject: [PATCH 02/14] first run at loading from keyfile --- README.md | 5 +- .../proserv/datamigrator/BucketDumper.java | 77 ++++++++++++++----- .../proserv/datamigrator/Configuration.java | 17 +++- .../com/basho/proserv/datamigrator/Main.java | 20 ++++- 4 files changed, 90 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 4ad16ff..452d956 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ Settings Transfer (optional, used with to -d or -l) You must also specify -d to export or -l to import, with this option. Delete a bucket ---delete Delete bucket data. Cannot be used with -d, -l, -k, or -t. Must be used with -b or -f +--delete Delete bucket data. Cannot be used with -d, -l, -k, or -t. Must be used with -b or -f Path (required) -r Set the path for data to be loaded to or dumped from (path must be valid) @@ -96,6 +96,7 @@ Bucket Options (required for -d, -k or -t) -a Export all buckets. -b Export a single bucket. -f Export multiple buckets listed in a file (containing line-delimited bucket names) +-K Export multiple keys listed in a file (containing line-delimited bucket,key names) Cluster Addresses and Ports (required) -h Specify Riak hostname. Required if a cluster host name file is not specified. @@ -115,7 +116,7 @@ Concurrency and Misc Settings at most 2 queues for Load/Dump operations. Copy Settings ---copy Set to Copy buckets to one cluster to another. Cannot be used with d, k, l, k or delete. +--copy Set to Copy buckets from one cluster to another. Cannot be used with d, k, l or delete. --copyhost Specify destination Riak host for *copy* operation --copyhostsfile Specify a file containing Cluster Host Names. Req'd if a single copyhost not specified. diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index 15fe4f1..105abf7 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -2,6 +2,9 @@ import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.slf4j.Logger; @@ -28,6 +31,7 @@ public class BucketDumper { private final File dataRoot; private final boolean verboseStatusOutput; private int errorCount = 0; + private Map bucketMap; private long timerStart = System.currentTimeMillis(); private long previousCount = 0; @@ -42,7 +46,8 @@ public BucketDumper(Connection connection, Connection httpConnection, Configurat if (config.getFilePath() == null) { throw new IllegalArgumentException("dataRoot cannot be null"); } - + + this.bucketMap = new HashMap(); this.connection = connection; this.httpConnection = httpConnection; this.config = config; @@ -94,21 +99,44 @@ public long dumpBuckets(Set bucketNames, boolean resume, boolean keysOnl } int objectCount = 0; for (String bucketName : bucketNames) { - objectCount += dumpBucket(bucketName, resume, keysOnly); + objectCount += dumpBucket(bucketName, null, resume, keysOnly); } return objectCount; } + public long dumpKeys(Set bucketKeyNames) { + if (!this.connection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; + + Map> bucketKeysMap = new HashMap>(); + + //Scan the list and build a map + for (String bucketKeyName : bucketKeyNames) { + String[] bucketKeyNamesArr = bucketKeyName.split(","); + + if (bucketKeyNamesArr.length != 2) { + throw new IllegalArgumentException("Key file lines must have the format: bucket,key"); + } - //TODO: Implement - public int dumpKeysFromBucket() { - //key journal? readmode, use key journal as iterator, key event / datatype - //dumpBucket logic? except for dumping the keys, take file, give it to key journal, and pass it to the bucket process - return 1; + if(!bucketKeysMap.containsKey(bucketKeyNamesArr[0])) { + Set keySet = new HashSet(); + bucketKeysMap.put(bucketKeyNamesArr[0], keySet); + } + + bucketKeysMap.get(bucketKeyNamesArr[0]).add(bucketKeyNamesArr[1]); + } + + for (Map.Entry> aMapEntry : bucketKeysMap.entrySet()) { + objectCount += dumpBucket(aMapEntry.getKey(), aMapEntry.getValue() , false, false); + } + return objectCount; } - + // resume is unimplemented - public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { + public long dumpBucket(String bucketName, Set keyNames, boolean resume, boolean keysOnly) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName cannot be null or empty"); } @@ -126,10 +154,6 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { if (this.verboseStatusOutput) { System.out.println("\nDumping bucket " + bucketName); } - - //keyevent - //riakevent - long objectCount = 0; long valueErrorCount = 0; @@ -144,7 +168,7 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { long keyCount = 0; try { - keyCount = this.dumpBucketKeys(bucketName, keyPath); + keyCount = this.dumpBucketKeys(bucketName, keyNames, keyPath); } catch (IOException e){ log.error("Error listing keys for bucket " + bucketName, e); this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); @@ -225,18 +249,22 @@ public int errorCount() { return errorCount; } - public long dumpBucketKeys(String bucketName, File filePath) throws IOException { + public long dumpBucketKeys(String bucketName, Iterable keyNames, File filePath) throws IOException { KeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); long keyCount = 0; - Iterable keys = this.connection.riakClient.listKeys(bucketName); - for (String keyString : keys) { + + if (keyNames == null) { + keyNames = this.connection.riakClient.listKeys(bucketName); + } + + for (String keyString : keyNames) { keyJournal.write(bucketName, keyString); ++keyCount; } keyJournal.close(); return keyCount; } - + private void saveBucketSettings(String bucketName, File path) { File xmlPath = RiakBucketProperties.createBucketSettingsFile(path); RiakBucketProperties riakBucketProps = new RiakBucketProperties(this.httpConnection); @@ -275,8 +303,15 @@ private String createBucketPath(String bucketName) { } private RiakObjectBucket createBucket(String bucketName) { - String bucketRootPath = this.createBucketPath(bucketName); - File bucketRoot = new File(bucketRootPath); - return new RiakObjectBucket(bucketRoot, RiakObjectBucket.BucketMode.WRITE, this.config); + RiakObjectBucket aRiakObjectBucket = this.bucketMap.get(bucketName); + + if (aRiakObjectBucket == null) { + String bucketRootPath = this.createBucketPath(bucketName); + File bucketRoot = new File(bucketRootPath); + aRiakObjectBucket = new RiakObjectBucket(bucketRoot, RiakObjectBucket.BucketMode.WRITE, this.config); + this.bucketMap.put(bucketName, aRiakObjectBucket); + } + + return aRiakObjectBucket; } } diff --git a/src/main/java/com/basho/proserv/datamigrator/Configuration.java b/src/main/java/com/basho/proserv/datamigrator/Configuration.java index 5ed3a81..1905126 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Configuration.java +++ b/src/main/java/com/basho/proserv/datamigrator/Configuration.java @@ -17,7 +17,8 @@ public static enum Mode { LOAD, DUMP }; public static enum Operation { ALL_BUCKETS, BUCKETS, ALL_KEYS, - BUCKET_KEYS, + BUCKET_KEYS, + KEYS, BUCKET_PROPERTIES, DELETE_BUCKETS, COPY_ALL, @@ -39,6 +40,7 @@ public static enum Operation { ALL_BUCKETS, private int queueSize = DEFAULT_QUEUE_SIZE; private Set bucketNames = new HashSet(); + private Set keyNames = new HashSet(); private boolean verboseStatus = true; private boolean resetVClock = false; @@ -137,9 +139,16 @@ public void addBucketName(String bucket) { public void addBucketNames(Collection buckets) { this.bucketNames.addAll(buckets); } - public Set getBucketNames() { - return this.bucketNames; - } + public Set getBucketNames() { + return this.bucketNames; + } + + public void addKeyNames(Collection keys) { + this.keyNames.addAll(keys); + } + public Set getKeyNames() { + return this.keyNames; + } public void setVerboseStatus(boolean verboseStatus) { this.verboseStatus = verboseStatus; diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index e9f6df1..1b9299a 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -209,18 +209,31 @@ public static Configuration handleCommandLine(CommandLine cmd) { System.exit(1); } } + + // Dump from a list of buckets/keys + if (cmd.hasOption("K")) { + try { + String fileName = cmd.getOptionValue("K"); + config.addKeyNames(Utilities.readFileLines(fileName)); + config.setOperation(Configuration.Operation.KEYS); + } catch (Exception e) { + System.out.println("Could not read file containing list of bucket,keys"); + System.exit(1); + } + } + // Keys only if (cmd.hasOption("k")) { // if keys only.... config.setOperation(Configuration.Operation.BUCKET_KEYS); } // Bucket properties transfer - if (cmd.hasOption("t")) { // if transfer buckets, no compatible with k + if (cmd.hasOption("t")) { // if transfer buckets, not compatible with k config.setOperation(Configuration.Operation.BUCKET_PROPERTIES); } - if (config.getBucketNames().size() == 0 && !cmd.hasOption("a")) { + if (config.getBucketNames().size() == 0 && !cmd.hasOption("a") && !cmd.hasOption("K")) { System.out.println("No buckets specified to load"); System.exit(1); } @@ -396,6 +409,8 @@ public static void runDumper(Configuration config) { dumpCount = dumper.dumpBuckets(config.getBucketNames(), config.getResume(), keysOnly); } else if (config.getOperation() == Configuration.Operation.BUCKET_PROPERTIES) { dumpCount = dumper.dumpBucketSettings(config.getBucketNames()); + } else if (config.getOperation() == Configuration.Operation.KEYS) { + dumpCount = dumper.dumpKeys(config.getKeyNames()); } else { dumpCount = dumper.dumpAllBuckets(config.getResume(), keysOnly); } @@ -533,6 +548,7 @@ private static Options createOptions() { options.addOption("a", false, "Load or Dump all buckets"); options.addOption("b", true, "Load or Dump a single bucket"); options.addOption("f", true, "Load or Dump a file containing bucket names"); + options.addOption("K", true, "Load or Dump a file containing bucket names and keys"); options.addOption("h", true, "Specify Riak Host"); options.addOption("c", true, "Specify a file containing Riak Cluster Host Names"); options.addOption("p", true, "Specify Riak PB Port"); From dcbabafb9b36b1f9b1e19c1a59f7b128412e699a Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Thu, 9 May 2013 16:42:11 -0400 Subject: [PATCH 03/14] cleanup / refactor --- .../proserv/datamigrator/BucketDumper.java | 83 +++++++------------ .../proserv/datamigrator/io/KeyJournal.java | 16 +++- 2 files changed, 45 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index 105abf7..101e5cc 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -31,7 +31,6 @@ public class BucketDumper { private final File dataRoot; private final boolean verboseStatusOutput; private int errorCount = 0; - private Map bucketMap; private long timerStart = System.currentTimeMillis(); private long previousCount = 0; @@ -47,7 +46,6 @@ public BucketDumper(Connection connection, Connection httpConnection, Configurat throw new IllegalArgumentException("dataRoot cannot be null"); } - this.bucketMap = new HashMap(); this.connection = connection; this.httpConnection = httpConnection; this.config = config; @@ -130,13 +128,14 @@ public long dumpKeys(Set bucketKeyNames) { } for (Map.Entry> aMapEntry : bucketKeysMap.entrySet()) { + objectCount += dumpBucket(aMapEntry.getKey(), aMapEntry.getValue() , false, false); } return objectCount; } // resume is unimplemented - public long dumpBucket(String bucketName, Set keyNames, boolean resume, boolean keysOnly) { + public long dumpBucket(String bucketName, Iterable keys, boolean resume, boolean keysOnly) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName cannot be null or empty"); } @@ -160,40 +159,41 @@ public long dumpBucket(String bucketName, Set keyNames, boolean resume, this.previousCount = 0; boolean error = false; - RiakObjectBucket dumpBucket = this.createBucket(bucketName); + RiakObjectBucket dumpBucket = this.createBucket(bucketName); + + File keyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/bucketkeys.keys"); + File dumpedKeyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/dumpedkeys.keys"); + + KeyJournal writeSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); + KeyJournal readSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.READ); + KeyJournal writeDestinationKeyJournal = new KeyJournal(dumpedKeyPath, KeyJournal.Mode.WRITE); - File keyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/bucketkeys.keys"); - File dumpedKeyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/dumpedkeys.keys"); - - long keyCount = 0; - try { - keyCount = this.dumpBucketKeys(bucketName, keyNames, keyPath); + // If no key subset is specified, get the entire key set + if (keys == null) { + keys = this.connection.riakClient.listKeys(bucketName); + } + + writeSourceKeyJournal.populate(bucketName, keys); + writeSourceKeyJournal.close(); } catch (IOException e){ log.error("Error listing keys for bucket " + bucketName, e); this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); return 0; } - if (keysOnly) { String bucketNameKeys= String.format("%s keys", bucketName); - this.summary.addStatistic(bucketNameKeys, keyCount, System.currentTimeMillis()-start, 0l, 0l); - return keyCount; + this.summary.addStatistic(bucketNameKeys, writeSourceKeyJournal.getKeyCount(), System.currentTimeMillis()-start, 0l, 0l); + return writeSourceKeyJournal.getKeyCount(); } - - KeyJournal bucketKeys = new KeyJournal(keyPath, KeyJournal.Mode.READ); // this.saveBucketSettings(bucketName, dumpBucket.getFileRoot()); - KeyJournal keyJournal = new KeyJournal( - dumpedKeyPath, - KeyJournal.Mode.WRITE); - try { // self closing AbstractClientDataReader reader = new ThreadedClientDataReader(connection, - new ClientReaderFactory(), - bucketKeys, + new ClientReaderFactory(), + readSourceKeyJournal, this.config.getRiakWorkerCount(), this.config.getQueueSize()); @@ -203,7 +203,7 @@ public long dumpBucket(String bucketName, Set keyNames, boolean resume, RiakObjectEvent riakEvent = event.asRiakObjectEvent(); dumpBucket.writeRiakObject(riakEvent); objectCount += riakEvent.count(); - keyJournal.write(riakEvent.key()); + writeDestinationKeyJournal.write(riakEvent.key()); } else if (event.isValueErrorEvent()) { // Count not-founds ++valueErrorCount; } else if (event.isIoErrorEvent()) { // Exit on IOException retry reached @@ -211,7 +211,7 @@ public long dumpBucket(String bucketName, Set keyNames, boolean resume, } if (this.verboseStatusOutput) { - this.printStatus(keyCount, objectCount, false); + this.printStatus(writeSourceKeyJournal.getKeyCount(), objectCount, false); } } } catch (IOException e) { @@ -221,7 +221,7 @@ public long dumpBucket(String bucketName, Set keyNames, boolean resume, } catch (InterruptedException e) { //no-op } finally { - keyJournal.close(); + writeDestinationKeyJournal.close(); dumpBucket.close(); } @@ -238,7 +238,7 @@ public long dumpBucket(String bucketName, Set keyNames, boolean resume, } if (this.verboseStatusOutput) { - this.printStatus(keyCount, objectCount, true); + this.printStatus(writeSourceKeyJournal.getKeyCount(), objectCount, true); } return objectCount; @@ -248,22 +248,6 @@ public long dumpBucket(String bucketName, Set keyNames, boolean resume, public int errorCount() { return errorCount; } - - public long dumpBucketKeys(String bucketName, Iterable keyNames, File filePath) throws IOException { - KeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); - long keyCount = 0; - - if (keyNames == null) { - keyNames = this.connection.riakClient.listKeys(bucketName); - } - - for (String keyString : keyNames) { - keyJournal.write(bucketName, keyString); - ++keyCount; - } - keyJournal.close(); - return keyCount; - } private void saveBucketSettings(String bucketName, File path) { File xmlPath = RiakBucketProperties.createBucketSettingsFile(path); @@ -301,17 +285,10 @@ private String createBucketPath(String bucketName) { String encodedBucketName = Utilities.urlEncode(bucketName); return this.dataRoot.getAbsolutePath() + "/" + encodedBucketName; } - - private RiakObjectBucket createBucket(String bucketName) { - RiakObjectBucket aRiakObjectBucket = this.bucketMap.get(bucketName); - - if (aRiakObjectBucket == null) { - String bucketRootPath = this.createBucketPath(bucketName); - File bucketRoot = new File(bucketRootPath); - aRiakObjectBucket = new RiakObjectBucket(bucketRoot, RiakObjectBucket.BucketMode.WRITE, this.config); - this.bucketMap.put(bucketName, aRiakObjectBucket); - } - return aRiakObjectBucket; - } + private RiakObjectBucket createBucket(String bucketName) { + String bucketRootPath = this.createBucketPath(bucketName); + File bucketRoot = new File(bucketRootPath); + return new RiakObjectBucket(bucketRoot, RiakObjectBucket.BucketMode.WRITE, this.config); + } } diff --git a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java index c18e75f..230299f 100644 --- a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java +++ b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java @@ -21,6 +21,7 @@ public enum Mode { READ, WRITE } private final Mode mode; private final BufferedWriter writer; private final BufferedReader reader; + private long keyCount; private boolean closed = false; public KeyJournal(File path, Mode mode) { @@ -39,9 +40,21 @@ public KeyJournal(File path, Mode mode) { throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); } this.mode = mode; + this.keyCount = 0; } - //TODO: Modify this file if necessary + public void populate(String bucketName, Iterable keys) throws IOException { + for (String keyString : keys) { + this.write(bucketName, keyString); + } + } + + public long getKeyCount() { + if (mode == Mode.READ) { + throw new IllegalArgumentException ("KeyJournal is in READ mode and cannot determine keyCount"); + } + return this.keyCount; + } public void write(Key key) throws IOException { if (key == null) { @@ -58,6 +71,7 @@ public void write(String bucket, String key) throws IOException { throw new IllegalArgumentException("bucket and key must not be null"); } this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); + this.keyCount++; } public void write(RiakObject riakObject) throws IOException { From 601ed60186e431cae110e572612413ea4c4ece13 Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Thu, 9 May 2013 16:51:10 -0400 Subject: [PATCH 04/14] fixing tabs... --- .../proserv/datamigrator/BucketDumper.java | 86 +++++++++---------- .../proserv/datamigrator/Configuration.java | 22 ++--- .../com/basho/proserv/datamigrator/Main.java | 28 +++--- .../proserv/datamigrator/io/KeyJournal.java | 28 +++--- 4 files changed, 82 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index 101e5cc..57cff7e 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -102,37 +102,37 @@ public long dumpBuckets(Set bucketNames, boolean resume, boolean keysOnl return objectCount; } - public long dumpKeys(Set bucketKeyNames) { - if (!this.connection.connected()) { - log.error("Not connected to Riak"); - return 0; - } - int objectCount = 0; + public long dumpKeys(Set bucketKeyNames) { + if (!this.connection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; - Map> bucketKeysMap = new HashMap>(); + Map> bucketKeysMap = new HashMap>(); - //Scan the list and build a map - for (String bucketKeyName : bucketKeyNames) { - String[] bucketKeyNamesArr = bucketKeyName.split(","); + //Scan the list and build a map + for (String bucketKeyName : bucketKeyNames) { + String[] bucketKeyNamesArr = bucketKeyName.split(","); - if (bucketKeyNamesArr.length != 2) { - throw new IllegalArgumentException("Key file lines must have the format: bucket,key"); - } + if (bucketKeyNamesArr.length != 2) { + throw new IllegalArgumentException("Key file lines must have the format: bucket,key"); + } - if(!bucketKeysMap.containsKey(bucketKeyNamesArr[0])) { - Set keySet = new HashSet(); - bucketKeysMap.put(bucketKeyNamesArr[0], keySet); - } + if(!bucketKeysMap.containsKey(bucketKeyNamesArr[0])) { + Set keySet = new HashSet(); + bucketKeysMap.put(bucketKeyNamesArr[0], keySet); + } - bucketKeysMap.get(bucketKeyNamesArr[0]).add(bucketKeyNamesArr[1]); - } + bucketKeysMap.get(bucketKeyNamesArr[0]).add(bucketKeyNamesArr[1]); + } - for (Map.Entry> aMapEntry : bucketKeysMap.entrySet()) { + for (Map.Entry> aMapEntry : bucketKeysMap.entrySet()) { - objectCount += dumpBucket(aMapEntry.getKey(), aMapEntry.getValue() , false, false); - } - return objectCount; - } + objectCount += dumpBucket(aMapEntry.getKey(), aMapEntry.getValue() , false, false); + } + return objectCount; + } // resume is unimplemented public long dumpBucket(String bucketName, Iterable keys, boolean resume, boolean keysOnly) { @@ -159,23 +159,23 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, this.previousCount = 0; boolean error = false; - RiakObjectBucket dumpBucket = this.createBucket(bucketName); + RiakObjectBucket dumpBucket = this.createBucket(bucketName); - File keyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/bucketkeys.keys"); + File keyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/bucketkeys.keys"); File dumpedKeyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/dumpedkeys.keys"); - KeyJournal writeSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); - KeyJournal readSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.READ); - KeyJournal writeDestinationKeyJournal = new KeyJournal(dumpedKeyPath, KeyJournal.Mode.WRITE); + KeyJournal writeSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); + KeyJournal readSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.READ); + KeyJournal writeDestinationKeyJournal = new KeyJournal(dumpedKeyPath, KeyJournal.Mode.WRITE); try { - // If no key subset is specified, get the entire key set - if (keys == null) { - keys = this.connection.riakClient.listKeys(bucketName); - } + // If no key subset is specified, get the entire key set + if (keys == null) { + keys = this.connection.riakClient.listKeys(bucketName); + } - writeSourceKeyJournal.populate(bucketName, keys); - writeSourceKeyJournal.close(); + writeSourceKeyJournal.populate(bucketName, keys); + writeSourceKeyJournal.close(); } catch (IOException e){ log.error("Error listing keys for bucket " + bucketName, e); this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); @@ -193,7 +193,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, // self closing AbstractClientDataReader reader = new ThreadedClientDataReader(connection, new ClientReaderFactory(), - readSourceKeyJournal, + readSourceKeyJournal, this.config.getRiakWorkerCount(), this.config.getQueueSize()); @@ -203,7 +203,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, RiakObjectEvent riakEvent = event.asRiakObjectEvent(); dumpBucket.writeRiakObject(riakEvent); objectCount += riakEvent.count(); - writeDestinationKeyJournal.write(riakEvent.key()); + writeDestinationKeyJournal.write(riakEvent.key()); } else if (event.isValueErrorEvent()) { // Count not-founds ++valueErrorCount; } else if (event.isIoErrorEvent()) { // Exit on IOException retry reached @@ -221,7 +221,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, } catch (InterruptedException e) { //no-op } finally { - writeDestinationKeyJournal.close(); + writeDestinationKeyJournal.close(); dumpBucket.close(); } @@ -286,9 +286,9 @@ private String createBucketPath(String bucketName) { return this.dataRoot.getAbsolutePath() + "/" + encodedBucketName; } - private RiakObjectBucket createBucket(String bucketName) { - String bucketRootPath = this.createBucketPath(bucketName); - File bucketRoot = new File(bucketRootPath); - return new RiakObjectBucket(bucketRoot, RiakObjectBucket.BucketMode.WRITE, this.config); - } + private RiakObjectBucket createBucket(String bucketName) { + String bucketRootPath = this.createBucketPath(bucketName); + File bucketRoot = new File(bucketRootPath); + return new RiakObjectBucket(bucketRoot, RiakObjectBucket.BucketMode.WRITE, this.config); + } } diff --git a/src/main/java/com/basho/proserv/datamigrator/Configuration.java b/src/main/java/com/basho/proserv/datamigrator/Configuration.java index 1905126..c070f19 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Configuration.java +++ b/src/main/java/com/basho/proserv/datamigrator/Configuration.java @@ -18,7 +18,7 @@ public static enum Operation { ALL_BUCKETS, BUCKETS, ALL_KEYS, BUCKET_KEYS, - KEYS, + KEYS, BUCKET_PROPERTIES, DELETE_BUCKETS, COPY_ALL, @@ -40,7 +40,7 @@ public static enum Operation { ALL_BUCKETS, private int queueSize = DEFAULT_QUEUE_SIZE; private Set bucketNames = new HashSet(); - private Set keyNames = new HashSet(); + private Set keyNames = new HashSet(); private boolean verboseStatus = true; private boolean resetVClock = false; @@ -139,16 +139,16 @@ public void addBucketName(String bucket) { public void addBucketNames(Collection buckets) { this.bucketNames.addAll(buckets); } - public Set getBucketNames() { - return this.bucketNames; - } + public Set getBucketNames() { + return this.bucketNames; + } - public void addKeyNames(Collection keys) { - this.keyNames.addAll(keys); - } - public Set getKeyNames() { - return this.keyNames; - } + public void addKeyNames(Collection keys) { + this.keyNames.addAll(keys); + } + public Set getKeyNames() { + return this.keyNames; + } public void setVerboseStatus(boolean verboseStatus) { this.verboseStatus = verboseStatus; diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 1b9299a..66127bb 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -210,17 +210,17 @@ public static Configuration handleCommandLine(CommandLine cmd) { } } - // Dump from a list of buckets/keys - if (cmd.hasOption("K")) { - try { - String fileName = cmd.getOptionValue("K"); - config.addKeyNames(Utilities.readFileLines(fileName)); - config.setOperation(Configuration.Operation.KEYS); - } catch (Exception e) { - System.out.println("Could not read file containing list of bucket,keys"); - System.exit(1); - } - } + // Dump from a list of buckets/keys + if (cmd.hasOption("K")) { + try { + String fileName = cmd.getOptionValue("K"); + config.addKeyNames(Utilities.readFileLines(fileName)); + config.setOperation(Configuration.Operation.KEYS); + } catch (Exception e) { + System.out.println("Could not read file containing list of bucket,keys"); + System.exit(1); + } + } // Keys only if (cmd.hasOption("k")) { // if keys only.... @@ -409,8 +409,8 @@ public static void runDumper(Configuration config) { dumpCount = dumper.dumpBuckets(config.getBucketNames(), config.getResume(), keysOnly); } else if (config.getOperation() == Configuration.Operation.BUCKET_PROPERTIES) { dumpCount = dumper.dumpBucketSettings(config.getBucketNames()); - } else if (config.getOperation() == Configuration.Operation.KEYS) { - dumpCount = dumper.dumpKeys(config.getKeyNames()); + } else if (config.getOperation() == Configuration.Operation.KEYS) { + dumpCount = dumper.dumpKeys(config.getKeyNames()); } else { dumpCount = dumper.dumpAllBuckets(config.getResume(), keysOnly); } @@ -548,7 +548,7 @@ private static Options createOptions() { options.addOption("a", false, "Load or Dump all buckets"); options.addOption("b", true, "Load or Dump a single bucket"); options.addOption("f", true, "Load or Dump a file containing bucket names"); - options.addOption("K", true, "Load or Dump a file containing bucket names and keys"); + options.addOption("K", true, "Load or Dump a file containing bucket names and keys"); options.addOption("h", true, "Specify Riak Host"); options.addOption("c", true, "Specify a file containing Riak Cluster Host Names"); options.addOption("p", true, "Specify Riak PB Port"); diff --git a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java index 230299f..934f5eb 100644 --- a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java +++ b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java @@ -21,7 +21,7 @@ public enum Mode { READ, WRITE } private final Mode mode; private final BufferedWriter writer; private final BufferedReader reader; - private long keyCount; + private long keyCount; private boolean closed = false; public KeyJournal(File path, Mode mode) { @@ -40,21 +40,21 @@ public KeyJournal(File path, Mode mode) { throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); } this.mode = mode; - this.keyCount = 0; + this.keyCount = 0; } - public void populate(String bucketName, Iterable keys) throws IOException { - for (String keyString : keys) { - this.write(bucketName, keyString); - } - } + public void populate(String bucketName, Iterable keys) throws IOException { + for (String keyString : keys) { + this.write(bucketName, keyString); + } + } - public long getKeyCount() { - if (mode == Mode.READ) { - throw new IllegalArgumentException ("KeyJournal is in READ mode and cannot determine keyCount"); - } - return this.keyCount; - } + public long getKeyCount() { + if (mode == Mode.READ) { + throw new IllegalArgumentException ("KeyJournal is in READ mode and cannot determine keyCount"); + } + return this.keyCount; + } public void write(Key key) throws IOException { if (key == null) { @@ -71,7 +71,7 @@ public void write(String bucket, String key) throws IOException { throw new IllegalArgumentException("bucket and key must not be null"); } this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); - this.keyCount++; + this.keyCount++; } public void write(RiakObject riakObject) throws IOException { From 2553856a05c54da6adf74bfe0ba486b449acd629 Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Thu, 9 May 2013 16:55:38 -0400 Subject: [PATCH 05/14] naming --- .../com/basho/proserv/datamigrator/BucketDumper.java | 8 ++++---- src/main/java/com/basho/proserv/datamigrator/Main.java | 2 -- .../com/basho/proserv/datamigrator/io/KeyJournal.java | 10 +++++----- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index 57cff7e..b27adbd 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -183,8 +183,8 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, } if (keysOnly) { String bucketNameKeys= String.format("%s keys", bucketName); - this.summary.addStatistic(bucketNameKeys, writeSourceKeyJournal.getKeyCount(), System.currentTimeMillis()-start, 0l, 0l); - return writeSourceKeyJournal.getKeyCount(); + this.summary.addStatistic(bucketNameKeys, writeSourceKeyJournal.getWriteCount(), System.currentTimeMillis()-start, 0l, 0l); + return writeSourceKeyJournal.getWriteCount(); } // this.saveBucketSettings(bucketName, dumpBucket.getFileRoot()); @@ -211,7 +211,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, } if (this.verboseStatusOutput) { - this.printStatus(writeSourceKeyJournal.getKeyCount(), objectCount, false); + this.printStatus(writeSourceKeyJournal.getWriteCount(), objectCount, false); } } } catch (IOException e) { @@ -238,7 +238,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, } if (this.verboseStatusOutput) { - this.printStatus(writeSourceKeyJournal.getKeyCount(), objectCount, true); + this.printStatus(writeSourceKeyJournal.getWriteCount(), objectCount, true); } return objectCount; diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 66127bb..9929fc6 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -536,8 +536,6 @@ private static CommandLine parseCommandLine(Options options, String[] args) thro return cmd; } - //TODO: add option that reads a partial list of keys from file to backup (in same format as -k) - private static Options createOptions() { Options options = new Options(); diff --git a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java index 934f5eb..1bc2610 100644 --- a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java +++ b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java @@ -21,7 +21,7 @@ public enum Mode { READ, WRITE } private final Mode mode; private final BufferedWriter writer; private final BufferedReader reader; - private long keyCount; + private long writeCount; private boolean closed = false; public KeyJournal(File path, Mode mode) { @@ -40,7 +40,7 @@ public KeyJournal(File path, Mode mode) { throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); } this.mode = mode; - this.keyCount = 0; + this.writeCount = 0; } public void populate(String bucketName, Iterable keys) throws IOException { @@ -49,11 +49,11 @@ public void populate(String bucketName, Iterable keys) throws IOExceptio } } - public long getKeyCount() { + public long getWriteCount() { if (mode == Mode.READ) { throw new IllegalArgumentException ("KeyJournal is in READ mode and cannot determine keyCount"); } - return this.keyCount; + return this.writeCount; } public void write(Key key) throws IOException { @@ -71,7 +71,7 @@ public void write(String bucket, String key) throws IOException { throw new IllegalArgumentException("bucket and key must not be null"); } this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); - this.keyCount++; + this.writeCount++; } public void write(RiakObject riakObject) throws IOException { From 50143f349ed6854473058161ebc2a019e207e5e7 Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Fri, 10 May 2013 10:50:44 -0400 Subject: [PATCH 06/14] clarifying verbiage --- src/main/java/com/basho/proserv/datamigrator/Main.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 9929fc6..eb8456d 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -167,7 +167,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { System.exit(1); } } else { - System.out.println("Port not specified, using the default: 8087"); + System.out.println("PB Port not specified, using the default: 8087"); } // HTTP Port From ce89ba7c08847261f5e41b5da2346c56e12118df Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Fri, 10 May 2013 14:53:17 -0400 Subject: [PATCH 07/14] changing -K to --loadkeys --- README | 2 ++ README.md | 31 ++++++++++++++----- .../com/basho/proserv/datamigrator/Main.java | 10 +++--- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/README b/README index c80788a..d6a307b 100644 --- a/README +++ b/README @@ -21,6 +21,8 @@ Options: -b Load or Dump a single bucket. -f Load or Dump a file containing line delimited bucket names +--loadkeys Export multiple keys listed in a file + (containing line-delimited bucket,key names) -h Specify Riak host. Required if a cluster host name file is not specified. -c Specify a file containing line delimited Riak diff --git a/README.md b/README.md index 452d956..fd9bfed 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ Bucket Options (required for -d, -k or -t) -a Export all buckets. -b Export a single bucket. -f Export multiple buckets listed in a file (containing line-delimited bucket names) --K Export multiple keys listed in a file (containing line-delimited bucket,key names) +--loadkeys Export multiple keys listed in a file (containing line-delimited bucket,key names) Cluster Addresses and Ports (required) -h Specify Riak hostname. Required if a cluster host name file is not specified. @@ -129,25 +129,40 @@ Copy Settings Examples: ------------------------- Dump (the contents of) all buckets from Riak: -```java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098``` +``` +java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098 +``` + +Dump a subset of keys from Riak: +``` +java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 +``` Load all buckets previously dumped back into Riak: -```java -jar riak-data-migrator-0.2.4.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098``` +``` +java -jar riak-data-migrator-0.2.4.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098 +``` Dump (the contents of) buckets listed in a line delimited file from a Riak cluster: -
+```
 java -jar riak-data-migrator-0.2.4.jar -d -f /home/riakadmin/buckets_to_export.txt -r \  
 /var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098
-
+``` Export only the bucket settings from a bucket named "Flights": -```java -jar riak-data-migrator-0.2.4.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098``` +``` +java -jar riak-data-migrator-0.2.4.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +``` Load bucket settings for a bucket named "Flights": -```java -jar riak-data-migrator-0.2.4.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098``` +``` +java -jar riak-data-migrator-0.2.4.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +``` Copy all buckets from one riak host to another: -```java -jar riak-data-migrator-0.2.4.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087``` +``` +java -jar riak-data-migrator-0.2.4.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 +``` Caveats: ------------------------ diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index eb8456d..00c0591 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -92,7 +92,7 @@ public static void main(String[] args) { runLoader(config); } - if (cmd.hasOption("d") || cmd.hasOption("k") || (cmd.hasOption("d") && cmd.hasOption("t"))) { + if (cmd.hasOption("d") || cmd.hasOption("loadkeys") || (cmd.hasOption("d") && cmd.hasOption("t"))) { runDumper(config); } @@ -211,9 +211,9 @@ public static Configuration handleCommandLine(CommandLine cmd) { } // Dump from a list of buckets/keys - if (cmd.hasOption("K")) { + if (cmd.hasOption("loadkeys")) { try { - String fileName = cmd.getOptionValue("K"); + String fileName = cmd.getOptionValue("loadkeys"); config.addKeyNames(Utilities.readFileLines(fileName)); config.setOperation(Configuration.Operation.KEYS); } catch (Exception e) { @@ -233,7 +233,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { } - if (config.getBucketNames().size() == 0 && !cmd.hasOption("a") && !cmd.hasOption("K")) { + if (config.getBucketNames().size() == 0 && !cmd.hasOption("a") && !cmd.hasOption("loadkeys")) { System.out.println("No buckets specified to load"); System.exit(1); } @@ -546,7 +546,7 @@ private static Options createOptions() { options.addOption("a", false, "Load or Dump all buckets"); options.addOption("b", true, "Load or Dump a single bucket"); options.addOption("f", true, "Load or Dump a file containing bucket names"); - options.addOption("K", true, "Load or Dump a file containing bucket names and keys"); + options.addOption("loadkeys", true, "Load or Dump a file containing bucket names and keys"); options.addOption("h", true, "Specify Riak Host"); options.addOption("c", true, "Specify a file containing Riak Cluster Host Names"); options.addOption("p", true, "Specify Riak PB Port"); From 4d720348957794a38e3e1d32300ebe73efc8022a Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Fri, 10 May 2013 14:55:15 -0400 Subject: [PATCH 08/14] changing -K to --loadkeys --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fd9bfed..8de7715 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p Dump a subset of keys from Riak: ``` -java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export --loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 ``` Load all buckets previously dumped back into Riak: From 45841a76dc5c25184bcc37b6d430f92bcfdbb1ea Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Fri, 10 May 2013 15:26:47 -0400 Subject: [PATCH 09/14] Upgrading to 0.2.5 --- README | 14 +++++++------- README.md | 29 ++++++++++++++++------------- pom.xml | 2 +- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/README b/README index d6a307b..070985e 100644 --- a/README +++ b/README @@ -5,7 +5,7 @@ one or more buckets from Riak to a local disk and then, likewise, load one or more buckets that have been stored to disk back into Riak. Usage: -java -jar riak-data-migrator-0.2.4.jar [options] +java -jar riak-data-migrator-0.2.5.jar [options] Options: -l Set to Load buckets. Cannot be used with d or k. @@ -46,26 +46,26 @@ Options: Examples: Dump all buckets from Riak -java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ +java -jar riak-data-migrator-0.2.5.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ -H 8098 Load all buckets previously dumped back into Riak -java -jar riak-data-migrator-0.2.4jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 \ +java -jar riak-data-migrator-0.2.5jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 \ -H 8098 Dump bucket settings from a bucket named "Flights": -java -jar riak-data-migrator-0.2.4.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 \ +java -jar riak-data-migrator-0.2.5.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 \ -p 8087 -H 8098 Load bucket settings for a bucket named "Flights": -java -jar riak-data-migrator-0.2.4.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 \ +java -jar riak-data-migrator-0.2.5.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 \ -p 8087 -H 8098 Dump buckets listed in a line delimited file from a Riak cluster -java -jar riak-data-migrator-0.2.4.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ +java -jar riak-data-migrator-0.2.5.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ /var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 Copy all buckets from one riak host to another: -java -jar riak-data-migrator-0.2.4.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ +java -jar riak-data-migrator-0.2.5.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ --copyhost 192.168.1.100 --copypbport 8087 \ No newline at end of file diff --git a/README.md b/README.md index 8de7715..02fe997 100644 --- a/README.md +++ b/README.md @@ -46,13 +46,13 @@ To transfer data from one Riak cluster to another: Downloading: ------------------------ You can download the ready to run jar file at: -http://ps-tools.data.riakcs.net:8080/riak-data-migrator-0.2.4-bin.tar.gz +http://ps-tools.data.riakcs.net:8080/riak-data-migrator-0.2.5-bin.tar.gz After downloading, unzip/untar it, and it's ready to run from its directory. ```bash -tar -xvzf riak-data-migrator-0.2.4-bin.tar.gz -cd riak-data-migrator-0.2.4 -java -jar riak-data-migrator-0.2.4.jar [options] +tar -xvzf riak-data-migrator-0.2.5-bin.tar.gz +cd riak-data-migrator-0.2.5 +java -jar riak-data-migrator-0.2.5.jar [options] ``` Building from source: @@ -67,12 +67,12 @@ mvn package ``` The compiled .jar file is located in the ```target/``` directory. - The usable binary file is ```riak-data-migrator-0.2.4-bin.tar.gz``` + The usable binary file is ```riak-data-migrator-0.2.5-bin.tar.gz``` Usage: ------------------------ Usage: -```java -jar riak-data-migrator-0.2.4.jar [options]``` +```java -jar riak-data-migrator-0.2.5.jar [options]``` Options: ``` @@ -130,38 +130,38 @@ Examples: ------------------------- Dump (the contents of) all buckets from Riak: ``` -java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.5.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098 ``` Dump a subset of keys from Riak: ``` -java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export --loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.5.jar -d -r /var/riak_export --loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 ``` Load all buckets previously dumped back into Riak: ``` -java -jar riak-data-migrator-0.2.4.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.5.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098 ``` Dump (the contents of) buckets listed in a line delimited file from a Riak cluster: ``` -java -jar riak-data-migrator-0.2.4.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ +java -jar riak-data-migrator-0.2.5.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ /var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 ``` Export only the bucket settings from a bucket named "Flights": ``` -java -jar riak-data-migrator-0.2.4.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.5.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 ``` Load bucket settings for a bucket named "Flights": ``` -java -jar riak-data-migrator-0.2.4.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.5.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 ``` Copy all buckets from one riak host to another: ``` -java -jar riak-data-migrator-0.2.4.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 +java -jar riak-data-migrator-0.2.5.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 ``` Caveats: @@ -175,6 +175,9 @@ option to specify a line-delimited list of buckets in a file. Version Notes: ------------------------ +0.2.5 +-Added option to dump a subset of keys + 0.2.4 -Verbose status output is now default -Added option to turn off verbose output diff --git a/pom.xml b/pom.xml index 64f7426..2610485 100644 --- a/pom.xml +++ b/pom.xml @@ -86,5 +86,5 @@ 1.0.9 - 0.2.4 + 0.2.5 \ No newline at end of file From 0747537eab5e8c008a8e847fd223fb80cc6552a4 Mon Sep 17 00:00:00 2001 From: drewkerrigan Date: Wed, 29 May 2013 15:26:55 -0400 Subject: [PATCH 10/14] removing need for output directory for deletes, and making deletes work with multiple buckets --- .../java/com/basho/proserv/datamigrator/BucketDelete.java | 4 ++-- src/main/java/com/basho/proserv/datamigrator/Main.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java b/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java index 8a39269..0cd0928 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java @@ -37,6 +37,8 @@ public long deleteBuckets(Set bucketNames) { for (String bucketName : bucketNames) { objectCount += this.deleteBucket(bucketName); } + + this.connection.close(); return objectCount; } @@ -96,8 +98,6 @@ public long deleteBucket(String bucketName) { this.printStatus(keyCount, objectCount, true); } - this.connection.close(); - return objectCount; } diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 00c0591..8469556 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -102,7 +102,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { Configuration config = new Configuration(); // Data path - if (!cmd.hasOption("copy")) { + if (!cmd.hasOption("copy") && !cmd.hasOption("delete")) { if (cmd.hasOption("r")) { File dataPath = new File(cmd.getOptionValue("r")); if (!dataPath.exists()) { From e69ddb293535cefd96ecaf8a1ade972828f1b104 Mon Sep 17 00:00:00 2001 From: Dmitri Zagidulin Date: Mon, 16 Sep 2013 11:47:50 -0400 Subject: [PATCH 11/14] Added workflow caveat on restoring Search-enabled buckets Also fixed formatting on version notes. --- README.md | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 02fe997..d17e23b 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,10 @@ To transfer data from one Riak cluster to another: on a bucket. 3. Export the contents of a bucket (Riak objects) using the ```-d``` option, to files on disk (the objects will be stored in the binary [ProtoBuf](http://docs.basho.com/riak/latest/references/apis/protocol-buffers/) format) -4. Load the Riak objects from the exported files into the target cluster using the ```-l``` option. +4. (Optional, Search-only) If backing up Search-indexed buckets using Data Migrator versions <= 0.2.5, go into the exported + data directory and delete the internal-use-only indexing buckets (```rm -rf _rsid_*```). See + https://github.com/basho/riak-data-migrator/issues/4 for explanation. +5. Load the Riak objects from the exported files into the target cluster using the ```-l``` option. Downloading: ------------------------ @@ -166,32 +169,33 @@ java -jar riak-data-migrator-0.2.5.jar -copy -r /var/riak_export -a -h 127.0.0.1 Caveats: ------------------------ --This app depends on the key listing operation in the Riak client which + - When backing up + - This app depends on the key listing operation in the Riak client which is slow on a good day. --The Riak memory backend bucket listing operating tends to timeout if + - The Riak memory backend bucket listing operating tends to timeout if any significant amount of data exists. In this case, you have to explicitly specify the buckets you need want to dump using the ```-f``` option to specify a line-delimited list of buckets in a file. + Version Notes: ------------------------ 0.2.5 --Added option to dump a subset of keys + - Added option to dump a subset of keys 0.2.4 --Verbose status output is now default --Added option to turn off verbose output --Logging of final status + - Verbose status output is now default + - Added option to turn off verbose output + - Logging of final status 0.2.3 --Changed internal message passing between threads from Riak Objects to Events for Dump, Load and Copy operations but not Delete. --Added the capability to transfer data directly between clusters --Added the capability to copy a single bucket into a new bucket for the Load or Copy operations. --Changed log level for retry attempts (but not max retries reached) to warn vs error. + - Changed internal message passing between threads from Riak Objects to Events for Dump, Load and Copy operations but not Delete. + - Added the capability to transfer data directly between clusters + - Added the capability to copy a single bucket into a new bucket for the Load or Copy operations. + - Changed log level for retry attempts (but not max retries reached) to warn vs error. 0.2.2 --Changed message passing for Dump partially to Events --Added logic to count the number of value not founds (ie 404s) when reading --Added summary output for value not founds + - Changed message passing for Dump partially to Events + - Added logic to count the number of value not founds (ie 404s) when reading + - Added summary output for value not founds -< 0.2.1 Ancient History From 37ff0defe307b0ca9ed4e7f131687c462c0de235 Mon Sep 17 00:00:00 2001 From: Dan Kerrigan Date: Fri, 31 Jan 2014 16:51:43 -0500 Subject: [PATCH 12/14] Modified to allow for Dumping, Copying, Deleting based on files with keys or bucket,keys --- .gitignore | 2 + README | 71 ------ README.md | 67 ++++-- pom.xml | 2 +- .../proserv/datamigrator/BucketDelete.java | 89 ++++++-- .../proserv/datamigrator/BucketDumper.java | 82 +++---- .../proserv/datamigrator/BucketLoader.java | 5 +- .../proserv/datamigrator/BucketTransfer.java | 87 ++++--- .../proserv/datamigrator/Configuration.java | 31 ++- .../com/basho/proserv/datamigrator/Main.java | 86 ++++++- .../basho/proserv/datamigrator/Utilities.java | 41 +++- .../datamigrator/io/AbstractKeyJournal.java | 214 ++++++++++++++++++ .../datamigrator/io/BucketKeyJournal.java | 53 +++++ .../proserv/datamigrator/io/IKeyJournal.java | 34 +++ .../proserv/datamigrator/io/KeyJournal.java | 168 +------------- .../datamigrator/BucketDumperLoaderTests.java | 13 +- .../riak/datamigrator/io/KeyJournalTests.java | 46 +++- .../java/com/basho/util/UtilitiesTests.java | 81 +++++++ 18 files changed, 785 insertions(+), 387 deletions(-) create mode 100644 .gitignore delete mode 100644 README create mode 100644 src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java create mode 100644 src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java create mode 100644 src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java create mode 100644 src/test/java/com/basho/util/UtilitiesTests.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9729d0d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +riak-data-migrator.iml diff --git a/README b/README deleted file mode 100644 index 070985e..0000000 --- a/README +++ /dev/null @@ -1,71 +0,0 @@ -riak-data-migrator - -This tool allows a user to connect to a Riak host or Riak cluster and copy -one or more buckets from Riak to a local disk and then, likewise, load one -or more buckets that have been stored to disk back into Riak. - -Usage: -java -jar riak-data-migrator-0.2.5.jar [options] - -Options: --l Set to Load buckets. Cannot be used with d or k. --d Set to Dump buckets. Cannot be used with l or k. Must specify one or more buckets ---copy Set to Copy buckets to one cluster to another. Cannot be used with d, k, l, k or delete. --k Set to Dump bucket keys. Cannot be used with d or l. Cannot be used with t. --t Transfer bucket properties. Will dump or load bucket properties instead of data. - Cannot be used with k. ---delete Delete bucket data. cannot be used with d, l, k, or t --r Set the path for data to be loaded to or dumped from. - The path must exist and is required. --a Load or Dump all buckets. Cannot be used with delete --b Load or Dump a single bucket. --f Load or Dump a file containing line delimited - bucket names ---loadkeys Export multiple keys listed in a file - (containing line-delimited bucket,key names) --h Specify Riak host. Required if a cluster host name file is - not specified. --c Specify a file containing line delimited Riak - Cluster Host Names. Required if a host name is not specified. - host name is not specified. --p Specify Riak Protocol Buffers Port. If not specified, defaults to 8087. --H Specify Riak HTTP Port. If not specified, defaults to 8098. ---copyhost Specify destination Riak host for *copy* operation ---copyhostsfile Specify a file containing Cluster Host Names. Req'd - if a single copyhost not specified. ---copypbport Specify destination protocol buffers port, defaults to 8087. --v Output verbose status output to the command line. Default. --s Turn off verbose status. --q Specify the queue size, especially if working with larger object sizes. There are - at most 2 queues for Load/Dump operations. - ---riakworkercount Specify the number of workers used to read from/write - to Riak. ---maxriakconnections Specify the number of connections to maintain - in the Riak connection pool. - -Examples: -Dump all buckets from Riak -java -jar riak-data-migrator-0.2.5.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ --H 8098 - -Load all buckets previously dumped back into Riak -java -jar riak-data-migrator-0.2.5jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 \ --H 8098 - - -Dump bucket settings from a bucket named "Flights": -java -jar riak-data-migrator-0.2.5.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 \ --p 8087 -H 8098 - -Load bucket settings for a bucket named "Flights": -java -jar riak-data-migrator-0.2.5.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 \ --p 8087 -H 8098 - -Dump buckets listed in a line delimited file from a Riak cluster -java -jar riak-data-migrator-0.2.5.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ -/var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 - -Copy all buckets from one riak host to another: -java -jar riak-data-migrator-0.2.5.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ ---copyhost 192.168.1.100 --copypbport 8087 \ No newline at end of file diff --git a/README.md b/README.md index d17e23b..9d244c9 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ To transfer data from one Riak cluster to another: on a bucket. 3. Export the contents of a bucket (Riak objects) using the ```-d``` option, to files on disk (the objects will be stored in the binary [ProtoBuf](http://docs.basho.com/riak/latest/references/apis/protocol-buffers/) format) -4. (Optional, Search-only) If backing up Search-indexed buckets using Data Migrator versions <= 0.2.5, go into the exported +4. (Optional, Search-only) If backing up Search-indexed buckets using Data Migrator versions <= 0.2.6, go into the exported data directory and delete the internal-use-only indexing buckets (```rm -rf _rsid_*```). See https://github.com/basho/riak-data-migrator/issues/4 for explanation. 5. Load the Riak objects from the exported files into the target cluster using the ```-l``` option. @@ -49,13 +49,13 @@ To transfer data from one Riak cluster to another: Downloading: ------------------------ You can download the ready to run jar file at: -http://ps-tools.data.riakcs.net:8080/riak-data-migrator-0.2.5-bin.tar.gz +http://ps-tools.data.riakcs.net:8080/riak-data-migrator-0.2.6-bin.tar.gz After downloading, unzip/untar it, and it's ready to run from its directory. ```bash -tar -xvzf riak-data-migrator-0.2.5-bin.tar.gz -cd riak-data-migrator-0.2.5 -java -jar riak-data-migrator-0.2.5.jar [options] +tar -xvzf riak-data-migrator-0.2.6-bin.tar.gz +cd riak-data-migrator-0.2.6 +java -jar riak-data-migrator-0.2.6.jar [options] ``` Building from source: @@ -70,14 +70,16 @@ mvn package ``` The compiled .jar file is located in the ```target/``` directory. - The usable binary file is ```riak-data-migrator-0.2.5-bin.tar.gz``` + The usable binary file is ```riak-data-migrator-0.2.6-bin.tar.gz``` Usage: ------------------------ Usage: -```java -jar riak-data-migrator-0.2.5.jar [options]``` + +```java -jar riak-data-migrator-0.2.6.jar [options]``` Options: + ``` Data Transfer (required, one of: d, l, k, or delete) -d Export (Dump) the contents bucket(s) (keys and objects), in ProtoBuf format, to files @@ -99,7 +101,8 @@ Bucket Options (required for -d, -k or -t) -a Export all buckets. -b Export a single bucket. -f Export multiple buckets listed in a file (containing line-delimited bucket names) ---loadkeys Export multiple keys listed in a file (containing line-delimited bucket,key names) +--loadkeys Specify multiple keys listed in a file (containing line-delimited bucket,key names). Not functional with the -l option. +--bucketkeys Specify keys listed in a line-delimited file in conjunction with the -b option to specify which bucket the keys belong to. Not functional with the -l option. Cluster Addresses and Ports (required) -h Specify Riak hostname. Required if a cluster host name file is not specified. @@ -131,42 +134,71 @@ Copy Settings Examples: ------------------------- + Dump (the contents of) all buckets from Riak: + ``` -java -jar riak-data-migrator-0.2.5.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.6.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098 ``` Dump a subset of keys from Riak: + ``` -java -jar riak-data-migrator-0.2.5.jar -d -r /var/riak_export --loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.6.jar -d -r /var/riak_export --loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 ``` Load all buckets previously dumped back into Riak: + ``` -java -jar riak-data-migrator-0.2.5.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.6.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098 ``` Dump (the contents of) buckets listed in a line delimited file from a Riak cluster: + +``` +java -jar riak-data-migrator-0.2.6.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ +/var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 +``` + +Dump (then contents of) buckets based on the bucket,keys listed in a file, bucket_keys.csv: + +``` +java -jar riak-data-migrator-0.2.6.jar -d --loadkeys bucket_keys.txt -r \ +/var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 +``` + +Dump (then contents of) a single bucket, A_BUCKET, based on the keys listed in a file, keys.txt: + ``` -java -jar riak-data-migrator-0.2.5.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ +java -jar riak-data-migrator-0.2.6.jar -d -b A_BUCKET --bucketkeys keys.txt -r \ /var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 ``` Export only the bucket settings from a bucket named "Flights": + ``` -java -jar riak-data-migrator-0.2.5.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.6.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 ``` Load bucket settings for a bucket named "Flights": + ``` -java -jar riak-data-migrator-0.2.5.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +java -jar riak-data-migrator-0.2.6.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 ``` Copy all buckets from one riak host to another: + ``` -java -jar riak-data-migrator-0.2.5.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 +java -jar riak-data-migrator-0.2.6.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 ``` +Copy a single bucket (A_BUCKET) to bucket (B_BUCKET) on the same host + +``` +java -jar riak-data-migrator-0.2.6.jar -copy -r /var/riak_export -b A_BUCKET -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 --destinationbucket B_BUCKET +``` + + Caveats: ------------------------ - When backing up @@ -180,6 +212,11 @@ option to specify a line-delimited list of buckets in a file. Version Notes: ------------------------ +0.2.6 + - Subset of keys can now be dumped, copied, or deleted. + - In addition to previous capability to specify a file in bucket,key\n format, you can now just have a file with keys + as long as a single bucket is also specified + 0.2.5 - Added option to dump a subset of keys diff --git a/pom.xml b/pom.xml index 2610485..199ae41 100644 --- a/pom.xml +++ b/pom.xml @@ -86,5 +86,5 @@ 1.0.9 - 0.2.5 + 0.2.6 \ No newline at end of file diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java b/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java index 0cd0928..9c52bd6 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java @@ -2,8 +2,10 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Set; +import com.basho.proserv.datamigrator.io.IKeyJournal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +28,28 @@ public BucketDelete(Connection connection, boolean verboseOutput) { this.connection = connection; this.verboseOutput = verboseOutput; } - + + public long deleteKeys(IKeyJournal keyJournal) { + if (!this.connection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; + + File dataRoot = this.getTemporaryPath(true); + if (dataRoot == null) { + return -1; + } + + Map keyJournals = Utilities.splitKeys(dataRoot, keyJournal); + + for (String bucketName : keyJournals.keySet()) { + objectCount += deleteBucket(bucketName, keyJournals.get(bucketName)); + } + + return objectCount; + } + public long deleteBuckets(Set bucketNames) { if (bucketNames == null || bucketNames.size() == 0) { throw new IllegalArgumentException("bucketNames must not be null and must not be sized 0"); @@ -42,8 +65,27 @@ public long deleteBuckets(Set bucketNames) { return objectCount; } - - public long deleteBucket(String bucketName) { + + public long deleteBucket(String bucketName) { + File keyPath = this.getTemporaryPath(false); + if (keyPath == null) { + return -1; + } + + try { + dumpBucketKeys(bucketName, keyPath); + } catch (IOException e) { + log.error("Error listing keys", e); + this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); + return -2; + } + + KeyJournal keys = new KeyJournal(keyPath, KeyJournal.Mode.READ); + + return deleteBucket(bucketName, keys); + } + + public long deleteBucket(String bucketName, IKeyJournal keyJournal) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName must not be null or empty"); } @@ -53,27 +95,10 @@ public long deleteBucket(String bucketName) { } long objectCount = 0; - File keyPath = null; - try { - keyPath = File.createTempFile("riak-data-migrator", "bucketName"); - keyPath.deleteOnExit(); - } catch (IOException e){ - log.error("Could not create temporary key list file", e); - return -1; - } - - long start = System.currentTimeMillis(); - long keyCount = 0; - try { - keyCount = dumpBucketKeys(bucketName, keyPath); - } catch (IOException e) { - log.error("Error listing keys", e); - this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); - return -2; - } - - KeyJournal keys = new KeyJournal(keyPath, KeyJournal.Mode.READ); - AbstractClientDataDeleter deleter = new ThreadedClientDataDeleter(connection, keys); + long start = System.currentTimeMillis(); + + long keyCount = keyJournal.getKeyCount(); + AbstractClientDataDeleter deleter = new ThreadedClientDataDeleter(connection, keyJournal); try { @SuppressWarnings("unused") @@ -112,6 +137,22 @@ public long dumpBucketKeys(String bucketName, File filePath) throws IOException keyJournal.close(); return keyCount; } + + private File getTemporaryPath(boolean directory) { + File keyPath = null; + try { + keyPath = File.createTempFile("riak-data-migrator", "bucketName"); + if (directory) { + keyPath.delete(); + keyPath.mkdir(); + } + keyPath.deleteOnExit(); + } catch (IOException e){ + log.error("Could not create temporary key list file", e); + } + + return keyPath; + } private void printStatus(long keyCount, long objectCount, boolean force) { long end = System.currentTimeMillis(); diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index b27adbd..69349a4 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -2,8 +2,6 @@ import java.io.File; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -12,6 +10,7 @@ import com.basho.proserv.datamigrator.events.Event; import com.basho.proserv.datamigrator.events.RiakObjectEvent; +import com.basho.proserv.datamigrator.io.IKeyJournal; import com.basho.proserv.datamigrator.io.KeyJournal; import com.basho.proserv.datamigrator.io.RiakObjectBucket; import com.basho.proserv.datamigrator.riak.AbstractClientDataReader; @@ -102,40 +101,24 @@ public long dumpBuckets(Set bucketNames, boolean resume, boolean keysOnl return objectCount; } - public long dumpKeys(Set bucketKeyNames) { - if (!this.connection.connected()) { - log.error("Not connected to Riak"); - return 0; - } - int objectCount = 0; - - Map> bucketKeysMap = new HashMap>(); + public long dumpKeys(IKeyJournal keyJournal) { + if (!this.connection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; - //Scan the list and build a map - for (String bucketKeyName : bucketKeyNames) { - String[] bucketKeyNamesArr = bucketKeyName.split(","); + Map keyJournals = Utilities.splitKeys(this.dataRoot, keyJournal); - if (bucketKeyNamesArr.length != 2) { - throw new IllegalArgumentException("Key file lines must have the format: bucket,key"); - } - - if(!bucketKeysMap.containsKey(bucketKeyNamesArr[0])) { - Set keySet = new HashSet(); - bucketKeysMap.put(bucketKeyNamesArr[0], keySet); - } - - bucketKeysMap.get(bucketKeyNamesArr[0]).add(bucketKeyNamesArr[1]); - } + for (String bucketName : keyJournals.keySet()) { + objectCount += dumpBucket(bucketName, keyJournals.get(bucketName), false, false); + } - for (Map.Entry> aMapEntry : bucketKeysMap.entrySet()) { - - objectCount += dumpBucket(aMapEntry.getKey(), aMapEntry.getValue() , false, false); - } - return objectCount; - } + return objectCount; + } // resume is unimplemented - public long dumpBucket(String bucketName, Iterable keys, boolean resume, boolean keysOnly) { + public long dumpBucket(String bucketName, KeyJournal keys, boolean resume, boolean keysOnly) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName cannot be null or empty"); } @@ -164,29 +147,32 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, File keyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/bucketkeys.keys"); File dumpedKeyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/dumpedkeys.keys"); - KeyJournal writeSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); - KeyJournal readSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.READ); - KeyJournal writeDestinationKeyJournal = new KeyJournal(dumpedKeyPath, KeyJournal.Mode.WRITE); + IKeyJournal writeDestinationKeyJournal = new KeyJournal(dumpedKeyPath, KeyJournal.Mode.WRITE); try { // If no key subset is specified, get the entire key set if (keys == null) { - keys = this.connection.riakClient.listKeys(bucketName); - } - - writeSourceKeyJournal.populate(bucketName, keys); - writeSourceKeyJournal.close(); + IKeyJournal writeSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); + Iterable listedKeys = this.connection.riakClient.listKeys(bucketName); + writeSourceKeyJournal.populate(bucketName, listedKeys); + writeSourceKeyJournal.close(); + if (this.verboseStatusOutput) { + this.printStatus(writeSourceKeyJournal.getKeyCount(), objectCount, false); + } + if (keysOnly) { + String bucketNameKeys= String.format("%s keys", bucketName); + this.summary.addStatistic(bucketNameKeys, writeSourceKeyJournal.getKeyCount(), System.currentTimeMillis()-start, 0l, 0l); + return writeSourceKeyJournal.getKeyCount(); + } + } } catch (IOException e){ log.error("Error listing keys for bucket " + bucketName, e); this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); return 0; } - if (keysOnly) { - String bucketNameKeys= String.format("%s keys", bucketName); - this.summary.addStatistic(bucketNameKeys, writeSourceKeyJournal.getWriteCount(), System.currentTimeMillis()-start, 0l, 0l); - return writeSourceKeyJournal.getWriteCount(); - } - + + IKeyJournal readSourceKeyJournal = keys == null ? new KeyJournal(keyPath, KeyJournal.Mode.READ) : keys; + // this.saveBucketSettings(bucketName, dumpBucket.getFileRoot()); try { @@ -210,9 +196,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, throw new IOException(event.asIoErrorEvent().ioException()); } - if (this.verboseStatusOutput) { - this.printStatus(writeSourceKeyJournal.getWriteCount(), objectCount, false); - } + } } catch (IOException e) { log.error("Riak error dumping objects for bucket: " + bucketName, e); @@ -238,7 +222,7 @@ public long dumpBucket(String bucketName, Iterable keys, boolean resume, } if (this.verboseStatusOutput) { - this.printStatus(writeSourceKeyJournal.getWriteCount(), objectCount, true); + this.printStatus(readSourceKeyJournal.getKeyCount(), objectCount, true); } return objectCount; diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java b/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java index 7b677fd..e18f0fd 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java @@ -10,6 +10,7 @@ import com.basho.proserv.datamigrator.events.Event; import com.basho.proserv.datamigrator.io.Key; +import com.basho.proserv.datamigrator.io.IKeyJournal; import com.basho.proserv.datamigrator.io.KeyJournal; import com.basho.proserv.datamigrator.io.RiakObjectBucket; import com.basho.proserv.datamigrator.riak.AbstractClientDataWriter; @@ -125,8 +126,8 @@ public long LoadBucket(String bucketName) { new ThreadedClientDataWriter(connection, clientWriterFactory, dumpBucket, this.config.getRiakWorkerCount(), this.config.getQueueSize()); - KeyJournal keyJournal = new KeyJournal( - KeyJournal.createKeyPathFromPath(new File(this.createBucketPath(bucketName, true) + "/keys" ), true), + IKeyJournal keyJournal = new KeyJournal( + KeyJournal.createKeyPathFromPath(new File(this.createBucketPath(bucketName, true) + "/keys" ), true), KeyJournal.Mode.WRITE); try { diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java b/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java index daeabee..0bf670e 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java @@ -2,6 +2,7 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Set; import org.slf4j.Logger; @@ -9,6 +10,7 @@ import com.basho.proserv.datamigrator.events.Event; import com.basho.proserv.datamigrator.events.RiakObjectEvent; +import com.basho.proserv.datamigrator.io.IKeyJournal; import com.basho.proserv.datamigrator.io.KeyJournal; import com.basho.proserv.datamigrator.riak.ClientReaderFactory; import com.basho.proserv.datamigrator.riak.ClientWriterFactory; @@ -44,7 +46,24 @@ public BucketTransfer(Connection from, this.verboseStatusOutput = this.configuration.getVerboseStatus(); } - + + public long transferKeys(IKeyJournal keyJournal) { + if (!this.fromConnection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; + + File dataRoot = this.getTemporaryPath(true); + Map keyJournals = Utilities.splitKeys(dataRoot, keyJournal); + + for (String bucketName : keyJournals.keySet()) { + objectCount += transferBucket(bucketName, keyJournals.get(bucketName), false); + } + + return objectCount; + } + public long transferAllBuckets(boolean resume) { Set buckets = null; if (this.fromConnection.connected()) { @@ -74,8 +93,27 @@ public long transferBuckets(Set bucketNames, boolean resume) { } return objectCount; } - - public long transferBucket(String bucketName, boolean resume) { + + public long transferBucket(String bucketName, boolean resume) { + File keyPath = getTemporaryPath(false); + if (keyPath == null) { + return -1; + } + + try { + dumpBucketKeys(bucketName, keyPath); + } catch (IOException e) { + log.error("Error listing keys", e); + this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); + return -2; + } + + KeyJournal bucketKeys = new KeyJournal(keyPath, KeyJournal.Mode.READ); + + return transferBucket(bucketName, bucketKeys, resume); + } + + public long transferBucket(String bucketName, IKeyJournal keyJournal, boolean resume) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName cannot be null or empty"); } @@ -88,28 +126,9 @@ public long transferBucket(String bucketName, boolean resume) { long valueErrorCount = 0; long dataSize = 0; long start = System.currentTimeMillis(); - long keyCount = 0; + long keyCount = keyJournal.getKeyCount(); boolean error = false; - - File keyPath = null; - try { - keyPath = File.createTempFile("riak-data-migrator", bucketName); - keyPath.deleteOnExit(); - } catch (IOException e){ - log.error("Could not create temporary key list file", e); - return -1; - } - - try { - keyCount = dumpBucketKeys(bucketName, keyPath); - } catch (IOException e) { - log.error("Error listing keys", e); - this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); - return -2; - } - - KeyJournal bucketKeys = new KeyJournal(keyPath, KeyJournal.Mode.READ); - + ClientWriterFactory clientWriterFactory = new ClientWriterFactory(); clientWriterFactory.setBucketRename(this.configuration.getDestinationBucket()); @@ -117,7 +136,7 @@ public long transferBucket(String bucketName, boolean resume) { // self closing ThreadedClientDataReader reader = new ThreadedClientDataReader(fromConnection, new ClientReaderFactory(), - bucketKeys, + keyJournal, this.configuration.getRiakWorkerCount(), this.configuration.getQueueSize()); @@ -170,7 +189,7 @@ public long transferBucket(String bucketName, boolean resume) { } private long dumpBucketKeys(String bucketName, File filePath) throws IOException { - KeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); + IKeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); long keyCount = 0; Iterable keys = this.fromConnection.riakClient.listKeys(bucketName); for (String keyString : keys) { @@ -180,6 +199,22 @@ private long dumpBucketKeys(String bucketName, File filePath) throws IOException keyJournal.close(); return keyCount; } + + private File getTemporaryPath(boolean directory) { + File keyPath = null; + try { + keyPath = File.createTempFile("riak-data-migrator", "bucketName"); + if (directory) { + keyPath.delete(); + keyPath.mkdir(); + } + keyPath.deleteOnExit(); + } catch (IOException e){ + log.error("Could not create temporary key list file", e); + } + + return keyPath; + } private void printStatus(long keyCount, long objectCount, boolean force) { long end = System.currentTimeMillis(); diff --git a/src/main/java/com/basho/proserv/datamigrator/Configuration.java b/src/main/java/com/basho/proserv/datamigrator/Configuration.java index c070f19..8d1a266 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Configuration.java +++ b/src/main/java/com/basho/proserv/datamigrator/Configuration.java @@ -1,5 +1,7 @@ package com.basho.proserv.datamigrator; +import com.basho.proserv.datamigrator.io.IKeyJournal; + import java.io.File; import java.util.Collection; import java.util.HashSet; @@ -21,8 +23,11 @@ public static enum Operation { ALL_BUCKETS, KEYS, BUCKET_PROPERTIES, DELETE_BUCKETS, + DELETE_KEYS, COPY_ALL, - COPY_BUCKETS }; + COPY_BUCKETS, + COPY_KEYS + }; @@ -40,7 +45,9 @@ public static enum Operation { ALL_BUCKETS, private int queueSize = DEFAULT_QUEUE_SIZE; private Set bucketNames = new HashSet(); - private Set keyNames = new HashSet(); +// private Set keyNames = new HashSet(); + + private IKeyJournal keyJournal = null; private boolean verboseStatus = true; private boolean resetVClock = false; @@ -143,12 +150,20 @@ public Set getBucketNames() { return this.bucketNames; } - public void addKeyNames(Collection keys) { - this.keyNames.addAll(keys); - } - public Set getKeyNames() { - return this.keyNames; - } + public void setKeyJournal(IKeyJournal keyJournal) { + this.keyJournal = keyJournal; + } + + public IKeyJournal getKeyJournal() { + return this.keyJournal; + } + +// public void addKeyNames(Collection keys) { +// this.keyNames.addAll(keys); +// } +// public Set getKeyNames() { +// return this.keyNames; +// } public void setVerboseStatus(boolean verboseStatus) { this.verboseStatus = verboseStatus; diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 8469556..0245925 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -3,6 +3,9 @@ import java.io.File; import java.util.Map; +import com.basho.proserv.datamigrator.io.AbstractKeyJournal; +import com.basho.proserv.datamigrator.io.BucketKeyJournal; +import com.basho.proserv.datamigrator.io.KeyJournal; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -77,6 +80,23 @@ public static void main(String[] args) { if (cmd.hasOption("copy") && (cmd.hasOption('t') || cmd.hasOption('k'))) { System.out.println("Copy not compatible with t or k options."); } + + if (cmd.hasOption("loadkeys") && cmd.hasOption("bucketkeys")) { + System.out.println("Cannot combine loadkeys and bucketkeys options."); + System.exit(1); + } + +// if ((cmd.hasOption("loadkeys") || cmd.hasOption("bucketkeys")) && !cmd.hasOption("r")) { +// System.out.println("loadkeys and bucketkeys require an output directory (r option)"); +// } + if ((cmd.hasOption("loadkeys") || cmd.hasOption("bucketkeys")) && cmd.hasOption("l")) { + System.out.println("loadkeys and bucketkeys cannot be used with the load (l) option"); + } + + if ((cmd.hasOption("loadkeys") || cmd.hasOption("bucketkeys")) && + !(cmd.hasOption("d") || cmd.hasOption("copy") | cmd.hasOption("delete"))) { + System.out.println("loadkeys and bucketkeys must be used with dump, copy, or delete options (d, copy, delete"); + } Configuration config = handleCommandLine(cmd); @@ -92,10 +112,10 @@ public static void main(String[] args) { runLoader(config); } - if (cmd.hasOption("d") || cmd.hasOption("loadkeys") || (cmd.hasOption("d") && cmd.hasOption("t"))) { + if (cmd.hasOption("d") || (cmd.hasOption("d") && cmd.hasOption("t"))) { runDumper(config); } - + } public static Configuration handleCommandLine(CommandLine cmd) { @@ -183,7 +203,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { } // Destination PB port - if (cmd.hasOption("copypbport")) { + if (cmd.hasOption("copy") && cmd.hasOption("copypbport")) { try { config.setPort(Integer.parseInt(cmd.getOptionValue("copypbport"))); } catch (NumberFormatException e) { @@ -214,14 +234,38 @@ public static Configuration handleCommandLine(CommandLine cmd) { if (cmd.hasOption("loadkeys")) { try { String fileName = cmd.getOptionValue("loadkeys"); - config.addKeyNames(Utilities.readFileLines(fileName)); - config.setOperation(Configuration.Operation.KEYS); + File path = new File(fileName); + + KeyJournal keyJournal = new KeyJournal(path, KeyJournal.Mode.READ); + + config.setKeyJournal(keyJournal); + + config.setOperation(Configuration.Operation.KEYS); + +// config.addKeyNames(Utilities.readFileLines(fileName)); } catch (Exception e) { System.out.println("Could not read file containing list of bucket,keys"); System.exit(1); } } + if (cmd.hasOption("bucketkeys")) { + + if (config.getBucketNames().size() == 1) { + String fileName = cmd.getOptionValue("bucketkeys"); + File path = new File(fileName); + String bucketName = config.getBucketNames().iterator().next(); + BucketKeyJournal keyJournal = new BucketKeyJournal(path, KeyJournal.Mode.READ, bucketName); + + config.setKeyJournal(keyJournal); + + config.setOperation(Configuration.Operation.KEYS); + } else { + System.out.println("bucketkeys only a valid option when specifying a single bucket"); + System.exit(1); + } + } + // Keys only if (cmd.hasOption("k")) { // if keys only.... config.setOperation(Configuration.Operation.BUCKET_KEYS); @@ -254,13 +298,28 @@ public static Configuration handleCommandLine(CommandLine cmd) { if (config.getBucketNames().size() > 0) { config.setOperation(Configuration.Operation.DELETE_BUCKETS); } + if (cmd.hasOption("bucketkeys") && config.getBucketNames().size() == 1) { + config.setOperation(Configuration.Operation.DELETE_KEYS); + } + if (cmd.hasOption("loadkeys")) { + config.setOperation(Configuration.Operation.DELETE_KEYS); + } } - + + // Subset copy + if (cmd.hasOption("copy") && cmd.hasOption("loadkeys")) { + config.setOperation(Configuration.Operation.COPY_KEYS); + } + + if (cmd.hasOption("copy") && cmd.hasOption("bucketkeys") && config.getBucketNames().size() == 1) { + config.setOperation(Configuration.Operation.COPY_KEYS); + } + // Bucket Copy - if (cmd.hasOption("copy") && config.getBucketNames().size() > 0) { + if (cmd.hasOption("copy") && config.getBucketNames().size() > 0 && !cmd.hasOption("bucketkeys") && !cmd.hasOption("loadkeys")) { config.setOperation(Configuration.Operation.COPY_BUCKETS); } - if (cmd.hasOption("copy") && cmd.hasOption("a")) { + if (cmd.hasOption("copy") && cmd.hasOption("a") && !cmd.hasOption("bucketkeys")) { config.setOperation(Configuration.Operation.COPY_ALL); } @@ -327,7 +386,9 @@ public static void runDelete(Configuration config) { long deleteCount = 0; if (config.getOperation() == Configuration.Operation.DELETE_BUCKETS) { deleteCount = deleter.deleteBuckets(config.getBucketNames()); - } + } else if (config.getOperation() == Configuration.Operation.DELETE_KEYS) { + deleteCount = deleter.deleteKeys(config.getKeyJournal()); + } connection.close(); @@ -410,7 +471,7 @@ public static void runDumper(Configuration config) { } else if (config.getOperation() == Configuration.Operation.BUCKET_PROPERTIES) { dumpCount = dumper.dumpBucketSettings(config.getBucketNames()); } else if (config.getOperation() == Configuration.Operation.KEYS) { - dumpCount = dumper.dumpKeys(config.getKeyNames()); + dumpCount = dumper.dumpKeys(config.getKeyJournal()); } else { dumpCount = dumper.dumpAllBuckets(config.getResume(), keysOnly); } @@ -454,7 +515,9 @@ private static void runCopy(Configuration config) { long transferCount = 0; if (config.getOperation() == Configuration.Operation.COPY_BUCKETS) { transferCount = mover.transferBuckets(config.getBucketNames(), false); - } else { + } else if (config.getOperation() == Configuration.Operation.COPY_KEYS) { + transferCount = mover.transferKeys(config.getKeyJournal()); + } else { transferCount = mover.transferAllBuckets(false); } @@ -547,6 +610,7 @@ private static Options createOptions() { options.addOption("b", true, "Load or Dump a single bucket"); options.addOption("f", true, "Load or Dump a file containing bucket names"); options.addOption("loadkeys", true, "Load or Dump a file containing bucket names and keys"); + options.addOption("bucketkeys", true, "Load or Dump a file containing keys. Bucket must be specified"); options.addOption("h", true, "Specify Riak Host"); options.addOption("c", true, "Specify a file containing Riak Cluster Host Names"); options.addOption("p", true, "Specify Riak PB Port"); diff --git a/src/main/java/com/basho/proserv/datamigrator/Utilities.java b/src/main/java/com/basho/proserv/datamigrator/Utilities.java index 3d9c875..fd193f3 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Utilities.java +++ b/src/main/java/com/basho/proserv/datamigrator/Utilities.java @@ -1,17 +1,48 @@ package com.basho.proserv.datamigrator; +import com.basho.proserv.datamigrator.io.AbstractKeyJournal; +import com.basho.proserv.datamigrator.io.IKeyJournal; +import com.basho.proserv.datamigrator.io.Key; +import com.basho.proserv.datamigrator.io.KeyJournal; + import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; public class Utilities { + public static Map splitKeys(File basePath, IKeyJournal keyJournal) { + Map journals = new HashMap(); + Map readJournals = new HashMap(); + + + try { + for (Key key : keyJournal) { + String bucketName = key.bucket(); + if (!journals.containsKey(bucketName)) { + File bucketPath = new File(basePath.getAbsolutePath() + "/" + bucketName); + bucketPath.mkdir(); + File keyFile = new File(bucketPath.getAbsolutePath() + "/bucketkeys.keys"); + journals.put(key.bucket(), new KeyJournal(keyFile, KeyJournal.Mode.WRITE)); + } + + journals.get(bucketName).write(key); + } + + for (String bucketName: journals.keySet()) { + journals.get(bucketName).close(); + File bucketKeys = new File(basePath.getAbsolutePath() + "/" + bucketName + "/" + "bucketkeys.keys"); + readJournals.put(bucketName, new KeyJournal(bucketKeys, KeyJournal.Mode.READ)); + } + } catch (IOException ex) { + return null; + } + return readJournals; + } + public static List readFileLines(String filename) throws IOException, FileNotFoundException { List lines = new ArrayList(); File file = new File(filename); @@ -50,4 +81,6 @@ public static String urlDecode(String input) { return input; } } + + } diff --git a/src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java new file mode 100644 index 0000000..cadb5e6 --- /dev/null +++ b/src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java @@ -0,0 +1,214 @@ +package com.basho.proserv.datamigrator.io; + +import com.basho.proserv.datamigrator.Utilities; +import com.basho.riak.client.IRiakObject; +import com.basho.riak.pbc.RiakObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.Iterator; + + +public abstract class AbstractKeyJournal implements Iterable, IKeyJournal { + private final Logger log = LoggerFactory.getLogger(AbstractKeyJournal.class); + + public enum Mode { READ, WRITE } + + protected final File path; + protected final Mode mode; + protected final BufferedWriter writer; + protected final BufferedReader reader; + private long keyCount; + private boolean closed = false; + + public AbstractKeyJournal(File path, Mode mode) { + if (path == null) { + throw new IllegalArgumentException("path cannot be null"); + } + this.path = path; + try { + if (mode == Mode.WRITE) { + this.writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path))); + this.keyCount = 0; + this.reader = null; + } else { + this.reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); + this.keyCount = -1; + this.writer = null; + } + } catch (FileNotFoundException e) { + throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); + } + + this.mode = mode; + } + + public void populate(String bucketName, Iterable keys) throws IOException { + for (String keyString : keys) { + this.write(bucketName, keyString); + } + } + + public long getKeyCount() { + if (this.mode == Mode.READ && keyCount == -1) { + try { + this.keyCount = this.countKeys(this.path); + } catch (IOException ex) { + log.error("Could not read key file for counting"); + } + } + return this.keyCount; + } + + @Override + public void write(Key key) throws IOException { + if (key == null) { + throw new IllegalArgumentException("key must not be null"); + } + this.write(key.bucket(), key.key()); + } + + @Override + public void write(String bucket, String key) throws IOException { + if (mode == Mode.READ) { + throw new IllegalArgumentException ("KeyJournal is in READ mode for write operation"); + } + if (bucket == null || key == null) { + throw new IllegalArgumentException("bucket and key must not be null"); + } + this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); + this.keyCount++; + } + + @Override + public void write(RiakObject riakObject) throws IOException { + this.write(riakObject.getBucket(), riakObject.getKey()); + } + + @Override + public void write(IRiakObject riakObject) throws IOException { + this.write(riakObject.getBucket(), riakObject.getKey()); + } + + @Override + public Key read() throws IOException { + if (mode == Mode.WRITE) { + throw new IllegalArgumentException("KeyJournal is in WRITE mode for read operation"); + } + String line = this.reader.readLine(); + if (line == null) { + return null; + } + String[] values = new String[2]; + int comma = line.indexOf(','); + if (comma != -1) { + values[0] = line.substring(0, comma); + values[1] = line.substring(comma + 1, line.length()); + return new Key(Utilities.urlDecode(values[0]), Utilities.urlDecode(values[1])); + } + return null; + } + + public void close() { + try { + if (this.writer != null) { + this.writer.flush(); + this.writer.close(); + } + if (this.reader != null) { + this.reader.close(); + } + } catch (IOException e) { + // no-op, swallow + } + this.closed = true; + } + + public boolean isClosed() { + return this.closed; + } + + public static File createKeyPathFromPath(File file, boolean load) { + String path = file.getAbsolutePath(); + int ind = path.lastIndexOf('.'); + if (ind == -1) { + ind = path.length()-1; + } + path = path.substring(0, ind); + if (load) { + path = path + ".loadedkeys"; + } else { + path = path + ".keys"; + } + return new File(path); + } + + protected long countKeys(File path) throws IOException, FileNotFoundException { + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); + + long count = 0; + while (reader.readLine() != null) { + ++count; + } + + reader.close(); + + return count; + } + + @Override + public Iterator iterator() { + return new KeyIterator(this); + } + + private class KeyIterator implements Iterator { + private final IKeyJournal keyJournal; + + private Key nextKey; + + public KeyIterator(IKeyJournal keyJournal) { + this.keyJournal = keyJournal; + try { + this.nextKey = keyJournal.read(); + } catch (IOException e) { + this.nextKey = null; + } + } + + @Override + public boolean hasNext() { + return this.nextKey != null; + } + + @Override + public Key next() { + Key currentKey = this.nextKey; + try { + this.nextKey = this.keyJournal.read(); + } catch (IOException e) { + this.nextKey = null; + } + if (currentKey == null && this.nextKey == null) { + currentKey = Key.createErrorKey(); + } + return currentKey; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } + +} + diff --git a/src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java new file mode 100644 index 0000000..a4a1e6c --- /dev/null +++ b/src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java @@ -0,0 +1,53 @@ +package com.basho.proserv.datamigrator.io; + +import com.basho.proserv.datamigrator.Utilities; +import com.basho.riak.client.IRiakObject; +import com.basho.riak.pbc.RiakObject; + +import java.io.File; +import java.io.IOException; + +/** + * Created with IntelliJ IDEA. + * User: dankerrigan + * Date: 1/28/14 + * Time: 11:59 AM + * To change this template use File | Settings | File Templates. + */ +public class BucketKeyJournal extends AbstractKeyJournal { + private final String bucketName; + + public BucketKeyJournal(File path, Mode mode, String bucketName) { + super(path, mode); + + this.bucketName = bucketName; + } + + public void write(Key key) throws IOException { + throw new UnsupportedOperationException(); + } + + public void write(String bucket, String key) throws IOException { + throw new UnsupportedOperationException(); + } + + public void write(RiakObject riakObject) throws IOException { + throw new UnsupportedOperationException(); + } + + public void write(IRiakObject riakObject) throws IOException { + throw new UnsupportedOperationException(); + } + + public Key read() throws IOException { + if (this.mode == Mode.WRITE) { + throw new IllegalArgumentException("KeyJournal is in WRITE mode for read operation"); + } + String line = this.reader.readLine(); + if (line == null) { + return null; + } + + return new Key(this.bucketName, Utilities.urlDecode(line)); + } +} diff --git a/src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java new file mode 100644 index 0000000..86bc3ab --- /dev/null +++ b/src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java @@ -0,0 +1,34 @@ +package com.basho.proserv.datamigrator.io; + +import com.basho.riak.client.IRiakObject; +import com.basho.riak.pbc.RiakObject; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Created with IntelliJ IDEA. + * User: dankerrigan + * Date: 1/28/14 + * Time: 10:55 AM + * To change this template use File | Settings | File Templates. + */ +public interface IKeyJournal extends Iterable { + void populate(String bucketName, Iterable keys) throws IOException; + + void write(Key key) throws IOException; + + void write(String bucket, String key) throws IOException; + + void write(RiakObject riakObject) throws IOException; + + void write(IRiakObject riakObject) throws IOException; + + Key read() throws IOException; + + void close(); + + long getKeyCount(); + + Iterator iterator(); +} diff --git a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java index 1bc2610..cc1084f 100644 --- a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java +++ b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java @@ -1,182 +1,18 @@ package com.basho.proserv.datamigrator.io; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.util.Iterator; -import com.basho.proserv.datamigrator.Utilities; -import com.basho.riak.client.IRiakObject; -import com.basho.riak.pbc.RiakObject; +public class KeyJournal extends AbstractKeyJournal { -public class KeyJournal implements Iterable { - public enum Mode { READ, WRITE } - - private final Mode mode; - private final BufferedWriter writer; - private final BufferedReader reader; private long writeCount; private boolean closed = false; public KeyJournal(File path, Mode mode) { - if (path == null) { - throw new IllegalArgumentException("path cannot be null"); - } - try { - if (mode == Mode.WRITE) { - this.writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path))); - this.reader = null; - } else { - this.reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); - this.writer = null; - } - } catch (FileNotFoundException e) { - throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); - } - this.mode = mode; - this.writeCount = 0; - } - - public void populate(String bucketName, Iterable keys) throws IOException { - for (String keyString : keys) { - this.write(bucketName, keyString); - } - } - - public long getWriteCount() { - if (mode == Mode.READ) { - throw new IllegalArgumentException ("KeyJournal is in READ mode and cannot determine keyCount"); - } - return this.writeCount; - } - - public void write(Key key) throws IOException { - if (key == null) { - throw new IllegalArgumentException("key must not be null"); - } - this.write(key.bucket(), key.key()); - } - - public void write(String bucket, String key) throws IOException { - if (mode == Mode.READ) { - throw new IllegalArgumentException ("KeyJournal is in READ mode for write operation"); - } - if (bucket == null || key == null) { - throw new IllegalArgumentException("bucket and key must not be null"); - } - this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); - this.writeCount++; - } - - public void write(RiakObject riakObject) throws IOException { - this.write(riakObject.getBucket(), riakObject.getKey()); - } - - public void write(IRiakObject riakObject) throws IOException { - this.write(riakObject.getBucket(), riakObject.getKey()); - } - - public Key read() throws IOException { - if (mode == Mode.WRITE) { - throw new IllegalArgumentException("KeyJournal is in WRITE mode for read operation"); - } - String line = this.reader.readLine(); - if (line == null) { - return null; - } - String[] values = new String[2]; - int comma = line.indexOf(','); - if (comma != -1) { - values[0] = line.substring(0, comma); - values[1] = line.substring(comma + 1, line.length()); - return new Key(Utilities.urlDecode(values[0]), Utilities.urlDecode(values[1])); - } - return null; - } - - public void close() { - try { - if (this.writer != null) { - this.writer.flush(); - this.writer.close(); - } - if (this.reader != null) { - this.reader.close(); - } - } catch (IOException e) { - // no-op, swallow - } - this.closed = true; - } - public boolean isClosed() { - return this.closed; - } + super(path, mode); - @Override - public Iterator iterator() { - return new KeyIterator(this); } - - public static File createKeyPathFromPath(File file, boolean load) { - String path = file.getAbsolutePath(); - int ind = path.lastIndexOf('.'); - if (ind == -1) { - ind = path.length()-1; - } - path = path.substring(0, ind); - if (load) { - path = path + ".loadedkeys"; - } else { - path = path + ".keys"; - } - return new File(path); - } - - private class KeyIterator implements Iterator { - private final KeyJournal keyJournal; - - private Key nextKey; - - public KeyIterator(KeyJournal keyJournal) { - this.keyJournal = keyJournal; - try { - this.nextKey = keyJournal.read(); - } catch (IOException e) { - this.nextKey = null; - } - } - - @Override - public boolean hasNext() { - return this.nextKey != null; - } - @Override - public Key next() { - Key currentKey = this.nextKey; - try { - this.nextKey = this.keyJournal.read(); - } catch (IOException e) { - this.nextKey = null; - } - if (currentKey == null && this.nextKey == null) { - currentKey = Key.createErrorKey(); - } - return currentKey; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } - } diff --git a/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java b/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java index d8efc8b..67362ba 100644 --- a/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java +++ b/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java @@ -4,6 +4,7 @@ import java.io.File; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -12,13 +13,16 @@ import com.basho.riak.client.builders.RiakObjectBuilder; public class BucketDumperLoaderTests { + @Rule + public TemporaryFolder tempFolderMaker = new TemporaryFolder(); + private final int DEFAULT_WORKER_COUNT = Runtime.getRuntime().availableProcessors() * 2; private String host = "127.0.0.1"; - private int port = 18087; - private int httpPort = 18098; + private int port = 8087; + private int httpPort = 8098; File dumpDirectory = null; - + public static String[] bucketNames = {"A","B","C","D","E","F"}; public static String[] keys = new String[100]; @@ -60,8 +64,7 @@ public void testDump() throws Exception { httpConnection.connectHTTPClient(host, httpPort); int count = loadTestData(connection); System.out.println("Loaded " + count + " records"); - - TemporaryFolder tempFolderMaker = new TemporaryFolder(); + dumpDirectory = tempFolderMaker.newFolder(); Configuration config = new Configuration(); diff --git a/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java b/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java index 7c335f0..9d3bede 100644 --- a/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java +++ b/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java @@ -2,8 +2,12 @@ import static org.junit.Assert.*; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileWriter; +import com.basho.proserv.datamigrator.io.BucketKeyJournal; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -12,10 +16,12 @@ public class KeyJournalTests { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @Test public void test() throws Exception { int KEY_COUNT = 1000; - TemporaryFolder tempFolder = new TemporaryFolder(); File keyPath = tempFolder.newFile(); @@ -44,14 +50,44 @@ public void test() throws Exception { assertTrue(readCount == buckets.length * keys.length); } + + @Test + public void testBucketKeyJournal() throws Exception { + int KEY_COUNT = 1000; + String TEST_BUCKET = "test_bucket"; + + File keyPath = tempFolder.newFile(); + BufferedWriter writer = new BufferedWriter(new FileWriter(keyPath)); + + String[] keys = new String[KEY_COUNT]; + for (Integer i = 0; i < keys.length; ++i) { + keys[i] = i.toString(); + writer.write(keys[i] + '\n'); + } + + writer.flush(); + writer.close(); + + BucketKeyJournal readJournal = new BucketKeyJournal(keyPath, KeyJournal.Mode.READ, TEST_BUCKET); + + int readCount = 0; + for (Key key : readJournal) { + assertTrue(key.bucket().compareTo(TEST_BUCKET) == 0); + assertTrue(keys[readCount].compareTo(key.key()) == 0); + if (!key.errorKey()) + ++readCount; + } + + assertTrue(readCount == keys.length); + } @Test - public void testCreateKeyPathFromPath() { - File file = new File("/Users/dankerrigan/data.data"); + public void testCreateKeyPathFromPath() throws Exception { + File file = tempFolder.newFile("data.data"); File newPath = KeyJournal.createKeyPathFromPath(file, false); - assertTrue(newPath.getAbsolutePath().compareTo("/Users/dankerrigan/data.keys") == 0); + assertTrue(newPath.getName().compareTo("data.keys") == 0); newPath = KeyJournal.createKeyPathFromPath(file, true); - assertTrue(newPath.getAbsolutePath().compareTo("/Users/dankerrigan/data.loadedkeys") == 0); + assertTrue(newPath.getName().compareTo("data.loadedkeys") == 0); } diff --git a/src/test/java/com/basho/util/UtilitiesTests.java b/src/test/java/com/basho/util/UtilitiesTests.java new file mode 100644 index 0000000..665e876 --- /dev/null +++ b/src/test/java/com/basho/util/UtilitiesTests.java @@ -0,0 +1,81 @@ +package com.basho.util; + +import com.basho.proserv.datamigrator.Utilities; +import com.basho.proserv.datamigrator.io.AbstractKeyJournal; +import com.basho.proserv.datamigrator.io.IKeyJournal; +import com.basho.proserv.datamigrator.io.Key; +import com.basho.proserv.datamigrator.io.KeyJournal; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +/** + * Created with IntelliJ IDEA. + * User: dankerrigan + * Date: 1/29/14 + * Time: 9:01 AM + * To change this template use File | Settings | File Templates. + */ +public class UtilitiesTests { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + + @Test + public void testSplitKeys() throws Exception { + int KEY_COUNT = 10; + File keyPath = tempFolder.newFile("data.keys"); + KeyJournal writeKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); + + String[] buckets = new String[] {"A", "B", "C", "D", "E", "F"}; + for (String bucket : buckets) { + for (Integer i = 0; i < KEY_COUNT; ++i) { + writeKeyJournal.write(new Key(bucket, i.toString())); + } + } + + writeKeyJournal.close(); + + KeyJournal readKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.READ); + + File splitFolder = tempFolder.newFolder(); + + Map readJournals = Utilities.splitKeys(splitFolder, readKeyJournal); + + String[] splitDirs = splitFolder.list(); + + int actualBucketCount = splitDirs.length; + + for (String splitDir : splitDirs) { + File newBucketKeyPath = new File(splitFolder.getAbsolutePath() + "/" + splitDir + "/" + "bucketkeys.keys"); + + KeyJournal newJournal = new KeyJournal(newBucketKeyPath, KeyJournal.Mode.READ); + + Iterator iter = newJournal.iterator(); + for (Integer i = 0; i < KEY_COUNT; ++i) { + assertTrue(i.toString().compareTo(iter.next().key()) == 0); + } + } + + assertTrue(actualBucketCount * KEY_COUNT== (buckets.length * KEY_COUNT)); + + + for (String bucketName : readJournals.keySet()) { + Set keys = new HashSet(); + for (Key key : readJournals.get(bucketName)) { + keys.add(key.key()); + } + assertEquals(keys.size(), KEY_COUNT); + } + } +} From 5855221cf5d9cf724e69220d090ba762444eeba2 Mon Sep 17 00:00:00 2001 From: Dan Kerrigan Date: Tue, 11 Feb 2014 14:15:52 -0500 Subject: [PATCH 13/14] Bucket names specified in a file or on cmd line are now urldecoded. Fixed the unspecified PB port message --- .gitignore | 2 ++ src/main/java/com/basho/proserv/datamigrator/Main.java | 8 ++++---- .../java/com/basho/proserv/datamigrator/Utilities.java | 10 ++++++++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 9729d0d..284630a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .idea riak-data-migrator.iml +.DS_Store +target diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 0245925..64b8a57 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -210,19 +210,19 @@ public static Configuration handleCommandLine(CommandLine cmd) { System.out.println("Destination PB Port (copypbport) argument is not an integer."); System.exit(1); } - } else { + } else if (cmd.hasOption("copy") && !cmd.hasOption("copypbport")) { System.out.println("Destination PB Port not specified, using the default: 8087"); } // Single bucket specifier if (cmd.hasOption("b")) { - config.addBucketName(cmd.getOptionValue("b")); + config.addBucketName(Utilities.urlDecode(cmd.getOptionValue("b"))); config.setOperation(Configuration.Operation.BUCKETS); } // Bucket filename if (cmd.hasOption("f")) { try { - config.addBucketNames(Utilities.readFileLines(cmd.getOptionValue("f"))); + config.addBucketNames(Utilities.urlDecode(Utilities.readFileLines(cmd.getOptionValue("f")))); config.setOperation(Configuration.Operation.BUCKETS); } catch (Exception e) { System.out.println("Could not read file containing buckets"); @@ -328,7 +328,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { System.out.println("Destination bucket option only valid when specifying a single bucket."); System.exit(1); } - config.setDestinationBucket(cmd.getOptionValue("destinationbucket")); + config.setDestinationBucket(Utilities.urlDecode(cmd.getOptionValue("destinationbucket"))); } if (cmd.hasOption("q")) { diff --git a/src/main/java/com/basho/proserv/datamigrator/Utilities.java b/src/main/java/com/basho/proserv/datamigrator/Utilities.java index fd193f3..a93903a 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Utilities.java +++ b/src/main/java/com/basho/proserv/datamigrator/Utilities.java @@ -82,5 +82,15 @@ public static String urlDecode(String input) { } } + public static List urlDecode(Iterable lines) { + List decoded = new ArrayList(); + + for (String line : lines) { + decoded.add(urlDecode(line)); + } + + return decoded; + } + } From dd753ad63374db1dc7d2ccbd6b903593c3440bdf Mon Sep 17 00:00:00 2001 From: Dan Kerrigan Date: Wed, 12 Feb 2014 15:29:30 -0500 Subject: [PATCH 14/14] Fixed bug in bucket splitter where bucket names with odd characters weren't being urlencoded. --- src/main/java/com/basho/proserv/datamigrator/Utilities.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/basho/proserv/datamigrator/Utilities.java b/src/main/java/com/basho/proserv/datamigrator/Utilities.java index a93903a..292506f 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Utilities.java +++ b/src/main/java/com/basho/proserv/datamigrator/Utilities.java @@ -23,7 +23,7 @@ public static Map splitKeys(File basePath, IKeyJournal keyJo for (Key key : keyJournal) { String bucketName = key.bucket(); if (!journals.containsKey(bucketName)) { - File bucketPath = new File(basePath.getAbsolutePath() + "/" + bucketName); + File bucketPath = new File(basePath.getAbsolutePath() + "/" + Utilities.urlEncode(bucketName)); bucketPath.mkdir(); File keyFile = new File(bucketPath.getAbsolutePath() + "/bucketkeys.keys"); journals.put(key.bucket(), new KeyJournal(keyFile, KeyJournal.Mode.WRITE)); @@ -34,7 +34,7 @@ public static Map splitKeys(File basePath, IKeyJournal keyJo for (String bucketName: journals.keySet()) { journals.get(bucketName).close(); - File bucketKeys = new File(basePath.getAbsolutePath() + "/" + bucketName + "/" + "bucketkeys.keys"); + File bucketKeys = new File(basePath.getAbsolutePath() + "/" + Utilities.urlEncode(bucketName) + "/" + "bucketkeys.keys"); readJournals.put(bucketName, new KeyJournal(bucketKeys, KeyJournal.Mode.READ)); } } catch (IOException ex) {