Skip to content

Commit

Permalink
expose HadoopConfiguration in SparkContext
Browse files Browse the repository at this point in the history
  • Loading branch information
skaarthik authored Aug 12, 2016
1 parent 4944319 commit a26d7d3
Show file tree
Hide file tree
Showing 14 changed files with 1,192 additions and 945 deletions.
3 changes: 3 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<Compile Include="Configuration\IConfigurationService.cs" />
<Compile Include="Core\Accumulator.cs" />
<Compile Include="Core\Broadcast.cs" />
<Compile Include="Core\HadoopConfiguration.cs" />
<Compile Include="Core\Option.cs" />
<Compile Include="Core\Partitioner.cs" />
<Compile Include="Core\RDDCollector.cs" />
Expand Down Expand Up @@ -105,11 +106,13 @@
<Compile Include="Proxy\IDataFrameReaderProxy.cs" />
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
<Compile Include="Proxy\IDStreamProxy.cs" />
<Compile Include="Proxy\IHadoopConfigurationProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameNaFunctionsIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameReaderIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
<Compile Include="Proxy\Ipc\HadoopConfigurationIpcProxy.cs" />
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SqlContextIpcProxy.cs" />
Expand Down
40 changes: 40 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/HadoopConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Microsoft.Spark.CSharp.Proxy;

namespace Microsoft.Spark.CSharp.Core
{
/// <summary>
/// Configuration for Hadoop operations
/// </summary>
public class HadoopConfiguration
{
private readonly IHadoopConfigurationProxy hadoopConfigurationProxy;
internal HadoopConfiguration(IHadoopConfigurationProxy hadoopConfProxy)
{
hadoopConfigurationProxy = hadoopConfProxy;
}

/// <summary>
/// Sets a property value to HadoopConfiguration
/// </summary>
/// <param name="name">Name of the property</param>
/// <param name="value">Value of the property</param>
public void Set(string name, string value)
{
hadoopConfigurationProxy.Set(name, value);
}

/// <summary>
/// Gets the value of a property from HadoopConfiguration
/// </summary>
/// <param name="name">Name of the property</param>
/// <param name="defaultValue">Default value if the property is not available in the configuration</param>
/// <returns></returns>
public string Get(string name, string defaultValue)
{
return hadoopConfigurationProxy.Get(name, defaultValue);
}
}
}
5 changes: 5 additions & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/SparkContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public int DefaultMinPartitions
/// </summary>
public StatusTracker StatusTracker { get { return new StatusTracker(SparkContextProxy.StatusTracker); } }

/// <summary>
/// Configuration for Hadoop usage in Spark
/// </summary>
public HadoopConfiguration HadoopConfiguration { get { return new HadoopConfiguration(SparkContextProxy.HadoopConfiguration); }}

/// <summary>
/// Initializes a SparkContext instance with a specific master, application name, and spark home
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Spark.CSharp.Proxy
{
interface IHadoopConfigurationProxy
{
void Set(string name, string value);
string Get(string name, string defaultValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal interface ISparkContextProxy
long StartTime { get; }
int DefaultParallelism { get; }
int DefaultMinPartitions { get; }
IHadoopConfigurationProxy HadoopConfiguration { get; }
void Stop();
IRDDProxy EmptyRDD();
IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Diagnostics.CodeAnalysis;
using Microsoft.Spark.CSharp.Interop.Ipc;

namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class HadoopConfigurationIpcProxy : IHadoopConfigurationProxy
{
private readonly JvmObjectReference jvmHadoopConfigurationReference;
public HadoopConfigurationIpcProxy(JvmObjectReference jHadoopConf)
{
jvmHadoopConfigurationReference = jHadoopConf;
}

public void Set(string name, string value)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmHadoopConfigurationReference, "set", new object[] { name, value });
}

public string Get(string name, string defaultvalue)
{
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmHadoopConfigurationReference, "get", new object[] { name, defaultvalue }).ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@ public int DefaultMinPartitions
{
get { if (defaultMinPartitions == null) { defaultMinPartitions = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultMinPartitions"); } return (int)defaultMinPartitions; }
}

private IHadoopConfigurationProxy hadoopConfiguration;
public IHadoopConfigurationProxy HadoopConfiguration
{
get
{
return hadoopConfiguration ??
(hadoopConfiguration =
new HadoopConfigurationIpcProxy(
new JvmObjectReference(
(string)
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference,
"hadoopConfiguration"))));
}
}

public void Accumulator(int port)
{
jvmAccumulatorReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "accumulator",
Expand Down
Loading

0 comments on commit a26d7d3

Please sign in to comment.