diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj index 741588ac..330e7506 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj @@ -62,6 +62,7 @@ + @@ -105,11 +106,13 @@ + + diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Core/HadoopConfiguration.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Core/HadoopConfiguration.cs new file mode 100644 index 00000000..a93c3ecf --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Core/HadoopConfiguration.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Microsoft.Spark.CSharp.Proxy; + +namespace Microsoft.Spark.CSharp.Core +{ + /// + /// Configuration for Hadoop operations + /// + public class HadoopConfiguration + { + private readonly IHadoopConfigurationProxy hadoopConfigurationProxy; + internal HadoopConfiguration(IHadoopConfigurationProxy hadoopConfProxy) + { + hadoopConfigurationProxy = hadoopConfProxy; + } + + /// + /// Sets a property value to HadoopConfiguration + /// + /// Name of the property + /// Value of the property + public void Set(string name, string value) + { + hadoopConfigurationProxy.Set(name, value); + } + + /// + /// Gets the value of a property from HadoopConfiguration + /// + /// Name of the property + /// Default value if the property is not available in the configuration + /// + public string Get(string name, string defaultValue) + { + return hadoopConfigurationProxy.Get(name, defaultValue); + } + } +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Core/SparkContext.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Core/SparkContext.cs index 9b362306..f16220c0 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Core/SparkContext.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Core/SparkContext.cs @@ -88,6 +88,11 @@ public int DefaultMinPartitions /// public StatusTracker StatusTracker { get { return new StatusTracker(SparkContextProxy.StatusTracker); } } + /// + /// Configuration for Hadoop usage in Spark + /// + public HadoopConfiguration HadoopConfiguration { get { return new HadoopConfiguration(SparkContextProxy.HadoopConfiguration); }} + /// /// Initializes a SparkContext instance with a specific master, application name, and spark home /// diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IHadoopConfigurationProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IHadoopConfigurationProxy.cs new file mode 100644 index 00000000..97a3072a --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IHadoopConfigurationProxy.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.Spark.CSharp.Proxy +{ + interface IHadoopConfigurationProxy + { + void Set(string name, string value); + string Get(string name, string defaultValue); + } +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs index 51691b5a..51324332 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs @@ -27,6 +27,7 @@ internal interface ISparkContextProxy long StartTime { get; } int DefaultParallelism { get; } int DefaultMinPartitions { get; } + IHadoopConfigurationProxy HadoopConfiguration { get; } void Stop(); IRDDProxy EmptyRDD(); IRDDProxy Parallelize(IEnumerable values, int numSlices); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/HadoopConfigurationIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/HadoopConfigurationIpcProxy.cs new file mode 100644 index 00000000..404cfafe --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/HadoopConfigurationIpcProxy.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Diagnostics.CodeAnalysis; +using Microsoft.Spark.CSharp.Interop.Ipc; + +namespace Microsoft.Spark.CSharp.Proxy.Ipc +{ + [ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured + internal class HadoopConfigurationIpcProxy : IHadoopConfigurationProxy + { + private readonly JvmObjectReference jvmHadoopConfigurationReference; + public HadoopConfigurationIpcProxy(JvmObjectReference jHadoopConf) + { + jvmHadoopConfigurationReference = jHadoopConf; + } + + public void Set(string name, string value) + { + SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmHadoopConfigurationReference, "set", new object[] { name, value }); + } + + public string Get(string name, string defaultvalue) + { + return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmHadoopConfigurationReference, "get", new object[] { name, defaultvalue }).ToString(); + } + } +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs index 621ecffa..6521b8d9 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs @@ -111,6 +111,22 @@ public int DefaultMinPartitions { get { if (defaultMinPartitions == null) { defaultMinPartitions = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultMinPartitions"); } return (int)defaultMinPartitions; } } + + private IHadoopConfigurationProxy hadoopConfiguration; + public IHadoopConfigurationProxy HadoopConfiguration + { + get + { + return hadoopConfiguration ?? + (hadoopConfiguration = + new HadoopConfigurationIpcProxy( + new JvmObjectReference( + (string) + SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, + "hadoopConfiguration")))); + } + } + public void Accumulator(int port) { jvmAccumulatorReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "accumulator", diff --git a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML index 7c974d22..606a6150 100644 --- a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML +++ b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML @@ -10,21 +10,6 @@ to be used in SparkCLR runtime - - - Helps getting config settings to be used in SparkCLR runtime - - - - - The full path of the CSharp external backend worker process executable. - - - - - The port number used for communicating with the CSharp external backend worker process. - - Default configuration for SparkCLR jobs. @@ -88,6 +73,21 @@ Indicates service is running in Yarn + + + Helps getting config settings to be used in SparkCLR runtime + + + + + The port number used for communicating with the CSharp external backend worker process. + + + + + The full path of the CSharp external backend worker process executable. + + A shared variable that can be accumulated, i.e., has a commutative and associative "add" @@ -127,6 +127,12 @@ The Identity of the accumulator The value of the accumulator + + + Gets or sets the value of the accumulator; only usable in driver program + + + Adds a term to this accumulator's value @@ -148,12 +154,6 @@ A string representation of the current accumulator - - - Gets or sets the value of the accumulator; only usable in driver program - - - An AccumulatorParam that uses the + operators to add values. Designed for simple types @@ -216,16 +216,23 @@ The type of element in Broadcast + + + Return the broadcasted value + + Delete cached copies of this broadcast on the executors. - + - Return the broadcasted value + Sets value to HadoopConfiguration + Name of the property + Value of the property @@ -245,6 +252,11 @@ The value to be associated with the new instance. + + + Indicates whether the option value is defined. + + Returns the value of the option if Option.IsDefined is TRUE; @@ -252,11 +264,6 @@ - - - Indicates whether the option value is defined. - - An object that defines how the elements in a key-value pair RDD are partitioned by key. @@ -293,11 +300,6 @@ Used for collect operation on RDD - - - Interface for collect operation on RDD - - Extra functions available on RDDs of Doubles through an implicit conversion. @@ -415,6 +417,11 @@ + + + Interface for collect operation on RDD + + Extra functions available on RDDs of (key, value) pairs where the key is sortable through @@ -1052,6 +1059,14 @@ path to sequence file (None by default) + + + These classes are defined explicitly and marked as [Serializable]instead of using anonymous method as delegate to + prevent C# compiler from generating private anonymous type that is not serializable. Since the delegate has to be + serialized and sent to the Spark workers for execution, it is necessary to have the type marked [Serializable]. + These classes are to work around the limitation on the serializability of compiler generated types + + Converts a collection to a list where the element type is Option(T) type. @@ -1061,14 +1076,6 @@ The element type in the collection A list that use Option(T) as element type - - - These classes are defined explicitly and marked as [Serializable]instead of using anonymous method as delegate to - prevent C# compiler from generating private anonymous type that is not serializable. Since the delegate has to be - serialized and sent to the Spark workers for execution, it is necessary to have the type marked [Serializable]. - These classes are to work around the limitation on the serializability of compiler generated types - - Wraps C#-based transformations that can be executed within a stage. It helps avoid unnecessary Ser/De of data between @@ -1076,6 +1083,50 @@ + + + Return a new RDD by applying a function to each partition of this RDD, + while tracking the index of the original partition. + + The element type + The function to be applied to each partition + Indicates if it preserves partition parameters + A new RDD + + + + This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating + private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers + for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation + on the serializability of compiler generated types + + + + + A bounded priority queue implemented with max binary heap. + + Construction steps: + 1. Build a Max Heap of the first k elements. + 2. For each element after the kth element, compare it with the root of the max heap, + a. If the element is less than the root, replace root with this element, heapify. + b. Else ignore it. + + + + + Constructor of PriorityQueue type. + + + + + Inserts the specified element into this priority queue. + + + + + A class represents a profiler + + Represents a Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -1095,6 +1146,21 @@ Indicates whether the RDD is checkpointed. + + + Return whether this RDD has been cached or not + + + + + Return whether this RDD has been checkpointed or not + + + + + Return the name of this RDD. + + Persist this RDD with the default storage level . @@ -1651,7 +1717,7 @@ n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from - >>> sc.Parallelize(new string[] { "a", "b", "c", "d" }, 1).ZipWithIndex().Collect() + >>> sc.Parallelize(new string[] { "a", "b", "c", "d" }, 1).ZipWithIndex().Collect() [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] @@ -1705,44 +1771,6 @@ the seed for the Random number generator A random sub-sample of the RDD without replacement. - - - Return whether this RDD has been cached or not - - - - - Return whether this RDD has been checkpointed or not - - - - - Return the name of this RDD. - - - - - Return a new RDD by applying a function to each partition of this RDD, - while tracking the index of the original partition. - - The element type - The function to be applied to each partition - Indicates if it preserves partition parameters - A new RDD - - - - This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating - private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers - for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation - on the serializability of compiler generated types - - - - - A class represents a profiler - - Some useful utility functions for RDD{string} @@ -1953,6 +1981,46 @@ Get existing SparkContext + + + Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be changed at runtime. + + + + + The version of Spark on which this application is running. + + + + + Return the epoch time when the Spark Context was started. + + + + + Default level of parallelism to use when not given by user (e.g. for reduce tasks) + + + + + Default min number of partitions for Hadoop RDDs when not given by user + + + + + Get SPARK_USER for user who is running SparkContext. + + + + + Return :class:`StatusTracker` object + + + + + Configuration for Hadoop usage in Spark + + Initializes a SparkContext instance with a specific master, application name, and spark home @@ -2295,36 +2363,6 @@ Cancel all jobs that have been scheduled or are running. - - - The version of Spark on which this application is running. - - - - - Return the epoch time when the Spark Context was started. - - - - - Default level of parallelism to use when not given by user (e.g. for reduce tasks) - - - - - Default min number of partitions for Hadoop RDDs when not given by user - - - - - Get SPARK_USER for user who is running SparkContext. - - - - - Return :class:`StatusTracker` object - - A class for tracking the statistics of a set of numbers (count, mean and variance) in a numerically @@ -2370,14 +2408,6 @@ - - - Returns a string that represents this StatCounter. - - - A string that represents this StatCounter. - - Gets the count number of this StatCounter @@ -2423,6 +2453,14 @@ Return the sample standard deviation of the values, which corrects for bias in estimating the variance by dividing by N-1 instead of N. + + + Returns a string that represents this StatCounter. + + + A string that represents this StatCounter. + + Low-level status reporting APIs for monitoring job and stage progress. @@ -2691,26 +2729,21 @@ Adjust checkCount adaptively according to current weak reference objects count - + - Maximum running duration for checking thread each time - - - - - It can be an expensive operation. ** Do not use ** unless there is a real need for this method + Sleep time for checking thread - - + - Sleep time for checking thread + Maximum running duration for checking thread each time - + - Contains everything needed to setup an environment for using C# with Spark + It can be an expensive operation. ** Do not use ** unless there is a real need for this method + @@ -2929,6 +2962,11 @@ The stream to write The string to write + + + Contains everything needed to setup an environment for using C# with Spark + + ByteBuf delimits a section of a ByteBufChunk. @@ -2956,6 +2994,66 @@ 0 == readerIndex == writerIndex == capacity + + + Gets the underlying array. + + + + + Gets the total number of elements in the range delimited by the ByteBuf. + + + + + Gets the position of the first element in the range delimited + by the ByteBuf, relative to the start of the original array. + + + + + Returns the ByteBuf chunk that contains this ByteBuf. + + + + + Returns the number of readable bytes which is equal to (writerIndex - readerIndex). + + + + + Returns the number of writable bytes which is equal to (capacity - writerIndex). + + + + + Gets the underlying unsafe array. + + + + + Gets or sets the readerIndex of this ByteBuf + + + + + + Gets or sets the writerIndex of this ByteBuf + + + + + + Returns the position of the readerIndex element in the range delimited + by the ByteBuf, relative to the start of the original array. + + + + + Returns the position of the readerIndex element in the range delimited + by the ByteBuf, relative to the start of the original array. + + Sets the readerIndex and writerIndex of this buffer to 0. @@ -3031,90 +3129,78 @@ Creates an empty ByteBuf with error status. - - - Gets the underlying array. - - - - - Gets the total number of elements in the range delimited by the ByteBuf. - - - + - Gets the position of the first element in the range delimited - by the ByteBuf, relative to the start of the original array. + ByteBufChunk represents a memory blocks that can be allocated from + .Net heap (managed code) or process heap(unsafe code) - + - Returns the ByteBuf chunk that contains this ByteBuf. + The ByteBufChunkList that contains this ByteBufChunk - + - Returns the number of readable bytes which is equal to (writerIndex - readerIndex). + The previous ByteBufChunk in linked like list - + - Returns the number of writable bytes which is equal to (capacity - writerIndex). + The next ByteBufChunk in linked like list - + - Gets the underlying unsafe array. + Finalizer. - + - Gets or sets the readerIndex of this ByteBuf + Returns the underlying array that is used for managed code. - - + - Gets or sets the writerIndex of this ByteBuf + Returns the buffer Id that registered as RIO buffer. + Only apply to unsafe ByteBufChunk - - + - Returns the position of the readerIndex element in the range delimited - by the ByteBuf, relative to the start of the original array. + Returns the unused bytes in this chunk. - + - Returns the position of the readerIndex element in the range delimited - by the ByteBuf, relative to the start of the original array. + Indicates whether this ByteBufChunk is disposed. - + - ByteBufChunk represents a memory blocks that can be allocated from - .Net heap (managed code) or process heap(unsafe code) + Indicates whether the underlying buffer array is a unsafe type array. + The unsafe array is used for PInvoke with native code. - + - The ByteBufChunkList that contains this ByteBufChunk + Returns the ByteBufPool that this ByteBufChunk belongs to. - + - The previous ByteBufChunk in linked like list + Returns the size of the ByteBufChunk - + - The next ByteBufChunk in linked like list + Returns the percentage of the current usage of the chunk - + - Finalizer. + Returns the IntPtr that points to beginning of the cached heap block. + This is used for PInvoke with native code. @@ -3161,54 +3247,6 @@ Implementation of the Dispose pattern. - - - Returns the underlying array that is used for managed code. - - - - - Returns the buffer Id that registered as RIO buffer. - Only apply to unsafe ByteBufChunk - - - - - Returns the unused bytes in this chunk. - - - - - Indicates whether this ByteBufChunk is disposed. - - - - - Indicates whether the underlying buffer array is a unsafe type array. - The unsafe array is used for PInvoke with native code. - - - - - Returns the ByteBufPool that this ByteBufChunk belongs to. - - - - - Returns the size of the ByteBufChunk - - - - - Returns the percentage of the current usage of the chunk - - - - - Returns the IntPtr that points to beginning of the cached heap block. - This is used for PInvoke with native code. - - Segment struct delimits a section of a byte chunk. @@ -3312,28 +3350,6 @@ Used to caculate chunk size and ensures it is a multiple of a segment size Indicates whether allocates memory from process's heap - - - Allocates a ByteBuf from this ByteBufPool to use. - - A ByteBuf contained in this ByteBufPool - - - - Deallocates a ByteBuf back to this ByteBufPool. - - The ByteBuf to be release. - - - - Gets a readable string for this ByteBufPool - - - - - Returns the chunk numbers in each queue. - - Gets the default byte buffer pool instance for managed memory. @@ -3362,75 +3378,34 @@ Returns the size of a ByteChunk in this ByteBufPool - + - A simple wrapper of System.Net.Sockets.Socket class. + Allocates a ByteBuf from this ByteBufPool to use. + A ByteBuf contained in this ByteBufPool - + - ISocketWrapper interface defines the common methods to operate a socket (traditional socket or - Windows Registered IO socket) + Deallocates a ByteBuf back to this ByteBufPool. + The ByteBuf to be release. - + - Accepts a incoming connection request. + Gets a readable string for this ByteBufPool - A ISocket instance used to send and receive data - + - Close the ISocket connections and releases all associated resources. + Returns the chunk numbers in each queue. - + - Establishes a connection to a remote host that is specified by an IP address and a port number + A simple wrapper of System.Net.Sockets.Socket class. - The IP address of the remote host - The port number of the remote host - - - Returns a stream used to send and receive data. - - The underlying Stream instance that be used to send and receive data - - - - Starts listening for incoming connections requests - - The maximum length of the pending connections queue. - - - - Receives network data from this socket, and returns a ByteBuf that contains the received data. - - A ByteBuf object that contains received data. - - - - Sends data to this socket with a ByteBuf object that contains data to be sent. - - A ByteBuf object that contains data to be sent - - - - Indicates whether there are data that has been received from the network and is available to be read. - - - - - Returns the local endpoint. - - - - - Returns the remote endpoint - - - + Default constructor that creates a new instance of DefaultSocket class which represents a traditional socket (System.Net.Socket.Socket). @@ -3525,6 +3500,69 @@ Returns the remote endpoint if it has one. + + + ISocketWrapper interface defines the common methods to operate a socket (traditional socket or + Windows Registered IO socket) + + + + + Accepts a incoming connection request. + + A ISocket instance used to send and receive data + + + + Close the ISocket connections and releases all associated resources. + + + + + Establishes a connection to a remote host that is specified by an IP address and a port number + + The IP address of the remote host + The port number of the remote host + + + + Returns a stream used to send and receive data. + + The underlying Stream instance that be used to send and receive data + + + + Starts listening for incoming connections requests + + The maximum length of the pending connections queue. + + + + Receives network data from this socket, and returns a ByteBuf that contains the received data. + + A ByteBuf object that contains received data. + + + + Sends data to this socket with a ByteBuf object that contains data to be sent. + + A ByteBuf object that contains data to be sent + + + + Indicates whether there are data that has been received from the network and is available to be read. + + + + + Returns the local endpoint. + + + + + Returns the remote endpoint + + RioNative class imports and initializes RIOSock.dll for use with RIO socket APIs. @@ -3547,6 +3585,11 @@ it must be called before calling EnsureRioLoaded() + + + Gets the connection table that contains all connections. + + Ensures that the native dll of RIO socket is loaded and initialized. @@ -3562,11 +3605,6 @@ Initializes RIOSock native library. - - - Gets the connection table that contains all connections. - - The RioResult structure contains data used to indicate request completion results used with RIO socket @@ -3594,6 +3632,26 @@ Finalizer + + + Indicates whether there are data that has been received from the network and is available to be read. + + + + + Returns the local endpoint. + + + + + Returns the remote endpoint + + + + + Returns the handle of native socket. + + Accepts a incoming connection request. @@ -3687,26 +3745,6 @@ Generates a unique key from a GUID. - - - Indicates whether there are data that has been received from the network and is available to be read. - - - - - Returns the local endpoint. - - - - - Returns the remote endpoint - - - - - Returns the handle of native socket. - - SaeaSocketWrapper class is a wrapper of a socket that use SocketAsyncEventArgs class @@ -3729,6 +3767,21 @@ Finalizer. + + + Indicates whether there are data that has been received from the network and is available to be read. + + + + + Returns the local endpoint. + + + + + Returns the remote endpoint + + Accepts a incoming connection request. @@ -3815,21 +3868,6 @@ If all of the data has NOT been sent, then it calls PostSend to send more data. - - - Indicates whether there are data that has been received from the network and is available to be read. - - - - - Returns the local endpoint. - - - - - Returns the remote endpoint - - Provides the underlying stream of data for network access. @@ -3848,6 +3886,36 @@ a RioSocketWrapper object + + + Indicates that data can be read from the stream. + This property always returns + + + + + Indicates that the stream can seek a specific location in the stream. + This property always returns + + + + + Indicates that data can be written to the stream. + This property always returns + + + + + The length of data available on the stream. + Always throws . + + + + + Gets or sets the position in the stream. + Always throws . + + Flushes data from the stream. This is meaningless for us, so it does nothing. @@ -3889,36 +3957,6 @@ Offset into the buffer from where we'll start writing. Number of bytes to write. - - - Indicates that data can be read from the stream. - This property always returns - - - - - Indicates that the stream can seek a specific location in the stream. - This property always returns - - - - - Indicates that data can be written to the stream. - This property always returns - - - - - The length of data available on the stream. - Always throws . - - - - - Gets or sets the position in the stream. - Always throws . - - SockDataToken class is used to associate with the SocketAsyncEventArgs object. @@ -3960,6 +3998,11 @@ only the application is running on a Windows OS that supports Registered IO socket. + + + Set socket wrapper type only for internal use (unit test) + + Creates a ISocket instance based on the configuration and OS version. @@ -3975,11 +4018,6 @@ Indicates whether current OS supports RIO socket. - - - Set socket wrapper type only for internal use (unit test) - - SocketWrapperType defines the socket wrapper type be used in transport. @@ -4167,162 +4205,162 @@ Right now it just prints out the messages to Console - - - Defines a logger what be used in service - - - + Get an instance of ILoggerService by a given type of logger The type of a logger to return An instance of ILoggerService - + Logs a message at debug level. The message to be logged - + Logs a message at debug level with a format string. The format string The array of arguments - + Logs a message at info level. The message to be logged - + Logs a message at info level with a format string. The format string The array of arguments - + Logs a message at warning level. The message to be logged - + Logs a message at warning level with a format string. The format string The array of arguments - + Logs a fatal message. The message to be logged - + Logs a fatal message with a format string. The format string The array of arguments - + Logs a error message. The message to be logged - + Logs a error message with a format string. The format string The array of arguments - + Logs an exception The exception to be logged - + + + Defines a logger what be used in service + + + Get an instance of ILoggerService by a given type of logger The type of a logger to return An instance of ILoggerService - + Logs a message at debug level. The message to be logged - + Logs a message at debug level with a format string. The format string The array of arguments - + Logs a message at info level. The message to be logged - + Logs a message at info level with a format string. The format string The array of arguments - + Logs a message at warning level. The message to be logged - + Logs a message at warning level with a format string. The format string The array of arguments - + Logs a fatal message. The message to be logged - + Logs a fatal message with a format string. The format string The array of arguments - + Logs a error message. The message to be logged - + Logs a error message with a format string. The format string The array of arguments - + Logs an exception @@ -4665,6 +4703,27 @@ See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame + + + Represents the content of the DataFrame as an RDD of Rows. + + + + + Returns true if the collect and take methods can be run locally (without any Spark executors). + + + + + Returns the schema of this DataFrame. + + + + + Returns a column for a given column name. + + The name of column + Registers this DataFrame as a temporary table using the given name. The lifetime of this @@ -5220,27 +5279,6 @@ SaveMode specified by mode, and a set of options. - - - Represents the content of the DataFrame as an RDD of Rows. - - - - - Returns true if the collect and take methods can be run locally (without any Spark executors). - - - - - Returns the schema of this DataFrame. - - - - - Returns a column for a given column name. - - The name of column - The type of join operation for DataFrame @@ -5661,470 +5699,135 @@ It supports running both SQL and HiveQL commands. - - - The entry point for working with structured data (rows and columns) in Spark. - Allows the creation of [[DataFrame]] objects as well as the execution of SQL queries. - - - + - Creates a SqlContext + Creates a HiveContext - + - Get the existing SQLContext or create a new one with given SparkContext. + Invalidate and refresh all the cached the metadata of the given table. + For performance reasons, Spark SQL or the external data source library it uses + might cache certain metadata about a table, such as the location of blocks. + When those change outside of Spark SQL, users should call this function to invalidate the cache. - - + - + - Returns a new SQLContext as new session, that has separate SQLConf, - registered temporary tables and UDFs, but shared SparkContext and table cache. + Used for SerDe of Python objects - - + - Returns the value of Spark SQL configuration property for the given key. - If the key is not set, returns defaultValue. + Unpickles objects from byte[] - - + - + - Sets the given Spark SQL configuration property. + Used by Unpickler to unpickle pickled objects. It is also used to construct a Row (C# representation of pickled objects). - - - + - Returns a DataFrameReader that can be used to read data in as a DataFrame. + Schema of the DataFrame currently being processed - + - Loads a dataframe the source path using the given schema and options + Indicates if Schema is already set during construction of this type - - - - - + - Creates a from a RDD containing array of object using the given schema. + Arguments used to construct this typ - RDD containing array of object. The array acts as a row and items within the array act as columns which the schema is specified in . - The schema of DataFrame. - - + - Registers the given as a temporary table in the catalog. - Temporary tables exist only during the lifetime of this instance of SqlContext. + Schema of the values - - - + - Remove the temp table from catalog. + Returns a string that represents the current object. - + A string that represents the current object. - + - Returns the specified table as a + Used by Unpickler - do not use to construct Row. Use GetRow() method - + - + - Returns a containing names of tables in the given database. - If is not specified, the current database will be used. - The returned DataFrame has two columns: 'tableName' and 'isTemporary' (a column with bool - type indicating if a table is a temporary one or not). + Used to construct a Row - Name of the database to use. Default to the current database. - Note: This is only applicable to HiveContext. - + - Returns a list of names of tables in the database + Represents one row of output from a relational operator. - Name of the database to use. Default to the current database. - Note: This is only applicable to HiveContext. - - + - Caches the specified table in-memory. + Number of elements in the Row. - + elements count in this row - + - Removes the specified table from the in-memory cache. + Schema for the row. - - + - Removes all cached tables from the in-memory cache. + Returns the value at position i. - + - Returns true if the table is currently cached in-memory. + Returns the value of a given columnName. - - - + - Executes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect' + Returns the value at position i, the return value will be cast to type T. - - - + - Loads a JSON file (one object per line), returning the result as a DataFrame - It goes through the entire dataset once to determine the schema. + Returns the value of a given columnName, the return value will be cast to type T. - path to JSON file - - + - Loads a JSON file (one object per line) and applies the given schema + DataFrame Built-in functions - path to JSON file - schema to use - - + - Loads text file with the specific column delimited using the given schema + Creates a Column of any literal value. - path to text file - schema to use - delimiter to use - + The given literal value + A new Column is created to represent the literal value - + - Loads a text file (one object per line), returning the result as a DataFrame + Returns a Column based on the given column name. - path to text file - delimited to use - indicates if the text file has a header row - indicates if every row has to be read to infer the schema; if false, columns will be strings - + The name of column specified + The column for the given name - + - Register UDF with no input argument, e.g: - SqlContext.RegisterFunction<bool>("MyFilter", () => true); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter()"); - - - - - - - - Register UDF with 1 input argument, e.g: - SqlContext.RegisterFunction<bool, string>("MyFilter", (arg1) => arg1 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)"); - - - - - - - - - Register UDF with 2 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null && arg2 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)"); - - - - - - - - - - Register UDF with 3 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null && arg2 != null && arg3 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)"); - - - - - - - - - - - Register UDF with 4 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null && arg2 != null && ... && arg3 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)"); - - - - - - - - - - - - Register UDF with 5 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null && arg2 != null && ... && arg5 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)"); - - - - - - - - - - - - - Register UDF with 6 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null && arg2 != null && ... && arg6 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)"); - - - - - - - - - - - - - - Register UDF with 7 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null && arg2 != null && ... && arg7 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)"); - - - - - - - - - - - - - - - Register UDF with 8 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null && arg2 != null && ... && arg8 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)"); - - - - - - - - - - - - - - - - Register UDF with 9 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null && arg2 != null && ... && arg9 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)"); - - - - - - - - - - - - - - - - - Register UDF with 10 input arguments, e.g: - SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null && arg2 != null && ... && arg10 != null); - sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)"); - - - - - - - - - - - - - - - - - - Creates a HiveContext - - - - - - Invalidate and refresh all the cached the metadata of the given table. - For performance reasons, Spark SQL or the external data source library it uses - might cache certain metadata about a table, such as the location of blocks. - When those change outside of Spark SQL, users should call this function to invalidate the cache. - - - - - - Used for SerDe of Python objects - - - - - Unpickles objects from byte[] - - - - - - - Used by Unpickler to unpickle pickled objects. It is also used to construct a Row (C# representation of pickled objects). - - - - - Schema of the DataFrame currently being processed - - - - - Indicates if Schema is already set during construction of this type - - - - - Arguments used to construct this typ - - - - - Schema of the values - - - - - Returns a string that represents the current object. - - A string that represents the current object. - - - - Used by Unpickler - do not use to construct Row. Use GetRow() method - - - - - - - Used to construct a Row - - - - - - Represents one row of output from a relational operator. - - - - - Number of elements in the Row. - - elements count in this row - - - - Schema for the row. - - - - - Returns the value at position i. - - - - - Returns the value of a given columnName. - - - - - Returns the value at position i, the return value will be cast to type T. - - - - - Returns the value of a given columnName, the return value will be cast to type T. - - - - - DataFrame Built-in functions - - - - - Creates a Column of any literal value. - - The given literal value - A new Column is created to represent the literal value - - - - Returns a Column based on the given column name. - - The name of column specified - The column for the given name - - - - Returns a Column based on the given column name. + Returns a Column based on the given column name. The name of column specified The column for the given name @@ -6755,70 +6458,389 @@ - + + + SaveMode is used to specify the expected behavior of saving a DataFrame to a data source. + + + + + Append mode means that when saving a DataFrame to a data source, if data/table already exists, + contents of the DataFrame are expected to be appended to existing data. + + + + + Overwrite mode means that when saving a DataFrame to a data source, + if data/table already exists, existing data is expected to be overwritten by the contents of + the DataFrame. + + + + + ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, + an exception is expected to be thrown. + + + + + Ignore mode means that when saving a DataFrame to a data source, if data already exists, + the save operation is expected to not save the contents of the DataFrame and to not + change the existing data. + + + + + For SaveMode.ErrorIfExists, the corresponding literal string in spark is "error" or "default". + + + + + Gets the string for the value of SaveMode + + The given SaveMode + The string that represents the given SaveMode + + + + The entry point for working with structured data (rows and columns) in Spark. + Allows the creation of [[DataFrame]] objects as well as the execution of SQL queries. + + + + + Creates a SqlContext + + + + + + Get the existing SQLContext or create a new one with given SparkContext. + + + + + + + Returns a new SQLContext as new session, that has separate SQLConf, + registered temporary tables and UDFs, but shared SparkContext and table cache. + + + + + + Returns the value of Spark SQL configuration property for the given key. + If the key is not set, returns defaultValue. + + + + + + + + Sets the given Spark SQL configuration property. + + + + + + + Returns a DataFrameReader that can be used to read data in as a DataFrame. + + + + + Loads a dataframe the source path using the given schema and options + + + + + + + + + Creates a from a RDD containing array of object using the given schema. + + RDD containing array of object. The array acts as a row and items within the array act as columns which the schema is specified in . + The schema of DataFrame. + + + + + Registers the given as a temporary table in the catalog. + Temporary tables exist only during the lifetime of this instance of SqlContext. + + + + + + + Remove the temp table from catalog. + + + + + + Returns the specified table as a + + + + + + + Returns a containing names of tables in the given database. + If is not specified, the current database will be used. + The returned DataFrame has two columns: 'tableName' and 'isTemporary' (a column with bool + type indicating if a table is a temporary one or not). + + Name of the database to use. Default to the current database. + Note: This is only applicable to HiveContext. + + + + + Returns a list of names of tables in the database + + Name of the database to use. Default to the current database. + Note: This is only applicable to HiveContext. + + + + + Caches the specified table in-memory. + + + + + + Removes the specified table from the in-memory cache. + + + + + + Removes all cached tables from the in-memory cache. + + + + + Returns true if the table is currently cached in-memory. + + + + + + + Executes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect' + + + + + + + Loads a JSON file (one object per line), returning the result as a DataFrame + It goes through the entire dataset once to determine the schema. + + path to JSON file + + + + + Loads a JSON file (one object per line) and applies the given schema + + path to JSON file + schema to use + + + + + Loads text file with the specific column delimited using the given schema + + path to text file + schema to use + delimiter to use + + + + + Loads a text file (one object per line), returning the result as a DataFrame + + path to text file + delimited to use + indicates if the text file has a header row + indicates if every row has to be read to infer the schema; if false, columns will be strings + + + + + Register UDF with no input argument, e.g: + SqlContext.RegisterFunction<bool>("MyFilter", () => true); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter()"); + + + + + + + + Register UDF with 1 input argument, e.g: + SqlContext.RegisterFunction<bool, string>("MyFilter", (arg1) => arg1 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)"); + + + + + + + + + Register UDF with 2 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null && arg2 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)"); + + + + + + + + + + Register UDF with 3 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null && arg2 != null && arg3 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)"); + + + + + + + + + - SaveMode is used to specify the expected behavior of saving a DataFrame to a data source. + Register UDF with 4 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null && arg2 != null && ... && arg3 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)"); + + + + + + + - + - Append mode means that when saving a DataFrame to a data source, if data/table already exists, - contents of the DataFrame are expected to be appended to existing data. + Register UDF with 5 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null && arg2 != null && ... && arg5 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)"); + + + + + + + + - + - Overwrite mode means that when saving a DataFrame to a data source, - if data/table already exists, existing data is expected to be overwritten by the contents of - the DataFrame. + Register UDF with 6 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null && arg2 != null && ... && arg6 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)"); + + + + + + + + + - + - ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, - an exception is expected to be thrown. + Register UDF with 7 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null && arg2 != null && ... && arg7 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)"); + + + + + + + + + + - + - Ignore mode means that when saving a DataFrame to a data source, if data already exists, - the save operation is expected to not save the contents of the DataFrame and to not - change the existing data. + Register UDF with 8 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null && arg2 != null && ... && arg8 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)"); + + + + + + + + + + + - + - For SaveMode.ErrorIfExists, the corresponding literal string in spark is "error" or "default". + Register UDF with 9 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null && arg2 != null && ... && arg9 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)"); + + + + + + + + + + + + - + - Gets the string for the value of SaveMode + Register UDF with 10 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null && arg2 != null && ... && arg10 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)"); - The given SaveMode - The string that represents the given SaveMode + + + + + + + + + + + + + The base type of all Spark SQL data types. - - - Parses a Json string to construct a DataType. - - The Json string to be parsed - The new DataType instance from the Json string - - - - Parse a JToken object to construct a DataType. - - The JToken object to be parsed - The new DataType instance from the Json string - Not implemented for "udt" type - - Trim "Type" in the end from class name, ToLower() to align with Scala. @@ -6839,6 +6861,22 @@ The compact JSON representation of this data type. + + + Parses a Json string to construct a DataType. + + The Json string to be parsed + The new DataType instance from the Json string + + + + Parse a JToken object to construct a DataType. + + The JToken object to be parsed + The new DataType instance from the Json string + Not implemented for "udt" type + + An internal type used to represent a simple type. @@ -6953,20 +6991,6 @@ The data type for collections of multiple values. - - - Initializes a ArrayType instance with a specific DataType and specifying if the array has null values. - - The data type of values - Indicates if values have null values - - - - Constructs a ArrayType from a Json object - - The Json object used to construct a ArrayType - A new ArrayType instance - Gets the DataType of each element in the array @@ -6977,11 +7001,25 @@ Returns whether the array can contain null (None) values + + + Initializes a ArrayType instance with a specific DataType and specifying if the array has null values. + + The data type of values + Indicates if values have null values + Readable string representation for the type. + + + Constructs a ArrayType from a Json object + + The Json object used to construct a ArrayType + A new ArrayType instance + The data type for Maps. Not implemented yet. @@ -7000,22 +7038,6 @@ A field inside a StructType. - - - Initializes a StructField instance with a specific name, data type, nullable, and metadata - - The name of this field - The data type of this field - Indicates if values of this field can be null values - The metadata of this field - - - - Constructs a StructField from a Json object - - The Json object used to construct a StructField - A new StructField instance - The name of this field. @@ -7036,23 +7058,49 @@ The metadata of this field. The metadata should be preserved during transformation if the content of the column is not modified, e.g, in selection. + + + Initializes a StructField instance with a specific name, data type, nullable, and metadata + + The name of this field + The data type of this field + Indicates if values of this field can be null values + The metadata of this field + Returns a readable string that represents the type. + + + Constructs a StructField from a Json object + + The Json object used to construct a StructField + A new StructField instance + Struct type, consisting of a list of StructField This is the data type representing a Row + + + Gets a list of StructField. + + Initializes a StructType instance with a specific collection of SructField object. The collection that holds StructField objects + + + Returns a readable string that joins all s together. + + Constructs a StructType from a Json object @@ -7060,19 +7108,14 @@ The Json object used to construct a StructType A new StructType instance - - - Gets a list of StructField. - - - + - Returns a readable string that joins all s together. + An input stream that always returns the same RDD on each timestep. Useful for testing. - + - An input stream that always returns the same RDD on each timestep. Useful for testing. + Construct a ConstantInputDStream instance. @@ -7096,6 +7139,11 @@ + + + Return the slideDuration in seconds of this DStream + + Return a new DStream in which each RDD has a single element @@ -7353,16 +7401,6 @@ number of partitions of each RDD in the new DStream. - - - Return the slideDuration in seconds of this DStream - - - - - Construct a ConstantInputDStream instance. - - Following classes are defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating diff --git a/csharp/Adapter/documentation/Mobius_API_Documentation.md b/csharp/Adapter/documentation/Mobius_API_Documentation.md index a0c5db03..b1adc9f8 100644 --- a/csharp/Adapter/documentation/Mobius_API_Documentation.md +++ b/csharp/Adapter/documentation/Mobius_API_Documentation.md @@ -142,13 +142,6 @@ Used for collect operation on RDD -###Microsoft.Spark.CSharp.Core.IRDDCollector -####Summary - - - Interface for collect operation on RDD - - ###Microsoft.Spark.CSharp.Core.DoubleRDDFunctions ####Summary @@ -163,6 +156,13 @@ --- +###Microsoft.Spark.CSharp.Core.IRDDCollector +####Summary + + + Interface for collect operation on RDD + + ###Microsoft.Spark.CSharp.Core.OrderedRDDFunctions ####Summary @@ -210,20 +210,22 @@ --- -###Microsoft.Spark.CSharp.Core.RDD`1 +###Microsoft.Spark.CSharp.Core.PriorityQueue`1 ####Summary - Represents a Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, - partitioned collection of elements that can be operated on in parallel + A bounded priority queue implemented with max binary heap. - See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD + Construction steps: + 1. Build a Max Heap of the first k elements. + 2. For each element after the kth element, compare it with the root of the max heap, + a. If the element is less than the root, replace root with this element, heapify. + b. Else ignore it. - Type of the RDD ####Methods -
NameDescription
CachePersist this RDD with the default storage level .
PersistSet this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to . sc.Parallelize(new string[] {"b", "a", "c").Persist().isCached True
UnpersistMark the RDD as non-persistent, and remove all blocks for it from memory and disk.
CheckpointMark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with ) and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
GetNumPartitionsReturns the number of partitions of this RDD.
Map``1Return a new RDD by applying a function to each element of this RDD. sc.Parallelize(new string[]{"b", "a", "c"}, 1).Map(x => new KeyValuePair<string, int>(x, 1)).Collect() [('a', 1), ('b', 1), ('c', 1)]
FlatMap``1Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. sc.Parallelize(new int[] {2, 3, 4}, 1).FlatMap(x => Enumerable.Range(1, x - 1)).Collect() [1, 1, 1, 2, 2, 3]
MapPartitions``1Return a new RDD by applying a function to each partition of this RDD. sc.Parallelize(new int[] {1, 2, 3, 4}, 2).MapPartitions(iter => new[]{iter.Sum(x => (x as decimal?))}).Collect() [3, 7]
MapPartitionsWithIndex``1Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. sc.Parallelize(new int[]{1, 2, 3, 4}, 4).MapPartitionsWithIndex<double>((pid, iter) => (double)pid).Sum() 6
FilterReturn a new RDD containing only the elements that satisfy a predicate. sc.Parallelize(new int[]{1, 2, 3, 4, 5}, 1).Filter(x => x % 2 == 0).Collect() [2, 4]
DistinctReturn a new RDD containing the distinct elements in this RDD. >>> sc.Parallelize(new int[] {1, 1, 2, 3}, 1).Distinct().Collect() [1, 2, 3]
SampleReturn a sampled subset of this RDD. var rdd = sc.Parallelize(Enumerable.Range(0, 100), 4) 6 <= rdd.Sample(False, 0.1, 81).count() <= 14 true
RandomSplitRandomly splits this RDD with the provided weights. var rdd = sc.Parallelize(Enumerable.Range(0, 500), 1) var rdds = rdd.RandomSplit(new double[] {2, 3}, 17) 150 < rdds[0].Count() < 250 250 < rdds[1].Count() < 350
TakeSampleReturn a fixed-size sampled subset of this RDD. var rdd = sc.Parallelize(Enumerable.Range(0, 10), 2) rdd.TakeSample(true, 20, 1).Length 20 rdd.TakeSample(false, 5, 2).Length 5 rdd.TakeSample(false, 15, 3).Length 10
ComputeFractionForSampleSizeReturns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of the time. How the sampling rate is determined: Let p = num / total, where num is the sample size and total is the total number of data points in the RDD. We're trying to compute q > p such that - when sampling with replacement, we're drawing each data point with prob_i ~ Pois(q), where we want to guarantee Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to total), i.e. the failure rate of not having a sufficiently large sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient to guarantee 0.9999 success rate for num > 12, but we need a slightly larger q (9 empirically determined). - when sampling without replacement, we're drawing each data point with prob_i ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success rate, where success rate is defined the same as in sampling with replacement.
UnionReturn the union of this RDD and another one. var rdd = sc.Parallelize(new int[] { 1, 1, 2, 3 }, 1) rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3]
IntersectionReturn the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did. Note that this method performs a shuffle internally. var rdd1 = sc.Parallelize(new int[] { 1, 10, 2, 3, 4, 5 }, 1) var rdd2 = sc.Parallelize(new int[] { 1, 6, 2, 3, 7, 8 }, 1) var rdd1.Intersection(rdd2).Collect() [1, 2, 3]
GlomReturn an RDD created by coalescing all elements within each partition into a list. var rdd = sc.Parallelize(new int[] { 1, 2, 3, 4 }, 2) rdd.Glom().Collect() [[1, 2], [3, 4]]
Cartesian``1Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other. rdd = sc.Parallelize(new int[] { 1, 2 }, 1) rdd.Cartesian(rdd).Collect() [(1, 1), (1, 2), (2, 1), (2, 2)]
GroupBy``1Return an RDD of grouped items. Each group consists of a key and a sequence of elements mapping to that key. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance. >>> rdd = sc.Parallelize(new int[] { 1, 1, 2, 3, 5, 8 }, 1) >>> result = rdd.GroupBy(lambda x: x % 2).Collect() [(0, [2, 8]), (1, [1, 1, 3, 5])]
PipeReturn an RDD created by piping elements to a forked external process. >>> sc.Parallelize(new char[] { '1', '2', '3', '4' }, 1).Pipe("cat").Collect() [u'1', u'2', u'3', u'4']
ForeachApplies a function to all elements of this RDD. sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Foreach(x => Console.Write(x))
ForeachPartitionApplies a function to each partition of this RDD. sc.parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).ForeachPartition(iter => { foreach (var x in iter) Console.Write(x + " "); })
CollectReturn a list that contains all of the elements in this RDD.
ReduceReduces the elements of this RDD using the specified commutative and associative binary operator. sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Reduce((x, y) => x + y) 15
TreeReduceReduces the elements of this RDD in a multi-level tree pattern. >>> add = lambda x, y: x + y >>> rdd = sc.Parallelize(new int[] { -5, -4, -3, -2, -1, 1, 2, 3, 4 }, 10).TreeReduce((x, y) => x + y)) >>> rdd.TreeReduce(add) -5 >>> rdd.TreeReduce(add, 1) -5 >>> rdd.TreeReduce(add, 2) -5 >>> rdd.TreeReduce(add, 5) -5 >>> rdd.TreeReduce(add, 10) -5
FoldAggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value." The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 15
Aggregate``1Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U >>> sc.parallelize(new int[] { 1, 2, 3, 4 }, 1).Aggregate(0, (x, y) => x + y, (x, y) => x + y)) 10
TreeAggregate``1Aggregates the elements of this RDD in a multi-level tree pattern. rdd = sc.Parallelize(new int[] { 1, 2, 3, 4 }, 1).TreeAggregate(0, (x, y) => x + y, (x, y) => x + y)) 10
CountReturn the number of elements in this RDD.
CountByValueReturn the count of each unique value in this RDD as a dictionary of (value, count) pairs. sc.Parallelize(new int[] { 1, 2, 1, 2, 2 }, 2).CountByValue()) [(1, 2), (2, 3)]
TakeTake the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit. Translated from the Scala implementation in RDD#take(). sc.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Cache().Take(2))) [2, 3] sc.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Take(10) [2, 3, 4, 5, 6] sc.Parallelize(Enumerable.Range(0, 100), 100).Filter(x => x > 90).Take(3) [91, 92, 93]
FirstReturn the first element in this RDD. >>> sc.Parallelize(new int[] { 2, 3, 4 }, 2).First() 2
IsEmptyReturns true if and only if the RDD contains no elements at all. Note that an RDD may be empty even when it has at least 1 partition. sc.Parallelize(new int[0], 1).isEmpty() true sc.Parallelize(new int[] {1}).isEmpty() false
SubtractReturn each value in this RDD that is not contained in . var x = sc.Parallelize(new int[] { 1, 2, 3, 4 }, 1) var y = sc.Parallelize(new int[] { 3 }, 1) x.Subtract(y).Collect()) [1, 2, 4]
KeyBy``1Creates tuples of the elements in this RDD by applying . sc.Parallelize(new int[] { 1, 2, 3, 4 }, 1).KeyBy(x => x * x).Collect()) (1, 1), (4, 2), (9, 3), (16, 4)
RepartitionReturn a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `Coalesce`, which can avoid performing a shuffle. var rdd = sc.Parallelize(new int[] { 1, 2, 3, 4, 5, 6, 7 }, 4) rdd.Glom().Collect().Length 4 rdd.Repartition(2).Glom().Collect().Length 2
CoalesceReturn a new RDD that is reduced into `numPartitions` partitions. sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 3).Glom().Collect().Length 3 >>> sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 3).Coalesce(1).Glom().Collect().Length 1
Zip``1Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other). var x = sc.parallelize(range(0,5)) var y = sc.parallelize(range(1000, 1005)) x.Zip(y).Collect() [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
ZipWithIndexZips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This method needs to trigger a spark job when this RDD contains more than one partitions. sc.Parallelize(new string[] { "a", "b", "c", "d" }, 3).ZipWithIndex().Collect() [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
ZipWithUniqueIdZips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from >>> sc.Parallelize(new string[] { "a", "b", "c", "d" }, 1).ZipWithIndex().Collect() [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
SetNameAssign a name to this RDD. >>> rdd1 = sc.parallelize([1, 2]) >>> rdd1.setName('RDD1').name() u'RDD1'
ToDebugStringA description of this RDD and its recursive dependencies for debugging.
GetStorageLevelGet the RDD's current storage level. >>> rdd1 = sc.parallelize([1,2]) >>> rdd1.getStorageLevel() StorageLevel(False, False, False, False, 1) >>> print(rdd1.getStorageLevel()) Serialized 1x Replicated
ToLocalIteratorReturn an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD. sc.Parallelize(Enumerable.Range(0, 10), 1).ToLocalIterator() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
RandomSampleWithRangeInternal method exposed for Random Splits in DataFrames. Samples an RDD given a probability range.
+
NameDescription
OfferInserts the specified element into this priority queue.
--- @@ -235,6 +237,24 @@ A class represents a profiler +###Microsoft.Spark.CSharp.Core.RDD`1 +####Summary + + + Represents a Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, + partitioned collection of elements that can be operated on in parallel + + See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD + + Type of the RDD + +####Methods + +
NameDescription
CachePersist this RDD with the default storage level .
PersistSet this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to . sc.Parallelize(new string[] {"b", "a", "c").Persist().isCached True
UnpersistMark the RDD as non-persistent, and remove all blocks for it from memory and disk.
CheckpointMark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with ) and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
GetNumPartitionsReturns the number of partitions of this RDD.
Map``1Return a new RDD by applying a function to each element of this RDD. sc.Parallelize(new string[]{"b", "a", "c"}, 1).Map(x => new KeyValuePair<string, int>(x, 1)).Collect() [('a', 1), ('b', 1), ('c', 1)]
FlatMap``1Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. sc.Parallelize(new int[] {2, 3, 4}, 1).FlatMap(x => Enumerable.Range(1, x - 1)).Collect() [1, 1, 1, 2, 2, 3]
MapPartitions``1Return a new RDD by applying a function to each partition of this RDD. sc.Parallelize(new int[] {1, 2, 3, 4}, 2).MapPartitions(iter => new[]{iter.Sum(x => (x as decimal?))}).Collect() [3, 7]
MapPartitionsWithIndex``1Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. sc.Parallelize(new int[]{1, 2, 3, 4}, 4).MapPartitionsWithIndex<double>((pid, iter) => (double)pid).Sum() 6
FilterReturn a new RDD containing only the elements that satisfy a predicate. sc.Parallelize(new int[]{1, 2, 3, 4, 5}, 1).Filter(x => x % 2 == 0).Collect() [2, 4]
DistinctReturn a new RDD containing the distinct elements in this RDD. >>> sc.Parallelize(new int[] {1, 1, 2, 3}, 1).Distinct().Collect() [1, 2, 3]
SampleReturn a sampled subset of this RDD. var rdd = sc.Parallelize(Enumerable.Range(0, 100), 4) 6 <= rdd.Sample(False, 0.1, 81).count() <= 14 true
RandomSplitRandomly splits this RDD with the provided weights. var rdd = sc.Parallelize(Enumerable.Range(0, 500), 1) var rdds = rdd.RandomSplit(new double[] {2, 3}, 17) 150 < rdds[0].Count() < 250 250 < rdds[1].Count() < 350
TakeSampleReturn a fixed-size sampled subset of this RDD. var rdd = sc.Parallelize(Enumerable.Range(0, 10), 2) rdd.TakeSample(true, 20, 1).Length 20 rdd.TakeSample(false, 5, 2).Length 5 rdd.TakeSample(false, 15, 3).Length 10
ComputeFractionForSampleSizeReturns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of the time. How the sampling rate is determined: Let p = num / total, where num is the sample size and total is the total number of data points in the RDD. We're trying to compute q > p such that - when sampling with replacement, we're drawing each data point with prob_i ~ Pois(q), where we want to guarantee Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to total), i.e. the failure rate of not having a sufficiently large sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient to guarantee 0.9999 success rate for num > 12, but we need a slightly larger q (9 empirically determined). - when sampling without replacement, we're drawing each data point with prob_i ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success rate, where success rate is defined the same as in sampling with replacement.
UnionReturn the union of this RDD and another one. var rdd = sc.Parallelize(new int[] { 1, 1, 2, 3 }, 1) rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3]
IntersectionReturn the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did. Note that this method performs a shuffle internally. var rdd1 = sc.Parallelize(new int[] { 1, 10, 2, 3, 4, 5 }, 1) var rdd2 = sc.Parallelize(new int[] { 1, 6, 2, 3, 7, 8 }, 1) var rdd1.Intersection(rdd2).Collect() [1, 2, 3]
GlomReturn an RDD created by coalescing all elements within each partition into a list. var rdd = sc.Parallelize(new int[] { 1, 2, 3, 4 }, 2) rdd.Glom().Collect() [[1, 2], [3, 4]]
Cartesian``1Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other. rdd = sc.Parallelize(new int[] { 1, 2 }, 1) rdd.Cartesian(rdd).Collect() [(1, 1), (1, 2), (2, 1), (2, 2)]
GroupBy``1Return an RDD of grouped items. Each group consists of a key and a sequence of elements mapping to that key. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance. >>> rdd = sc.Parallelize(new int[] { 1, 1, 2, 3, 5, 8 }, 1) >>> result = rdd.GroupBy(lambda x: x % 2).Collect() [(0, [2, 8]), (1, [1, 1, 3, 5])]
PipeReturn an RDD created by piping elements to a forked external process. >>> sc.Parallelize(new char[] { '1', '2', '3', '4' }, 1).Pipe("cat").Collect() [u'1', u'2', u'3', u'4']
ForeachApplies a function to all elements of this RDD. sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Foreach(x => Console.Write(x))
ForeachPartitionApplies a function to each partition of this RDD. sc.parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).ForeachPartition(iter => { foreach (var x in iter) Console.Write(x + " "); })
CollectReturn a list that contains all of the elements in this RDD.
ReduceReduces the elements of this RDD using the specified commutative and associative binary operator. sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 1).Reduce((x, y) => x + y) 15
TreeReduceReduces the elements of this RDD in a multi-level tree pattern. >>> add = lambda x, y: x + y >>> rdd = sc.Parallelize(new int[] { -5, -4, -3, -2, -1, 1, 2, 3, 4 }, 10).TreeReduce((x, y) => x + y)) >>> rdd.TreeReduce(add) -5 >>> rdd.TreeReduce(add, 1) -5 >>> rdd.TreeReduce(add, 2) -5 >>> rdd.TreeReduce(add, 5) -5 >>> rdd.TreeReduce(add, 10) -5
FoldAggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value." The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 15
Aggregate``1Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U >>> sc.parallelize(new int[] { 1, 2, 3, 4 }, 1).Aggregate(0, (x, y) => x + y, (x, y) => x + y)) 10
TreeAggregate``1Aggregates the elements of this RDD in a multi-level tree pattern. rdd = sc.Parallelize(new int[] { 1, 2, 3, 4 }, 1).TreeAggregate(0, (x, y) => x + y, (x, y) => x + y)) 10
CountReturn the number of elements in this RDD.
CountByValueReturn the count of each unique value in this RDD as a dictionary of (value, count) pairs. sc.Parallelize(new int[] { 1, 2, 1, 2, 2 }, 2).CountByValue()) [(1, 2), (2, 3)]
TakeTake the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit. Translated from the Scala implementation in RDD#take(). sc.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Cache().Take(2))) [2, 3] sc.Parallelize(new int[] { 2, 3, 4, 5, 6 }, 2).Take(10) [2, 3, 4, 5, 6] sc.Parallelize(Enumerable.Range(0, 100), 100).Filter(x => x > 90).Take(3) [91, 92, 93]
FirstReturn the first element in this RDD. >>> sc.Parallelize(new int[] { 2, 3, 4 }, 2).First() 2
IsEmptyReturns true if and only if the RDD contains no elements at all. Note that an RDD may be empty even when it has at least 1 partition. sc.Parallelize(new int[0], 1).isEmpty() true sc.Parallelize(new int[] {1}).isEmpty() false
SubtractReturn each value in this RDD that is not contained in . var x = sc.Parallelize(new int[] { 1, 2, 3, 4 }, 1) var y = sc.Parallelize(new int[] { 3 }, 1) x.Subtract(y).Collect()) [1, 2, 4]
KeyBy``1Creates tuples of the elements in this RDD by applying . sc.Parallelize(new int[] { 1, 2, 3, 4 }, 1).KeyBy(x => x * x).Collect()) (1, 1), (4, 2), (9, 3), (16, 4)
RepartitionReturn a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `Coalesce`, which can avoid performing a shuffle. var rdd = sc.Parallelize(new int[] { 1, 2, 3, 4, 5, 6, 7 }, 4) rdd.Glom().Collect().Length 4 rdd.Repartition(2).Glom().Collect().Length 2
CoalesceReturn a new RDD that is reduced into `numPartitions` partitions. sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 3).Glom().Collect().Length 3 >>> sc.Parallelize(new int[] { 1, 2, 3, 4, 5 }, 3).Coalesce(1).Glom().Collect().Length 1
Zip``1Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other). var x = sc.parallelize(range(0,5)) var y = sc.parallelize(range(1000, 1005)) x.Zip(y).Collect() [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
ZipWithIndexZips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This method needs to trigger a spark job when this RDD contains more than one partitions. sc.Parallelize(new string[] { "a", "b", "c", "d" }, 3).ZipWithIndex().Collect() [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
ZipWithUniqueIdZips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from >>> sc.Parallelize(new string[] { "a", "b", "c", "d" }, 1).ZipWithIndex().Collect() [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
SetNameAssign a name to this RDD. >>> rdd1 = sc.parallelize([1, 2]) >>> rdd1.setName('RDD1').name() u'RDD1'
ToDebugStringA description of this RDD and its recursive dependencies for debugging.
GetStorageLevelGet the RDD's current storage level. >>> rdd1 = sc.parallelize([1,2]) >>> rdd1.getStorageLevel() StorageLevel(False, False, False, False, 1) >>> print(rdd1.getStorageLevel()) Serialized 1x Replicated
ToLocalIteratorReturn an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD. sc.Parallelize(Enumerable.Range(0, 10), 1).ToLocalIterator() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
RandomSampleWithRangeInternal method exposed for Random Splits in DataFrames. Samples an RDD given a probability range.
+ +--- + + ###Microsoft.Spark.CSharp.Core.StringRDDFunctions ####Summary @@ -293,7 +313,7 @@ ####Methods -
NameDescription
GetActiveSparkContextGet existing SparkContext
TextFileRead a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
Parallelize``1Distribute a local collection to form an RDD. sc.Parallelize(new int[] {0, 2, 3, 4, 6}, 5).Glom().Collect() [[0], [2], [3], [4], [6]]
EmptyRDDCreate an RDD that has no partitions or elements.
WholeTextFilesRead a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do {{{ RDD<KeyValuePair<string, string>> rdd = sparkContext.WholeTextFiles("hdfs://a-hdfs-path") }}} then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} Small files are preferred, large file is also allowable, but may cause bad performance. minPartitions A suggestion value of the minimal splitting number for input data.
BinaryFilesRead a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do RDD<KeyValuePair<string, byte[]>>"/> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} @note Small files are preferred; very large files but may cause bad performance. @param minPartitions A suggestion value of the minimal splitting number for input data.
SequenceFileRead a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value 4. PickleSerializer is used to deserialize pickled objects on the Python side
NewAPIHadoopFileRead a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
NewAPIHadoopRDDRead a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
HadoopFileRead an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.
HadoopRDDRead an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
Union``1Build the union of a list of RDDs. This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer: >>> path = os.path.join(tempdir, "union-text.txt") >>> with open(path, "w") as testFile: ... _ = testFile.write("Hello") >>> textFile = sc.textFile(path) >>> textFile.collect() [u'Hello'] >>> parallelized = sc.parallelize(["World!"]) >>> sorted(sc.union([textFile, parallelized]).collect()) [u'Hello', 'World!']
Broadcast``1Broadcast a read-only variable to the cluster, returning a Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Accumulator``1Create an with the given initial value, using a given helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used.
StopShut down the SparkContext.
AddFileAdd a file to be downloaded with this Spark job on every node. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, use `SparkFiles.get(fileName)` to find its download location.
SetCheckpointDirSet the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.
SetJobGroupAssigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared. Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]] to cancel all running jobs in this group. For example, {{{ // In the main thread: sc.setJobGroup("some_job_to_cancel", "some job description"); rdd.map(...).count(); // In a separate thread: sc.cancelJobGroup("some_job_to_cancel"); }}} If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
SetLocalPropertySet a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
GetLocalPropertyGet a local property set in this thread, or null if it is missing. See [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
SetLogLevelControl our logLevel. This overrides any user-defined log settings. @param logLevel The desired log level as a string. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
CancelJobGroupCancel active jobs for the specified group. See for more information.
CancelAllJobsCancel all jobs that have been scheduled or are running.
+
NameDescription
GetActiveSparkContextGet existing SparkContext
GetConfReturn a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be changed at runtime.
TextFileRead a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
Parallelize``1Distribute a local collection to form an RDD. sc.Parallelize(new int[] {0, 2, 3, 4, 6}, 5).Glom().Collect() [[0], [2], [3], [4], [6]]
EmptyRDDCreate an RDD that has no partitions or elements.
WholeTextFilesRead a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do {{{ RDD<KeyValuePair<string, string>> rdd = sparkContext.WholeTextFiles("hdfs://a-hdfs-path") }}} then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} Small files are preferred, large file is also allowable, but may cause bad performance. minPartitions A suggestion value of the minimal splitting number for input data.
BinaryFilesRead a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do RDD<KeyValuePair<string, byte[]>>"/> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} @note Small files are preferred; very large files but may cause bad performance. @param minPartitions A suggestion value of the minimal splitting number for input data.
SequenceFileRead a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value 4. PickleSerializer is used to deserialize pickled objects on the Python side
NewAPIHadoopFileRead a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
NewAPIHadoopRDDRead a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
HadoopFileRead an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.
HadoopRDDRead an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
Union``1Build the union of a list of RDDs. This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer: >>> path = os.path.join(tempdir, "union-text.txt") >>> with open(path, "w") as testFile: ... _ = testFile.write("Hello") >>> textFile = sc.textFile(path) >>> textFile.collect() [u'Hello'] >>> parallelized = sc.parallelize(["World!"]) >>> sorted(sc.union([textFile, parallelized]).collect()) [u'Hello', 'World!']
Broadcast``1Broadcast a read-only variable to the cluster, returning a Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Accumulator``1Create an with the given initial value, using a given helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used.
StopShut down the SparkContext.
AddFileAdd a file to be downloaded with this Spark job on every node. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, use `SparkFiles.get(fileName)` to find its download location.
SetCheckpointDirSet the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.
SetJobGroupAssigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared. Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]] to cancel all running jobs in this group. For example, {{{ // In the main thread: sc.setJobGroup("some_job_to_cancel", "some job description"); rdd.map(...).count(); // In a separate thread: sc.cancelJobGroup("some_job_to_cancel"); }}} If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
SetLocalPropertySet a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
GetLocalPropertyGet a local property set in this thread, or null if it is missing. See [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
SetLogLevelControl our logLevel. This overrides any user-defined log settings. @param logLevel The desired log level as a string. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
CancelJobGroupCancel active jobs for the specified group. See for more information.
CancelAllJobsCancel all jobs that have been scheduled or are running.
--- @@ -629,21 +649,6 @@ --- -###Microsoft.Spark.CSharp.Sql.SqlContext -####Summary - - - The entry point for working with structured data (rows and columns) in Spark. - Allows the creation of [[DataFrame]] objects as well as the execution of SQL queries. - - -####Methods - -
NameDescription
GetOrCreateGet the existing SQLContext or create a new one with given SparkContext.
NewSessionReturns a new SQLContext as new session, that has separate SQLConf, registered temporary tables and UDFs, but shared SparkContext and table cache.
GetConfReturns the value of Spark SQL configuration property for the given key. If the key is not set, returns defaultValue.
SetConfSets the given Spark SQL configuration property.
ReadReturns a DataFrameReader that can be used to read data in as a DataFrame.
ReadDataFrameLoads a dataframe the source path using the given schema and options
CreateDataFrameCreates a from a RDD containing array of object using the given schema.
RegisterDataFrameAsTableRegisters the given as a temporary table in the catalog. Temporary tables exist only during the lifetime of this instance of SqlContext.
DropTempTableRemove the temp table from catalog.
TableReturns the specified table as a
TablesReturns a containing names of tables in the given database. If is not specified, the current database will be used. The returned DataFrame has two columns: 'tableName' and 'isTemporary' (a column with bool type indicating if a table is a temporary one or not).
TableNamesReturns a list of names of tables in the database
CacheTableCaches the specified table in-memory.
UncacheTableRemoves the specified table from the in-memory cache.
ClearCacheRemoves all cached tables from the in-memory cache.
IsCachedReturns true if the table is currently cached in-memory.
SqlExecutes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'
JsonFileLoads a JSON file (one object per line), returning the result as a DataFrame It goes through the entire dataset once to determine the schema.
JsonFileLoads a JSON file (one object per line) and applies the given schema
TextFileLoads text file with the specific column delimited using the given schema
TextFileLoads a text file (one object per line), returning the result as a DataFrame
RegisterFunction``1Register UDF with no input argument, e.g: SqlContext.RegisterFunction<bool>("MyFilter", () => true); sqlContext.Sql("SELECT * FROM MyTable where MyFilter()");
RegisterFunction``2Register UDF with 1 input argument, e.g: SqlContext.RegisterFunction<bool, string>("MyFilter", (arg1) => arg1 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)");
RegisterFunction``3Register UDF with 2 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null && arg2 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)");
RegisterFunction``4Register UDF with 3 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null && arg2 != null && arg3 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)");
RegisterFunction``5Register UDF with 4 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null && arg2 != null && ... && arg3 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)");
RegisterFunction``6Register UDF with 5 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null && arg2 != null && ... && arg5 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)");
RegisterFunction``7Register UDF with 6 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null && arg2 != null && ... && arg6 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)");
RegisterFunction``8Register UDF with 7 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null && arg2 != null && ... && arg7 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)");
RegisterFunction``9Register UDF with 8 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null && arg2 != null && ... && arg8 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)");
RegisterFunction``10Register UDF with 9 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null && arg2 != null && ... && arg9 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)");
RegisterFunction``11Register UDF with 10 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null && arg2 != null && ... && arg10 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)");
- ---- - - ###Microsoft.Spark.CSharp.Sql.PythonSerDe ####Summary @@ -728,6 +733,21 @@ --- +###Microsoft.Spark.CSharp.Sql.SqlContext +####Summary + + + The entry point for working with structured data (rows and columns) in Spark. + Allows the creation of [[DataFrame]] objects as well as the execution of SQL queries. + + +####Methods + +
NameDescription
GetOrCreateGet the existing SQLContext or create a new one with given SparkContext.
NewSessionReturns a new SQLContext as new session, that has separate SQLConf, registered temporary tables and UDFs, but shared SparkContext and table cache.
GetConfReturns the value of Spark SQL configuration property for the given key. If the key is not set, returns defaultValue.
SetConfSets the given Spark SQL configuration property.
ReadReturns a DataFrameReader that can be used to read data in as a DataFrame.
ReadDataFrameLoads a dataframe the source path using the given schema and options
CreateDataFrameCreates a from a RDD containing array of object using the given schema.
RegisterDataFrameAsTableRegisters the given as a temporary table in the catalog. Temporary tables exist only during the lifetime of this instance of SqlContext.
DropTempTableRemove the temp table from catalog.
TableReturns the specified table as a
TablesReturns a containing names of tables in the given database. If is not specified, the current database will be used. The returned DataFrame has two columns: 'tableName' and 'isTemporary' (a column with bool type indicating if a table is a temporary one or not).
TableNamesReturns a list of names of tables in the database
CacheTableCaches the specified table in-memory.
UncacheTableRemoves the specified table from the in-memory cache.
ClearCacheRemoves all cached tables from the in-memory cache.
IsCachedReturns true if the table is currently cached in-memory.
SqlExecutes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'
JsonFileLoads a JSON file (one object per line), returning the result as a DataFrame It goes through the entire dataset once to determine the schema.
JsonFileLoads a JSON file (one object per line) and applies the given schema
TextFileLoads text file with the specific column delimited using the given schema
TextFileLoads a text file (one object per line), returning the result as a DataFrame
RegisterFunction``1Register UDF with no input argument, e.g: SqlContext.RegisterFunction<bool>("MyFilter", () => true); sqlContext.Sql("SELECT * FROM MyTable where MyFilter()");
RegisterFunction``2Register UDF with 1 input argument, e.g: SqlContext.RegisterFunction<bool, string>("MyFilter", (arg1) => arg1 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)");
RegisterFunction``3Register UDF with 2 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null && arg2 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)");
RegisterFunction``4Register UDF with 3 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null && arg2 != null && arg3 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)");
RegisterFunction``5Register UDF with 4 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null && arg2 != null && ... && arg3 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)");
RegisterFunction``6Register UDF with 5 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null && arg2 != null && ... && arg5 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)");
RegisterFunction``7Register UDF with 6 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null && arg2 != null && ... && arg6 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)");
RegisterFunction``8Register UDF with 7 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null && arg2 != null && ... && arg7 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)");
RegisterFunction``9Register UDF with 8 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null && arg2 != null && ... && arg8 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)");
RegisterFunction``10Register UDF with 9 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null && arg2 != null && ... && arg9 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)");
RegisterFunction``11Register UDF with 10 input arguments, e.g: SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null && arg2 != null && ... && arg10 != null); sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)");
+ +--- + + ###Microsoft.Spark.CSharp.Sql.DataType ####Summary diff --git a/csharp/AdapterTest/AdapterTest.csproj b/csharp/AdapterTest/AdapterTest.csproj index 3ede6096..dfe00ea8 100644 --- a/csharp/AdapterTest/AdapterTest.csproj +++ b/csharp/AdapterTest/AdapterTest.csproj @@ -77,6 +77,7 @@ + diff --git a/csharp/AdapterTest/HadoopConfigurationTest.cs b/csharp/AdapterTest/HadoopConfigurationTest.cs new file mode 100644 index 00000000..6eeb1c8f --- /dev/null +++ b/csharp/AdapterTest/HadoopConfigurationTest.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Microsoft.Spark.CSharp.Core; +using Microsoft.Spark.CSharp.Proxy; +using Moq; +using NUnit.Framework; + +namespace AdapterTest +{ + [TestFixture] + class HadoopConfigurationTest + { + private string name, value; + + [Test] + public void TestGetterSetter() + { + Mock hadoopConfProxy = new Mock(); + + hadoopConfProxy.Setup(m => m.Get(It.IsAny(), It.IsAny())).Returns("valueofproperty"); + + hadoopConfProxy.Setup(m => m.Set(It.IsAny(), It.IsAny())) + .Callback(ValueSetter); + + var hadoopConf = new HadoopConfiguration(hadoopConfProxy.Object); + + var returnValue = hadoopConf.Get("somename", "somedefaultvalue"); + Assert.AreEqual("valueofproperty", returnValue); + + hadoopConf.Set("propertyname", "propertyvalue"); + + Assert.AreEqual("propertyname", name); + Assert.AreEqual("propertyvalue", value); + } + + private void ValueSetter(string name, string value) + { + this.name = name; + this.value = value; + } + } +} diff --git a/csharp/AdapterTest/Mocks/MockSparkContextProxy.cs b/csharp/AdapterTest/Mocks/MockSparkContextProxy.cs index 4b665280..edf8a7c5 100644 --- a/csharp/AdapterTest/Mocks/MockSparkContextProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSparkContextProxy.cs @@ -249,6 +249,14 @@ public int AccumulatorServerPort } } + public IHadoopConfigurationProxy HadoopConfiguration + { + get + { + throw new NotImplementedException(); + } + } + public void Accumulator(int port) { accumuatorServerPort = port; diff --git a/csharp/AdapterTest/SparkContextTest.cs b/csharp/AdapterTest/SparkContextTest.cs index 728b5950..d0005683 100644 --- a/csharp/AdapterTest/SparkContextTest.cs +++ b/csharp/AdapterTest/SparkContextTest.cs @@ -139,6 +139,19 @@ public void TestSparkContextStatusTrackerProperty() Assert.IsNotNull(statusTracker); } + [Test] + public void TestSparkContextHadoopConfigurationProperty() + { + Mock hadoopConfProxy = new Mock(); + Mock sparkContextProxy = new Mock(); + sparkContextProxy.Setup(m => m.HadoopConfiguration).Returns(hadoopConfProxy.Object); + SparkContext sc = new SparkContext(sparkContextProxy.Object, null); + + var hadoopConf = sc.HadoopConfiguration; + + Assert.IsNotNull(hadoopConf); + } + [Test] public void TestCancelAllJobs() { diff --git a/csharp/Samples/Microsoft.Spark.CSharp/SparkContextSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/SparkContextSamples.cs index 2769134e..99a88c59 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/SparkContextSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/SparkContextSamples.cs @@ -19,6 +19,7 @@ class SparkContextSamples internal class BroadcastHelper { private readonly Broadcast broadcastVar; + internal BroadcastHelper(Broadcast broadcastVar) { this.broadcastVar = broadcastVar; @@ -37,26 +38,26 @@ internal static void SparkContextBroadcastSample() foreach (var value in b.Value) { Console.Write(value + " "); - } + } Console.WriteLine(); if (SparkCLRSamples.Configuration.IsValidationEnabled) { - CollectionAssert.AreEqual(new[] { 1, 2, 3, 4, 5 }, b.Value); + CollectionAssert.AreEqual(new[] {1, 2, 3, 4, 5}, b.Value); } - RDD rdd = SparkCLRSamples.SparkContext.Parallelize(new[] { 0, 0 }, 1); + RDD rdd = SparkCLRSamples.SparkContext.Parallelize(new[] {0, 0}, 1); var r = rdd.FlatMap(new BroadcastHelper(b).Execute).Collect(); foreach (var value in r) { Console.Write(value + " "); - } + } Console.WriteLine(); if (SparkCLRSamples.Configuration.IsValidationEnabled) { // each item in rdd is mapped to broadcast value. - CollectionAssert.AreEqual(new[] { 1, 2, 3, 4, 5, 1, 2, 3, 4, 5 }, r); + CollectionAssert.AreEqual(new[] {1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, r); } } @@ -65,6 +66,7 @@ internal class AccumulatorHelper { private Accumulator accumulator; private bool async; + internal AccumulatorHelper(Accumulator accumulator, bool async = false) { this.accumulator = accumulator; @@ -97,8 +99,9 @@ internal static void SparkContextAccumulatorSample() var a = SparkCLRSamples.SparkContext.Accumulator(100); var b = SparkCLRSamples.SparkContext.Accumulator(100); - SparkCLRSamples.SparkContext.Parallelize(new[] { 1, 2, 3, 4 }, 3).Foreach(new AccumulatorHelper(a).Execute); - SparkCLRSamples.SparkContext.Parallelize(new[] { 1, 2, 3, 4 }, 3).Foreach(new AccumulatorHelper(b, true).Execute); + SparkCLRSamples.SparkContext.Parallelize(new[] {1, 2, 3, 4}, 3).Foreach(new AccumulatorHelper(a).Execute); + SparkCLRSamples.SparkContext.Parallelize(new[] {1, 2, 3, 4}, 3) + .Foreach(new AccumulatorHelper(b, true).Execute); Console.WriteLine("accumulator value, a: {0}, b: {1}", a.Value, b.Value); if (SparkCLRSamples.Configuration.IsValidationEnabled) @@ -117,7 +120,7 @@ internal static void SparkContextSample() Console.WriteLine(SparkCLRSamples.SparkContext.StartTime); Console.WriteLine(SparkCLRSamples.SparkContext.DefaultParallelism); Console.WriteLine(SparkCLRSamples.SparkContext.DefaultMinPartitions); - + StatusTracker StatusTracker = SparkCLRSamples.SparkContext.StatusTracker; //var file = Path.GetTempFileName(); @@ -130,7 +133,7 @@ internal static void SparkContextSample() SparkCLRSamples.SparkContext.SetLogLevel("DEBUG"); //SparkCLRSamples.SparkContext.SetJobGroup("SampleGroupId", "Sample Description"); SparkCLRSamples.SparkContext.SetLocalProperty("SampleKey", "SampleValue"); - + Console.WriteLine(SparkCLRSamples.SparkContext.GetLocalProperty("SampleKey")); SparkCLRSamples.SparkContext.CancelJobGroup("SampleGroupId"); SparkCLRSamples.SparkContext.CancelAllJobs(); @@ -139,15 +142,32 @@ internal static void SparkContextSample() [Sample] internal static void SparkContextUnionSample() { - var rdd1 = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 1, 2, 3 }, 1); - var rdd2 = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 1, 2, 3 }, 1); + var rdd1 = SparkCLRSamples.SparkContext.Parallelize(new int[] {1, 1, 2, 3}, 1); + var rdd2 = SparkCLRSamples.SparkContext.Parallelize(new int[] {1, 1, 2, 3}, 1); - var union = SparkCLRSamples.SparkContext.Union(new[] { rdd1, rdd2 }).Collect(); + var union = SparkCLRSamples.SparkContext.Union(new[] {rdd1, rdd2}).Collect(); Console.WriteLine(string.Join(",", union)); if (SparkCLRSamples.Configuration.IsValidationEnabled) { - CollectionAssert.AreEqual(new[] { 1, 1, 2, 3, 1, 1, 2, 3 }, union); + CollectionAssert.AreEqual(new[] {1, 1, 2, 3, 1, 1, 2, 3}, union); + } + } + + [Sample] + internal static void SparkContextHadoopConfigurationSample() + { + var hadoopConf = SparkCLRSamples.SparkContext.HadoopConfiguration; + var initialValue = hadoopConf.Get("testproperty", "defaultvalue"); + + hadoopConf.Set("testproperty", "testvalue"); + + var finalValue = hadoopConf.Get("testproperty", "defaultvalue"); + + if (SparkCLRSamples.Configuration.IsValidationEnabled) + { + Assert.AreEqual("defaultvalue", initialValue); + Assert.AreEqual("testvalue", finalValue); } } }