/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@ Experimental
def Subscribe[K, V](
topics : Iterable[jl.String],
kafkaParams : collection.Map[String, Object],
offsets : collection.Map[TopicPartition, Long]) : ConsumerStrategy[K, V] = {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l = > new jl.Long(l)).asJava))
}
/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@ Experimental
def SubscribePattern[K, V](
pattern : ju.regex.Pattern,
kafkaParams : collection.Map[String, Object],
offsets : collection.Map[TopicPartition, Long]) : ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l = > new jl.Long(l)).asJava))
}
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@ Experimental
def Assign[K, V](
topicPartitions : Iterable[TopicPartition],
kafkaParams : collection.Map[String, Object],
offsets : collection.Map[TopicPartition, Long]) : ConsumerStrategy[K, V] = {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l = > new jl.Long(l)).asJava))
}
|