Skip to content

Commit

Permalink
Support Cassandra versions after 2.0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
jancona committed Dec 21, 2015
1 parent 27bad31 commit 922d033
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 61 deletions.
147 changes: 106 additions & 41 deletions farsandra-core/src/main/java/io/teknek/farsandra/Farsandra.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
Expand Down Expand Up @@ -318,18 +320,46 @@ public void start(){
throw new RuntimeException(e);
}
lines = replaceHost(lines);
lines = replaceThisWithThatExpectNMatch(lines,
" - /var/lib/cassandra/data",
" - " + this.instanceName + "/data/data" , 1);
lines = replaceThisWithThatExpectNMatch(lines,
"listen_address: localhost",
"listen_address: " +host , 1);
lines = replaceThisWithThatExpectNMatch(lines,
"commitlog_directory: /var/lib/cassandra/commitlog",
"commitlog_directory: " + this.instanceName + "/data/commitlog" , 1 );
try {
lines = replaceThisWithThatExpectNMatch(lines,
" - /var/lib/cassandra/data",
" - " + this.instanceName + "/data/data" , 1);
} catch (RuntimeException ex) {
lines = replaceThisWithThatExpectNMatch(lines,
"# data_file_directories:",
"data_file_directories:" , 1);
lines = replaceThisWithThatExpectNMatch(lines,
"# - /var/lib/cassandra/data",
" - " + this.instanceName + "/data/data" , 1);
}
lines = replaceThisWithThatExpectNMatch(lines,
"saved_caches_directory: /var/lib/cassandra/saved_caches",
"saved_caches_directory: " + this.instanceName + "/data/saved_caches", 1);
"listen_address: localhost",
"listen_address: " +host , 1);
try {
lines = replaceThisWithThatExpectNMatch(lines,
"commitlog_directory: /var/lib/cassandra/commitlog",
"commitlog_directory: " + this.instanceName + "/data/commitlog" , 1 );
} catch (RuntimeException ex) {
lines = replaceThisWithThatExpectNMatch(lines,
"# commitlog_directory: /var/lib/cassandra/commitlog",
"commitlog_directory: " + this.instanceName + "/data/commitlog" , 1 );
}
try {
lines = replaceThisWithThatExpectNMatch(lines,
"saved_caches_directory: /var/lib/cassandra/saved_caches",
"saved_caches_directory: " + this.instanceName + "/data/saved_caches", 1);
} catch (RuntimeException ex) {
lines = replaceThisWithThatExpectNMatch(lines,
"# saved_caches_directory: /var/lib/cassandra/saved_caches",
"saved_caches_directory: " + this.instanceName + "/data/saved_caches", 1);
}
try {
lines = replaceThisWithThatExpectNMatch(lines,
"start_rpc: false",
"start_rpc: true", 1);
} catch (RuntimeException ex) {
// Only needed for C* 2.2+
}
if (storagePort != null){
lines = replaceThisWithThatExpectNMatch(lines, "storage_port: 7000", "storage_port: "+storagePort, 1 );
}
Expand Down Expand Up @@ -384,9 +414,8 @@ public void start(){
manager.setLaunchArray(launchArray);
manager.go();
}

private final String log4jAppenderConfLine = "log4j.appender.R.File";



/**
* Replaces the default file path of the system log.
* Cassandra comes with a log4j configuration that assumes, by default, there's a /var/log/cassandra directory
Expand All @@ -400,31 +429,67 @@ public void start(){
* @param instanceConfDirectory the instance "conf" directory.
* @param instanceLogDirectory the instance "log" directory.
*/
private void setUpLoggingConf(final File instanceConfDirectory, final File instanceLogDirectory) {
final File log4ServerProperties = new File(instanceConfDirectory, "log4j-server.properties");
final File systemLog = new File(instanceLogDirectory, "system.log");
BufferedWriter writer = null;
try {
final List<String> lines = Files.readAllLines(log4ServerProperties.toPath(), Charset.defaultCharset());
writer = new BufferedWriter(new FileWriter(log4ServerProperties));
for (final String line : lines) {
writer.write(
(line.startsWith(log4jAppenderConfLine)
? log4jAppenderConfLine.concat("=").concat(systemLog.getAbsolutePath())
: line));
writer.newLine();
}
} catch (final IOException exception) {
throw new RuntimeException(exception);
} finally {
if (writer != null) {
try {
writer.close();
} catch (final IOException ignore) {
// Nothing to be done here...
}
}
}
private void setUpLoggingConf(final File instanceConfDirectory,
final File instanceLogDirectory) {
final File log4ServerProperties = new File(instanceConfDirectory,
"log4j-server.properties");
final File systemLog = new File(instanceLogDirectory, "system.log");
if (log4ServerProperties.exists()) {
final String log4jAppenderConfLine = "log4j.appender.R.File";
BufferedWriter writer = null;
try {
final List<String> lines = Files.readAllLines(
log4ServerProperties.toPath(), Charset.defaultCharset());
writer = new BufferedWriter(new FileWriter(log4ServerProperties));
for (final String line : lines) {
writer.write(
(line.startsWith(log4jAppenderConfLine) ? log4jAppenderConfLine
.concat("=").concat(systemLog.getAbsolutePath()) : line));
writer.newLine();
}
} catch (final IOException exception) {
throw new RuntimeException(exception);
} finally {
if (writer != null) {
try {
writer.close();
} catch (final IOException ignore) {
// Nothing to be done here...
}
}
}
} else {
final File logbackXml = new File(instanceConfDirectory, "logback.xml");
// Setting cassandra.logdir would be the clean way to do this, but that requires modifying bin/cassandra, so...
final Pattern filePattern = Pattern.compile("\\s*\\Q<file>${cassandra.logdir}/\\E(?<fileName>[^<]*)\\Q</file>\\E");
BufferedWriter writer = null;
try {
final List<String> lines = Files.readAllLines(
logbackXml.toPath(), Charset.defaultCharset());
writer = new BufferedWriter(new FileWriter(logbackXml));
for (final String line : lines) {
Matcher m = filePattern.matcher(line);
if (m.matches()) {
File logFile = new File(instanceLogDirectory, m.group(1));
writer.write(" <file>" + logFile.getAbsolutePath() + "</file>");
} else {
writer.write(line);
}
writer.newLine();
}
} catch (final IOException exception) {
throw new RuntimeException(exception);
} finally {
if (writer != null) {
try {
writer.close();
} catch (final IOException ignore) {
// Nothing to be done here...
}
}
}

}
}

private List<String> yamlLinesToAppend(List<String> input){
Expand Down Expand Up @@ -495,7 +560,7 @@ public String buildJavaHome() {
return "";
}
}

void delete(File f) {
if (f.isDirectory()) {
for (File c : f.listFiles())
Expand All @@ -517,7 +582,7 @@ public List<String> replaceThisWithThatExpectNMatch(List<String> lines, String m
}
}
if (replaced != expectedMatches){
throw new RuntimeException("looking to make " + expectedMatches +" replacement but made "+replaced
throw new RuntimeException("looking to make " + expectedMatches +" of ('" + match + "')->'(" + replace + "') but made "+replaced
+" . Likely that farsandra does not understand this version of configuration file. ");
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public class CompareAndSwapTest {

@Test
public void threeNodeTest() throws Exception {
fs1 = UnflushedJoinTest.startInstance("2.0.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999);
fs2 = UnflushedJoinTest.startInstance("2.0.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998);
fs3 = UnflushedJoinTest.startInstance("2.0.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997);
fs1 = UnflushedJoinTest.startInstance("2.2.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999);
fs2 = UnflushedJoinTest.startInstance("2.2.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998);
fs3 = UnflushedJoinTest.startInstance("2.2.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997);

FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1", 9160);
String ks = "CREATE KEYSPACE test "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void close(){

@Test
public void testShutdownWithLatch() throws InterruptedException {
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/3_1");
fs.withCreateConfigurationFiles(true);
Expand All @@ -51,7 +51,7 @@ public void testShutdownWithLatch() throws InterruptedException {
fs.withJmxPort(9999);
fs.appendLineToYaml("#this means nothing");
fs.appendLinesToEnv("#this also does nothing");
fs.withEnvReplacement("#MALLOC_ARENA_MAX=4", "#MALLOC_ARENA_MAX=wombat");
fs.withEnvReplacement("# Per-thread stack size.", "# Per-thread stack size. wombat");
fs.withYamlReplacement("# NOTE:", "# deNOTE:");
final CountDownLatch started = new CountDownLatch(1);
fs.getManager().addOutLineHandler( new LineHandler(){
Expand Down Expand Up @@ -82,7 +82,7 @@ public void handleTermination(int exitValue) {
public void threeNodeTest() throws InterruptedException, InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException {
Farsandra fs = new Farsandra();
{
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/3_1");
fs.withCreateConfigurationFiles(true);
Expand Down Expand Up @@ -116,7 +116,7 @@ public void handleTermination(int exitValue) {
Farsandra fs2 = new Farsandra();
{

fs2.withVersion("2.0.4");
fs2.withVersion("2.2.4");
fs2.withCleanInstanceOnStart(true);
fs2.withInstanceName("target/3_2");
fs2.withCreateConfigurationFiles(true);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void handleTermination(int exitValue) {
Farsandra fs3 = new Farsandra();
{

fs3.withVersion("2.0.4");
fs3.withVersion("2.2.4");
fs3.withCleanInstanceOnStart(true);
fs3.withInstanceName("target/3_3");
fs3.withCreateConfigurationFiles(true);
Expand Down Expand Up @@ -222,7 +222,7 @@ public void handleTermination(int exitValue) {

@Test
public void simpleOtherTest() throws InterruptedException{
fs.withVersion("2.0.3");
fs.withVersion("2.2.3");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/1");
fs.withCreateConfigurationFiles(true);
Expand Down Expand Up @@ -266,7 +266,7 @@ public void handleTermination(int exitValue) {

@Test
public void simpleTest() throws InterruptedException {
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/1");
fs.withCreateConfigurationFiles(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void close(){

@Test
public void testShutdownWithLatch() throws InterruptedException {
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/3_1");
fs.withCreateConfigurationFiles(true);
Expand All @@ -51,7 +51,7 @@ public void testShutdownWithLatch() throws InterruptedException {
fs.withJmxPort(9999);
fs.appendLineToYaml("#this means nothing");
fs.appendLinesToEnv("#this also does nothing");
fs.withEnvReplacement("#MALLOC_ARENA_MAX=4", "#MALLOC_ARENA_MAX=wombat");
fs.withEnvReplacement("# Per-thread stack size.", "# Per-thread stack size. wombat");
fs.withYamlReplacement("# NOTE:", "# deNOTE:");
final CountDownLatch started = new CountDownLatch(1);
fs.getManager().addOutLineHandler( new LineHandler(){
Expand Down Expand Up @@ -79,7 +79,7 @@ public void handleTermination(int exitValue) {

@Test
public void simpleTest() throws InterruptedException {
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/1");
fs.withCreateConfigurationFiles(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void close(){

@Test
public void testShutdownWithLatch() throws InterruptedException {
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/3_1");
fs.withCreateConfigurationFiles(true);
Expand All @@ -51,7 +51,7 @@ public void testShutdownWithLatch() throws InterruptedException {
fs.withJmxPort(9999);
fs.appendLineToYaml("#this means nothing");
fs.appendLinesToEnv("#this also does nothing");
fs.withEnvReplacement("#MALLOC_ARENA_MAX=4", "#MALLOC_ARENA_MAX=wombat");
fs.withEnvReplacement("# Per-thread stack size.", "# Per-thread stack size. wombat");
fs.withYamlReplacement("# NOTE:", "# deNOTE:");
final CountDownLatch started = new CountDownLatch(1);
fs.getManager().addOutLineHandler( new LineHandler(){
Expand Down Expand Up @@ -79,7 +79,7 @@ public void handleTermination(int exitValue) {

@Test
public void simpleTest() throws InterruptedException {
fs.withVersion("2.0.4");
fs.withVersion("2.2.4");
fs.withCleanInstanceOnStart(true);
fs.withInstanceName("target/1");
fs.withCreateConfigurationFiles(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public void assertAllThere(String host) throws Exception {

@Test
public void threeNodeTest() throws Exception {
fs = startInstance("2.0.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999);
fs2 = startInstance("2.0.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998);
fs3 = startInstance("2.0.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997);
fs = startInstance("2.2.4", "target/3_1", "127.0.0.1", "127.0.0.1", 9999);
fs2 = startInstance("2.2.4", "target/3_2", "127.0.0.2", "127.0.0.1", 9998);
fs3 = startInstance("2.2.4", "target/3_3", "127.0.0.3", "127.0.0.1", 9997);
Thread.sleep(30000);
FramedConnWrapper wrap = new FramedConnWrapper("127.0.0.1", 9160);
String ks = "CREATE KEYSPACE test "
Expand All @@ -101,7 +101,7 @@ public void threeNodeTest() throws Exception {
assertAllThere("127.0.0.1");

System.out.println("added");
fs4 = startInstance("2.0.4", "target/3_4", "127.0.0.4", "127.0.0.1", 9996);
fs4 = startInstance("2.2.4", "target/3_4", "127.0.0.4", "127.0.0.1", 9996);
Thread.sleep(40000);
assertAllThere("127.0.0.4");
}
Expand Down

0 comments on commit 922d033

Please sign in to comment.