循环神经网络

序列模型

序列数据

实际中很多数据是由时序结构的

比如,电影评分随着时间变化而变化,音乐语言文本和视频都是连续的,或者股价

在时间t观察到$x_t$那么得到T个不独立的随机变量

$x1,....x_T $~p(x)

用条件概率展开

$$ p(a,b)=p(a)p(b|a)=p(b)p(a|b) $$

所有的机器学习模型都是对于p(x)建模,上面是正向,下面是的反向

对条件概率建模

$$ p(x_t|x_1,...x_{t-1})=p(x_t|f(x_1,...x_{t-1})) $$

上式中的f对见过的数据建模,也称自回归模型。即用之前给定的数据预测下面将会出现的数据。

预测方案

方案啊A 马尔科夫假设:

假设当前数据只跟$\tau$个过去数据点相关。列入股价和五年前相关,但和10年前相关性很小。语言也是同理。

$$ p(x_t|x_1,...x_{t-1})=p(x_t|x_{t-\tau},...x_{t-1})=p(x_t|f(x_{t-\tau},...x_{t-1})) $$

所以我们这被转换成一个定长的特征,这样就变成了最简单的回归问题

方案B 潜变量模型

引入潜变量$h_t$来表示过去信息$h_t=f(x_1,...,x_{t-1})$

这样$x_t=p(x_t|h_t)$

import torch
from torch import nn
from d2l import torch as d2l

T = 1000  # 总共产生1000个点
time = torch.arange(1, T + 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))
d2l.plt.show()
# 将数据映射为,yt=xt
tau = 4
features = torch.zeros((T - tau, tau))  # 指定张量的尺寸
# 每行存放1,2,3,4    2,3,4,5等数据
for i in range(tau):
    features[:, i] = x[i:T - tau + i]
labels = x[tau:].reshape((-1, 1))
batch_size, n_train = 16, 600
# 只有前n_train个样本用于训练
train_iter = d2l.load_array((features[:n_train], labels[:n_train]), batch_size, is_train=True)


# 初始化网络权重的函数
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)


# 一个简单的多层感知机
def get_net():
    net = nn.Sequential(nn.Linear(4, 10),
                        nn.ReLU(),2
                        nn.Linear(10, 1))
    net.apply(init_weights)
    return net


# 平方损失。注意:MSELoss计算平方误差时不带系数1/2
loss = nn.MSELoss(reduction='none')


# 训练
def train(net, train_iter, loss, epochs, lr):
    trainer = torch.optim.Adam(net.parameters(), lr)
    for epoch in range(epochs):
        for X, y in train_iter:
            trainer.zero_grad()
            l = loss(net(X), y)
            l.sum().backward()
            trainer.step()
        print(f'epoch {epoch + 1}, '
              f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')


net = get_net()
train(net, train_iter, loss, 5, 0.01)

只能进行短期预测,而不能进行长期预测,预测的越多错的越多

文本预处理

文本预处理的核心思想是如何把词变为能够处理的东西。

import collections
import re
from d2l import torch as d2l

d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')


def read_time_machine():  # @save
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    # 把各种标点符号全部变成空格,转成小写
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]


lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])


# 将文本拆分为单词或字符标记
def tokenize(lines, token='word'):
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print("错误未知类型" + token)


tokens = tokenize(lines)
for i in range(11):
    print(tokens[i])


# 构建一个字典也叫做词汇表(vocabulary),用来想字符串的类型标记映射到从0开始的数字索引中
class Vocab:  # @save
    """文本词表"""

    # min_freq代表如果少于传入的次数,就不进行训练。reserved_tokens标记句子结束或者开始
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 计算出现频率
        counter = count_corpus(tokens)
        # 按出现频率排序(排序是为了加块访问速度)
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
        # 未知词元的索引为0,尖括号表示特殊的token
        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}
        for token, freq in self._token_freqs:
            # 如果小于min_freq就扔掉
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                # idx一般会比较长出现。
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)

    # 给token返回index
    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    # 给下标返回token
    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

    @property
    def unk(self):  # 未知词元的索引为0
        return 0

    @property
    def token_freqs(self):
        return self._token_freqs


def count_corpus(tokens):  # @save
    """统计词元的频率"""
    # 这里的tokens是1D列表或2D列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # 将词元列表展平成一个列表
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)


# 们首先使用时光机器数据集作为语料库来构建词表,然后打印前几个高频词元及其索引。高频词给小下标,低频词给大下标。
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])
# 将每一个文本转换成数字索引列表
for i in [0, 10]:
    print('文本:', tokens[i])
    print('索引:', vocab[tokens[i]])


# 所有的功能打包到一个函数中
def load_corpus_time_machine(max_tokens=-1):  # @save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
    # 所以将所有文本行展平到一个列表中  这里corpus可以理解为一长串的整数
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab


# 做的事情就是把文本序列化
corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)

语言模型

给定文本序列$x_1,...,x_t$,语言模型的目标是估计联合概率$p(x1,..,x_T)$

它的应用包括bert,GPT-3,

判断多个序列哪个序列最常见。

$$ p(x,x^{\prime})=p(x)p(x^{\prime}|x)=\frac{n(x)}n\frac{n(x,x^{\prime})}{n(x)} $$

这里n是总词数,n(x),n(x,x')是单个单词和连续单词对出现的词数。(等于预测一个序列出现的概率)这个n(x)是可以消掉的。

很容易拓展到长为3的情况。n(x)/n就是x在文章中出现的比例。

$$ p(x,x',x'')=p(x)p(x'|x)p(x''|x,x')=\frac{n(x)}n\frac{n(x,x')}n\frac{n(x,x',x'')}{n(x)} $$

当它很长时,我们可以用马尔科夫假设。

一元语法就是跟之前的词不相关,二元语法就是跟之前的一个词相关。

$$ \begin{aligned} p(x_1,x_2,x_3,x_4)& =p(x_1)p(x_2)p(x_3)p(x_4) \\ \bullet^{\text{ 一元语法:}}& =\frac{n(x_1)}n\frac{n(x_2)}n\frac{n(x_3)}n\frac{n(x_4)}n \\ \begin{array}{c}p(x_1,x_2,x_3,x_4)\\\text{二元语法:}\end{array}& =p(x_1)p(x_2|x_1)p(x_3|x_2)p(x_4|x_3) \\ &=\frac{n(x_1)}n\frac{n(x_1,x_2)}{n(x_1)}\frac{n(x_2,x_3)}{n(x_2)}\frac{n(x_3,x_4)}{n(x_3)} \\ \bullet\text{ 三元语法: }p(x_1,x_2,x_3,x_4)& =p(x_1)p(x_2|x_1)p(x_3|x_1,x_2)p(x_4|x_2,x_3) \end{aligned} $$

其实语言模型估计文本序列的联合概率。NLP:natural language processing

停用词(Stop Words)是指在信息检索和自然语言处理(NLP)中被过滤掉的一些常用词汇。这些词汇通常在文本中非常常见,但对表达文本的主要内容贡献不大,因此在进行文本分析时常常会被忽略或移除。

import random
import torch
from d2l import torch as d2l
from textprepreprocessing import read_time_machine
from textprepreprocessing import load_corpus_time_machine
from textprepreprocessing import Vocab

tokens = d2l.tokenize(read_time_machine())
# 因为每个文本行不一定是一个句子或一个段落,因此我们把所有文本行拼接到一起
corpus = [token for line in tokens for token in line]
vocab = Vocab(corpus)
print(vocab.token_freqs[:10])  # 可以看到高频词都是虚词

# 拿到一个元素和这个元素的后面一个
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = Vocab(bigram_tokens)
print(bigram_vocab.token_freqs[:10])

# 拿到连续的三个元素,越长可以看到越多出现的频率
trigram_tokens = [triple for triple in zip(
    corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = Vocab(trigram_tokens)
print(trigram_vocab.token_freqs[:10])


# 随机生成一个小批量的数据的特征和标签以供读取。在随机采样中,每个样本都是在原始长序列上任意补货的子序列。

# 这里的num_steps就是tau
def seq_data_iter_random(corpus, batch_size, num_steps):  # @save
    """使用随机抽样生成一个小批量子序列"""
    # 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 减去1,是因为我们需要考虑标签
    num_subseqs = (len(corpus) - 1) // num_steps
    # 长度为num_steps的子序列的起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 在随机抽样的迭代过程中,
    # 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
    random.shuffle(initial_indices)

    def data(pos):
        # 返回从pos位置开始的长度为num_steps的序列
        return corpus[pos: pos + num_steps]

    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        # 在这里,initial_indices包含子序列的随机起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)


# batch中是随机采样的
def seq_data_iter_random(corpus, batch_size, num_steps):  # @save
    """使用随机抽样生成一个小批量子序列"""
    # 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 减去1,是因为我们需要考虑标签
    num_subseqs = (len(corpus) - 1) // num_steps
    # 长度为num_steps的子序列的起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 在随机抽样的迭代过程中,
    # 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
    random.shuffle(initial_indices)

    def data(pos):
        # 返回从pos位置开始的长度为num_steps的序列
        return corpus[pos: pos + num_steps]

    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        # 在这里,initial_indices包含子序列的随机起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)


# 生成0-34的序列
my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
    print('X: ', X, '\nY:', Y)


# 另一个顺序分区的函数
def seq_data_iter_sequential(corpus, batch_size, num_steps):  # @save
    """使用顺序分区生成一个小批量子序列"""
    # 从随机偏移量开始划分序列
    offset = random.randint(0, num_steps)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    num_batches = Xs.shape[1] // num_steps
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y


class SeqDataLoader:  # @save
    """加载序列数据的迭代器"""

    def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
        if use_random_iter:
            self.data_iter_fn = seq_data_iter_random
        else:
            self.data_iter_fn = seq_data_iter_sequential
        self.corpus, self.vocab = load_corpus_time_machine(max_tokens)
        self.batch_size, self.num_steps = batch_size, num_steps

    def __iter__(self):
        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)


def load_data_time_machine(batch_size, num_steps,  # @save
                           use_random_iter=False, max_tokens=10000):
    """返回时光机器数据集的迭代器和词表"""
    data_iter = SeqDataLoader(
        batch_size, num_steps, use_random_iter, max_tokens)
    return data_iter, data_iter.vocab

RNN

循环神经网络(recurrent neural network)

更新隐藏状态$\mathbf{h}_t=\phi(\mathbf{W}_{hh}\mathbf{h}_{t-1}+\mathbf{W}_{hx}\mathbf{x}_{t-1}+\mathbf{b}_h)$

输出$\mathbf{o}_t=\phi(\mathbf{W}_{ho}\mathbf{h}_t+\mathbf{b}_o)$

衡量一个语言模型的好坏可以用平均交叉熵

$$ \pi=\frac1n\sum_{i=1}^n-\log p(x_t|x_{t-1},...) $$

p是语言模型预测的概率,$x_t$是真实值

历史原因NLP使用困惑度EXP($\pi$)来衡量平均每次可能的选项。1表示完美,无穷大是最差情况。

梯度剪裁

迭代计算T个时间步上的梯度,在反向传播过程中产生长度为O(T)的矩阵乘法链,导致数值不稳定。

梯度裁剪能有效防止梯度爆炸,(防止梯度不稳定 )如果梯度长度超过$\theta$就拖影回$\theta$

$\mathbf{g}\leftarrow\min\left(1,\frac\theta{\|\mathbf{g}\|}\right)\mathbf{g}$

循环神经网络的输出取决于当下输入和前一时间的隐变量

要用到语言模型时,循环神经网络根据当前词频预测下一次时刻次

通常使用困惑度来衡量模型的好坏

独热编码

回想一下,在train_iter中,每个词元都表示为一个数字索引, 将这些索引直接输入神经网络可能会使学习变得困难。 我们通常将每个词元表示为更具表现力的特征向量。 最简单的表示称为独热编码(one-hot encoding)

从0开始实现

import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
from language_model import load_data_time_machine

batch_size, num_steps = 32, 35
# vocab是一个字典用来相互转换词
train_iter, vocab = load_data_time_machine(batch_size, num_steps)

# 我们每次采样的小批量数据是二维张量(批量大小,时间步数)。one_hot函数将这样一个小批量数据转换成三维张量
# 张量的最后一个维度等于词表大小。

# 批量大小是2时间步数是5
# 这里的结果是X = tensor([[0, 1, 2, 3, 4],[5, 6, 7, 8, 9]])
X = torch.arange(10).reshape((2, 5))
# 这里会输出【5,2,28】时间,批量,样本特征长度(词表大小)相当于5个2*28的数据
print(F.one_hot(X.T, 28))


# 初始化模型参数
def get_params(vocab_size, num_hiddens, device):
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        # 均值为0方差为0.01
        return torch.randn(size=shape, device=device) * 0.01

    # 隐藏层参数
    # 输入层到隐藏层
    W_xh = normal((num_inputs, num_hiddens))
    # 隐藏层到隐藏层的权重
    W_hh = normal((num_hiddens, num_hiddens))  # 这个是rnn加的除此之外就是单隐藏层的神经网络
    # 隐藏层从偏置
    b_h = torch.zeros(num_hiddens, device=device)
    # 隐藏变量到输出2层输出层参数
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    # 附加梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        # 计算梯度
        param.requires_grad_(True)
    return params


# 定义初始的隐藏状态,该函数的返回是一个张量,用0进行填充,形状为(批量大小,隐藏单元数)
def init_rnn_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device),)


# 定义如何在一个时间步内计算隐藏状态和输出

def rnn(inputs, state, params):
    # inputs的形状:(时间步数量,批量大小,词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    # 初始化的隐藏状态,rnn会接受俩个输入
    H, = state  # 提取单个元组
    outputs = []
    # X的形状:(批量大小,词表大小) 通过便利将时间步数展开了
    for X in inputs:
        H = torch.tanh(torch.mm(X, W_xh)  # 正常的矩阵乘法权重乘输入
                       + torch.mm(H, W_hh)  # 这里是对之前一个网络输出的隐藏层进行计算
                       + b_h)  # 只添加一次偏置
        Y = torch.mm(H, W_hq) + b_q  # 这里的y是当前时刻的预测
        outputs.append(Y)  # 把所有时刻的输出添加到list中 这里的y是批量大小*词表大小
    # 在0的维度拼起来(垂直的方向也)列不变(也就是词表大小),行变乘批量大小乘以时长度
    return torch.cat(outputs, dim=0), (H,)


# 创建类来包装RNN函数

class RNNModelScratch:  # @save
    """从零开始实现的循环神经网络模型"""

    def __init__(self, vocab_size, num_hiddens, device,
                 get_params, init_state, forward_fn):
        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
        self.params = get_params(vocab_size, num_hiddens, device)
        # 这里是允许自定义状态函数和forward函数
        self.init_state, self.forward_fn = init_state, forward_fn

    # forward函数(这里的x是批量大小*时间步数(一句话的长度))
    def __call__(self, X, state):
        # one_hot的大小是vocab_size,并设置为浮点型
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        return self.forward_fn(X, state, self.params)

    def begin_state(self, batch_size, device):
        # 直接会掉init_state函数
        return self.init_state(batch_size, self.num_hiddens, device)


# 检查输出是否具有正确的形状
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
# 初始化隐藏状态
state = net.begin_state(X.shape[0], d2l.try_gpu())
Y, new_state = net(X.to(d2l.try_gpu()), state)
# 这里的输出为torch.Size([10, 28]) 1 torch.Size([2, 512])
# 表示了预测x中每一个词,每个预测是28分类问题
print(Y.shape, len(new_state), new_state[0].shape)


# 定义预测函数来生成prefix之后的新字符
# 其中num_preds是预测生成单词的数量,net是训练好的模型
def predict_ch8(prefix, num_preds, net, vocab, device):  # @save
    """在prefix后面生成新字符"""
    state = net.begin_state(batch_size=1, device=device)  # 获取初始的隐藏状态,batch_size等于1,就代表对一个字符串作预测
    outputs = [vocab[prefix[0]]]  # 第一个词
    # 每次把output最后一个词存下来,作为下一个词的输入
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    for y in prefix[1:]:  # 预热期
        # 在这里不关心输出,因为前面标准答案都是给定的
        _, state = net(get_input(), state)  # 这里用的是真实值而不是预测
        outputs.append(vocab[y])
    for _ in range(num_preds):  # 预测num_preds步
        # 每一次就是把前一次的输入作为预测放进来,更新状态
        y, state = net(get_input(), state)
        # 拿出最大的下标放到输出中
        outputs.append(int(y.argmax(dim=1).reshape(1)))
    return ''.join([vocab.idx_to_token[i] for i in outputs])


# 指定前缀为time traveller,并基于这个前缀生成10个后续的字符
print(predict_ch8('time traveller ', 10, net, vocab, d2l.try_gpu()))


# 梯度剪裁
def grad_clipping(net, theta):  # @save
    """裁剪梯度"""
    if isinstance(net, nn.Module):
        # 区分一下实现
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    # 拿出所有的层,取所有的梯度取平方对所有层求和再开更号。也就是去l2范数
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))

    if norm > theta:
        for param in params:
            # 裁剪之后 它的所有梯度之和等于theta
            param.grad[:] *= theta / norm


# 定义函数在迭代周期内训练模型
# use_random_iter训练输入参数是否是由连续性的
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    """训练网络一个迭代周期(定义见第8章)"""
    state, timer = None, d2l.Timer()
    metric = d2l.Accumulator(2)  # 训练损失之和,词元数量
    for X, Y in train_iter:
        if state is None or use_random_iter:
            # 在第一次迭代或使用随机抽样时初始化state
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            # 判断类型,如果是连续抽样可以将上一层的状态传递下去
            if isinstance(net, nn.Module) and not isinstance(state, tuple):
                # state对于nn.GRU是个张量
                state.detach_()
            else:
                # state对于nn.LSTM或对于我们从零开始实现的模型是个张量
                for s in state:
                    s.detach_()
        # y拉成向量
        y = Y.T.reshape(-1)
        X, y = X.to(device), y.to(device)
        y_hat, state = net(X, state)  # 传播
        # 计算loss
        l = loss(y_hat, y.long()).mean()  # 就是多分类问题
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            # 进行梯度裁剪
            grad_clipping(net, 1)
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            # 因为已经调用了mean函数
            updater(batch_size=1)
        metric.add(l * y.numel(), y.numel())
        # 平均交叉熵
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()


#
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
              use_random_iter=False):
    """训练模型(定义见第8章)"""
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
                            legend=['train'], xlim=[10, num_epochs])
    # 初始化
    if isinstance(net, nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
    # 包装上面的predict函数
    predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
    # 训练和预测
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_ch8(
            net, train_iter, loss, updater, device, use_random_iter)
        if (epoch + 1) % 10 == 0:
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])
    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    print(predict('time traveller'))
    print(predict('traveller'))


num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())

上面是正常隐藏层输出的公式,下面是RNN中隐藏层输出的公式

用pytorch实现rnn

import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
from language_model import load_data_time_machine
from rnn import train_ch8, predict_ch8

batch_size, num_steps = 32, 35
train_iter, vocab = load_data_time_machine(batch_size, num_steps)
# 定义模型
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
# 初始化隐藏状态,(隐藏层数,批量大小,隐藏单元数)
state = torch.zeros((1, batch_size, num_hiddens))
print(state.shape)
# 通过一个隐藏状态和一个输入,我们可以用更新后的状态计算输出
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
print(Y.shape, state_new.shape)


# rnn_layer只包含隐藏的循环层,我们还需要创建一个单独的输出层。
class RNNModel(nn.Module):
    """循环神经网络模型"""

    def __init__(self, rnn_layer, vocab_size, **kwargs):
        super(RNNModel, self).__init__(**kwargs)
        self.rnn = rnn_layer
        self.vocab_size = vocab_size
        self.num_hiddens = self.rnn.hidden_size
        # 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
        if not self.rnn.bidirectional:
            self.num_directions = 1
            self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
        else:
            self.num_directions = 2
            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

    def forward(self, inputs, state):
        # 转置矩阵之后转长整形,one-hot编码需要整数
        X = F.one_hot(inputs.T.long(), self.vocab_size)
        X = X.to(torch.float32)
        # state就是前一刻的状态,X就是输入
        Y, state = self.rnn(X, state)
        # 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
        # 它的输出形状是(时间步数*批量大小,词表大小)。
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        return output, state

    def begin_state(self, device, batch_size=1):
        if not isinstance(self.rnn, nn.LSTM):
            # nn.GRU以张量作为隐状态
            return torch.zeros((self.num_directions * self.rnn.num_layers,
                                batch_size, self.num_hiddens),
                               device=device)
        else:
            # nn.LSTM以元组作为隐状态
            return (torch.zeros((
                self.num_directions * self.rnn.num_layers,
                batch_size, self.num_hiddens), device=device),
                    torch.zeros((
                        self.num_directions * self.rnn.num_layers,
                        batch_size, self.num_hiddens), device=device))


# 选取模型进行预测
device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
predict_ch8('time traveller', 10, net, vocab, device)

# 框架会将小矩阵乘法优化成大矩阵乘法
num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, device)
Last modification:August 17, 2024
如果觉得我的文章对你有用,请随意赞赏