From 922d033c8c1f4d0dabf8043fc87a64523f4be2c3 Mon Sep 17 00:00:00 2001 From: Jim Ancona Date: Mon, 21 Dec 2015 12:42:23 -0500 Subject: [PATCH] Support Cassandra versions after 2.0.x --- .../java/io/teknek/farsandra/Farsandra.java | 147 +++++++++++++----- .../teknek/farsandra/CompareAndSwapTest.java | 6 +- .../io/teknek/farsandra/TestFarsandra.java | 14 +- .../TestFarsandraWithCustomConfig.java | 6 +- .../TestFarsandraWithDefaultConfig.java | 6 +- .../teknek/farsandra/UnflushedJoinTest.java | 8 +- 6 files changed, 126 insertions(+), 61 deletions(-) diff --git a/farsandra-core/src/main/java/io/teknek/farsandra/Farsandra.java b/farsandra-core/src/main/java/io/teknek/farsandra/Farsandra.java index 9b5f87f..cda1f12 100644 --- a/farsandra-core/src/main/java/io/teknek/farsandra/Farsandra.java +++ b/farsandra-core/src/main/java/io/teknek/farsandra/Farsandra.java @@ -19,6 +19,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; @@ -318,18 +320,46 @@ public void start(){ throw new RuntimeException(e); } lines = replaceHost(lines); - lines = replaceThisWithThatExpectNMatch(lines, - " - /var/lib/cassandra/data", - " - " + this.instanceName + "/data/data" , 1); - lines = replaceThisWithThatExpectNMatch(lines, - "listen_address: localhost", - "listen_address: " +host , 1); - lines = replaceThisWithThatExpectNMatch(lines, - "commitlog_directory: /var/lib/cassandra/commitlog", - "commitlog_directory: " + this.instanceName + "/data/commitlog" , 1 ); + try { + lines = replaceThisWithThatExpectNMatch(lines, + " - /var/lib/cassandra/data", + " - " + this.instanceName + "/data/data" , 1); + } catch (RuntimeException ex) { + lines = replaceThisWithThatExpectNMatch(lines, + "# data_file_directories:", + "data_file_directories:" , 1); + lines = replaceThisWithThatExpectNMatch(lines, + "# - /var/lib/cassandra/data", + " - " + this.instanceName + "/data/data" , 1); + } lines = replaceThisWithThatExpectNMatch(lines, - "saved_caches_directory: /var/lib/cassandra/saved_caches", - "saved_caches_directory: " + this.instanceName + "/data/saved_caches", 1); + "listen_address: localhost", + "listen_address: " +host , 1); + try { + lines = replaceThisWithThatExpectNMatch(lines, + "commitlog_directory: /var/lib/cassandra/commitlog", + "commitlog_directory: " + this.instanceName + "/data/commitlog" , 1 ); + } catch (RuntimeException ex) { + lines = replaceThisWithThatExpectNMatch(lines, + "# commitlog_directory: /var/lib/cassandra/commitlog", + "commitlog_directory: " + this.instanceName + "/data/commitlog" , 1 ); + } + try { + lines = replaceThisWithThatExpectNMatch(lines, + "saved_caches_directory: /var/lib/cassandra/saved_caches", + "saved_caches_directory: " + this.instanceName + "/data/saved_caches", 1); + } catch (RuntimeException ex) { + lines = replaceThisWithThatExpectNMatch(lines, + "# saved_caches_directory: /var/lib/cassandra/saved_caches", + "saved_caches_directory: " + this.instanceName + "/data/saved_caches", 1); + } + try { + lines = replaceThisWithThatExpectNMatch(lines, + "start_rpc: false", + "start_rpc: true", 1); + } catch (RuntimeException ex) { + // Only needed for C* 2.2+ + } if (storagePort != null){ lines = replaceThisWithThatExpectNMatch(lines, "storage_port: 7000", "storage_port: "+storagePort, 1 ); } @@ -384,9 +414,8 @@ public void start(){ manager.setLaunchArray(launchArray); manager.go(); } - - private final String log4jAppenderConfLine = "log4j.appender.R.File"; - + + /** * Replaces the default file path of the system log. * Cassandra comes with a log4j configuration that assumes, by default, there's a /var/log/cassandra directory @@ -400,31 +429,67 @@ public void start(){ * @param instanceConfDirectory the instance "conf" directory. * @param instanceLogDirectory the instance "log" directory. */ - private void setUpLoggingConf(final File instanceConfDirectory, final File instanceLogDirectory) { - final File log4ServerProperties = new File(instanceConfDirectory, "log4j-server.properties"); - final File systemLog = new File(instanceLogDirectory, "system.log"); - BufferedWriter writer = null; - try { - final List lines = Files.readAllLines(log4ServerProperties.toPath(), Charset.defaultCharset()); - writer = new BufferedWriter(new FileWriter(log4ServerProperties)); - for (final String line : lines) { - writer.write( - (line.startsWith(log4jAppenderConfLine) - ? log4jAppenderConfLine.concat("=").concat(systemLog.getAbsolutePath()) - : line)); - writer.newLine(); - } - } catch (final IOException exception) { - throw new RuntimeException(exception); - } finally { - if (writer != null) { - try { - writer.close(); - } catch (final IOException ignore) { - // Nothing to be done here... - } - } - } + private void setUpLoggingConf(final File instanceConfDirectory, + final File instanceLogDirectory) { + final File log4ServerProperties = new File(instanceConfDirectory, + "log4j-server.properties"); + final File systemLog = new File(instanceLogDirectory, "system.log"); + if (log4ServerProperties.exists()) { + final String log4jAppenderConfLine = "log4j.appender.R.File"; + BufferedWriter writer = null; + try { + final List lines = Files.readAllLines( + log4ServerProperties.toPath(), Charset.defaultCharset()); + writer = new BufferedWriter(new FileWriter(log4ServerProperties)); + for (final String line : lines) { + writer.write( + (line.startsWith(log4jAppenderConfLine) ? log4jAppenderConfLine + .concat("=").concat(systemLog.getAbsolutePath()) : line)); + writer.newLine(); + } + } catch (final IOException exception) { + throw new RuntimeException(exception); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (final IOException ignore) { + // Nothing to be done here... + } + } + } + } else { + final File logbackXml = new File(instanceConfDirectory, "logback.xml"); + // Setting cassandra.logdir would be the clean way to do this, but that requires modifying bin/cassandra, so... + final Pattern filePattern = Pattern.compile("\\s*\\Q${cassandra.logdir}/\\E(?[^<]*)\\Q\\E"); + BufferedWriter writer = null; + try { + final List lines = Files.readAllLines( + logbackXml.toPath(), Charset.defaultCharset()); + writer = new BufferedWriter(new FileWriter(logbackXml)); + for (final String line : lines) { + Matcher m = filePattern.matcher(line); + if (m.matches()) { + File logFile = new File(instanceLogDirectory, m.group(1)); + writer.write(" " + logFile.getAbsolutePath() + ""); + } else { + writer.write(line); + } + writer.newLine(); + } + } catch (final IOException exception) { + throw new RuntimeException(exception); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (final IOException ignore) { + // Nothing to be done here... + } + } + } + + } } private List yamlLinesToAppend(List input){ @@ -495,7 +560,7 @@ public String buildJavaHome() { return ""; } } - + void delete(File f) { if (f.isDirectory()) { for (File c : f.listFiles()) @@ -517,7 +582,7 @@ public List replaceThisWithThatExpectNMatch(List lines, String m } } if (replaced != expectedMatches){ - throw new RuntimeException("looking to make " + expectedMatches +" replacement but made "+replaced + throw new RuntimeException("looking to make " + expectedMatches +" of ('" + match + "')->'(" + replace + "') but made "+replaced +" . Likely that farsandra does not understand this version of configuration file. "); } return result; diff --git a/farsandra-core/src/test/java/io/teknek/farsandra/CompareAndSwapTest.java b/farsandra-core/src/test/java/io/teknek/farsandra/CompareAndSwapTest.java index 58b3845..734aa13 100644 --- a/farsandra-core/src/test/java/io/teknek/farsandra/CompareAndSwapTest.java +++ b/farsandra-core/src/test/java/io/teknek/farsandra/CompareAndSwapTest.java @@ -24,9 +24,9 @@ public class CompareAndSwapTest { @Test public void threeNodeTest() throws Exception { - fs1 = UnflushedJoinTest.startInstance("2.0.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999); - fs2 = UnflushedJoinTest.startInstance("2.0.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998); - fs3 = UnflushedJoinTest.startInstance("2.0.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997); + fs1 = UnflushedJoinTest.startInstance("2.2.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999); + fs2 = UnflushedJoinTest.startInstance("2.2.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998); + fs3 = UnflushedJoinTest.startInstance("2.2.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997); FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1", 9160); String ks = "CREATE KEYSPACE test " diff --git a/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandra.java b/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandra.java index 2144e33..15a37b4 100644 --- a/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandra.java +++ b/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandra.java @@ -42,7 +42,7 @@ public void close(){ @Test public void testShutdownWithLatch() throws InterruptedException { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/3_1"); fs.withCreateConfigurationFiles(true); @@ -51,7 +51,7 @@ public void testShutdownWithLatch() throws InterruptedException { fs.withJmxPort(9999); fs.appendLineToYaml("#this means nothing"); fs.appendLinesToEnv("#this also does nothing"); - fs.withEnvReplacement("#MALLOC_ARENA_MAX=4", "#MALLOC_ARENA_MAX=wombat"); + fs.withEnvReplacement("# Per-thread stack size.", "# Per-thread stack size. wombat"); fs.withYamlReplacement("# NOTE:", "# deNOTE:"); final CountDownLatch started = new CountDownLatch(1); fs.getManager().addOutLineHandler( new LineHandler(){ @@ -82,7 +82,7 @@ public void handleTermination(int exitValue) { public void threeNodeTest() throws InterruptedException, InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException { Farsandra fs = new Farsandra(); { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/3_1"); fs.withCreateConfigurationFiles(true); @@ -116,7 +116,7 @@ public void handleTermination(int exitValue) { Farsandra fs2 = new Farsandra(); { - fs2.withVersion("2.0.4"); + fs2.withVersion("2.2.4"); fs2.withCleanInstanceOnStart(true); fs2.withInstanceName("target/3_2"); fs2.withCreateConfigurationFiles(true); @@ -151,7 +151,7 @@ public void handleTermination(int exitValue) { Farsandra fs3 = new Farsandra(); { - fs3.withVersion("2.0.4"); + fs3.withVersion("2.2.4"); fs3.withCleanInstanceOnStart(true); fs3.withInstanceName("target/3_3"); fs3.withCreateConfigurationFiles(true); @@ -222,7 +222,7 @@ public void handleTermination(int exitValue) { @Test public void simpleOtherTest() throws InterruptedException{ - fs.withVersion("2.0.3"); + fs.withVersion("2.2.3"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/1"); fs.withCreateConfigurationFiles(true); @@ -266,7 +266,7 @@ public void handleTermination(int exitValue) { @Test public void simpleTest() throws InterruptedException { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/1"); fs.withCreateConfigurationFiles(true); diff --git a/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithCustomConfig.java b/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithCustomConfig.java index 6bf518c..d68632b 100644 --- a/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithCustomConfig.java +++ b/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithCustomConfig.java @@ -42,7 +42,7 @@ public void close(){ @Test public void testShutdownWithLatch() throws InterruptedException { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/3_1"); fs.withCreateConfigurationFiles(true); @@ -51,7 +51,7 @@ public void testShutdownWithLatch() throws InterruptedException { fs.withJmxPort(9999); fs.appendLineToYaml("#this means nothing"); fs.appendLinesToEnv("#this also does nothing"); - fs.withEnvReplacement("#MALLOC_ARENA_MAX=4", "#MALLOC_ARENA_MAX=wombat"); + fs.withEnvReplacement("# Per-thread stack size.", "# Per-thread stack size. wombat"); fs.withYamlReplacement("# NOTE:", "# deNOTE:"); final CountDownLatch started = new CountDownLatch(1); fs.getManager().addOutLineHandler( new LineHandler(){ @@ -79,7 +79,7 @@ public void handleTermination(int exitValue) { @Test public void simpleTest() throws InterruptedException { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/1"); fs.withCreateConfigurationFiles(true); diff --git a/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithDefaultConfig.java b/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithDefaultConfig.java index 7fffe29..46acc89 100644 --- a/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithDefaultConfig.java +++ b/farsandra-core/src/test/java/io/teknek/farsandra/TestFarsandraWithDefaultConfig.java @@ -42,7 +42,7 @@ public void close(){ @Test public void testShutdownWithLatch() throws InterruptedException { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/3_1"); fs.withCreateConfigurationFiles(true); @@ -51,7 +51,7 @@ public void testShutdownWithLatch() throws InterruptedException { fs.withJmxPort(9999); fs.appendLineToYaml("#this means nothing"); fs.appendLinesToEnv("#this also does nothing"); - fs.withEnvReplacement("#MALLOC_ARENA_MAX=4", "#MALLOC_ARENA_MAX=wombat"); + fs.withEnvReplacement("# Per-thread stack size.", "# Per-thread stack size. wombat"); fs.withYamlReplacement("# NOTE:", "# deNOTE:"); final CountDownLatch started = new CountDownLatch(1); fs.getManager().addOutLineHandler( new LineHandler(){ @@ -79,7 +79,7 @@ public void handleTermination(int exitValue) { @Test public void simpleTest() throws InterruptedException { - fs.withVersion("2.0.4"); + fs.withVersion("2.2.4"); fs.withCleanInstanceOnStart(true); fs.withInstanceName("target/1"); fs.withCreateConfigurationFiles(true); diff --git a/farsandra-core/src/test/java/io/teknek/farsandra/UnflushedJoinTest.java b/farsandra-core/src/test/java/io/teknek/farsandra/UnflushedJoinTest.java index b8d34aa..228d079 100644 --- a/farsandra-core/src/test/java/io/teknek/farsandra/UnflushedJoinTest.java +++ b/farsandra-core/src/test/java/io/teknek/farsandra/UnflushedJoinTest.java @@ -74,9 +74,9 @@ public void assertAllThere(String host) throws Exception { @Test public void threeNodeTest() throws Exception { - fs = startInstance("2.0.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999); - fs2 = startInstance("2.0.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998); - fs3 = startInstance("2.0.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997); + fs = startInstance("2.2.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999); + fs2 = startInstance("2.2.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998); + fs3 = startInstance("2.2.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997); Thread.sleep(30000); FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1", 9160); String ks = "CREATE KEYSPACE test " @@ -101,7 +101,7 @@ public void threeNodeTest() throws Exception { assertAllThere("127.0.0.1"); System.out.println("added"); - fs4 = startInstance("2.0.4", "target/3_4", "127.0.0.4", "127.0.0.1", 9996); + fs4 = startInstance("2.2.4", "target/3_4", "127.0.0.4", "127.0.0.1", 9996); Thread.sleep(40000); assertAllThere("127.0.0.4"); }