diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/Kafka.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/Kafka.cs index c385f427..812d32bb 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/Kafka.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/Kafka.cs @@ -10,6 +10,7 @@ using System.IO; using Microsoft.Spark.CSharp.Core; +using Microsoft.Spark.CSharp.Interop.Ipc; namespace Microsoft.Spark.CSharp.Streaming { @@ -83,6 +84,14 @@ public static DStream> CreateStream(StreamingContex /// A DStream object public static DStream> CreateDirectStream(StreamingContext ssc, List topics, Dictionary kafkaParams, Dictionary fromOffsets) { + int numPartitions = GetNumPartitionsFromConfig(ssc, topics, kafkaParams); + if (numPartitions >= 0 || + ssc.SparkContext.SparkConf.SparkConfProxy.Get("spark.mobius.streaming.kafka.CSharpReader.enabled", "false").ToLower() == "true" || + ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numReceivers", 0) > 0 || + topics.Any(topic => ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.maxMessagesPerTask." + topic, 0) > 0)) + { + return new DStream>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair); + } return new DStream>(ssc.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), ssc, SerializedMode.Pair); } @@ -109,57 +118,18 @@ public static DStream> CreateDirectStream(Streaming /// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. /// /// Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. - /// - /// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions, - /// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across - /// a probably larger number of RDD partitions - /// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined - /// If numPartitions = 0, repartition using original kafka partition count - /// If numPartitions > 0, repartition using this parameter - /// - /// A DStream object - public static DStream> CreateDirectStreamWithRepartition(StreamingContext ssc, List topics, Dictionary kafkaParams, Dictionary fromOffsets, int numPartitions = -1) - { - return new DStream>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair); - } - - /// - /// Create an input stream that directly pulls messages from a Kafka Broker and specific offset. - /// - /// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka - /// in each batch duration and processed without storing. - /// - /// This does not use Zookeeper to store offsets. The consumed offsets are tracked - /// by the stream itself. For interoperability with Kafka monitoring tools that depend on - /// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - /// You can access the offsets used in each batch from the generated RDDs (see - /// [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). - /// To recover from driver failures, you have to enable checkpointing in the StreamingContext. - /// The information on consumed offset can be recovered from the checkpoint. - /// See the programming guide for details (constraints, etc.). - /// - /// - /// Spark Streaming Context - /// list of topic_name to consume. - /// - /// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set - /// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. - /// - /// Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. - /// - /// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions, - /// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across - /// a probably larger number of RDD partitions - /// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined - /// If numPartitions = 0, repartition using original kafka partition count - /// If numPartitions > 0, repartition using this parameter - /// /// user function to process the kafka data. /// A DStream object - public static DStream CreateDirectStreamWithRepartitionAndReadFunc(StreamingContext ssc, List topics, Dictionary kafkaParams, Dictionary fromOffsets, - int numPartitions, Func>, IEnumerable> readFunc) + public static DStream CreateDirectStream(StreamingContext ssc, List topics, Dictionary kafkaParams, Dictionary fromOffsets, Func>, IEnumerable> readFunc) { - var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper, T>(readFunc, true); + int numPartitions = GetNumPartitionsFromConfig(ssc, topics, kafkaParams); + if (ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numReceivers", 0) <= 0) + { + var dstream = new DStream>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair); + return dstream.MapPartitionsWithIndex(readFunc, true); + } + + var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper, T>(readFunc, true); var transformHelper = new TransformHelper, T>(mapPartitionsWithIndexHelper.Execute); var transformDynamicHelper = new TransformDynamicHelper, T>(transformHelper.Execute); Func, RDD> func = transformDynamicHelper.Execute; @@ -170,5 +140,105 @@ public static DStream CreateDirectStreamWithRepartitionAndReadFunc(Streami string serializationMode = SerializedMode.Pair.ToString(); return new DStream(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, readFuncBytes, serializationMode), ssc); } + + /// + /// create offset range from kafka messages when CSharpReader is enabled + /// + /// + /// + public static OffsetRange GetOffsetRange(IEnumerable> input) + { + int count = 2; + int i = 0; + var offsetRange = new KeyValuePair[count]; + foreach (var message in input) + { + offsetRange[i++ % count] = message; + if (i > count) + break; + } + + if (i != count) + { + throw new ArgumentException("Expecting kafka OffsetRange metadata."); + } + + var topicAndClusterId = SerDe.ToString(offsetRange[0].Key); + var topic = topicAndClusterId.Split(',')[0]; + var clusterId = topicAndClusterId.Split(',')[1]; + var partition = SerDe.ToInt(offsetRange[0].Value); + var fromOffset = SerDe.ReadLong(new MemoryStream(offsetRange[1].Key)); + var untilOffset = SerDe.ReadLong(new MemoryStream(offsetRange[1].Value)); + + return new OffsetRange(topic, clusterId, partition, fromOffset, untilOffset); + } + + /// + /// topics should contain only one topic if choose to repartitions to a configured numPartitions + /// TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation + /// + /// + /// + /// + /// + private static int GetNumPartitionsFromConfig(StreamingContext ssc, List topics, Dictionary kafkaParams) + { + if (topics == null || topics.Count == 0) + return -1; + + string clusterId = kafkaParams.ContainsKey("cluster.id") ? "." + kafkaParams["cluster.id"] : null; + return ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numPartitions." + topics[0] + clusterId, -1); + } + } + + /// + /// Kafka offset range + /// + public class OffsetRange + { + private readonly string topic; + private readonly string clusterId; + private readonly int partition; + private readonly long fromOffset; + private readonly long untilOffset; + + /// + /// Topic + /// + public string Topic { get { return topic; } } + /// + /// ClusterId + /// + public string ClusterId { get { return clusterId; } } + /// + /// Partition + /// + public int Partition { get { return partition; } } + /// + /// FromOffset + /// + public long FromOffset { get { return fromOffset; } } + /// + /// Until Offset + /// + public long UntilOffset { get { return untilOffset; } } + + internal OffsetRange(string topic, string clusterId, int partition, long fromOffset, long untilOffset) + { + this.topic = topic; + this.clusterId = clusterId; + this.partition = partition; + this.fromOffset = fromOffset; + this.untilOffset = untilOffset; + } + + /// + /// OffsetRange string format + /// + /// + public override string ToString() + { + return string.Format("Kafka OffsetRange: topic {0} cluster {1} partition {2} from {3} until {4}", topic, clusterId, partition, fromOffset, untilOffset); + } } } diff --git a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML index 09ba9f9b..7c974d22 100644 --- a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML +++ b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML @@ -7455,7 +7455,7 @@ Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. A DStream object - + Create an input stream that directly pulls messages from a Kafka Broker and specific offset. @@ -7479,50 +7479,61 @@ with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. - - user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions, - unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across - a probably larger number of RDD partitions - If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined - If numPartitions = 0, repartition using original kafka partition count - If numPartitions > 0, repartition using this parameter - + user function to process the kafka data. A DStream object - + - Create an input stream that directly pulls messages from a Kafka Broker and specific offset. - - This is not a receiver based Kafka input stream, it directly pulls the message from Kafka - in each batch duration and processed without storing. - - This does not use Zookeeper to store offsets. The consumed offsets are tracked - by the stream itself. For interoperability with Kafka monitoring tools that depend on - Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. - You can access the offsets used in each batch from the generated RDDs (see - [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). - To recover from driver failures, you have to enable checkpointing in the StreamingContext. - The information on consumed offset can be recovered from the checkpoint. - See the programming guide for details (constraints, etc.). - + create offset range from kafka messages when CSharpReader is enabled - Spark Streaming Context - list of topic_name to consume. - - Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set - with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. - - Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. - - user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions, - unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across - a probably larger number of RDD partitions - If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined - If numPartitions = 0, repartition using original kafka partition count - If numPartitions > 0, repartition using this parameter - - user function to process the kafka data. - A DStream object + + + + + + topics should contain only one topic if choose to repartitions to a configured numPartitions + TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation + + + + + + + + + Kafka offset range + + + + + Topic + + + + + ClusterId + + + + + Partition + + + + + FromOffset + + + + + Until Offset + + + + + OffsetRange string format + + diff --git a/csharp/Adapter/documentation/Mobius_API_Documentation.md b/csharp/Adapter/documentation/Mobius_API_Documentation.md index a4fe1677..a0c5db03 100644 --- a/csharp/Adapter/documentation/Mobius_API_Documentation.md +++ b/csharp/Adapter/documentation/Mobius_API_Documentation.md @@ -985,7 +985,21 @@ ####Methods -
NameDescription
CreateStreamCreate an input stream that pulls messages from a Kafka Broker.
CreateStreamCreate an input stream that pulls messages from a Kafka Broker.
CreateDirectStreamCreate an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).
CreateDirectStreamWithRepartitionCreate an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).
CreateDirectStreamWithRepartitionAndReadFunc``1Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+
NameDescription
CreateStreamCreate an input stream that pulls messages from a Kafka Broker.
CreateStreamCreate an input stream that pulls messages from a Kafka Broker.
CreateDirectStreamCreate an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).
CreateDirectStream``1Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).
GetOffsetRangecreate offset range from kafka messages when CSharpReader is enabled
GetNumPartitionsFromConfigtopics should contain only one topic if choose to repartitions to a configured numPartitions TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation
+ +--- + + +###Microsoft.Spark.CSharp.Streaming.OffsetRange +####Summary + + + Kafka offset range + + +####Methods + +
NameDescription
ToStringOffsetRange string format
--- diff --git a/csharp/AdapterTest/Mocks/MockSparkConfProxy.cs b/csharp/AdapterTest/Mocks/MockSparkConfProxy.cs index c39480dd..2f8bd99b 100644 --- a/csharp/AdapterTest/Mocks/MockSparkConfProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSparkConfProxy.cs @@ -36,6 +36,11 @@ public void SetSparkHome(string sparkHome) public void Set(string key, string value) { stringConfDictionary[key] = value; + int i; + if (int.TryParse(value, out i)) + { + intConfDictionary[key] = i; + } } public int GetInt(string key, int defaultValue) diff --git a/csharp/AdapterTest/StreamingContextTest.cs b/csharp/AdapterTest/StreamingContextTest.cs index c6306a7e..402d7e65 100644 --- a/csharp/AdapterTest/StreamingContextTest.cs +++ b/csharp/AdapterTest/StreamingContextTest.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.IO; using System.Net; +using System.Text; using AdapterTest.Mocks; using Microsoft.Spark.CSharp.Core; using Microsoft.Spark.CSharp.Streaming; @@ -37,16 +38,26 @@ public void TestStreamingContext() var directKafkaStream = KafkaUtils.CreateDirectStream(ssc, new List { "testTopic2" }, new Dictionary(), new Dictionary()); Assert.IsNotNull(directKafkaStream.DStreamProxy); - var directKafkaStreamWithRepartition = KafkaUtils.CreateDirectStreamWithRepartition(ssc, new List { "testTopic3" }, new Dictionary(), new Dictionary(), 10); + ssc.SparkContext.SparkConf.Set("spark.mobius.streaming.kafka.numPartitions.testTopic3", "10"); + + var directKafkaStreamWithRepartition = KafkaUtils.CreateDirectStream(ssc, new List { "testTopic3" }, new Dictionary(), new Dictionary()); Assert.IsNotNull(directKafkaStreamWithRepartition.DStreamProxy); - var directKafkaStreamWithRepartitionAndReadFunc = KafkaUtils.CreateDirectStreamWithRepartitionAndReadFunc( + var directKafkaStreamWithRepartitionAndReadFunc = KafkaUtils.CreateDirectStream( + ssc, + new List { "testTopic3" }, + new Dictionary(), new Dictionary(), + (int pid, IEnumerable> input) => { return input; }); + Assert.IsNotNull(directKafkaStreamWithRepartitionAndReadFunc); + + ssc.SparkContext.SparkConf.Set("spark.mobius.streaming.kafka.numReceivers", "10"); + + var directKafkaReceiver = KafkaUtils.CreateDirectStream( ssc, new List { "testTopic3" }, new Dictionary(), new Dictionary(), - 10, - (int pid, IEnumerable> input) => { return input;}); - Assert.IsNotNull(directKafkaStreamWithRepartitionAndReadFunc.DStreamProxy); + (int pid, IEnumerable> input) => { return input; }); + Assert.IsNotNull(directKafkaReceiver.DStreamProxy); var union = ssc.Union(textFile, socketStream); Assert.IsNotNull(union.DStreamProxy); @@ -77,5 +88,28 @@ public void TestStreamingAwaitTimeout() ssc.AwaitTerminationOrTimeout(3000); ssc.Stop(); } + + [Test] + public void TestStreamingOffsetRange() + { + byte[] partition = BitConverter.GetBytes(1); + Array.Reverse(partition); + byte[] fromOffset = BitConverter.GetBytes(2L); + Array.Reverse(fromOffset); + byte[] untilOffset = BitConverter.GetBytes(3L); + Array.Reverse(untilOffset); + + var offsetRange = KafkaUtils.GetOffsetRange(new List> + { + new KeyValuePair(Encoding.UTF8.GetBytes("testTopic,testClusterId"), partition), + new KeyValuePair(fromOffset, untilOffset) + }); + + Assert.AreEqual(offsetRange.Topic, "testTopic"); + Assert.AreEqual(offsetRange.ClusterId, "testClusterId"); + Assert.AreEqual(offsetRange.Partition, 1); + Assert.AreEqual(offsetRange.FromOffset, 2); + Assert.AreEqual(offsetRange.UntilOffset, 3); + } } } diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs index 68037587..c438e79e 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs @@ -140,7 +140,8 @@ internal static void DStreamDirectKafkaWithRepartitionSample() StreamingContext ssc = StreamingContext.GetOrCreate(checkpointPath, () => { - SparkContext sc = SparkCLRSamples.SparkContext; + var conf = new SparkConf(); + SparkContext sc = new SparkContext(conf); StreamingContext context = new StreamingContext(sc, 2000L); context.Checkpoint(checkpointPath); @@ -149,7 +150,8 @@ internal static void DStreamDirectKafkaWithRepartitionSample() {"auto.offset.reset", "smallest"} }; - var dstream = KafkaUtils.CreateDirectStreamWithRepartition(context, new List { topic }, kafkaParams, new Dictionary(), partitions); + conf.Set("spark.mobius.streaming.kafka.numPartitions." + topic, partitions.ToString()); + var dstream = KafkaUtils.CreateDirectStream(context, new List { topic }, kafkaParams, new Dictionary()); dstream.ForeachRDD((time, rdd) => {