SparkStreaming中的数据抽象叫做DStream。DStream是抽象类,它把连续的数据流拆成很多的小RDD数据块, 这叫做“微批次”, spark的流式处理, 都是“微批次处理”。 DStream内部实现上有批次处理时间间隔,滑动窗口等机制来保证每个微批次的时间间隔里, 数据流以RDD的形式发送给spark做进一步处理。因此, 在一个为批次的处理时间间隔里, DStream只产生一个RDD。
foreachRDD、foreachPartition和foreach 的区别:
- 首先是作用范围不同,foreachRDD 作用于 DStream中每一个时间间隔的 RDD,foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition,foreach 作用于每一个时间间隔的 RDD 中的每一个元素。
- foreach 与 foreachPartition都是在每个partition中对iterator进行操作,不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用,而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理(可以避免内存溢出)。
Dstream.foreachRDD介绍:
首先他操作的是Dstream, Dstream是一个由RDD组成的流,foreachRDD是一个输出的操作,它可以操作RDD,比如把RDD的数据写入的数据库,对RDD进行业务逻辑处理,把SparkStream运行得到的结果保存到外部系统比如HDFS、Mysql、Redis等等。
要是想要操作RDD里面的数据,就要用RDD.foreach
foreachRDD算子使用的常见误区:
可以利用Dstream.foreachRDD把数据发送给外部系统。 但是想要正确地, 有效率的使用它, 必须理解一下背后的机制。通常向外部系统写数据需要一个Connection对象(通过它与外部服务器交互)。
- 误区一:在driver上创建连接对象(比如网络连接或数据库连接)
如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 数据库连接在driver上执行
rdd.foreach { record =>
connection.send(record) // 在worker上执行
}
}
- 误区二:为每一条记录都创建一个连接对象
通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。
比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入数据库中。这样可以大大减少创建的连接对象的数量。
正确做法一:为每个RDD分区创建一个连接对象
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
正确做法二:为每个RDD分区使用一个连接池中的连接对象
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 从数据库连接池中获取连接
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 用完以后将连接返回给连接池,进行复用
}
}
举例:输出到MySQL数据库中
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.DriverManager
/**
* 数据的输出,利用foreachRDD算子将数据输出到指定的位置或将数据保存到某个地方
*/
object ForeachRDDDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
//二、获取数据流,即数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9996)
//三、数据处理
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val wordCounts: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//四、数据输出,将数据保存到MySQL中
wordCounts.foreachRDD{(rdd,time) =>
rdd.foreach{record =>
//为每一条数据都创建了一个连接,连接使用完了就关闭
//频繁的创建和关闭连接。其实对数据性能影响很大。 这个就是可以优化的点,可以考虑将foreach换成foreachPartition
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark","root","111111")
val statement = conn.prepareStatement(s"insert into wordcount(ts,word,count) values (?,?,?)")
statement.setLong(1,time.milliseconds.toLong)
statement.setString(2,record._1)
statement.setInt(3,record._2)
statement.execute()
statement.close()
conn.close()
}
}
//五、启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
- 注意:
Dstream和RDD一样是延迟执行,只有遇到action操作才会真正去计算。因此在Dstream的内部RDD必须包含Action操作才能是接受到的数据得到处理。即使代码中包含foreachRDD,但在内部却没有action的RDD,SparkStream只会简单地接受数据数据而不进行处理
版权声明:本文为qq_40078490原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。