class NegativeCosineLayer():
""" 自定义batch内负采样并做cosine相似度的层 """
"""
负采样原理:
query_input.shape = [batch_size, dim]
doc_input.shape = [batch_size, dim]
默认 query点击该doc。每个点击的item, 随机采集NEG个item负样本
1. 假设每个正样本要采集N个负样本。
2. 分N次采集负样本,每次采集batch_size大小的样本。即每次给所有的正样本采集到一个负样本。shape = [batch_size, dim]
3. N次采集的负样本与之前正样本进行concat。shape=[batch_size * NEG, dim]
4. 对应的query特征进行扩展。(batch_size * NEG, dim)
5. query特征按行计算Norm。
6. item特征按行计算Norm。
7. query * item/|query|*|item|
"""
#
def __init__(self, NEG, t):
self.NEG = NEG
self.t = t
def __call__(self, inputs):
def _cosine(x):
query_encoder, doc_encoder = x
doc_encoder_fd = doc_encoder
print('query_encoder shape', query_encoder.shape)
print('doc_encoder shape', doc_encoder.shape)
print('doc_encoder_fd shape', doc_encoder_fd.shape)
#
for i in range(self.NEG):
ss = tf.gather(doc_encoder, tf.random.shuffle(tf.range(tf.shape(doc_encoder)[0])))
print("---ss shape", tf.range(tf.shape(doc_encoder)[0]), ss.shape)
doc_encoder_fd = tf.concat([doc_encoder_fd, ss], axis=0) # []
print('doc_encoder_fd shape', doc_encoder_fd.shape)
query_norm = tf.tile(tf.sqrt(tf.reduce_sum(tf.square(query_encoder), axis=1, keepdims=True)), [self.NEG + 1, 1])
print('query_norm shape', query_norm.shape)
doc_norm = tf.sqrt(tf.reduce_sum(tf.square(doc_encoder_fd), axis=1, keepdims=True))
print('doc_norm shape', doc_norm.shape)
query_encoder_fd = tf.tile(query_encoder, [self.NEG + 1, 1])
print('query_encoder_fd shape', query_encoder_fd.shape)
prod = tf.reduce_sum(tf.multiply(query_encoder_fd, doc_encoder_fd, name="sim-multiply"), axis=1,
keepdims=True)
print('prod shape', prod.shape)
norm_prod = tf.multiply(query_norm, doc_norm)
print('prod norm_prod', norm_prod.shape)
cos_sim_raw = tf.truediv(prod, norm_prod)
cos_sim = tf.transpose(tf.reshape(tf.transpose(cos_sim_raw), [self.NEG + 1, -1])) * 20
print("-----cos_sim", cos_sim.shape, cos_sim)
#
#prob = tf.nn.softmax(cos_sim, name="sim-softmax")
prob = (tf.exp(cos_sim) / self.t) / tf.reduce_sum(tf.exp(cos_sim) / self.t, axis=0) # t为平滑参数
print("-----prob", prob.shape, prob)
hit_prob = tf.slice(prob, [0, 0], [-1, 1], name="sim-slice")
print("-----hit_prob", hit_prob.shape, hit_prob)
loss = -tf.reduce_mean(tf.math.log(hit_prob), name="sim-mean")
#loss = -tf.reduce_sum(tf.math.log(hit_prob), name="sim-mean")
print("-----loss", loss.shape, loss)
return loss
#
#output_shape = (1,)
#value = tf.keras.layers.Lambda(_cosine, output_shape=output_shape)([inputs[0], inputs[1]])
loss = _cosine([inputs[0], inputs[1]])
loss = tf.expand_dims(loss, 0)
return loss
if __name__ == '__main__':
query_encoder = tf.constant([[1, 2],
[2, 1],
[3, 2],
[4, 1]
], dtype=tf.float16)
doc_encoder = tf.constant([[1, 2],
[2, 1],
[3, 2],
[4, 1]
], dtype=tf.float16)
cos_sim = NegativeCosineLayer()([query_encoder, doc_encoder])
版权声明:本文为zhouwenyuan1015原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。