spark的java的map函数_当我在Spark上使用DataFrame的map函数时发生java.lang.ClassNotFoundException...

我有一个DataFrame" orderedDf" ,哪个架构如下:

root

|-- schoolID: string (nullable = true)

|-- count(studentID): long (nullable = false)

|-- count(teacherID): long (nullable = false)

|-- sum(size): long (nullable = true)

|-- sum(documentCount): long (nullable = true)

|-- avg_totalScore: double (nullable = true)

以下是我的DataFrame" orderedDf"的数据:

+--------+----------------+----------------+---------+------------------+--------------+

|schoolID|count(studentID)|count(teacherID)|sum(size)|sum(documentCount)|avg_totalScore|

+--------+----------------+----------------+---------+------------------+--------------+

|school03| 2| 2| 195| 314| 100.0|

|school02| 2| 2| 193| 330| 94.5|

|school01| 2| 2| 294| 285| 83.4|

|school04| 2| 2| 263| 415| 72.5|

|school05| 2| 2| 263| 415| 62.5|

|school07| 2| 2| 263| 415| 52.5|

|school09| 2| 2| 263| 415| 49.8|

|school08| 2| 2| 263| 415| 42.3|

|school06| 2| 2| 263| 415| 32.5|

+--------+----------------+----------------+---------+------------------+--------------+

我们可以看到专栏" avg_totalScore"由desc订购。

现在,我有一个问题,我想将所有行分区为三组,如下所示:

+--------+----------------+----------------+---------+------------------+--------------+

|schoolID|count(studentID)|count(teacherID)|sum(size)|sum(documentCount)|avg_totalScore|

+--------+----------------+----------------+---------+------------------+--------------+

|great | 2| 2| 195| 314| 100.0|

|great | 2| 2| 193| 330| 94.5|

|great | 2| 2| 294| 285| 83.4|

|good | 2| 2| 263| 415| 72.5|

|good | 2| 2| 263| 415| 62.5|

|good | 2| 2| 263| 415| 52.5|

|bad | 2| 2| 263| 415| 49.8|

|bad | 2| 2| 263| 415| 42.3|

|bad | 2| 2| 263| 415| 32.5|

+--------+----------------+----------------+---------+------------------+--------------+

换句话说,我想根据他们的#av; avg_totalScore"分别是好学校,好学校和坏学校将学校分为三组,费率为3:3:3。

我的解决方案如下:

val num = orderedDf.count()

val first_split_num = math.floor(num * (1.0/3))

val second_split_num = math.ceil(num * (2.0/3))

val accumu = SparkContext.getOrCreate(Configuration.getSparkConf).accumulator(0, "Group Num")

val rdd = orderedDf.map(row => {

val group = {

accumu match {

case a: Accumulator[Int] if a.value <= first_split_num => "great"

case b: Accumulator[Int] if b.value <= second_split_num => "good"

case _ => "bad"

}

}

accumu += 1

Row(group, row(1), row(2), row(3), row(4), row(5), row(6))

})

val result = sqlContext.createDataFrame(rdd,orderedDf.schema)

上面的代码没问题,没有任何异常,但是当我使用:

result.collect().foreach(println)

result.show()

我得到一个ClassNotFound异常,我不知道原因。谁能帮助我,非常感谢你!

以下是例外情况的详细信息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 44.0 failed 4 times, most recent failure: Lost task 0.3 in stage 44.0 (TID 3644, node1): java.lang.ClassNotFoundException: com.lancoo.ecbdc.business.ComparativeAnalysisBusiness$$anonfun$1

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)

at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)

at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)

at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)

at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


版权声明:本文为weixin_39568172原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。