【Flink图计算源码解析】:Flink图的存储数据结构

承接上篇:https://blog.csdn.net/hxcaifly/article/details/86147736

1.图的基本组成

图由边和顶点构成:

  • Edge: 边。每条边是Tuple3<K,K,V>的数据结构,保存了边的开始顶点Id,边的目的顶点Id和边的值。
  • Vertex: 顶点。每个顶点是Tuple2<K, V>的数据结构。保存了顶点的Id,和顶点的值。

2. 顶点和边的代码定义

2.1. 顶点的定义

package org.apache.flink.graph;

import org.apache.flink.api.java.tuple.Tuple2;

/**
 * 图的顶点。由ID和值构成
 * 对于没有值的顶点,利用 {@link org.apache.flink.types.NullValue} 作为值类型。
 *
 * @param <K>
 * @param <V>
 */
public class Vertex<K, V> extends Tuple2<K, V> {

	private static final long serialVersionUID = 1L;

	public Vertex(){}

	public Vertex(K k, V val) {
		this.f0 = k;
		this.f1 = val;
	}

	public K getId() {
		return this.f0;
	}

	public V getValue() {
		return this.f1;
	}

	public void setId(K id) {
		this.f0 = id;
	}

	public void setValue(V val) {
		this.f1 = val;
	}
}

顶点的定义其实很简洁,顶点就是由顶点Id和顶点值构成。所以就利用了Tuple2<K, V> 数据结构。其中K表示顶点Id的类型;V表示顶点的值类型。

2.2. 边的定义

package org.apache.flink.graph;

import org.apache.flink.api.java.tuple.Tuple3;

/**
 * 边是呈现两个顶点{@link Vertex vertices}之间的连线
 * 对于边如果没有值,用{@link org.apache.flink.types.NullValue}作为值类型。
 *
 * @param <K> 源顶点和目的顶点的key类型
 * @param <V> 边的值类型
 */
public class Edge<K, V> extends Tuple3<K, K, V> {

	private static final long serialVersionUID = 1L;

	public Edge(){}

	public Edge(K source, K target, V value) {
		this.f0 = source;
		this.f1 = target;
		this.f2 = value;
	}

	/**
	 * Reverses the direction of this Edge.
	 * @return a new Edge, where the source is the original Edge's target
	 * and the target is the original Edge's source.
	 */
	public Edge<K, V> reverse() {
			return new Edge<>(this.f1, this.f0, this.f2);
	}

	public void setSource(K source) {
		this.f0 = source;
	}

	public K getSource() {
		return this.f0;
	}

	public void setTarget(K target) {
		this.f1 = target;
	}

	public K getTarget() {
		return f1;
	}

	public void setValue(V value) {
		this.f2 = value;
	}

	public V getValue() {
		return f2;
	}
}

边是表示两个顶点之间的连线。那么怎样才能记录一条边呢。很容易想到,需要记录边的出发点(起始顶点)和目的点(目标顶点)。然后两个顶点之间的连线是有值的,这个值可以表示不同的含义,比如可以表示权重,或者表示距离。

所以,边的数据结构是继承了Tuple3<K, K, V>。其中,K表示顶点的类型,分别是起始顶点和目标顶点;V表示边的值类型。

2.3. 图的定义

我们已经知道图(Graph)是由边和顶点构成的,那么我们不难想象Graph这个类的定义。

package org.apache.flink.graph;

/**
 * 定义由边{@link Edge edges}和顶点{@link Vertex vertices}组成的图。
 *  * @see org.apache.flink.graph.Edge
 * @see org.apache.flink.graph.Vertex
 *  *  * @param <K>  顶点的key类型
 * @param <VV> 顶点的值类型
 * @param <EV> 边的值类型
 */
@SuppressWarnings("serial")
public class Graph<K, VV, EV> {
    
    // 1. 执行环境
	private final ExecutionEnvironment context;
    // 2.图的顶点数据集
	private final DataSet<Vertex<K, VV>> vertices;
    
    // 3. 图的边数据集
	private final DataSet<Edge<K, EV>> edges;

    // 省略了方法操作等
}

上述代码是图的定义中的成员变量部分。

  • ExecutionEnvironment :图的执行上下文,图的计算其实也是一个单独的任务。需要依靠于这个执行上下文去获取一些相关执行配置等。
  • DataSet<Vertex<K, VV>> : 顶点的数据集。
  • DataSet<Edge<K, EV>>: 边的数据集。

Graph类中定义了很多构造方法,这些构造方法主要是要适应从不同类型的数据源中读取图数据集。比如从csv文件中读取,或者说只提供了边数据集时,我们需要从顶点数据中提取出所有的顶点。这一块可能会随着业务的变化会有所拓展,因为宗旨就是方便不同的开发者能够多样化读入数据源。
下面列出一部分构建函数:

/**
	 * 从两个DataSets中创建图:顶点和边
	 *
	 * @param vertices 顶点的DataSet.
	 * @param edges 边的DataSet.
	 * @param context flink执行环境.
	 */
	protected Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
		this.vertices = vertices;
		this.edges = edges;
		this.context = context;
	}

	/**
	 * 从顶点和边的集合数据中创建图
	 *
	 * @param vertices a Collection of vertices.
	 * @param edges a Collection of edges.
	 * @param context the flink execution environment.
	 * @return the newly created graph.
	 */
	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
			Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {

		return fromDataSet(context.fromCollection(vertices),
				context.fromCollection(edges), context);
	}

	/**
	 * 从边的集合数据创建图。图的顶点会自动根据边来创建。并且顶点的值被设为Null
	 * Creates a graph from a Collection of edges.
	 * Vertices are created automatically and their values are set to
	 * NullValue.
	 *
	 * @param edges a Collection of edges.
	 * @param context the flink execution environment.
	 * @return the newly created graph.
	 */
	public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
			ExecutionEnvironment context) {

		return fromDataSet(context.fromCollection(edges), context);
	}

	/**
	 * 从边的数据集中构建出图
	 *
	 * @param edges a Collection of edges.
	 * @param vertexValueInitializer a map function that initializes the vertex values.
	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
	 * @param context the flink execution environment.
	 * @return the newly created graph.
	 */
	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {

		return fromDataSet(context.fromCollection(edges), vertexValueInitializer, context);
	}

	/**
	 * 从顶点数据集和边的数据集中创建图。
	 * Creates a graph from a DataSet of vertices and a DataSet of edges.
	 *
	 * @param vertices a DataSet of vertices.
	 * @param edges a DataSet of edges.
	 * @param context the flink execution environment.
	 * @return the newly created graph.
	 */
	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {

		return new Graph<>(vertices, edges, context);
	}

	/**
	 * 从边的DataSet中创建图。
	 * NullValue.
	 *
	 * @param edges a DataSet of edges.
	 * @param context the flink execution environment.
	 * @return the newly created graph.
	 */
	public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {

		DataSet<Vertex<K, NullValue>> vertices = edges
			.flatMap(new EmitSrcAndTarget<>())
				.name("Source and target IDs")
			.distinct()
				.name("IDs");

		return new Graph<>(vertices, edges, context);
	}

Graph类里还定义了一些和图相关的基本操作方法,比如:移除顶点操作,反转图的所有边方向等。因为整个代码太长,我们没有必要一个个去看,后面会结合应用案例去熟悉相关操作方法。

3.图的简单操作案例

package org.apache.flink.graph.examples;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.data.EuclideanGraphData;

import java.io.Serializable;

/**
 * 这个例子主要是展示怎样应用Gelly的{@link Graph#getTriplets()}和
 * {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)}方法
 *
 * 给定一个有方向的,无权重的图。顶点值表示平面中的点
 * 返回一个加权图,其中边权重等于SRC和TRG顶点值之间的欧几里得距离。
 *
 * <p>输入文件是纯文本文件,必须按以下格式设置:
 * <ul>
 * 	<li> 顶点由顶点和顶点值表示,并用换行符分隔。由两个用逗号分隔的双精度数组成的值。,
 *  例如: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> 定义了由3个顶点组成的数据集
 * 	<li> 边由一对srcvertexid,trgvertexid由逗号分隔。
 * 	他们是由换行符分隔的边。
 * 	例如: <code>1,2\n1,3\n</code> 定义了两条边 1-2 和 1-3.
 * </ul>
 *
 * <p>Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; &lt;result path&gt;</code><br>
 * If no parameters are provided, the program is run with default data from
 * {@link EuclideanGraphData}
 */
@SuppressWarnings("serial")
public class EuclideanGraphWeighing implements ProgramDescription {

	public static void main(String[] args) throws Exception {

		if (!parseParameters(args)) {
			return;
		}

		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// 1. 读取顶点数据
		DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);

		// 2. 读取边数据
		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);

		// 3. 构建图
		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);

		// 4. 边的值将会是源顶点和目的顶点之间的欧式距离
		DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {

					@Override
					public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
							throws Exception {

						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();

						return new Tuple3<>(srcVertex.getId(), trgVertex.getId(),
							srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
					}
				});
        // 5. 结果图
		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
				new EdgeJoinFunction<Double, Double>() {

					public Double edgeJoin(Double edgeValue, Double inputValue) {
						return inputValue;
					}
				});

		// 从最后的结果图中还原出边
		DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();

		// emit result
		if (fileOutput) {
			result.writeAsCsv(outputPath, "\n", ",");

			// since file sinks are lazy, we trigger the execution explicitly
			env.execute("Euclidean Graph Weighing Example");
		} else {
			result.print();
		}

	}

	@Override
	public String getDescription() {
		return "Weighing a graph by computing the Euclidean distance " +
				"between its vertices";
	}

	// *************************************************************************
	//     数据类型
	// *************************************************************************

	/**
	 * 一个检点的二维点坐标数据结构
	 */
	public static class Point implements Serializable {

		public double x, y;

		public Point() {}

		public Point(double x, double y) {
			this.x = x;
			this.y = y;
		}

		public double euclideanDistance(Point other) {
			return Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y));
		}

		@Override
		public String toString() {
			return x + " " + y;
		}
	}

	// ******************************************************************************************************************
	// 工具方法
	// ******************************************************************************************************************

	private static boolean fileOutput = false;

	private static String verticesInputPath = null;

	private static String edgesInputPath = null;

	private static String outputPath = null;

	/**
	 * 解析命令行参数
	 * @param args
	 * @return
	 */
	private static boolean parseParameters(String[] args) {

		if (args.length > 0) {
			if (args.length == 3) {
				fileOutput = true;
				verticesInputPath = args[0];
				edgesInputPath = args[1];
				outputPath = args[2];
			} else {
				System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
				System.out.println("Provide parameters to read input data from files.");
				System.out.println("See the documentation for the correct format of input files.");
				System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
						" <output path>");
				return false;
			}
		}
		return true;
	}

	/**
	 * 加载顶点数据
	 * @param env
	 * @return
	 */
	private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
		// 1. 如果穿了外部读取文件
		if (fileOutput) {
			// 读取csv文件数据,并且映射为Vertex数据。
			return env.readCsvFile(verticesInputPath)
					.lineDelimiter("\n")
					.types(Long.class, Double.class, Double.class)
					.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {

						@Override
						public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
							return new Vertex<>(value.f0, new Point(value.f1, value.f2));
						}
					});
		} else {
			// 读取默认的数据
			return EuclideanGraphData.getDefaultVertexDataSet(env);
		}
	}

	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
		if (fileOutput) {
			return env.readCsvFile(edgesInputPath)
					.lineDelimiter("\n")
					.types(Long.class, Long.class)
					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {

						@Override
						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
							return new Edge<>(tuple2.f0, tuple2.f1, 0.0);
						}
					});
		} else {
			return EuclideanGraphData.getDefaultEdgeDataSet(env);
		}
	}
}

上述代码重点利用了图的joinWithEdges操作:

/**
	 * 边的DataSet和输入的DataSet,通过sourceIds和targetIds的复合键进行Join。
	 * 并且利用自定义的转移函数应用到匹配的记录值上去。
	 * 输入Dataset的前两个字段是用来作为join的key。
	 *
	 * @param inputDataSet 用来join的输入Dataset
	 * Tuple3的前两个字段是作为join操作的复合键。然后第三个字段会作为参数传进转移函数里面去。

	 * @param edgeJoinFunction 应用的转移函数。
	 * 第一个参数是当前的边值,第二个参数是从输入Dataset传进来的匹配Tuple3的值

	 * @param <T> 输入Tuple3数据集的第三个字段。
	 * @return 返回一个新图,然后边的值根据edgeJoinFunction跟新了
	*/
	public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
			final EdgeJoinFunction<EV, T> edgeJoinFunction) {

		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
				.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
				.with(new ApplyCoGroupToEdgeValues<>(edgeJoinFunction))
					.name("Join with edges");
		return new Graph<>(this.vertices, resultedEdges, this.context);
	}

coGroup操作执行的逻辑定义在Graph类文件的子类中:ApplyCoGroupToEdgeValues。

	private static final class ApplyCoGroupToEdgeValues<K, EV, T>
	implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
		private Edge<K, EV> output = new Edge<>();

		private EdgeJoinFunction<EV, T> edgeJoinFunction;

		public ApplyCoGroupToEdgeValues(EdgeJoinFunction<EV, T> mapper) {
			this.edgeJoinFunction = mapper;
		}

		@Override
		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input,
				Collector<Edge<K, EV>> collector) throws Exception {

			final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
			final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();

			if (edgesIterator.hasNext()) {
				if (inputIterator.hasNext()) {
					final Tuple3<K, K, T> inputNext = inputIterator.next();

					output.f0 = inputNext.f0;
					output.f1 = inputNext.f1;
					output.f2 = edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2);
					collector.collect(output);
				} else {
					collector.collect(edgesIterator.next());
				}
			}
		}
	}

最终程序运行的结果为:

(1,4,4.242640687119285)
(2,5,4.242640687119285)
(4,6,2.8284271247461903)
(7,9,2.8284271247461903)
(1,2,1.4142135623730951)
(2,3,1.4142135623730951)
(3,5,2.8284271247461903)
(5,7,2.8284271247461903)
(5,9,5.656854249492381)
(6,8,2.8284271247461903)
(7,8,1.4142135623730951)
(2,4,2.8284271247461903)
(4,5,1.4142135623730951)
(8,9,1.4142135623730951)
(6,7,1.4142135623730951)

其实图的操作很简单,只要把图的数据源定义清楚,后面的操作其实就像跟操作普通的Tuple结构数据是一样的。

备注:上述案例的源码请参考flink源码的flink-gelly-examples模块。


版权声明:本文为hxcaifly原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。