mysql数据库关闭连接spark_sparksql 连接读取MySQL数据库

1. 从oracle 官方网站  https://dev.mysql.com/downloads/connector/j/ 下载mysql-connector 驱动,一般是一个rpm包。

2.  部署mysql-connector 驱动

在spark中使用此驱动连接mysql数据库时,不要在spark 集群上使用rpm  -ivh 方式安装,因为你会发现这样安装后找不到很多部署文档中提到的mysql-connector的jar包。其实只需要在windows中通过解压程序对rpm包解压即可(可能的话要多解压几层),在其中的cpio压缩文件中包含目录,其中的java目录中包含此jar包。

将此jar包传到spark集群服务器上,一般可以放在SPAKR_HOME/jars目录中,然后设置CLASSPATH 环境变量(可参考mysql-connector的官方安装指导)

export CLASSPATH=/path/mysql-connector-***.jar:$CLASSPATH

3. 代码连接mysql数据库

from pyspark.sql import SparkSession

try:

sc.stop()

except:

pass

spk = SparkSession.builder.master("spark://192.168.12.7:7077").appName("spark1").getOrCreate()

print(spk)

sc11=spk.sparkContext

print(sc11)

jdbcdf = spk.read.format('jdbc').option('url',"jdbc:mysql://192.168.8.61:3306/sparkdb?user=spark&password=passwrod")\

.option('dbtable','class').load()

print(jdbcdf)

#jdbcdf.select('id','name','score').write.format('parquet').save("src/main/resources/sparkdb")

#jdbcdf.select('id','name','score').write.format('json').save("src/main/resources/sparkdb")

jdbcdf.select('id','name','score').write.saveAsTable("sparktb3")

jdbcsql=spk.sql("select * from sparktb3")

jdbcsql.show()

4. 保存数据到mysql数据库

dforacle.write.format('jdbc').mode("overwrite").options(

url="jdbc:mysql://192.168.8.61:3306/sparkdb",

dbtable='tablename',

user='spark',

password='password').save()

dfmysql=spk.read.format('jdbc').options(

url="jdbc:mysql://192.168.8.61:3306/sparkdb",

dbtable='tablename',

user='spark',

password='password').load()

dfmysql.show(3)

dfmysql.createOrReplaceTempView('fy_ls')

spk.sql("select * from fy_ls").show(30)


版权声明:本文为weixin_36238982原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。