10.2. 注意力汇聚:Nadaraya-Watson 核回归¶ Open the notebook in SageMaker Studio Lab
上节介绍了框架下的注意力机制的主要成分 图10.1.3: 查询(自主提示)和键(非自主提示)之间的交互形成了注意力汇聚; 注意力汇聚有选择地聚合了值(感官输入)以生成最终的输出。 本节将介绍注意力汇聚的更多细节, 以便从宏观上了解注意力机制在实践中的运作方式。 具体来说,1964年提出的Nadaraya-Watson核回归模型 是一个简单但完整的例子,可以用于演示具有注意力机制的机器学习。
from mxnet import autograd, gluon, np, npx
from mxnet.gluon import nn
from d2l import mxnet as d2l
npx.set_np()
import torch
from torch import nn
from d2l import torch as d2l
import tensorflow as tf
from d2l import tensorflow as d2l
tf.random.set_seed(seed=1322)
import warnings
from d2l import paddle as d2l
warnings.filterwarnings("ignore")
import paddle
from paddle import nn
10.2.1. 生成数据集¶
简单起见,考虑下面这个回归问题: 给定的成对的“输入-输出”数据集 \(\{(x_1, y_1), \ldots, (x_n, y_n)\}\), 如何学习\(f\)来预测任意新输入\(x\)的输出\(\hat{y} = f(x)\)?
根据下面的非线性函数生成一个人工数据集, 其中加入的噪声项为\(\epsilon\):
其中\(\epsilon\)服从均值为\(0\)和标准差为\(0.5\)的正态分布。 在这里生成了\(50\)个训练样本和\(50\)个测试样本。 为了更好地可视化之后的注意力模式,需要将训练样本进行排序。
n_train = 50 # 训练样本数
x_train = np.sort(np.random.rand(n_train) * 5) # 排序后的训练样本
[07:15:08] ../src/storage/storage.cc:196: Using Pooled (Naive) StorageManager for CPU
def f(x):
return 2 * np.sin(x) + x**0.8
y_train = f(x_train) + np.random.normal(0.0, 0.5, (n_train,)) # 训练样本的输出
x_test = np.arange(0, 5, 0.1) # 测试样本
y_truth = f(x_test) # 测试样本的真实输出
n_test = len(x_test) # 测试样本数
n_test
50
n_train = 50 # 训练样本数
x_train, _ = torch.sort(torch.rand(n_train) * 5) # 排序后的训练样本
def f(x):
return 2 * torch.sin(x) + x**0.8
y_train = f(x_train) + torch.normal(0.0, 0.5, (n_train,)) # 训练样本的输出
x_test = torch.arange(0, 5, 0.1) # 测试样本
y_truth = f(x_test) # 测试样本的真实输出
n_test = len(x_test) # 测试样本数
n_test
50
n_train = 50
x_train = tf.sort(tf.random.uniform(shape=(n_train,), maxval=5))
def f(x):
return 2 * tf.sin(x) + x**0.8
y_train = f(x_train) + tf.random.normal((n_train,), 0.0, 0.5) # 训练样本的输出
x_test = tf.range(0, 5, 0.1) # 测试样本
y_truth = f(x_test) # 测试样本的真实输出
n_test = len(x_test) # 测试样本数
n_test
50
n_train = 50 # 训练样本数
x_train = paddle.sort(paddle.rand([n_train]) * 5) # 排序后的训练样本
def f(x):
return 2 * paddle.sin(x) + x**0.8
y_train = f(x_train) + paddle.normal(0.0, 0.5, (n_train,)) # 训练样本的输出
x_test = paddle.arange(0, 5, 0.1, dtype='float32') # 测试样本
y_truth = f(x_test) # 测试样本的真实输出
n_test = len(x_test) # 测试样本数
n_test
50
下面的函数将绘制所有的训练样本(样本由圆圈表示), 不带噪声项的真实数据生成函数\(f\)(标记为“Truth”), 以及学习得到的预测函数(标记为“Pred”)。
def plot_kernel_reg(y_hat):
d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
xlim=[0, 5], ylim=[-1, 5])
d2l.plt.plot(x_train, y_train, 'o', alpha=0.5);
10.2.2. 平均汇聚¶
先使用最简单的估计器来解决回归问题。 基于平均汇聚来计算所有训练样本输出值的平均值:
如下图所示,这个估计器确实不够聪明。 真实函数\(f\)(“Truth”)和预测函数(“Pred”)相差很大。
y_hat = y_train.mean().repeat(n_test)
plot_kernel_reg(y_hat)
y_hat = torch.repeat_interleave(y_train.mean(), n_test)
plot_kernel_reg(y_hat)
y_hat = tf.repeat(tf.reduce_mean(y_train), repeats=n_test)
plot_kernel_reg(y_hat)
y_hat = paddle.repeat_interleave(y_train.mean(), n_test)
plot_kernel_reg(y_hat)
10.2.3. 非参数注意力汇聚¶
显然,平均汇聚忽略了输入\(x_i\)。 于是Nadaraya (Nadaraya, 1964)和 Watson (Watson, 1964)提出了一个更好的想法, 根据输入的位置对输出\(y_i\)进行加权:
其中\(K\)是核(kernel)。 公式 (10.2.3)所描述的估计器被称为 Nadaraya-Watson核回归(Nadaraya-Watson kernel regression)。 这里不会深入讨论核函数的细节, 但受此启发, 我们可以从 图10.1.3中的注意力机制框架的角度 重写 (10.2.3), 成为一个更加通用的注意力汇聚(attention pooling)公式:
其中\(x\)是查询,\((x_i, y_i)\)是键值对。 比较 (10.2.4)和 (10.2.2), 注意力汇聚是\(y_i\)的加权平均。 将查询\(x\)和键\(x_i\)之间的关系建模为 注意力权重(attention weight)\(\alpha(x, x_i)\), 如 (10.2.4)所示, 这个权重将被分配给每一个对应值\(y_i\)。 对于任何查询,模型在所有键值对注意力权重都是一个有效的概率分布: 它们是非负的,并且总和为1。
为了更好地理解注意力汇聚, 下面考虑一个高斯核(Gaussian kernel),其定义为:
将高斯核代入 (10.2.4)和 (10.2.3)可以得到:
在 (10.2.6)中, 如果一个键\(x_i\)越是接近给定的查询\(x\), 那么分配给这个键对应值\(y_i\)的注意力权重就会越大, 也就“获得了更多的注意力”。
值得注意的是,Nadaraya-Watson核回归是一个非参数模型。 因此, (10.2.6)是 非参数的注意力汇聚(nonparametric attention pooling)模型。 接下来,我们将基于这个非参数的注意力汇聚模型来绘制预测结果。 从绘制的结果会发现新的模型预测线是平滑的,并且比平均汇聚的预测更接近真实。
# X_repeat的形状:(n_test,n_train),
# 每一行都包含着相同的测试输入(例如:同样的查询)
X_repeat = x_test.repeat(n_train).reshape((-1, n_train))
# x_train包含着键。attention_weights的形状:(n_test,n_train),
# 每一行都包含着要在给定的每个查询的值(y_train)之间分配的注意力权重
attention_weights = npx.softmax(-(X_repeat - x_train)**2 / 2)
# y_hat的每个元素都是值的加权平均值,其中的权重是注意力权重
y_hat = np.dot(attention_weights, y_train)
plot_kernel_reg(y_hat)
# X_repeat的形状:(n_test,n_train),
# 每一行都包含着相同的测试输入(例如:同样的查询)
X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train))
# x_train包含着键。attention_weights的形状:(n_test,n_train),
# 每一行都包含着要在给定的每个查询的值(y_train)之间分配的注意力权重
attention_weights = nn.functional.softmax(-(X_repeat - x_train)**2 / 2, dim=1)
# y_hat的每个元素都是值的加权平均值,其中的权重是注意力权重
y_hat = torch.matmul(attention_weights, y_train)
plot_kernel_reg(y_hat)
# X_repeat的形状:(n_test,n_train),
# 每一行都包含着相同的测试输入(例如:同样的查询)
X_repeat = tf.repeat(tf.expand_dims(x_train, axis=0), repeats=n_train, axis=0)
# x_train包含着键。attention_weights的形状:(n_test,n_train),
# 每一行都包含着要在给定的每个查询的值(y_train)之间分配的注意力权重
attention_weights = tf.nn.softmax(-(X_repeat - tf.expand_dims(x_train, axis=1))**2/2, axis=1)
# y_hat的每个元素都是值的加权平均值,其中的权重是注意力权重
y_hat = tf.matmul(attention_weights, tf.expand_dims(y_train, axis=1))
plot_kernel_reg(y_hat)
# X_repeat的形状:(n_test,n_train),
# 每一行都包含着相同的测试输入(例如:同样的查询)
X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train))
# x_train包含着键。attention_weights的形状:(n_test,n_train),
# 每一行都包含着要在给定的每个查询的值(y_train)之间分配的注意力权重
attention_weights = nn.functional.softmax(-(X_repeat - x_train)**2 / 2, axis=1)
# y_hat的每个元素都是值的加权平均值,其中的权重是注意力权重
y_hat = paddle.matmul(attention_weights, y_train)
plot_kernel_reg(y_hat)
现在来观察注意力的权重。 这里测试数据的输入相当于查询,而训练数据的输入相当于键。 因为两个输入都是经过排序的,因此由观察可知“查询-键”对越接近, 注意力汇聚的注意力权重就越高。
d2l.show_heatmaps(np.expand_dims(np.expand_dims(attention_weights, 0), 0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.show_heatmaps(tf.expand_dims(
tf.expand_dims(attention_weights, axis=0), axis=0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
10.2.4. 带参数注意力汇聚¶
非参数的Nadaraya-Watson核回归具有一致性(consistency)的优点: 如果有足够的数据,此模型会收敛到最优结果。 尽管如此,我们还是可以轻松地将可学习的参数集成到注意力汇聚中。
例如,与 (10.2.6)略有不同, 在下面的查询\(x\)和键\(x_i\)之间的距离乘以可学习参数\(w\):
本节的余下部分将通过训练这个模型 (10.2.7)来学习注意力汇聚的参数。
10.2.4.1. 批量矩阵乘法¶
为了更有效地计算小批量数据的注意力, 我们可以利用深度学习开发框架中提供的批量矩阵乘法。
假设第一个小批量数据包含\(n\)个矩阵\(\mathbf{X}_1,\ldots, \mathbf{X}_n\), 形状为\(a\times b\), 第二个小批量包含\(n\)个矩阵\(\mathbf{Y}_1, \ldots, \mathbf{Y}_n\), 形状为\(b\times c\)。 它们的批量矩阵乘法得到\(n\)个矩阵 \(\mathbf{X}_1\mathbf{Y}_1, \ldots, \mathbf{X}_n\mathbf{Y}_n\), 形状为\(a\times c\)。 因此,假定两个张量的形状分别是\((n,a,b)\)和\((n,b,c)\), 它们的批量矩阵乘法输出的形状为\((n,a,c)\)。
X = np.ones((2, 1, 4))
Y = np.ones((2, 4, 6))
npx.batch_dot(X, Y).shape
(2, 1, 6)
X = torch.ones((2, 1, 4))
Y = torch.ones((2, 4, 6))
torch.bmm(X, Y).shape
torch.Size([2, 1, 6])
X = tf.ones((2, 1, 4))
Y = tf.ones((2, 4, 6))
tf.matmul(X, Y).shape
TensorShape([2, 1, 6])
X = paddle.ones((2, 1, 4))
Y = paddle.ones((2, 4, 6))
paddle.bmm(X, Y).shape
[2, 1, 6]
在注意力机制的背景中,我们可以使用小批量矩阵乘法来计算小批量数据中的加权平均值。
weights = np.ones((2, 10)) * 0.1
values = np.arange(20).reshape((2, 10))
npx.batch_dot(np.expand_dims(weights, 1), np.expand_dims(values, -1))
array([[[ 4.5]],
[[14.5]]])
weights = torch.ones((2, 10)) * 0.1
values = torch.arange(20.0).reshape((2, 10))
torch.bmm(weights.unsqueeze(1), values.unsqueeze(-1))
tensor([[[ 4.5000]],
[[14.5000]]])
weights = tf.ones((2, 10)) * 0.1
values = tf.reshape(tf.range(20.0), shape = (2, 10))
tf.matmul(tf.expand_dims(weights, axis=1), tf.expand_dims(values, axis=-1)).numpy()
array([[[ 4.5]],
[[14.5]]], dtype=float32)
weights = paddle.ones((2, 10)) * 0.1
values = paddle.arange(20, dtype='float32').reshape((2, 10))
paddle.bmm(weights.unsqueeze(1), values.unsqueeze(-1))
Tensor(shape=[2, 1, 1], dtype=float32, place=Place(cpu), stop_gradient=True,
[[[4.50000000 ]],
[[14.50000000]]])
10.2.4.2. 定义模型¶
基于 (10.2.7)中的 带参数的注意力汇聚,使用小批量矩阵乘法, 定义Nadaraya-Watson核回归的带参数版本为:
class NWKernelRegression(nn.Block):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.w = self.params.get('w', shape=(1,))
def forward(self, queries, keys, values):
# queries和attention_weights的形状为(查询数,“键-值”对数)
queries = queries.repeat(keys.shape[1]).reshape((-1, keys.shape[1]))
self.attention_weights = npx.softmax(
-((queries - keys) * self.w.data())**2 / 2)
# values的形状为(查询数,“键-值”对数)
return npx.batch_dot(np.expand_dims(self.attention_weights, 1),
np.expand_dims(values, -1)).reshape(-1)
class NWKernelRegression(nn.Module):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.w = nn.Parameter(torch.rand((1,), requires_grad=True))
def forward(self, queries, keys, values):
# queries和attention_weights的形状为(查询个数,“键-值”对个数)
queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1]))
self.attention_weights = nn.functional.softmax(
-((queries - keys) * self.w)**2 / 2, dim=1)
# values的形状为(查询个数,“键-值”对个数)
return torch.bmm(self.attention_weights.unsqueeze(1),
values.unsqueeze(-1)).reshape(-1)
class NWKernelRegression(tf.keras.layers.Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.w = tf.Variable(initial_value=tf.random.uniform(shape=(1,)))
def call(self, queries, keys, values, **kwargs):
# 对于训练,“查询”是x_train。“键”是每个点的训练数据的距离。“值”为'y_train'。
# queries和attention_weights的形状为(查询个数,“键-值”对个数)
queries = tf.repeat(tf.expand_dims(queries, axis=1), repeats=keys.shape[1], axis=1)
self.attention_weights = tf.nn.softmax(-((queries - keys) * self.w)**2 /2, axis =1)
# values的形状为(查询个数,“键-值”对个数)
return tf.squeeze(tf.matmul(tf.expand_dims(self.attention_weights, axis=1), tf.expand_dims(values, axis=-1)))
class NWKernelRegression(nn.Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.w = paddle.create_parameter((1,), dtype='float32')
def forward(self, queries, keys, values):
# queries和attention_weights的形状为(查询个数,“键-值”对个数)
queries = queries.reshape((queries.shape[0], 1)) \
.tile([keys.shape[1]]) \
.reshape((-1, keys.shape[1]))
self.attention_weight = nn.functional.softmax(
-((queries - keys) * self.w)**2 / 2, axis=1)
# values的形状为(查询个数,“键-值”对个数)
return paddle.bmm(self.attention_weight.unsqueeze(1),
values.unsqueeze(-1)).reshape((-1, ))
10.2.4.3. 训练¶
接下来,将训练数据集变换为键和值用于训练注意力模型。 在带参数的注意力汇聚模型中, 任何一个训练样本的输入都会和除自己以外的所有训练样本的“键-值”对进行计算, 从而得到其对应的预测输出。
# X_tile的形状:(n_train,n_train),每一行都包含着相同的训练输入
X_tile = np.tile(x_train, (n_train, 1))
# Y_tile的形状:(n_train,n_train),每一行都包含着相同的训练输出
Y_tile = np.tile(y_train, (n_train, 1))
# keys的形状:('n_train','n_train'-1)
keys = X_tile[(1 - np.eye(n_train)).astype('bool')].reshape((n_train, -1))
# values的形状:('n_train','n_train'-1)
values = Y_tile[(1 - np.eye(n_train)).astype('bool')].reshape((n_train, -1))
# X_tile的形状:(n_train,n_train),每一行都包含着相同的训练输入
X_tile = x_train.repeat((n_train, 1))
# Y_tile的形状:(n_train,n_train),每一行都包含着相同的训练输出
Y_tile = y_train.repeat((n_train, 1))
# keys的形状:('n_train','n_train'-1)
keys = X_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))
# values的形状:('n_train','n_train'-1)
values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))
# X_tile的形状:(n_train,n_train),每一行都包含着相同的训练输入
X_tile = tf.repeat(tf.expand_dims(x_train, axis=0), repeats=n_train, axis=0)
# Y_tile的形状:(n_train,n_train),每一行都包含着相同的训练输出
Y_tile = tf.repeat(tf.expand_dims(y_train, axis=0), repeats=n_train, axis=0)
# keys的形状:('n_train','n_train'-1)
keys = tf.reshape(X_tile[tf.cast(1 - tf.eye(n_train), dtype=tf.bool)], shape=(n_train, -1))
# values的形状:('n_train','n_train'-1)
values = tf.reshape(Y_tile[tf.cast(1 - tf.eye(n_train), dtype=tf.bool)], shape=(n_train, -1))
# X_tile的形状:(n_train,n_train),每一行都包含着相同的训练输入
X_tile = x_train.tile([n_train, 1])
# Y_tile的形状:(n_train,n_train),每一行都包含着相同的训练输出
Y_tile = y_train.tile([n_train, 1])
# keys的形状:('n_train','n_train'-1)
keys = X_tile[(1 - paddle.eye(n_train)).astype(paddle.bool)].reshape((n_train, -1))
# values的形状:('n_train','n_train'-1)
values = Y_tile[(1 - paddle.eye(n_train)).astype(paddle.bool)].reshape((n_train, -1))
训练带参数的注意力汇聚模型时,使用平方损失函数和随机梯度下降。
net = NWKernelRegression()
net.initialize()
loss = gluon.loss.L2Loss()
trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.5})
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])
for epoch in range(5):
with autograd.record():
l = loss(net(x_train, keys, values), y_train)
l.backward()
trainer.step(1)
print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
animator.add(epoch + 1, float(l.sum()))
net = NWKernelRegression()
loss = nn.MSELoss(reduction='none')
trainer = torch.optim.SGD(net.parameters(), lr=0.5)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])
for epoch in range(5):
trainer.zero_grad()
l = loss(net(x_train, keys, values), y_train)
l.sum().backward()
trainer.step()
print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
animator.add(epoch + 1, float(l.sum()))
net = NWKernelRegression()
loss_object = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.5)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])
for epoch in range(5):
with tf.GradientTape() as t:
loss = loss_object(y_train, net(x_train, keys, values)) * len(y_train)
grads = t.gradient(loss, net.trainable_variables)
optimizer.apply_gradients(zip(grads, net.trainable_variables))
print(f'epoch {epoch + 1}, loss {float(loss):.6f}')
animator.add(epoch + 1, float(loss))
net = NWKernelRegression()
loss = nn.MSELoss(reduction='none')
trainer = paddle.optimizer.SGD(learning_rate=0.5, parameters=net.parameters())
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])
for epoch in range(5):
trainer.clear_grad()
l = loss(net(x_train, keys, values), y_train)
l.sum().backward()
trainer.step()
print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
animator.add(epoch + 1, float(l.sum()))
如下所示,训练完带参数的注意力汇聚模型后可以发现: 在尝试拟合带噪声的训练数据时, 预测结果绘制的线不如之前非参数模型的平滑。
# keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键)
keys = np.tile(x_train, (n_test, 1))
# value的形状:(n_test,n_train)
values = np.tile(y_train, (n_test, 1))
y_hat = net(x_test, keys, values)
plot_kernel_reg(y_hat)
# keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键)
keys = x_train.repeat((n_test, 1))
# value的形状:(n_test,n_train)
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)
# keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键)
keys = tf.repeat(tf.expand_dims(x_train, axis=0), repeats=n_test, axis=0)
# value的形状:(n_test,n_train)
values = tf.repeat(tf.expand_dims(y_train, axis=0), repeats=n_test, axis=0)
y_hat = net(x_test, keys, values)
plot_kernel_reg(y_hat)
# keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键)
keys = x_train.tile([n_test, 1])
# value的形状:(n_test,n_train)
values = y_train.tile([n_test, 1])
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)
为什么新的模型更不平滑了呢? 下面看一下输出结果的绘制图: 与非参数的注意力汇聚模型相比, 带参数的模型加入可学习的参数后, 曲线在注意力权重较大的区域变得更不平滑。
d2l.show_heatmaps(np.expand_dims(
np.expand_dims(net.attention_weights, 0), 0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.show_heatmaps(tf.expand_dims(
tf.expand_dims(net.attention_weights, axis=0), axis=0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.show_heatmaps(net.attention_weight.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorter testing, inputs')
10.2.5. 小结¶
Nadaraya-Watson核回归是具有注意力机制的机器学习范例。
Nadaraya-Watson核回归的注意力汇聚是对训练数据中输出的加权平均。从注意力的角度来看,分配给每个值的注意力权重取决于将值所对应的键和查询作为输入的函数。
注意力汇聚可以分为非参数型和带参数型。
10.2.6. 练习¶
增加训练数据的样本数量,能否得到更好的非参数的Nadaraya-Watson核回归模型?
在带参数的注意力汇聚的实验中学习得到的参数\(w\)的价值是什么?为什么在可视化注意力权重时,它会使加权区域更加尖锐?
如何将超参数添加到非参数的Nadaraya-Watson核回归中以实现更好地预测结果?
为本节的核回归设计一个新的带参数的注意力汇聚模型。训练这个新模型并可视化其注意力权重。