图神经网络 | (5) GraphSAGE实战

近期买了一本图神经网络的入门书,最近几篇博客对书中的一些实战案例进行整理,具体的理论和原理部分可以自行查阅该书,该书购买链接:《深入浅出的图神经网络》

该书配套代码

本节我们通过代码来介绍GraphSAGE以加深读者对相关知识的理解,如书中介绍的那样,GraphSAGE包括两方面,一是对邻居的采样;二是对邻居的聚合操作。

目录

1. 对邻居采样

2. 对邻居聚合

3. 定义SageGCN

4. GraphSage模型

5. 数据预处理

6. 主程序


1. 对邻居采样

首先来看下对邻居的采样方法,为了实现更高效低地采样,可以将节点及其邻居存放在一起,即维护一个节点与其邻居对应关系的表。可以通过两个函数sampling和multihop_sampling来实现采样的具体操作。其中sampling是进行一阶采样,根据源节点采样指定数量的邻居节点,multihop_sampling则是利用sampling实现多阶采样的功能。

def sampling(src_nodes, sample_num, neighbor_table):
    """根据源节点采样指定数量的邻居节点,注意使用的是有放回的采样;
    某个节点的邻居节点数量少于采样数量时,采样结果出现重复的节点

    Arguments:
        src_nodes {list, ndarray} -- 源节点列表
        sample_num {int} -- 需要采样的节点数
        neighbor_table {dict} -- 节点到其邻居节点的映射表,邻接矩阵

    Returns:
        np.ndarray -- 采样结果构成的列表
    """
    results = []
    for sid in src_nodes:
        # 从节点的邻居中进行有放回地进行采样 
        res = np.random.choice(neighbor_table[sid], size=(sample_num,))
        results.append(res)
    return np.asarray(results).flatten() #拉伸为1维


def multihop_sampling(src_nodes, sample_nums, neighbor_table):
    """根据源节点进行多阶采样

    Arguments:
        src_nodes {list, np.ndarray} -- 源节点id
        sample_nums {list of int} -- 每一阶需要采样的个数
        neighbor_table {dict} -- 节点到其邻居节点的映射 /邻接矩阵

    Returns:
        [list of ndarray] -- 每一阶采样的结果
    """
    sampling_result = [src_nodes] #首先包含源节点 
    for k, hopk_num in enumerate(sample_nums): #先对源节点进行1阶采样 在与源节点距离为1的节点中采样hopk_num个节点; 再对源节点进行2阶采样,即对源节点的所有1阶邻居进行1阶采样
        hopk_result = sampling(sampling_result[k], hopk_num, neighbor_table)
        sampling_result.append(hopk_result) #追加源节点的1阶邻居 和 2阶邻居(2层网络,代表采样到2阶)
    return sampling_result

这样采样得到的结果仅仅是节点的ID,还需要根据节点ID去查询每个节点的特征(向量),以进行聚合操作更新特征。

 

2. 对邻居聚合

接下里根据两种聚类算子对邻居的状态(特征向量)进行聚合:

1)平均/加和聚合算子

                                    Agg^{sum} = \sigma(SUM\{Wh_j+b,\forall v_j \in N(v_i)})

2)池化聚合算子

                                   Agg^{pool} = MAX\{\sigma(Wh_j+b),\forall v_j \in N(v_i)\}

计算过程定义在forward函数中,输入neighbor_feature表示每个节点需要聚合的邻接节点的特征(向量),维度为N_{src} \times N_{neighbor} \times D_{in},其中N_{src}表示源节点的数量,N_{neighbor}表示邻居节点的数量,D_{in}表示输入的特征维度。将这些邻居节点的特征经过一个线性变换得到隐层特征,就可以沿着N_{neighbor}维度进行聚合操作了,包括求和、均值和最大值,得到维度N_{src} \times D_{in}的输出。

class NeighborAggregator(nn.Module):
    def __init__(self, input_dim, output_dim,
                 use_bias=False, aggr_method="mean"):
        """聚合节点邻居

        Args:
            input_dim: 输入特征的维度
            output_dim: 输出特征的维度
            use_bias: 是否使用偏置 (default: {False})
            aggr_method: 邻居聚合方式 (default: {mean})
        """
        super(NeighborAggregator, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.use_bias = use_bias
        self.aggr_method = aggr_method
        self.weight = nn.Parameter(torch.Tensor(input_dim, output_dim))
        if self.use_bias:
            self.bias = nn.Parameter(torch.Tensor(self.output_dim))
        self.reset_parameters() #自定义参数初始化

    def reset_parameters(self):
        init.kaiming_uniform_(self.weight)
        if self.use_bias:
            init.zeros_(self.bias)

    def forward(self, neighbor_feature):
        if self.aggr_method == "mean":
            aggr_neighbor = neighbor_feature.mean(dim=1)
        elif self.aggr_method == "sum":
            aggr_neighbor = neighbor_feature.sum(dim=1)
        elif self.aggr_method == "max":
            aggr_neighbor = neighbor_feature.max(dim=1)
        else:
            raise ValueError("Unknown aggr type, expected sum, max, or mean, but got {}"
                             .format(self.aggr_method))

        neighbor_hidden = torch.matmul(aggr_neighbor, self.weight) #先聚合再做线性变换
        if self.use_bias:
            neighbor_hidden += self.bias

        return neighbor_hidden

    def extra_repr(self):
        return 'in_features={}, out_features={}, aggr_method={}'.format(
            self.input_dim, self.output_dim, self.aggr_method)

 

3. 定义SageGCN

基于邻居聚合的结果对中心节点的特征进行更新。更新的方式是将邻居节点聚合的特征和经过线性变换的中心节点的特征进行求和或级联,再经过一个激活函数,得到更新后的特征。

class SageGCN(nn.Module):
    def __init__(self, input_dim, hidden_dim,
                 activation=F.relu,
                 aggr_neighbor_method="mean",
                 aggr_hidden_method="sum"):
        """SageGCN层定义

        Args:
            input_dim: 输入特征的维度
            hidden_dim: 隐层特征的维度,
                当aggr_hidden_method=sum, 输出维度为hidden_dim
                当aggr_hidden_method=concat, 输出维度为hidden_dim*2
            activation: 激活函数
            aggr_neighbor_method: 邻居特征聚合方法,["mean", "sum", "max"]
            aggr_hidden_method: 节点特征的更新方法,["sum", "concat"]
        """
        super(SageGCN, self).__init__()
        assert aggr_neighbor_method in ["mean", "sum", "max"]
        assert aggr_hidden_method in ["sum", "concat"]
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.aggr_neighbor_method = aggr_neighbor_method
        self.aggr_hidden_method = aggr_hidden_method
        self.activation = activation
        
        self.aggregator = NeighborAggregator(input_dim, hidden_dim,
                                             aggr_method=aggr_neighbor_method)
        self.weight = nn.Parameter(torch.Tensor(input_dim, hidden_dim))
        self.reset_parameters() #自定义参数初始化方式

    def reset_parameters(self):
        init.kaiming_uniform_(self.weight)

    def forward(self, src_node_features, neighbor_node_features):
        # 得到邻居节点的聚合特征(经过线性变换)
        neighbor_hidden = self.aggregator(neighbor_node_features)
        #对中心节点的特征作线性变换
        self_hidden = torch.matmul(src_node_features, self.weight)

        #对中心节点的特征和邻居节点的聚合特征进行求和或拼接
        if self.aggr_hidden_method == "sum":
            hidden = self_hidden + neighbor_hidden
        elif self.aggr_hidden_method == "concat":
            hidden = torch.cat([self_hidden, neighbor_hidden], dim=1)
        else:
            raise ValueError("Expected sum or concat, got {}"
                             .format(self.aggr_hidden))
        #通过激活函数
        if self.activation: 
            return self.activation(hidden)
        else:
            return hidden

    def extra_repr(self):
        output_dim = self.hidden_dim if self.aggr_hidden_method == "sum" else self.hidden_dim * 2
        return 'in_features={}, out_features={}, aggr_hidden_method={}'.format(
            self.input_dim, output_dim, self.aggr_hidden_method)

4. GraphSage模型

基于前面定义的采样和节点特征的更新方式,就可以实现书中介绍的计算节点嵌入的方法。下面定义一个两层的模型,隐层节点数为64,假设每一阶的采样节点数都为10,那么计算中心节点的输出可以通过以下代码实现。其中前向传播时传入的参数node_features_list是一个列表,其中第1个元素表示源节点的特征,其后的元素表示每阶采样得到的节点特征。

class GraphSage(nn.Module):
    def __init__(self, input_dim, hidden_dim,
                 num_neighbors_list):
        super(GraphSage, self).__init__()
        self.input_dim = input_dim     #1433
        self.hidden_dim = hidden_dim    #[128,7]
        self.num_neighbors_list = num_neighbors_list #[10,10] 两层 1阶节点采样10个 2阶节点采样10个
        self.num_layers = len(num_neighbors_list) #2层
        self.gcn = nn.ModuleList()
        self.gcn.append(SageGCN(input_dim, hidden_dim[0]))
        for index in range(0, len(hidden_dim) - 2):
            self.gcn.append(SageGCN(hidden_dim[index], hidden_dim[index + 1]))
        self.gcn.append(SageGCN(hidden_dim[-2], hidden_dim[-1], activation=None))

    def forward(self, node_features_list):
        hidden = node_features_list      #[[源节点对应的特征],[1阶邻居对应的特征],[2阶邻居对应的特征]]
        for l in range(self.num_layers):
            next_hidden = []
            gcn = self.gcn[l]
            for hop in range(self.num_layers - l):
                src_node_features = hidden[hop]   #源节点对应的特征  (batch_size,feature_size=1433)
                src_node_num = len(src_node_features) #batch_size
                neighbor_node_features = hidden[hop + 1] \
                    .view((src_node_num, self.num_neighbors_list[hop], -1)) #(batch_size*10,feature_size) -> (batch_size,10,feature_size)
                h = gcn(src_node_features, neighbor_node_features) #(batch_size,hidden_size=128)
                next_hidden.append(h)
            hidden = next_hidden
        return hidden[0]  #(batch_size,hidden_size=7)

    def extra_repr(self):
        return 'in_features={}, num_neighbors_list={}'.format(
            self.input_dim, self.num_neighbors_list
        )

5. 数据预处理

实现和GCN实战中相同的数据集,不同在于,之前的GCN实战需要计算归一化的拉普拉斯矩阵\widetilde{L}_{sym},参与GCN的计算,GraphSage模型不需要计算拉普拉斯矩阵,通过对邻居节点特征进行聚合操作来更新中心节点特征;之前GCN实战是全图(full-batch)训练方式,即一个epoch进行一次更细,GraphSage模型基于采样邻居节点的策略改造成以节点为中心的小批量(mini-batch)训练方式。

我们使用的是Cora数据集,该数据集由2708篇论文,以及它们之间的引用关系构成的5429条边构成。这些论文根据主题划分为7类,分别是神经网络、强化学习、规则学习、概率方法、遗传算法、理论研究、案例相关。每篇论文的特征(向量)通过词袋模型得到,维度为1433(词典大小),每一维表示一个词,1表示该词在该论文中出现,0表示未出现。

首先定义类CoraData来对数据进行预处理,主要包括下载数据、规范化数据并进行缓存以备重复使用。最终得到的数据形式包括如下几个部分:

1)X:图中节点的特征,维度为N*D,即2708*1433(每个节点表示一条数据/一篇论文)

2)Y:节点对应的标签,包括7个类别。

3)adjacency:邻接矩阵,维度N*N(2708*2708),类型为scipy.sparse.coo_matrix

4)train_mask、val_mask、test_mask:与节点数相同的掩码,用于划分训练集、验证集、 测试集。

注意:我们把每条数据/每篇论文表示为图中的一个节点,和之前的深度学习数据集不同,以前我们假设数据之间是独立同分布的,在这里论文间都有引用关系,也就是每个数据都是有关联的,之前的假设不再适用。所以,我们把这种有关联的数据表示为图中节点,边表示数据之间的关系。这是一个典型的图数据。
 

import os
import os.path as osp
import pickle
import numpy as np
import itertools
import scipy.sparse as sp
import urllib
from collections import namedtuple


Data = namedtuple('Data', ['x', 'y', 'adjacency_dict',
                           'train_mask', 'val_mask', 'test_mask'])


class CoraData(object):
    download_url = "https://raw.githubusercontent.com/kimiyoung/planetoid/master/data"
    filenames = ["ind.cora.{}".format(name) for name in
                 ['x', 'tx', 'allx', 'y', 'ty', 'ally', 'graph', 'test.index']]

    def __init__(self, data_root="cora", rebuild=False):
        """Cora数据,包括数据下载,处理,加载等功能
        当数据的缓存文件存在时,将使用缓存文件,否则将下载、进行处理,并缓存到磁盘

        处理之后的数据可以通过属性 .data 获得,它将返回一个数据对象,包括如下几部分:
            * x: 节点的特征,维度为 2708 * 1433,类型为 np.ndarray
            * y: 节点的标签,总共包括7个类别,类型为 np.ndarray
            * adjacency_dict: 邻接信息,,类型为 dict
            * train_mask: 训练集掩码向量,维度为 2708,当节点属于训练集时,相应位置为True,否则False
            * val_mask: 验证集掩码向量,维度为 2708,当节点属于验证集时,相应位置为True,否则False
            * test_mask: 测试集掩码向量,维度为 2708,当节点属于测试集时,相应位置为True,否则False

        Args:
        -------
            data_root: string, optional
                存放数据的目录,原始数据路径: {data_root}/raw
                缓存数据路径: {data_root}/processed_cora.pkl
            rebuild: boolean, optional
                是否需要重新构建数据集,当设为True时,如果存在缓存数据也会重建数据

        """
        self.data_root = data_root
        save_file = osp.join(self.data_root, "processed_cora.pkl")
        if osp.exists(save_file) and not rebuild:
            print("Using Cached file: {}".format(save_file))
            self._data = pickle.load(open(save_file, "rb"))
        else:
            self.maybe_download()
            self._data = self.process_data()
            with open(save_file, "wb") as f:
                pickle.dump(self.data, f)
            print("Cached file: {}".format(save_file))

    @property
    def data(self):
        """返回Data数据对象,包括x, y, adjacency, train_mask, val_mask, test_mask"""
        return self._data

    def process_data(self):
        """
        处理数据,得到节点特征和标签,邻接矩阵,训练集、验证集以及测试集
        引用自:https://github.com/rusty1s/pytorch_geometric
        """
        print("Process data ...")
        _, tx, allx, y, ty, ally, graph, test_index = [self.read_data(
            osp.join(self.data_root, "raw", name)) for name in self.filenames]
        train_index = np.arange(y.shape[0])
        val_index = np.arange(y.shape[0], y.shape[0] + 500)
        sorted_test_index = sorted(test_index)

        x = np.concatenate((allx, tx), axis=0)
        y = np.concatenate((ally, ty), axis=0).argmax(axis=1)

        x[test_index] = x[sorted_test_index]
        y[test_index] = y[sorted_test_index]
        num_nodes = x.shape[0]

        train_mask = np.zeros(num_nodes, dtype=np.bool)
        val_mask = np.zeros(num_nodes, dtype=np.bool)
        test_mask = np.zeros(num_nodes, dtype=np.bool)
        train_mask[train_index] = True
        val_mask[val_index] = True
        test_mask[test_index] = True
        adjacency_dict = graph
        print("Node's feature shape: ", x.shape)
        print("Node's label shape: ", y.shape)
        print("Adjacency's shape: ", len(adjacency_dict))
        print("Number of training nodes: ", train_mask.sum())
        print("Number of validation nodes: ", val_mask.sum())
        print("Number of test nodes: ", test_mask.sum())

        return Data(x=x, y=y, adjacency_dict=adjacency_dict,
                    train_mask=train_mask, val_mask=val_mask, test_mask=test_mask)

    def maybe_download(self):
        save_path = os.path.join(self.data_root, "raw")
        for name in self.filenames:
            if not osp.exists(osp.join(save_path, name)):
                self.download_data(
                    "{}/{}".format(self.download_url, name), save_path)

    @staticmethod
    def build_adjacency(adj_dict):
        """根据邻接表创建邻接矩阵"""
        edge_index = []
        num_nodes = len(adj_dict)
        for src, dst in adj_dict.items():
            edge_index.extend([src, v] for v in dst)
            edge_index.extend([v, src] for v in dst)
        # 去除重复的边
        edge_index = list(k for k, _ in itertools.groupby(sorted(edge_index)))
        edge_index = np.asarray(edge_index)
        adjacency = sp.coo_matrix((np.ones(len(edge_index)),
                                   (edge_index[:, 0], edge_index[:, 1])),
                                  shape=(num_nodes, num_nodes), dtype="float32")
        return adjacency

    @staticmethod
    def read_data(path):
        """使用不同的方式读取原始数据以进一步处理"""
        name = osp.basename(path)
        if name == "ind.cora.test.index":
            out = np.genfromtxt(path, dtype="int64")
            return out
        else:
            out = pickle.load(open(path, "rb"), encoding="latin1")
            out = out.toarray() if hasattr(out, "toarray") else out
            return out

    @staticmethod
    def download_data(url, save_path):
        """数据下载工具,当原始数据不存在时将会进行下载"""
        if not os.path.exists(save_path):
            os.makedirs(save_path)
        data = urllib.request.urlopen(url)
        filename = os.path.split(url)[-1]

        with open(os.path.join(save_path, filename), 'wb') as f:
            f.write(data.read())

        return True

6. 主程序

"""
基于Cora的GraphSage示例
"""
import torch

import numpy as np
import torch.nn as nn
import torch.optim as optim
from net import GraphSage
from data import CoraData
from sampling import multihop_sampling

from collections import namedtuple
INPUT_DIM = 1433    # 输入维度 (节点/样本特征向量维度)
# Note: 采样的邻居阶数需要与GCN的层数保持一致
HIDDEN_DIM = [128, 7]   # 隐藏单元节点数  两层
NUM_NEIGHBORS_LIST = [10, 10]   # 每阶/每层采样邻居的节点数
assert len(HIDDEN_DIM) == len(NUM_NEIGHBORS_LIST)

BTACH_SIZE = 16     # 批处理大小
EPOCHS = 20
NUM_BATCH_PER_EPOCH = 20    # 每个epoch循环的批次数
LEARNING_RATE = 0.01    # 学习率
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

Data = namedtuple('Data', ['x', 'y', 'adjacency_dict',
                           'train_mask', 'val_mask', 'test_mask'])

data = CoraData().data #获取预处理数据
x = data.x / data.x.sum(1, keepdims=True)  # 归一化数据,使得每一行和为1

train_index = np.where(data.train_mask)[0] #训练节点/数据对应的索引
train_label = data.y[train_index] #训练节点/数据对应的标签
test_index = np.where(data.test_mask)[0] #测试节点/数据对应的索引

model = GraphSage(input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM,
                  num_neighbors_list=NUM_NEIGHBORS_LIST).to(DEVICE)
print(model)
criterion = nn.CrossEntropyLoss().to(DEVICE) #多分类交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=5e-4) #Adam优化器


def train():
    model.train() #训练模式
    for e in range(EPOCHS):
        for batch in range(NUM_BATCH_PER_EPOCH):
            batch_src_index = np.random.choice(train_index, size=(BTACH_SIZE,)) #随机选择BTACH_SIZE个训练节点 作为源节点
            batch_src_label = torch.from_numpy(train_label[batch_src_index]).long().to(DEVICE) #这些训练节点对应的标签
            
            #对源节点进行0-K阶(k等于网络层数 k=2)采样
            batch_sampling_result = multihop_sampling(batch_src_index, NUM_NEIGHBORS_LIST, data.adjacency_dict) #[[源节点索引列表],[源节点的1阶邻居索引]=10,[源节点的2阶邻居索引]=10]
            batch_sampling_x = [torch.from_numpy(x[idx]).float().to(DEVICE) for idx in batch_sampling_result] #获取采样的节点对应的特征向量
            batch_train_logits = model(batch_sampling_x) #获取模型的输出 (BATCH_SIZE,hidden_size[-1]=7)
            loss = criterion(batch_train_logits, batch_src_label)
            optimizer.zero_grad() #清空梯度
            loss.backward()  # 反向传播计算参数的梯度
            optimizer.step()  # 使用优化方法进行梯度更新
            print("Epoch {:03d} Batch {:03d} Loss: {:.4f}".format(e, batch, loss.item()))
        test() #每一epoch做一次测试


def test():
    model.eval() #测试模型
    with torch.no_grad():
        test_sampling_result = multihop_sampling(test_index, NUM_NEIGHBORS_LIST, data.adjacency_dict) #对测试节点进行0-K阶(k等于网络层数 k=2)采样[[测试节点索引列表],[源节点的1阶邻居索引]=10,[源节点的2阶邻居索引]=10]
        test_x = [torch.from_numpy(x[idx]).float().to(DEVICE) for idx in test_sampling_result]#获取采样的节点对应的特征向量
        test_logits = model(test_x) #得到模型输出 (len(test_index),hidden_size[-1]=7)
        test_label = torch.from_numpy(data.y[test_index]).long().to(DEVICE) #测试节点对应的标签
        predict_y = test_logits.max(1)[1] #预测标签 对预测结果按行取argmax
        accuarcy = torch.eq(predict_y, test_label).float().mean().item() #计算在测试节点/数据上的准确率
        print("Test Accuracy: ", accuarcy) 


if __name__ == '__main__':
    train()

 


版权声明:本文为sdu_hao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。