Skip to content

Commit

Permalink
Add DataFrame API: UnionAll, Subtract, Drop, DropNa
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjaywg committed Nov 6, 2015
1 parent d0694d2 commit ebc177b
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ internal interface IDataFrameProxy
IDataFrameProxy Join(IDataFrameProxy otherScalaDataFrameReference, string[] joinColumnNames);
IDataFrameProxy Join(IDataFrameProxy otherScalaDataFrameReference, IColumnProxy scalaColumnReference, string joinType);
IDataFrameProxy Intersect(IDataFrameProxy otherScalaDataFrameReference);
IDataFrameProxy UnionAll(IDataFrameProxy otherScalaDataFrameReference);
IDataFrameProxy Subtract(IDataFrameProxy otherScalaDataFrameReference);
IDataFrameProxy Drop(string columnName);
IDataFrameProxy DropNa(string how, int? thresh, string[] subset);
}

internal interface IUDFProxy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@ public IColumnProxy GetColumn(string columnName)

public IDataFrameProxy Select(string columnName, string[] columnNames)
{
return new DataFrameIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference,
"select",
var parameters = columnNames.Length > 0 ?
new object[]
{
columnName,
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", new object[] { columnNames }))
})),
} :
new object[] { columnName, new string[0] }; // when columnNames is empty, pass an empty array to JVM instead calling SQLUtils.toSeq

return new DataFrameIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference,
"select",
parameters)),
sqlContextProxy);
}

Expand Down Expand Up @@ -218,13 +222,62 @@ public IDataFrameProxy Join(IDataFrameProxy otherScalaDataFrameReference, IColum
}

public IDataFrameProxy Intersect(IDataFrameProxy otherScalaDataFrameReference)
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "intersect",
new object[]{(otherScalaDataFrameReference as DataFrameIpcProxy).jvmDataFrameReference}).ToString()), sqlContextProxy);
}
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "intersect",
new object[] { (otherScalaDataFrameReference as DataFrameIpcProxy).jvmDataFrameReference }).ToString()), sqlContextProxy);
}

public IDataFrameProxy UnionAll(IDataFrameProxy otherScalaDataFrameReference)
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "unionAll",
new object[] { (otherScalaDataFrameReference as DataFrameIpcProxy).jvmDataFrameReference }).ToString()), sqlContextProxy);
}

public IDataFrameProxy Subtract(IDataFrameProxy otherScalaDataFrameReference)
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "except",
new object[] { (otherScalaDataFrameReference as DataFrameIpcProxy).jvmDataFrameReference }).ToString()), sqlContextProxy);
}

public IDataFrameProxy Drop(string columnName)
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "drop",
new object[] { columnName }).ToString()), sqlContextProxy);
}

public IDataFrameProxy DropNa(string how, int? thresh, string[] subset)
{
if(how != "any" && how != "all")
throw new ArgumentException(string.Format(@"how ({0}) should be 'any' or 'all'.", how));

string[] columnNames = null;
if (subset == null || subset.Length == 0)
columnNames = GetSchema().GetStructTypeFields().Select(f => f.GetStructFieldName().ToString()).ToArray();

if (thresh == null)
thresh = how == "any" ? (subset == null ? columnNames.Length : subset.Length) : 1;

var dataFrameNaRef = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "na"));

return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
dataFrameNaRef, "drop",
new object[] { thresh, subset ?? columnNames }).ToString()), sqlContextProxy);
}
}

internal class UDFIpcProxy : IUDFProxy
Expand Down
59 changes: 56 additions & 3 deletions csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,68 @@ public DataFrame Join(DataFrame otherDataFrame, Column joinExpression, JoinType
}

/// <summary>
/// Intersect with another DataFrame
/// Intersect with another DataFrame.
/// This is equivalent to `INTERSECT` in SQL.
/// </summary>
/// <param name="otherDataFrame">DataFrame to intersect with</param>
/// <returns>Intersected DataFrame</returns>
/// <param name="otherDataFrame">DataFrame to intersect with.</param>
/// <returns>Intersected DataFrame.</returns>
public DataFrame Intersect(DataFrame otherDataFrame)
{
return
new DataFrame(dataFrameProxy.Intersect(otherDataFrame.dataFrameProxy), sparkContext);
}

/// <summary>
/// Union with another DataFrame WITHOUT removing duplicated rows.
/// This is equivalent to `UNION ALL` in SQL.
/// </summary>
/// <param name="otherDataFrame">DataFrame to union all with.</param>
/// <returns>Unioned DataFrame.</returns>
public DataFrame UnionAll(DataFrame otherDataFrame)
{
return
new DataFrame(dataFrameProxy.UnionAll(otherDataFrame.dataFrameProxy), sparkContext);
}

/// <summary>
/// Returns a new DataFrame containing rows in this frame but not in another frame.
/// This is equivalent to `EXCEPT` in SQL.
/// </summary>
/// <param name="otherDataFrame">DataFrame to subtract from this frame.</param>
/// <returns>A new DataFrame containing rows in this frame but not in another frame.</returns>
public DataFrame Subtract(DataFrame otherDataFrame)
{
return
new DataFrame(dataFrameProxy.Subtract(otherDataFrame.dataFrameProxy), sparkContext);
}

/// <summary>
/// Returns a new DataFrame with a column dropped.
/// </summary>
/// <param name="columnName"> a string name of the column to drop</param>
/// <returns>A new new DataFrame that drops the specified column.</returns>
public DataFrame Drop(string columnName)
{
return
new DataFrame(dataFrameProxy.Drop(columnName), sparkContext);
}

/// <summary>
/// Returns a new DataFrame omitting rows with null values.
/// </summary>
/// <param name="how">'any' or 'all'.
/// If 'any', drop a row if it contains any nulls.
/// If 'all', drop a row only if all its values are null.</param>
/// <param name="thresh">thresh: int, default null.
/// If specified, drop rows that have less than `thresh` non-null values.
/// This overwrites the `how` parameter.</param>
/// <param name="subset">optional list of column names to consider.</param>
/// <returns>A new DataFrame omitting rows with null values</returns>
public DataFrame DropNa(string how = "any", int? thresh = null, string[] subset = null)
{
return
new DataFrame(dataFrameProxy.DropNa(how, thresh, subset), sparkContext);
}
}

//TODO - complete impl
Expand Down
56 changes: 53 additions & 3 deletions csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ internal static void DFProjectionFilterDSLSample()
}

/// <summary>
/// Sample to join DataFrame using DSL
/// Sample to join DataFrames using DSL
/// </summary>
[Sample]
internal static void DFJoinSample()
Expand All @@ -224,7 +224,7 @@ internal static void DFJoinSample()
}

/// <summary>
/// Sample to intersect DataFrame using DSL
/// Sample to intersect DataFrames using DSL.
/// </summary>
[Sample]
internal static void DFIntersectSample()
Expand All @@ -237,7 +237,57 @@ internal static void DFIntersectSample()
}

/// <summary>
/// Sample to perform aggregatoin on DataFrame using DSL
/// Sample to unionAll DataFrames using DSL.
/// </summary>
[Sample]
internal static void DFUnionAllSample()
{
var peopleDataFrame1 = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
var peopleDataFrame2 = peopleDataFrame1.Filter("name = 'Bill'");

var unioned = peopleDataFrame1.UnionAll(peopleDataFrame2);
unioned.Show();
}

/// <summary>
/// Sample to subtract a DataFrame from another DataFrame using DSL.
/// </summary>
[Sample]
internal static void DFSubtractSample()
{
var peopleDataFrame1 = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
var peopleDataFrame2 = peopleDataFrame1.Filter("name = 'Bill'");

var subtracted = peopleDataFrame1.Subtract(peopleDataFrame2);
subtracted.Show();
}

/// <summary>
/// Sample to subtract a DataFrame from another DataFrame using DSL.
/// </summary>
[Sample]
internal static void DFDropSample()
{
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));

var dropped = peopleDataFrame.Drop("name");
dropped.Show();
}

/// <summary>
/// Sample to subtract a DataFrame from another DataFrame using DSL.
/// </summary>
[Sample]
internal static void DFDropNaSample()
{
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));

var dropped = peopleDataFrame.DropNa(thresh: 2, subset: new []{"name", "address"});
dropped.Show();
}

/// <summary>
/// Sample to perform aggregation on DataFrame using DSL
/// </summary>
[Sample]
internal static void DFAggregateDSLSample()
Expand Down
3 changes: 2 additions & 1 deletion csharp/Samples/Microsoft.Spark.CSharp/data/people.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"id":"123", "name":"Bill", "age":34, "address":{"city":"Columbus","state":"Ohio"}}
{"id":"456", "name":"Steve", "age":14, "address":{"city":null, "state":"California"}}
{"id":"789", "name":"Bill", "age":43, "address":{"city":"Seattle","state":"Washington"}}
{ "id": "789", "name": "Bill", "age": 43, "address": { "city": "Seattle", "state": "Washington" } }
{ "id": "531", "name": "Satya", "age": 46, "address": null }

0 comments on commit ebc177b

Please sign in to comment.