以下是一个基于PyTorch实现Transformer模型的简单示例代码,并对每个步骤进行了详细的注释。请注意,这个示例主要着重于模型的实现,不包括数据处理和训练部分。
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
# 定义位置编码器
class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_len=512):
super(PositionalEncoder, self).__init__()
self.d_model = d_model
self.max_len = max_len
# 计算位置编码表
pe = torch.zeros(max_len, d_model) # 创建一个形状为(max_len, d_model)的零张量pe,用于存储位置编码
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 创建一个形状为(max_len, 1)的张量,表示位置
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 计算位置编码中的分母部分
pe[:, 0::2] = torch.sin(position * div_term) # 计算位置编码中偶数位置的值
pe[:, 1::2] = torch.cos(position * div_term) # 计算位置编码中奇数位置的值
pe = pe.unsqueeze(0) # 在第0维上增加一维,用于处理批次数据
self.register_buffer('pe', pe) # 将位置编码表pe注册为模型的缓冲区
def forward(self, x):
# 输入x的维度为(batch_size, seq_len, d_model)
x = x * math.sqrt(self.d_model) # 对输入乘以一个缩放因子,以便缓解梯度消失问题
seq_len = x.size(1) # 获取输入序列的长度
# 将位置编码添加到输入中
x = x + self.pe[:, :seq_len] # 在对应位置添加位置编码
return x
# 定义多头注意力机制
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model必须被num_heads整除"
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads # 每个头的维度
# 定义线性变换层
self.W_q = nn.Linear(d_model, d_model) # 查询向量的线性变换层
self.W_k = nn.Linear(d_model, d_model) # 键向量的线性变换层
self.W_v = nn.Linear(d_model, d_model) # 值向量的线性变换层
self.W_o = nn.Linear(d_model, d_model) # 输出向量的线性变换层
def forward(self, query, key, value, mask=None):
batch_size = query.size(0) # 获取批次大小
# 将输入的query、key、value通过线性变换得到Q、K、V
Q = self.W_q(query).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) # 计算查询向量Q
K = self.W_k(key).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) # 计算键向量K
V = self.W_v(value).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) # 计算值向量V
# 计算注意力分数
attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim) # 计算注意力分数
if mask is not None:
attention_scores = attention_scores.masked_fill(mask == 0, float('-inf')) # 使用mask处理注意力分数
attention_weights = F.softmax(attention_scores, dim=-1) # 计算注意力权重
# 计算注意力值
attention_output = torch.matmul(attention_weights, V) # 计算注意力值
attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) # 调整注意力值的形状
# 经过线性变换得到最终输出
output = self.W_o(attention_output) # 最终输出
return output
# 定义前向传播层
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(FeedForward, self).__init__()
self.d_model = d_model
self.d_ff = d_ff
# 定义两个线性变换层
self.linear1 = nn.Linear(d_model, d_ff) # 第一个线性变换层
self.linear2 = nn.Linear(d_ff, d_model) # 第二个线性变换层
def forward(self, x):
x = F.relu(self.linear1(x)) # 使用ReLU激活函数进行非线性变换
x = self.linear2(x) # 进行第二个线性变换
return x
# 定义一个Transformer模型
class Transformer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, num_layers):
super(Transformer, self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_ff = d_ff
self.num_layers = num_layers
# 定义多个编码器层
self.encoder_layers = nn.ModuleList([
nn.ModuleList([
MultiHeadAttention(d_model, num_heads),
nn.LayerNorm(d_model),
FeedForward(d_model, d_ff),
nn.LayerNorm(d_model)
])
for _ in range(num_layers)
])
def forward(self, src, mask=None):
x = src
# 通过多个编码器层进行前向传播
for i in range(self.num_layers):
# 多头注意力层
attention = self.encoder_layers[i][0]
norm1 = self.encoder_layers[i][1]
x = x + attention(x, x, x, mask=mask)
x = norm1(x)
# 前向传播层
feed_forward = self.encoder_layers[i][2]
norm2 = self.encoder_layers[i][3]
x = x + feed_forward(x)
x = norm2(x)
return x
# 测试Transformer模型
if __name__ == "__main__":
# 假设输入维度为(16, 20, 512),即(batch_size, seq_len, d_model)
input_tensor = torch.randn(16, 20, 512)
transformer_model = Transformer(d_model=512, num_heads=8, d_ff=2048, num_layers=6)
output = transformer_model(input_tensor)
print(output.shape) # 输出:torch.Size([16, 20, 512])
以下是longfromer的pytorch实现版本
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
# 定义位置编码器
class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_len=512):
super(PositionalEncoder, self).__init__()
self.d_model = d_model
self.max_len = max_len
# 计算位置编码表
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x * math.sqrt(self.d_model)
seq_len = x.size(1)
x = x + self.pe[:, :seq_len]
return x
# 定义局部注意力机制
class LocalAttention(nn.Module):
def __init__(self, d_model, local_window):
super(LocalAttention, self).__init__()
self.d_model = d_model
self.local_window = local_window
self.attention = nn.MultiheadAttention(d_model, 1)
def forward(self, x, mask=None):
batch_size, seq_len, _ = x.size()
local_mask = torch.zeros(seq_len, seq_len)
for i in range(seq_len):
local_mask[i, max(0, i - self.local_window):i + self.local_window + 1] = 1
local_mask = local_mask.unsqueeze(0).to(x.device)
local_mask = local_mask * mask.unsqueeze(1) if mask is not None else local_mask
return self.attention(x.permute(1, 0, 2), x.permute(1, 0, 2), x.permute(1, 0, 2), key_padding_mask=local_mask)
# 定义Longformer模型
class Longformer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, num_layers, local_window):
super(Longformer, self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_ff = d_ff
self.num_layers = num_layers
self.local_window = local_window
# 定义位置编码器和局部注意力层
self.positional_encoder = PositionalEncoder(d_model)
self.local_attention = LocalAttention(d_model, local_window)
# 定义多个编码器层
self.encoder_layers = nn.ModuleList([
nn.ModuleList([
nn.LayerNorm(d_model),
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Linear(d_ff, d_model),
nn.LayerNorm(d_model)
])
for _ in range(num_layers)
])
def forward(self, src, mask=None):
x = self.positional_encoder(src)
for i in range(self.num_layers):
norm1 = self.encoder_layers[i][0]
linear1 = self.encoder_layers[i][1]
relu = self.encoder_layers[i][2]
linear2 = self.encoder_layers[i][3]
norm2 = self.encoder_layers[i][4]
# 局部注意力层
if mask is not None:
mask[:, :, :self.local_window] = 0
x = x + self.local_attention(x.permute(1, 0, 2), mask=mask)[0].permute(1, 0, 2)
# 前向传播层
x = norm1(x)
x = linear2(relu(linear1(x))) + x
x = norm2(x)
return x
# 测试Longformer模型
if __name__ == "__main__":
input_tensor = torch.randn(16, 512, 512) # 假设输入维度为(16, 512, 512)
mask = torch.ones(16, 512) # 假设有512个标记
longformer_model = Longformer(d_model=512, num_heads=8, d_ff=2048, num_layers=6, local_window=128)
output = longformer_model(input_tensor, mask=mask)
print(output.shape) # 输出:torch.Size([16, 512, 512])

