Skip to content

Commit

Permalink
unifying KafkaUtils.CreateDirectStream API
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongrenyi committed Jul 20, 2016
1 parent 3d541e9 commit 66f6f60
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 97 deletions.
166 changes: 118 additions & 48 deletions csharp/Adapter/Microsoft.Spark.CSharp/Streaming/Kafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.IO;

using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;

namespace Microsoft.Spark.CSharp.Streaming
{
Expand Down Expand Up @@ -83,6 +84,14 @@ public static DStream<KeyValuePair<byte[], byte[]>> CreateStream(StreamingContex
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStream(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
{
int numPartitions = GetNumPartitionsFromConfig(ssc, topics, kafkaParams);
if (numPartitions >= 0 ||
ssc.SparkContext.SparkConf.SparkConfProxy.Get("spark.mobius.streaming.kafka.CSharpReader.enabled", "false").ToLower() == "true" ||
ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numReceivers", 0) > 0 ||
topics.Any(topic => ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.maxMessagesPerTask." + topic, 0) > 0))
{
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
}
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), ssc, SerializedMode.Pair);
}

Expand All @@ -109,57 +118,18 @@ public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStream(Streaming
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <param name="numPartitions">
/// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
/// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
/// a probably larger number of RDD partitions
/// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
/// If numPartitions = 0, repartition using original kafka partition count
/// If numPartitions > 0, repartition using this parameter
/// </param>
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStreamWithRepartition(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions = -1)
{
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
}

/// <summary>
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
///
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
/// in each batch duration and processed without storing.
///
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
/// You can access the offsets used in each batch from the generated RDDs (see
/// [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
/// The information on consumed offset can be recovered from the checkpoint.
/// See the programming guide for details (constraints, etc.).
///
/// </summary>
/// <param name="ssc">Spark Streaming Context</param>
/// <param name="topics">list of topic_name to consume.</param>
/// <param name="kafkaParams">
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <param name="numPartitions">
/// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
/// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
/// a probably larger number of RDD partitions
/// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
/// If numPartitions = 0, repartition using original kafka partition count
/// If numPartitions > 0, repartition using this parameter
/// </param>
/// <param name="readFunc">user function to process the kafka data.</param>
/// <returns>A DStream object</returns>
public static DStream<T> CreateDirectStreamWithRepartitionAndReadFunc<T>(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets,
int numPartitions, Func<int, IEnumerable<KeyValuePair<byte[], byte[]>>, IEnumerable<T>> readFunc)
public static DStream<T> CreateDirectStream<T>(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, Func<int, IEnumerable<KeyValuePair<byte[], byte[]>>, IEnumerable<T>> readFunc)
{
var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper<KeyValuePair<byte[], byte[]>, T>(readFunc, true);
int numPartitions = GetNumPartitionsFromConfig(ssc, topics, kafkaParams);
if (ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numReceivers", 0) <= 0)
{
var dstream = new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
return dstream.MapPartitionsWithIndex(readFunc, true);
}

var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper<KeyValuePair<byte[], byte[]>, T>(readFunc, true);
var transformHelper = new TransformHelper<KeyValuePair<byte[], byte[]>, T>(mapPartitionsWithIndexHelper.Execute);
var transformDynamicHelper = new TransformDynamicHelper<KeyValuePair<byte[], byte[]>, T>(transformHelper.Execute);
Func<double, RDD<dynamic>, RDD<dynamic>> func = transformDynamicHelper.Execute;
Expand All @@ -170,5 +140,105 @@ public static DStream<T> CreateDirectStreamWithRepartitionAndReadFunc<T>(Streami
string serializationMode = SerializedMode.Pair.ToString();
return new DStream<T>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, readFuncBytes, serializationMode), ssc);
}

/// <summary>
/// create offset range from kafka messages when CSharpReader is enabled
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static OffsetRange GetOffsetRange(IEnumerable<KeyValuePair<byte[], byte[]>> input)
{
int count = 2;
int i = 0;
var offsetRange = new KeyValuePair<byte[], byte[]>[count];
foreach (var message in input)
{
offsetRange[i++ % count] = message;
if (i > count)
break;
}

if (i != count)
{
throw new ArgumentException("Expecting kafka OffsetRange metadata.");
}

var topicAndClusterId = SerDe.ToString(offsetRange[0].Key);
var topic = topicAndClusterId.Split(',')[0];
var clusterId = topicAndClusterId.Split(',')[1];
var partition = SerDe.ToInt(offsetRange[0].Value);
var fromOffset = SerDe.ReadLong(new MemoryStream(offsetRange[1].Key));
var untilOffset = SerDe.ReadLong(new MemoryStream(offsetRange[1].Value));

return new OffsetRange(topic, clusterId, partition, fromOffset, untilOffset);
}

/// <summary>
/// topics should contain only one topic if choose to repartitions to a configured numPartitions
/// TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation
/// </summary>
/// <param name="ssc"></param>
/// <param name="topics"></param>
/// <param name="kafkaParams"></param>
/// <returns></returns>
private static int GetNumPartitionsFromConfig(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams)
{
if (topics == null || topics.Count == 0)
return -1;

string clusterId = kafkaParams.ContainsKey("cluster.id") ? "." + kafkaParams["cluster.id"] : null;
return ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numPartitions." + topics[0] + clusterId, -1);
}
}

/// <summary>
/// Kafka offset range
/// </summary>
public class OffsetRange
{
private readonly string topic;
private readonly string clusterId;
private readonly int partition;
private readonly long fromOffset;
private readonly long untilOffset;

/// <summary>
/// Topic
/// </summary>
public string Topic { get { return topic; } }
/// <summary>
/// ClusterId
/// </summary>
public string ClusterId { get { return clusterId; } }
/// <summary>
/// Partition
/// </summary>
public int Partition { get { return partition; } }
/// <summary>
/// FromOffset
/// </summary>
public long FromOffset { get { return fromOffset; } }
/// <summary>
/// Until Offset
/// </summary>
public long UntilOffset { get { return untilOffset; } }

internal OffsetRange(string topic, string clusterId, int partition, long fromOffset, long untilOffset)
{
this.topic = topic;
this.clusterId = clusterId;
this.partition = partition;
this.fromOffset = fromOffset;
this.untilOffset = untilOffset;
}

/// <summary>
/// OffsetRange string format
/// </summary>
/// <returns></returns>
public override string ToString()
{
return string.Format("Kafka OffsetRange: topic {0} cluster {1} partition {2} from {3} until {4}", topic, clusterId, partition, fromOffset, untilOffset);
}
}
}
93 changes: 52 additions & 41 deletions csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 66f6f60

Please sign in to comment.