动手深度学习note-6(RNN)

循环神经网络 RNN

模型原理

公式推导

从多层感知机mlp出发

\[ O_t = \phi(X \times W_{xh} + b_h)\tag{1} \]

引入隐藏状态\(h_t\)

  • \(h_t\)包含时间信息,即包含前面的所有输入的信息

\[ h_t = \phi(h_{t-1} \times W_{hh} + X \times W_{xh} + b_h) \tag{1} \]

  • 由隐藏状态输出Output

\[ Output:O_t = \phi(W_{ho} \times h_t + b_o) \]

通过对比可以发现,如果对隐藏状态去掉时间序列,那么RNN就退化成了mlp

计算损失函数

再思考RNN的计算公式,当模型进行推理时,给定输入的序列,预测下一个字符的概率,因而本质上也是一个分类模型,故而交叉熵再计算RNN中依然适用

  • 平均交叉熵(Average Cross Entropy) \[ \pi = - \frac{1}{n} \sum _{i=1} ^{n}log P(X_t|X_{t-1})\tag{1} \]

然而,由于历史原因,在RNN中我们使用困惑度来刻画模型预测下一个字符的把握

  • 困惑度(Perplexity) \[ P = e^{\pi}\tag{2} \]

现在,我们来考虑模型最理想的形态:

  • 假设由已知的输入模型推测下一个字符出现的概率为1,即模型非常确信应该输出的下一个字符,由\(P(X_t|X_{t-1})=1\)\(\pi = 0\)\(P=1\)

代码实现

文本编码

构建训练数据集、字典

对文本进行预处理,使用正则表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Load Data
def count_corpus(tokens):
"""统计词元的频率"""
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)


class Vocab:
"""生成词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(),
key=lambda x: x[1], reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1

def __len__(self):
return len(self.idx_to_token)

def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]

def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]

@property
def unk(self):
return 0

@property
def token_freqs(self):
return self._token_freqs


def get_data(path, mode, pattern):
'''load data and return data & data_tokenized'''
with open(path, 'r') as f:
file_name = f.readlines()

file_name_origin = [re.sub(pattern, ' ' , line).strip().lower() for line in file_name]
file_name_tokenized = [line.split() for line in file_name_origin]
tokens = [i for k in file_name_tokenized for i in k]

if mode == 'unigram':
print('---本次使用一元语法---')
gram_tokens = [_ for _ in tokens]
if mode == 'bigram':
print('---本次使用二元语法---')
gram_tokens = [_ for _ in zip(tokens[:-1],tokens[1:])]
if mode == 'trigram':
print('---本次使用三元语法---')
gram_tokens = [_ for _ in zip(tokens[:-2], tokens[1:-1], tokens[2:])]

file_name_vocab = Vocab(gram_tokens)
file_name_corpus = [file_name_vocab[i] for i in gram_tokens]

print('文件信息摘要:')
print(f'总字数:{len(file_name_corpus)}')
print(f'字典大小:{len(file_name_vocab.token_freqs)}')
print(f'高频词:{file_name_vocab.token_freqs[:10]}')

return file_name_vocab, file_name_corpus


def seq_data_iter_random(corpus, batch_size, num_steps):
corpus = corpus[random.randint(0, num_steps-1):]
num_subseqs = (len(corpus) - 1) // num_steps
initial_indics = list(range(0, num_subseqs * num_steps, num_steps))
num_batch = num_subseqs // batch_size
for i in range(0, batch_size * num_batch, batch_size):
initial_indics_per_batch = initial_indics[i: i+batch_size]
X = [corpus[j: j+num_steps] for j in initial_indics_per_batch]
Y = [corpus[j+1: j+num_steps+1] for j in initial_indics_per_batch]
yield tf.constant(X), tf.constant(Y)
print(num_subseqs, initial_indics, num_batch)


def seq_data_iter_sequential(corpus, batch_size, num_steps):
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = tf.constant(corpus[offset: offset + num_tokens])
Ys = tf.constant(corpus[offset+1 : offset+1+num_tokens])
Xs = tf.reshape(Xs, (batch_size, -1))
Ys = tf.reshape(Ys, (batch_size, -1))
num_batch = Xs.shape[1] // num_steps
for _ in range(0, num_batch * num_steps, num_steps):
X = Xs[:, _:_+num_steps]
Y = Ys[:, _:_+num_steps]
yield X, Y


class SeqDataLoader:
'''生成训练样本'''
def __init__(
self, batch_size, num_steps, random_iter,
path, gram_mode, pattern):
if random_iter:
self.method = seq_data_iter_random
else:
self.method = seq_data_iter_sequential
self.batch_size = batch_size
self.num_steps = num_steps
self.vocab, self.corpus = get_data(path, gram_mode, pattern)

def _iter_(self):
return self.method(self.corpus, self.batch_size, self.num_steps)


def load_data(batch_size, num_steps, random_iter,
path, gram_mode, pattern):
data_iter = SeqDataLoader(
batch_size, num_steps, random_iter,
path, gram_mode, pattern)
return data_iter, data_iter.vocab


# 初始化变量
''''读取数据'''
path = 'dataset/time_machine.txt'
gram_mode = 'unigram' # 'unigram' & 'bigram' & 'trigram'
pattern = '[^A-Za-z]+' # 匹配正则表达式

'''生成训练样本'''
batch_size = 64
num_steps = 5
random_iter = False # True or False


''''读取数据'''
path = 'dataset/time_machine.txt'
gram_mode = 'unigram' # ('unigram' & 'bigram' & 'trigram')
pattern = '[^A-Za-z]+' # (匹配正则表达式)

'''生成训练样本'''
batch_size = 32
num_steps = 35
random_iter = True # True or False


train_iter, vocabulary = load_data(batch_size, num_steps, random_iter,
path, gram_mode, pattern)
  • unigram:一元语法,以一个单词为最小单元(包含很多低频词)
  • bigram&trigram:二元语法与三元语法,减少词典的大小

使用独热编码(one_hot)

构建模型

初始化参数 params

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def get_params(vocab_size, num_hiddens):
num_inputs = num_outputs = vocab_size

def normal(shape):
return tf.random.normal(
shape=shape,stddev=0.01,mean=0,dtype=tf.float32)

W_xh = tf.Variable(normal(
(num_inputs, num_hiddens)), dtype=tf.float32)
W_hh = tf.Variable(normal(
(num_hiddens, num_hiddens)), dtype=tf.float32)
b_h = tf.Variable(tf.zeros(
num_hiddens), dtype=tf.float32)
W_hq = tf.Variable(normal(
(num_hiddens, num_outputs)), dtype=tf.float32)
b_q = tf.Variable(tf.zeros(
num_outputs), dtype=tf.float32)

params = [W_xh, W_hh, b_h, W_hq, b_q]
return params

初始化隐藏层 H

1
2
3
def init_rnn_state(batch_size, num_hiddens):
'''初始化隐藏层参数'''
return (tf.zeros(shape=(batch_size, num_hiddens)),

根据公式构建前向传播函数

1
2
3
4
5
6
7
8
9
10
11
def rnn(inputs, state, params):
'''前向传播函数,RNN的计算方法,使用tanh作为激活函数'''
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
for x in inputs:
X = tf.reshape(x,[-1, W_xh.shape[0]])
H = tf.tanh(tf.matmul(H, W_hh) + tf.matmul(X, W_xh) + b_h)
Y = tf.matmul(H, W_hq + b_q)
outputs.append(Y)
return tf.concat(outputs, axis=0), (H,)

动手深度学习note-6(RNN)
https://blog.potential.icu/2024/03/26/2024-2-26-动手深度学习note-6(RNN)/
Author
Xt-Zhu
Posted on
March 26, 2024
Licensed under