Skip to content

Commit

Permalink
Add DataFrame.Write()
Browse files Browse the repository at this point in the history
  • Loading branch information
tawan0109 committed Dec 14, 2015
1 parent 5fd923f commit 1147585
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Build.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ call precheck.cmd
if %precheck% == "bad" (goto :eof)

@rem Windows 7/8/10 may not allow powershell scripts by default
powershell -Command Set-ExecutionPolicy Unrestricted
powershell -Command Set-ExecutionPolicy -Scope CurrentUser Unrestricted

@rem download build tools
pushd %~dp0
Expand Down
4 changes: 4 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@
<Compile Include="Interop\Ipc\SerDe.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Proxy\IDataFrameProxy.cs" />
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
<Compile Include="Proxy\IDStreamProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
Expand All @@ -107,8 +109,10 @@
<Compile Include="Services\LoggerServiceFactory.cs" />
<Compile Include="Sql\Column.cs" />
<Compile Include="Sql\DataFrame.cs" />
<Compile Include="Sql\DataFrameWriter.cs" />
<Compile Include="Sql\Row.cs" />
<Compile Include="Sql\Functions.cs" />
<Compile Include="Sql\SaveMode.cs" />
<Compile Include="Sql\SqlContext.cs" />
<Compile Include="Sql\Struct.cs" />
<Compile Include="Sql\UserDefinedFunction.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ internal interface IDataFrameProxy
void Unpersist(bool blocking = true);
IDataFrameProxy Repartition(int numPartitions);
IDataFrameProxy Sample(bool withReplacement, double fraction, long seed);
IDataFrameWriterProxy Write();
}

internal interface IUDFProxy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;

namespace Microsoft.Spark.CSharp.Proxy
{
internal interface IDataFrameWriterProxy
{
void Mode(string saveMode);
void Format(string source);
void Options(Dictionary<string, string> options);
void PartitionBy(params string[] colNames);
void Save();
void InsertInto(string tableName);
void SaveAsTable(string tableName);
void Jdbc(string url, string table, Dictionary<string, string> properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ public IDataFrameProxy Sample(bool withReplacement, double fraction, long seed)
jvmDataFrameReference, "sample",
new object[] { withReplacement, fraction, seed }).ToString()), sqlContextProxy);
}

public IDataFrameWriterProxy Write()
{
return new DataFrameWriterIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "write").ToString()));
}
}

internal class UDFIpcProxy : IUDFProxy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Interop.Ipc;

namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
internal class DataFrameWriterIpcProxy : IDataFrameWriterProxy
{
private readonly JvmObjectReference jvmDataFrameWriterReference;

internal DataFrameWriterIpcProxy(JvmObjectReference jvmDataFrameWriterReference)
{
this.jvmDataFrameWriterReference = jvmDataFrameWriterReference;
}

public void Mode(string saveMode)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "mode", new object[] { saveMode });
}

public void Format(string source)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "format", new object[] { source });
}

public void Options(Dictionary<string, string> options)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "options", new object[] { options });
}

public void PartitionBy(params string[] colNames)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "partitionBy", new object[] { colNames });
}

public void Save()
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameWriterReference, "save");
}

public void InsertInto(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "insertInto", new object[] { tableName });
}

public void SaveAsTable(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "saveAsTable", new object[] { tableName });
}

public void Jdbc(string url, string table, Dictionary<string, string> properties)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "jdbc", new object[] { url, table, properties });
}
}
}
8 changes: 8 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,14 @@ public void Foreach(Action<Row> f)
{
Rdd.Foreach(f);
}

/// <summary>
/// Interface for saving the content of the DataFrame out into external storage.
/// </summary>
public DataFrameWriter Write()
{
return new DataFrameWriter(dataFrameProxy.Write());
}
}

public class JoinType
Expand Down
170 changes: 170 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrameWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using Microsoft.Spark.CSharp.Proxy;

namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// Interface used to write a DataFrame to external storage systems (e.g. file systems,
/// key-value stores, etc). Use DataFrame.Write to access this.
///
/// See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
/// </summary>
public class DataFrameWriter
{
internal IDataFrameWriterProxy DataFrameWriterProxy
{
get { return dataFrameWriterProxy; }
}

private readonly IDataFrameWriterProxy dataFrameWriterProxy;

internal DataFrameWriter(IDataFrameWriterProxy dataFrameWriterProxy)
{
this.dataFrameWriterProxy = dataFrameWriterProxy;
}

/// <summary>
/// Specifies the behavior when data or table already exists. Options include:
/// - `SaveMode.Overwrite`: overwrite the existing data.
/// - `SaveMode.Append`: append the data.
/// - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
/// - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
/// </summary>
public DataFrameWriter Mode(SaveMode saveMode)
{
return Mode(saveMode.GetStringValue());
}

/// <summary>
/// Specifies the behavior when data or table already exists. Options include:
/// - `SaveMode.Overwrite`: overwrite the existing data.
/// - `SaveMode.Append`: append the data.
/// - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
/// - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
/// </summary>
public DataFrameWriter Mode(string saveMode)
{
dataFrameWriterProxy.Mode(saveMode);
return this;
}

/// <summary>
/// Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
/// </summary>
public DataFrameWriter Format(string source)
{
dataFrameWriterProxy.Format(source);
return this;
}

/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
public DataFrameWriter Option(string key, string value)
{
var options = new Dictionary<string, string>() { { key, value } };
return Options(options);
}

/// <summary>
/// Adds output options for the underlying data source.
/// </summary>
public DataFrameWriter Options(Dictionary<string,string> options)
{
dataFrameWriterProxy.Options(options);
return this;
}

/// <summary>
/// Partitions the output by the given columns on the file system. If specified, the output is
/// laid out on the file system similar to Hive's partitioning scheme.
///
/// This is only applicable for Parquet at the moment.
/// </summary>
public DataFrameWriter PartitionBy(params string[] colNames)
{
dataFrameWriterProxy.PartitionBy(colNames);
return this;
}

/// <summary>
/// Saves the content of the DataFrame at the specified path.
/// </summary>
public void Save(string path)
{
Option("path", path).Save();
}

/// <summary>
/// Saves the content of the DataFrame as the specified table.
/// </summary>
public void Save()
{
dataFrameWriterProxy.Save();
}

/// <summary>
/// Inserts the content of the DataFrame to the specified table. It requires that
/// the schema of the DataFrame is the same as the schema of the table.
/// Because it inserts data to an existing table, format or options will be ignored.
/// </summary>
public void InsertInto(string tableName)
{
dataFrameWriterProxy.InsertInto(tableName);
}

/// <summary>
/// Saves the content of the DataFrame as the specified table.
/// In the case the table already exists, behavior of this function depends on the
/// save mode, specified by the `mode` function (default to throwing an exception).
/// When `mode` is `Overwrite`, the schema of the DataFrame does not need to be
/// the same as that of the existing table.
/// When `mode` is `Append`, the schema of the DataFrame need to be
/// the same as that of the existing table, and format or options will be ignored.
/// </summary>
public void SaveAsTable(string tableName)
{
dataFrameWriterProxy.SaveAsTable(tableName);
}

/// <summary>
/// Saves the content of the DataFrame to a external database table via JDBC. In the case the
/// table already exists in the external database, behavior of this function depends on the
/// save mode, specified by the `mode` function (default to throwing an exception).
///
/// Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
/// your external database systems.
/// </summary>
/// <param name="url">JDBC database url of the form `jdbc:subprotocol:subname`</param>
/// <param name="table">Name of the table in the external database.</param>
/// <param name="properties">JDBC database connection arguments, a list of arbitrary string tag/value.
/// Normally at least a "user" and "password" property should be included.</param>
public void Jdbc(string url, string table, Dictionary<string, string> properties)
{
dataFrameWriterProxy.Jdbc(url, table, properties);
}

/// <summary>
/// Saves the content of the DataFrame in JSON format at the specified path.
/// This is equivalent to:
/// Format("json").Save(path)
/// </summary>
public void Json(string path)
{
Format("json").Save(path);
}

/// <summary>
/// Saves the content of the DataFrame in JSON format at the specified path.
/// This is equivalent to:
/// Format("parquet").Save(path)
/// </summary>
public void Parquet(string path)
{
Format("parquet").Save(path);
}
}
}
54 changes: 54 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Sql/SaveMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
/// </summary>
public enum SaveMode
{
/// <summary>
/// Append mode means that when saving a DataFrame to a data source, if data/table already exists,
/// contents of the DataFrame are expected to be appended to existing data.
/// </summary>
Append,

/// <summary>
/// Overwrite mode means that when saving a DataFrame to a data source,
/// if data/table already exists, existing data is expected to be overwritten by the contents of
/// the DataFrame.
/// </summary>
Overwrite,

/// <summary>
/// ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
/// an exception is expected to be thrown.
/// </summary>
ErrorIfExists,

/// <summary>
/// Ignore mode means that when saving a DataFrame to a data source, if data already exists,
/// the save operation is expected to not save the contents of the DataFrame and to not
/// change the existing data.
/// </summary>
Ignore
}

/// <summary>
/// For SaveMode.ErrorIfExists, the corresponding literal string in spark is "error" or "default".
/// </summary>
public static class SaveModeExtensions
{
public static string GetStringValue(this SaveMode mode)
{
switch (mode)
{
case SaveMode.ErrorIfExists:
return "error";
default:
return mode.ToString();
}
}
}
}
1 change: 1 addition & 0 deletions csharp/AdapterTest/AdapterTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<Otherwise />
</Choose>
<ItemGroup>
<Compile Include="DataFrameWriterTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestWithMoqDemo.cs" />
<Compile Include="Mocks\MockStructTypeProxy.cs" />
Expand Down
Loading

0 comments on commit 1147585

Please sign in to comment.