Skip to content

Commit

Permalink
Merge pull request datastax#98 from datastax/java-api-upgrade
Browse files Browse the repository at this point in the history
Java api upgrade
  • Loading branch information
jacek-lewandowski committed Jul 28, 2014
2 parents c6defbc + 2c9acf0 commit 1be2b29
Show file tree
Hide file tree
Showing 14 changed files with 975 additions and 167 deletions.
8 changes: 8 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
- Added implicit wrappers which simplify access to Cassandra related
functionality from StreamingContext and DStream.
- Added a stub for further Spark Streaming integration tests.
* Upgraded Java API. (#98)
- Refactored existing Java API
- Added CassandraJavaRDD as a JAVA counterpart of CassandraRDD
- Added Java helpers for accessing Spark Streaming related methods
- Added several integration tests
- Added a documentation page for JAVA API
- Extended Java API demo
- Added a lot of API docs
* Ability to register custom TypeConverters. (#32)
* Handle null values in StringConverter. (#79)
* Improved error message when there are no replicas in the local DC. (#69)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The documentation will be generated to `target/scala-2.10/api/`
- [Working with user-defined case classes and tuples](doc/4_mapper.md)
- [Saving datasets to Cassandra](doc/5_saving.md)
- [Customizing the object mapping](doc/6_advanced_mapper.md)
- [Using Connector in Java](doc/7_java_api.md)

## License
This software is available under the [Apache License, Version 2.0](LICENSE).
Expand Down
135 changes: 135 additions & 0 deletions doc/7_java_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Documentation

## Using Connector in Java
This section describes how to access the functionality of Connector when you write your program in Java.
It is assumed that you already familiarized yourself with the previous sections and you understand how
Connector works.

The best way to use Connector Java API is to import statically all the methods in `CassandraJavaUtil`.
This utility class is the main entry point for Connector Java API.

### Accessing Cassandra tables in Java
`CassandraJavaRDD` is a `CassandraRDD` counterpart in Java. It allows to invoke easily Connector specific methods
in order to enforce selection or projection on the database side. However, conversely to `CassandraRDD`, it extends
`JavaRDD` which is much more suitable for the development of Spark applications in Java.

In order to create `CassandraJavaRDD` you need to invoke one of the `cassandraTable` methods of a special
wrapper around `SparkContext`. The wrapper can be easily obtained with use of one of the overloaded `javaFunctions`
method in `CassandraJavaUtil`.

Example:

```java
JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable("ks", "tab")
.map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println("Data as CassandraRows: \n" + StringUtils.join("\n", cassandraRowsRDD.toArray()));
```

In the above example, `cassandraTable` method has been used to create `CassandraJavaRDD` view of the data in `ks.tab`.
The elements of the returned *RDD* are of `CassandraRow` type. If you want to produce an *RDD* of custom beans, you may
use `cassandraTable` method, which accepts the class of *RDD*'s elements.

Example:

```java
// firstly, we define a bean class
public static class Person implements Serializable {
private Integer id;
private String name;
private Date birthDate;

// Remember to declare no-args constructor
public Person() { }

public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }

public String getName() { return name; }
public void setName(String name) { this.name = name; }

public Date getBirthDate() { return birthDate; }
public void setBirthDate(Date birthDate) { this.birthDate = birthDate; }

// other methods, constructors, etc.
}
```

```java
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "people", Person.class)
.map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
```

In this example, we created a `CassandraJavaRDD` of `Person` elements. While defining bean classes like
`Person`, remember to define no-args constructor. Although, it is not required for it to be the only constructor
of such a class.

### Using selection and projection on the database side
Once `CassandraJavaRDD` is created, you may apply selection and projection on that RDD by invoking `where`
and `select` methods on it respectively. Their semantic is the same as the semantic of their counterparts
in `CassandraRDD`.

Example:
```java
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "tab")
.select("id").map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println("Data with only 'id' column fetched: \n" + StringUtils.join("\n", rdd.toArray()));
```

Example:
```java
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "tab")
.where("name=?", "Anna").map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println("Data filtered by the where clause (name='Anna'): \n" + StringUtils.join("\n", rdd.toArray()));
```

### Saving data to Cassandra

`javaFunctions` method can be also applied to any *RDD* in order to provide `saveToCassandra` overloaded methods.
Because Java lacks default values for arguments and implicit conversions, and for user's convenience, `saveToCassandra`
method is overloaded with a large amount of argument combinations. In the following example, a `JavaRDD` of `Person`
elements is saved to Cassandra table `ks.people`.

Example:

```java
List<Person> people = Arrays.asList(
Person.newInstance(1, "John", new Date()),
Person.newInstance(2, "Anna", new Date()),
Person.newInstance(3, "Andrew", new Date())
);
JavaRDD<Person> rdd = sc.parallelize(people);
javaFunctions(rdd, Person.class).saveToCassandra("ks", "people");
```

### Extensions for Spark Streaming

The main entry point for Spark Streaming in Java is `JavaStreamingContext` object. Like for `JavaSparkContext`, we
can use `javaFunctions` method to access Connector specific functionality. For example, we can create an ordinary
`CassandraJavaRDD` by invoking the same `cassandraTable` method as we do for `SparkContext`. There is nothing specific
to streaming in this case - these methods are provided only for convenience and they use `SparkContext` wrapped by
`StreamingContext` under the hood.

You may also save the data from `JavaDStream` to Cassandra. Again, you need to use `javaFunctions` method to create
a special wrapper around `JavaDStream` and then invoke one of the `saveToCassandra` methods. *DStream* is a sequence
of *RDDs* and when you invoke `saveToCassandra` on it, it will follow saving to Cassandra all the *RDDs* in that *DStream*.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.datastax.spark.connector.rdd

import java.net.InetAddress

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.util.{CassandraServer, SparkServer}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

import scala.collection.JavaConversions._

class CassandraJavaRDDSpec extends FlatSpec with Matchers with BeforeAndAfter with CassandraServer with SparkServer {

useCassandraConfig("cassandra-default.yaml.template")
val conn = CassandraConnector(InetAddress.getByName("127.0.0.1"))

before {
conn.withSessionDo { session =>
session.execute("DROP KEYSPACE IF EXISTS java_api_test")
session.execute("CREATE KEYSPACE java_api_test WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE java_api_test.test_table (key INT, value TEXT, PRIMARY KEY (key))")
session.execute("CREATE INDEX test_table_idx ON java_api_test.test_table (value)")

session.execute("INSERT INTO java_api_test.test_table (key, value) VALUES (1, 'one')")
session.execute("INSERT INTO java_api_test.test_table (key, value) VALUES (2, 'two')")
session.execute("INSERT INTO java_api_test.test_table (key, value) VALUES (3, 'three')")
}
}

"CassandraJavaRDD" should "allow to select a subset of columns" in {
val rows = CassandraJavaUtil.javaFunctions(sc).cassandraTable("java_api_test", "test_table")
.select("key").toArray()
assert(rows.size == 3)
assert(rows.exists(row => !row.contains("value") && row.getInt("key") == 1))
assert(rows.exists(row => !row.contains("value") && row.getInt("key") == 2))
assert(rows.exists(row => !row.contains("value") && row.getInt("key") == 3))
}

"CassandraJavaRDD" should "return selected columns" in {
val rdd = CassandraJavaUtil.javaFunctions(sc).cassandraTable("java_api_test", "test_table")
.select("key")
assert(rdd.selectedColumnNames().size == 1)
assert(rdd.selectedColumnNames().contains("key"))
}

"CassandraJavaRDD" should "allow to use where clause to filter records" in {
val rows = CassandraJavaUtil.javaFunctions(sc).cassandraTable("java_api_test", "test_table")
.where("value = ?", "two").toArray()
assert(rows.size == 1)
assert(rows.exists(row => row.getString("value") == "two" && row.getInt("key") == 2))
}

}
Loading

0 comments on commit 1be2b29

Please sign in to comment.