承接上篇:https://blog.csdn.net/hxcaifly/article/details/86147736
1.图的基本组成
图由边和顶点构成:
- Edge: 边。每条边是Tuple3<K,K,V>的数据结构,保存了边的开始顶点Id,边的目的顶点Id和边的值。
- Vertex: 顶点。每个顶点是Tuple2<K, V>的数据结构。保存了顶点的Id,和顶点的值。
2. 顶点和边的代码定义
2.1. 顶点的定义
package org.apache.flink.graph;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* 图的顶点。由ID和值构成
* 对于没有值的顶点,利用 {@link org.apache.flink.types.NullValue} 作为值类型。
*
* @param <K>
* @param <V>
*/
public class Vertex<K, V> extends Tuple2<K, V> {
private static final long serialVersionUID = 1L;
public Vertex(){}
public Vertex(K k, V val) {
this.f0 = k;
this.f1 = val;
}
public K getId() {
return this.f0;
}
public V getValue() {
return this.f1;
}
public void setId(K id) {
this.f0 = id;
}
public void setValue(V val) {
this.f1 = val;
}
}
顶点的定义其实很简洁,顶点就是由顶点Id和顶点值构成。所以就利用了Tuple2<K, V> 数据结构。其中K表示顶点Id的类型;V表示顶点的值类型。
2.2. 边的定义
package org.apache.flink.graph;
import org.apache.flink.api.java.tuple.Tuple3;
/**
* 边是呈现两个顶点{@link Vertex vertices}之间的连线
* 对于边如果没有值,用{@link org.apache.flink.types.NullValue}作为值类型。
*
* @param <K> 源顶点和目的顶点的key类型
* @param <V> 边的值类型
*/
public class Edge<K, V> extends Tuple3<K, K, V> {
private static final long serialVersionUID = 1L;
public Edge(){}
public Edge(K source, K target, V value) {
this.f0 = source;
this.f1 = target;
this.f2 = value;
}
/**
* Reverses the direction of this Edge.
* @return a new Edge, where the source is the original Edge's target
* and the target is the original Edge's source.
*/
public Edge<K, V> reverse() {
return new Edge<>(this.f1, this.f0, this.f2);
}
public void setSource(K source) {
this.f0 = source;
}
public K getSource() {
return this.f0;
}
public void setTarget(K target) {
this.f1 = target;
}
public K getTarget() {
return f1;
}
public void setValue(V value) {
this.f2 = value;
}
public V getValue() {
return f2;
}
}
边是表示两个顶点之间的连线。那么怎样才能记录一条边呢。很容易想到,需要记录边的出发点(起始顶点)和目的点(目标顶点)。然后两个顶点之间的连线是有值的,这个值可以表示不同的含义,比如可以表示权重,或者表示距离。
所以,边的数据结构是继承了Tuple3<K, K, V>。其中,K表示顶点的类型,分别是起始顶点和目标顶点;V表示边的值类型。
2.3. 图的定义
我们已经知道图(Graph)是由边和顶点构成的,那么我们不难想象Graph这个类的定义。
package org.apache.flink.graph;
/**
* 定义由边{@link Edge edges}和顶点{@link Vertex vertices}组成的图。
* * @see org.apache.flink.graph.Edge
* @see org.apache.flink.graph.Vertex
* * * @param <K> 顶点的key类型
* @param <VV> 顶点的值类型
* @param <EV> 边的值类型
*/
@SuppressWarnings("serial")
public class Graph<K, VV, EV> {
// 1. 执行环境
private final ExecutionEnvironment context;
// 2.图的顶点数据集
private final DataSet<Vertex<K, VV>> vertices;
// 3. 图的边数据集
private final DataSet<Edge<K, EV>> edges;
// 省略了方法操作等
}
上述代码是图的定义中的成员变量部分。
- ExecutionEnvironment :图的执行上下文,图的计算其实也是一个单独的任务。需要依靠于这个执行上下文去获取一些相关执行配置等。
- DataSet<Vertex<K, VV>> : 顶点的数据集。
- DataSet<Edge<K, EV>>: 边的数据集。
Graph类中定义了很多构造方法,这些构造方法主要是要适应从不同类型的数据源中读取图数据集。比如从csv文件中读取,或者说只提供了边数据集时,我们需要从顶点数据中提取出所有的顶点。这一块可能会随着业务的变化会有所拓展,因为宗旨就是方便不同的开发者能够多样化读入数据源。
下面列出一部分构建函数:
/**
* 从两个DataSets中创建图:顶点和边
*
* @param vertices 顶点的DataSet.
* @param edges 边的DataSet.
* @param context flink执行环境.
*/
protected Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
this.vertices = vertices;
this.edges = edges;
this.context = context;
}
/**
* 从顶点和边的集合数据中创建图
*
* @param vertices a Collection of vertices.
* @param edges a Collection of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(vertices),
context.fromCollection(edges), context);
}
/**
* 从边的集合数据创建图。图的顶点会自动根据边来创建。并且顶点的值被设为Null
* Creates a graph from a Collection of edges.
* Vertices are created automatically and their values are set to
* NullValue.
*
* @param edges a Collection of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(edges), context);
}
/**
* 从边的数据集中构建出图
*
* @param edges a Collection of edges.
* @param vertexValueInitializer a map function that initializes the vertex values.
* It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(edges), vertexValueInitializer, context);
}
/**
* 从顶点数据集和边的数据集中创建图。
* Creates a graph from a DataSet of vertices and a DataSet of edges.
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
return new Graph<>(vertices, edges, context);
}
/**
* 从边的DataSet中创建图。
* NullValue.
*
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
DataSet<Vertex<K, NullValue>> vertices = edges
.flatMap(new EmitSrcAndTarget<>())
.name("Source and target IDs")
.distinct()
.name("IDs");
return new Graph<>(vertices, edges, context);
}
Graph类里还定义了一些和图相关的基本操作方法,比如:移除顶点操作,反转图的所有边方向等。因为整个代码太长,我们没有必要一个个去看,后面会结合应用案例去熟悉相关操作方法。
3.图的简单操作案例
package org.apache.flink.graph.examples;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.data.EuclideanGraphData;
import java.io.Serializable;
/**
* 这个例子主要是展示怎样应用Gelly的{@link Graph#getTriplets()}和
* {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)}方法
*
* 给定一个有方向的,无权重的图。顶点值表示平面中的点
* 返回一个加权图,其中边权重等于SRC和TRG顶点值之间的欧几里得距离。
*
* <p>输入文件是纯文本文件,必须按以下格式设置:
* <ul>
* <li> 顶点由顶点和顶点值表示,并用换行符分隔。由两个用逗号分隔的双精度数组成的值。,
* 例如: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> 定义了由3个顶点组成的数据集
* <li> 边由一对srcvertexid,trgvertexid由逗号分隔。
* 他们是由换行符分隔的边。
* 例如: <code>1,2\n1,3\n</code> 定义了两条边 1-2 和 1-3.
* </ul>
*
* <p>Usage <code>EuclideanGraphWeighing <vertex path> <edge path> <result path></code><br>
* If no parameters are provided, the program is run with default data from
* {@link EuclideanGraphData}
*/
@SuppressWarnings("serial")
public class EuclideanGraphWeighing implements ProgramDescription {
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1. 读取顶点数据
DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
// 2. 读取边数据
DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
// 3. 构建图
Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
// 4. 边的值将会是源顶点和目的顶点之间的欧式距离
DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
@Override
public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
throws Exception {
Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
return new Tuple3<>(srcVertex.getId(), trgVertex.getId(),
srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
}
});
// 5. 结果图
Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
new EdgeJoinFunction<Double, Double>() {
public Double edgeJoin(Double edgeValue, Double inputValue) {
return inputValue;
}
});
// 从最后的结果图中还原出边
DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", ",");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("Euclidean Graph Weighing Example");
} else {
result.print();
}
}
@Override
public String getDescription() {
return "Weighing a graph by computing the Euclidean distance " +
"between its vertices";
}
// *************************************************************************
// 数据类型
// *************************************************************************
/**
* 一个检点的二维点坐标数据结构
*/
public static class Point implements Serializable {
public double x, y;
public Point() {}
public Point(double x, double y) {
this.x = x;
this.y = y;
}
public double euclideanDistance(Point other) {
return Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y));
}
@Override
public String toString() {
return x + " " + y;
}
}
// ******************************************************************************************************************
// 工具方法
// ******************************************************************************************************************
private static boolean fileOutput = false;
private static String verticesInputPath = null;
private static String edgesInputPath = null;
private static String outputPath = null;
/**
* 解析命令行参数
* @param args
* @return
*/
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
if (args.length == 3) {
fileOutput = true;
verticesInputPath = args[0];
edgesInputPath = args[1];
outputPath = args[2];
} else {
System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
System.out.println("Provide parameters to read input data from files.");
System.out.println("See the documentation for the correct format of input files.");
System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
" <output path>");
return false;
}
}
return true;
}
/**
* 加载顶点数据
* @param env
* @return
*/
private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
// 1. 如果穿了外部读取文件
if (fileOutput) {
// 读取csv文件数据,并且映射为Vertex数据。
return env.readCsvFile(verticesInputPath)
.lineDelimiter("\n")
.types(Long.class, Double.class, Double.class)
.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
@Override
public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
return new Vertex<>(value.f0, new Point(value.f1, value.f2));
}
});
} else {
// 读取默认的数据
return EuclideanGraphData.getDefaultVertexDataSet(env);
}
}
private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
@Override
public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
return new Edge<>(tuple2.f0, tuple2.f1, 0.0);
}
});
} else {
return EuclideanGraphData.getDefaultEdgeDataSet(env);
}
}
}
上述代码重点利用了图的joinWithEdges操作:
/**
* 边的DataSet和输入的DataSet,通过sourceIds和targetIds的复合键进行Join。
* 并且利用自定义的转移函数应用到匹配的记录值上去。
* 输入Dataset的前两个字段是用来作为join的key。
*
* @param inputDataSet 用来join的输入Dataset
* Tuple3的前两个字段是作为join操作的复合键。然后第三个字段会作为参数传进转移函数里面去。
* @param edgeJoinFunction 应用的转移函数。
* 第一个参数是当前的边值,第二个参数是从输入Dataset传进来的匹配Tuple3的值
* @param <T> 输入Tuple3数据集的第三个字段。
* @return 返回一个新图,然后边的值根据edgeJoinFunction跟新了
*/
public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
final EdgeJoinFunction<EV, T> edgeJoinFunction) {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
.with(new ApplyCoGroupToEdgeValues<>(edgeJoinFunction))
.name("Join with edges");
return new Graph<>(this.vertices, resultedEdges, this.context);
}
coGroup操作执行的逻辑定义在Graph类文件的子类中:ApplyCoGroupToEdgeValues。
private static final class ApplyCoGroupToEdgeValues<K, EV, T>
implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
private Edge<K, EV> output = new Edge<>();
private EdgeJoinFunction<EV, T> edgeJoinFunction;
public ApplyCoGroupToEdgeValues(EdgeJoinFunction<EV, T> mapper) {
this.edgeJoinFunction = mapper;
}
@Override
public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input,
Collector<Edge<K, EV>> collector) throws Exception {
final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
if (edgesIterator.hasNext()) {
if (inputIterator.hasNext()) {
final Tuple3<K, K, T> inputNext = inputIterator.next();
output.f0 = inputNext.f0;
output.f1 = inputNext.f1;
output.f2 = edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2);
collector.collect(output);
} else {
collector.collect(edgesIterator.next());
}
}
}
}
最终程序运行的结果为:
(1,4,4.242640687119285)
(2,5,4.242640687119285)
(4,6,2.8284271247461903)
(7,9,2.8284271247461903)
(1,2,1.4142135623730951)
(2,3,1.4142135623730951)
(3,5,2.8284271247461903)
(5,7,2.8284271247461903)
(5,9,5.656854249492381)
(6,8,2.8284271247461903)
(7,8,1.4142135623730951)
(2,4,2.8284271247461903)
(4,5,1.4142135623730951)
(8,9,1.4142135623730951)
(6,7,1.4142135623730951)
其实图的操作很简单,只要把图的数据源定义清楚,后面的操作其实就像跟操作普通的Tuple结构数据是一样的。
备注:上述案例的源码请参考flink源码的flink-gelly-examples模块。