python graphx_Spark Graphx 实现图中极大团挖掘, 伪并行化算法

背景:

spark graphx并未提供极大团挖掘算法

当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法

思路:

spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集

利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)

对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限

期待真正的并行化的极大团算法

配置文件:

graph_data_path=hdfs://localhost/graph_data

out_path=hdfs://localhost/clique

ck_path=hdfs://localhost/checkpoint

numIter=50剪枝次数

count=3极大团顶点数大小

algorithm=2极大团算法,1:个人实现 2:jgrapht

percent=90剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高

spark.master=local

spark.app.name=graph

spark.serializer=org.apache.spark.serializer.KryoSerializer

spark.yarn.executor.memoryOverhead=20480

spark.yarn.driver.memoryOverhead=20480

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC

spark.driver.maxResultSize=10g

spark.default.parallelism=60

样本数据:

{"src":"0","dst":"1"}

{"src":"0","dst":"2"}

{"src":"0","dst":"3"}

{"src":"1","dst":"0"}

{"src":"2","dst":"1"}

{"src":"3","dst":"5"}

{"src":"4","dst":"6"}

{"src":"5","dst":"4"}

{"src":"6","dst":"5"}

{"src":"3","dst":"2"}

{"src":"2","dst":"3"}

{"src":"6","dst":"4"}

{"src":"3","dst":"4"}

{"src":"4","dst":"3"}

{"src":"2","dst":"6"}

{"src":"6","dst":"2"}

{"src":"6","dst":"7"}

{"src":"7","dst":"6"}

样本图:

BCW1UuK.png

输出:

0,1,2

0,2,3

3,4,5

4,5,6

代码实现:

import java.util

import java.util.Properties

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.graphx.{Edge, Graph}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{Row, SQLContext}

import org.apache.spark.storage.StorageLevel

import org.apache.spark.{SparkConf, SparkContext}

import org.jgrapht.alg.BronKerboschCliqueFinder

import org.jgrapht.graph.{DefaultEdge, SimpleGraph}

import scala.collection.JavaConverters._

import scala.collection.mutable

object ApplicationTitan {

def main(args: Array[String]) {

val prop = new Properties()

prop.load(getClass.getResourceAsStream("/config.properties"))

val graph_data_path = prop.getProperty("graph_data_path")

val out_path = prop.getProperty("out_path")

val ck_path = prop.getProperty("ck_path")

val count = Integer.parseInt(prop.getProperty("count"))

val numIter = Integer.parseInt(prop.getProperty("numIter"))

val algorithm = Integer.parseInt(prop.getProperty("algorithm"))

val percent = Integer.parseInt(prop.getProperty("percent"))

val conf = new SparkConf()

try {

Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path)

// Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path)

} catch {

case ex: Exception =>

ex.printStackTrace(System.out)

}

prop.stringPropertyNames().asScala.foreach(s => {

if (s.startsWith("spark")) {

conf.set(s, prop.getProperty(s))

}

})

conf.registerKryoClasses(Array(getClass))

val sc = new SparkContext(conf)

sc.setLogLevel("ERROR")

sc.setCheckpointDir(ck_path)

val sqlc = new SQLContext(sc)

try {

val e_df = sqlc.read

// .json(graph_data_path)

.parquet(graph_data_path)

var e_rdd = e_df

.mapPartitions(it => {

it.map({

case Row(dst: String, src: String) =>

val src_long = src.toLong

val dst_long = dst.toLong

if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long)

})

}).distinct()

e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

var bc: Broadcast[Set[Long]] = null

var iter = 0

var bc_size = 0

//剪枝

while (iter <= numIter) {

val temp = e_rdd

.flatMap(x => List((x._1, 1), (x._2, 1)))

.reduceByKey((x, y) => x + y)

.filter(x => x._2 >= count - 1)

.mapPartitions(it => it.map(x => x._1))

val bc_value = temp.collect().toSet

bc = sc.broadcast(bc_value)

e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2))

e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

iter += 1

if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) {

println("total iter : "+ iter)

iter = Int.MaxValue

}

bc_size = bc_value.size

}

// 构造图

val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))

val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)

//连通图

val cc = graph.connectedComponents().vertices

cc.persist(StorageLevel.MEMORY_AND_DISK_SER)

cc.join(e_rdd)

.mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))

.aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)

.mapPartitions(it => it.map(x => (x._1.substring(1), x._2)))

.aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4)

.filter(x => x._2.size >= count - 1)

.flatMap(x => {

if (algorithm == 1)

find(x, count)

else

find2(x, count)

})

.mapPartitions(it => {

it.map({

case set =>

var temp = ""

set.asScala.foreach(x => temp += x + ",")

temp.substring(0, temp.length - 1)

case _ =>

})

})

// .coalesce(1)

.saveAsTextFile(out_path)

}

catch {

case ex: Exception =>

ex.printStackTrace(System.out)

}

sc.stop()

}

//自己实现的极大团算法

def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = {

println(x._1 + "|s|" + x._2.size)

println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())

val neighbors = new util.HashMap[String, util.Set[String]]

val finder = new CliqueFinder(neighbors, count)

x._2.foreach(r => {

val v1 = r._1.toString

val v2 = r._2.toString

if (neighbors.containsKey(v1)) {

neighbors.get(v1).add(v2)

} else {

val temp = new util.HashSet[String]()

temp.add(v2)

neighbors.put(v1, temp)

}

if (neighbors.containsKey(v2)) {

neighbors.get(v2).add(v1)

} else {

val temp = new util.HashSet[String]()

temp.add(v1)

neighbors.put(v2, temp)

}

})

println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())

finder.findMaxCliques().asScala

}

//jgrapht 中的极大团算法

def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = {

println(x._1 + "|s|" + x._2.size)

println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())

val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge])

x._2.foreach(r => {

val v1 = r._1.toString

val v2 = r._2.toString

to_clique.addVertex(v1)

to_clique.addVertex(v2)

to_clique.addEdge(v1, v2)

})

val finder = new BronKerboschCliqueFinder(to_clique)

val list = finder.getAllMaximalCliques.asScala

var result = Set[util.Set[String]]()

list.foreach(x => {

if (x.size() >= count)

result = result + x

})

println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())

result

}

}

自己实现的极大团算法:

import java.util.*;

/**

* [@author](https://my.oschina.net/arthor) mopspecial@gmail.com

* [@date](https://my.oschina.net/u/2504391) 2017/7/31

*/

public class CliqueFinder {

private Map> neighbors;

private Set nodes;

private Set> maxCliques = new HashSet<>();

private Integer minSize;

public CliqueFinder(Map> neighbors, Integer minSize) {

this.neighbors = neighbors;

this.nodes = neighbors.keySet();

this.minSize = minSize;

}

private void bk3(Set clique, List candidates, List excluded) {

if (candidates.isEmpty() && excluded.isEmpty()) {

if (!clique.isEmpty() && clique.size() >= minSize) {

maxCliques.add(clique);

}

return;

}

for (String s : degeneracy_order(candidates)) {

List new_candidates = new ArrayList<>(candidates);

new_candidates.retainAll(neighbors.get(s));

List new_excluded = new ArrayList<>(excluded);

new_excluded.retainAll(neighbors.get(s));

Set nextClique = new HashSet<>(clique);

nextClique.add(s);

bk2(nextClique, new_candidates, new_excluded);

candidates.remove(s);

excluded.add(s);

}

}

private void bk2(Set clique, List candidates, List excluded) {

if (candidates.isEmpty() && excluded.isEmpty()) {

if (!clique.isEmpty() && clique.size() >= minSize) {

maxCliques.add(clique);

}

return;

}

String pivot = pick_random(candidates);

if (pivot == null) {

pivot = pick_random(excluded);

}

List tempc = new ArrayList<>(candidates);

tempc.removeAll(neighbors.get(pivot));

for (String s : tempc) {

List new_candidates = new ArrayList<>(candidates);

new_candidates.retainAll(neighbors.get(s));

List new_excluded = new ArrayList<>(excluded);

new_excluded.retainAll(neighbors.get(s));

Set nextClique = new HashSet<>(clique);

nextClique.add(s);

bk2(nextClique, new_candidates, new_excluded);

candidates.remove(s);

excluded.add(s);

}

}

private List degeneracy_order(List innerNodes) {

List result = new ArrayList<>();

Map deg = new HashMap<>();

for (String node : innerNodes) {

deg.put(node, neighbors.get(node).size());

}

while (!deg.isEmpty()) {

Integer min = Collections.min(deg.values());

String minKey = null;

for (String key : deg.keySet()) {

if (deg.get(key).equals(min)) {

minKey = key;

break;

}

}

result.add(minKey);

deg.remove(minKey);

for (String k : neighbors.get(minKey)) {

if (deg.containsKey(k)) {

deg.put(k, deg.get(k) - 1);

}

}

}

return result;

}

private String pick_random(List random) {

if (random != null && !random.isEmpty()) {

return random.get(0);

} else {

return null;

}

}

public Set> findMaxCliques() {

this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>());

return maxCliques;

}

public static void main(String[] args) {

Map> neighbors = new HashMap<>();

neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3")));

neighbors.put("1", new HashSet<>(Arrays.asList("0", "2")));

neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6")));

neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5")));

neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6")));

neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6")));

neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5")));

neighbors.put("7", new HashSet<>(Arrays.asList("6")));

CliqueFinder finder = new CliqueFinder(neighbors, 3);

finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>());

System.out.println(finder.maxCliques);

}

}


版权声明:本文为weixin_42512474原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。