KafkaUtils.createDirectStream()参数详解

转载自 KafkaUtils.createDirectStream()参数详解 - 海贼王一样的男人 - 博客园

通过KafkaUtils.createDirectStream该方法创建kafka的DStream数据源,传入有三个参数:ssc,LocationStrategies,ConsumerStrategies。

LocationStrategies有三种策略:PreferBrokers,PreferConsistent,PreferFixed详情查看上边源码解析

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

/**

 * :: Experimental :: object to obtain instances of [[LocationStrategy]]

 *

 */

@Experimental

object LocationStrategies {

  /**

   *  :: Experimental ::

   * Use this only if your executors are on the same nodes as your Kafka brokers. 只有当executors数量等于brokers数量时使用

   */

  @Experimental

  def PreferBrokers: LocationStrategy =

    org.apache.spark.streaming.kafka010.PreferBrokers

  /**

   *  :: Experimental ::

   * Use this in most cases, it will consistently distribute partitions across all executors.大多数使用,在所有的executors分配分区

   */

  @Experimental

  def PreferConsistent: LocationStrategy =

    org.apache.spark.streaming.kafka010.PreferConsistent

  /**

   *  :: Experimental ::

   * Use this to place particular TopicPartitions on particular hosts if your load is uneven.

   * Any TopicPartition not specified in the map will use a consistent location.如果负载不平衡,把特定的TopicPartitions放在特定的hosts,不在这个map中的TopicPartition采用PreferConsistent策略

   */

  @Experimental

  def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =

    new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))

  /**

   *  :: Experimental ::

   * Use this to place particular TopicPartitions on particular hosts if your load is uneven.

   * Any TopicPartition not specified in the map will use a consistent location.

   */

  @Experimental

  def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =

    new PreferFixed(hostMap)

ConsumerStrategies消费者策略:Subscribe,SubscribePattern,Assign,订阅和分配

Subscribe为consumer自动分配partition,有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer

Assign为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当于指定的group无效

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

/**

   *  :: Experimental ::

   * Subscribe to a collection of topics.

   * @param topics collection of topics to subscribe

   * @param kafkaParams Kafka

   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">

   * configuration parameters</a> to be used on driver. The same params will be used on executors,

   * with minor automatic modifications applied.

   *  Requires "bootstrap.servers" to be set

   * with Kafka broker(s) specified in host1:port1,host2:port2 form.

   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a

   * TopicPartition, the committed offset (if applicable) or kafka param

   * auto.offset.reset will be used.

   */

  @Experimental

  def Subscribe[K, V](

      topics: Iterable[jl.String],

      kafkaParams: collection.Map[String, Object],

      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {

    new Subscribe[K, V](

      new ju.ArrayList(topics.asJavaCollection),

      new ju.HashMap[String, Object](kafkaParams.asJava),

      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =new jl.Long(l)).asJava))

  }

/** :: Experimental ::

   * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.

   * The pattern matching will be done periodically against topics existing at the time of check.

   * @param pattern pattern to subscribe to

   * @param kafkaParams Kafka

   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">

   * configuration parameters</a> to be used on driver. The same params will be used on executors,

   * with minor automatic modifications applied.

   *  Requires "bootstrap.servers" to be set

   * with Kafka broker(s) specified in host1:port1,host2:port2 form.

   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a

   * TopicPartition, the committed offset (if applicable) or kafka param

   * auto.offset.reset will be used.

   */

  @Experimental

  def SubscribePattern[K, V](

      pattern: ju.regex.Pattern,

      kafkaParams: collection.Map[String, Object],

      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {

    new SubscribePattern[K, V](

      pattern,

      new ju.HashMap[String, Object](kafkaParams.asJava),

      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =new jl.Long(l)).asJava))

  }

/**

   *  :: Experimental ::

   * Assign a fixed collection of TopicPartitions

   * @param topicPartitions collection of TopicPartitions to assign

   * @param kafkaParams Kafka

   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">

   * configuration parameters</a> to be used on driver. The same params will be used on executors,

   * with minor automatic modifications applied.

   *  Requires "bootstrap.servers" to be set

   * with Kafka broker(s) specified in host1:port1,host2:port2 form.

   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a

   * TopicPartition, the committed offset (if applicable) or kafka param

   * auto.offset.reset will be used.

   */

  @Experimental

  def Assign[K, V](

      topicPartitions: Iterable[TopicPartition],

      kafkaParams: collection.Map[String, Object],

      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {

    new Assign[K, V](

      new ju.ArrayList(topicPartitions.asJavaCollection),

      new ju.HashMap[String, Object](kafkaParams.asJava),

      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l =new jl.Long(l)).asJava))

  }

 Cannot resolve overloaded method:

原因:方法中传入的参数不符合要求。检查参数类型


版权声明:本文为theminer原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。