
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow import keras
from tensorflow.keras import datasets, layers , Sequential, optimizers, losses
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets import *
from sklearn.model_selection import *
##################### 神经网络 #####################
# 算法1:感知机收敛算法
# 初始化参数 = , =
# repeat
# 从训练集随机采样一个样本(, )
# 计算感知机的输出 = ( + )
# 如果 ≠ :
# ′=+∗ ∗
# ′ = + ∗
# repeat训练次数达到要求
# 输出:分类网络参数,
# 并行堆叠了 2 个神经元,即 2 个替换了激活函数的感知机, 构成 3 输入节点,
# 2 个输出节点的网络层。其中第一个输出节点的输出:
# 1=(11∗1+21∗2+1∗ +1)
# 第二个输出节点的输出
# 2=(12∗1+22∗2+2∗ +1)
# 输出向量为 = [1, 2]。整个网络层可以通过一次矩阵运算完成:
# O = X@W +
# 在 TensorFlow 中,要实现全连接层,只需要定义好权值张量 W 和偏置张量 b,
# 并利用 TensorFlow 提供的批量矩阵相乘函数 tf.matmul()即可完成网络层的计算。
# 如下代码创建输 入 X 矩阵为 = 2个样本,每个样本的输入特征长度为 = 784,
# 输出节点数为 = 256,故定义权值矩阵 W 的 shape 为[784,256],
# 并采用正态分布初始化 W;偏置向量 b 的 shape 定义为[256],在计算完X@W后相加即可,
# 最终全连接层的输出 O 的 shape 为 [2,256],即 2 个样本的特征,每个特征长度为 256。
# 创建2个样本的W,b张量
x = tf.random.normal([2,784])
w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1))
b1 = tf.Variable(tf.zeros([256]))
o1 = tf.matmul(x,w1) + b1 # 线性变换
o1 = tf.nn.relu(o1) # 激活函数
# 通过fc类完成一次全连接层的计算
# layers.Dense(units, activation),只需要指定输出节点数Units和激活函数类型即可。
# 创建4个样本
x = tf.random.normal([4,28*28])
# 创建全连接层,指定输出节点数和激活函数
fc = layers.Dense(512, activation=tf.nn.relu)
h1 = fc(x)
# 获取 Dense 类的权值矩阵
fc.kernel
# 获取 Dense 类的偏置向量
fc.bias
# 在优化参数时,需要获得网络的所有待优化的参数张量列表,
# 可以通过类的trainable_variables来返回待优化参数列表
fc.trainable_variables
# 返回所有参数列表
fc.variables
# 例如:对于输入:[,784] 隐藏层1:[256] 隐藏层2:[128] 隐藏层3:[64] 输出层:[ ,10]的神经网络
# 张量方式实现:
# 隐藏层1张量
w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1))
b1 = tf.Variable(tf.zeros([256]))
# 隐藏层2张量
w2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
b2 = tf.Variable(tf.zeros([128]))
# 隐藏层3张量
w3 = tf.Variable(tf.random.truncated_normal([128, 64], stddev=0.1))
b3 = tf.Variable(tf.zeros([64]))
# 输出层张量
w4 = tf.Variable(tf.random.truncated_normal([64, 10], stddev=0.1))
b4 = tf.Variable(tf.zeros([10]))
# 在计算时,只需要按照网络层的顺序,将上一层的输出送入当前层的输入即可,
# 重复直至 最后一层,将输出层的输出作为网络的输出:
with tf.GradientTape() as tape: # 梯度记录器
# x: [b, 28*28]
# 隐藏层 1 前向计算,[b, 28*28] => [b, 256]
h1 = x@w1 + tf.broadcast_to(b1, [x.shape[0], 256])
h1 = tf.nn.relu(h1)
# 隐藏层 2 前向计算,[b, 256] => [b, 128]
h2 = h1@w2 + b2
h2 = tf.nn.relu(h2)
# 隐藏层 3 前向计算,[b, 128] => [b, 64]
h3 = h2@w3 + b3
h3 = tf.nn.relu(h3)
# 输出层前向计算,[b, 64] => [b, 10]
h4 = h3@w4 + b4
# 层方式实现
fc1 = layers.Dense(256, activation=tf.nn.relu) # 隐藏层1
fc2 = layers.Dense(128, activation=tf.nn.relu) # 隐藏层2
fc3 = layers.Dense(64, activation=tf.nn.relu) # 隐藏层3
fc4 = layers.Dense(10, activation=None) # 输出层
x = tf.random.normal([4,28*28])
h1 = fc1(x)
h2 = fc2(h1)
h3 = fc3(h2)
h4 = fc4(h3)
# 通过Sequential容器封装为一个网络类
model = tf.keras.models.Sequential([
layers.Dense(256, activation=tf.nn.relu) , # 创建隐藏层 1
layers.Dense(128, activation=tf.nn.relu) , # 创建隐藏层 2
layers.Dense(64, activation=tf.nn.relu) , # 创建隐藏层 3
layers.Dense(10, activation=None) , # 创建输出层
])
out = model(x) # 前向计算得到输出
# ∗ = ( (),), ∈
# 网络的参数量是衡量网络规模的重要指标。那么怎么计算全连接层的参数量呢?
# 考虑权值矩阵W,偏置b,输入特征长度为,输出特征长度为的网络层,
# 其参数量为 ∗ ,再加上偏置 b 的参数,总参数量为 ∗ + 。
# 对于多层的全连接神经网 络,比如784 → 256 → 128 → 64 → 10,总参数量计算表达式:
# 256 ∗ 784 + 256 + 128 ∗ 256 + 128 + 64 ∗ 128 + 64 + 10 ∗ 64 + 10 = 242762
# 约 242K 个参数量。
# 激活函数
# 构造-6~6 的输入向量
x = tf.linspace(-6.,6.,10)
# ∶= 1/(1+−)
# 通过Sigmoid函数,向量的范围由[−6,6]映射到[0,1]的区间
tf.nn.sigmoid(x)
# () ≔ (0, )
# 通过ReLU激活函数后,负数全部抑制为0,正数得以保留。
tf.nn.relu(x)
# = ≥0 : ∗ <0
# 其中alpha参数即参数,通过 LeakyReLU(alpha)创建LeakyReLU网络层,
# 并设置参数,像Dense层一样将LeakyReLU层放置在网络的合适位置。
tf.nn.leaky_relu(x, alpha=0.1)
# h() = ( − −)/( + −) = 2 ∗ (2) − 1
# 通过 tanh 激活函数,可以看到向量的范围被映射到[−1,1]之间。
tf.nn.tanh(x)
# 输出层设计,以下常见几种输出类型:
# ∈ 输出属于整个实数空间,或者某段普通的实数空间,比如函数值趋势的预测, 年龄的预测问题等
# ∈ [0, 1] 输出值特别地落在[0, 1]的区间,如图片生成,图片像素值一般用[0, 1]表示;
# 或者二分类问题的概率,如硬币正反面的概率预测问题
# ∈ [0, 1], ∑ =1 输出值落在[0, 1]的区间,并且所有输出值之和为 1,常见的如多分类问题,
# 如 MNIST 手写数字图片识别,图片属于 10 个类别的概率之和为 1
# ∈ [−1, 1] 输出值在[-1, 1]之间
# 误差计算,以下常见误差计算类型:
# 均方差误差(Mean Squared Error, MSE)函数把输出向量和真实向量映射到笛卡尔坐标系的两个点上,
# 通过计算这两个点之间的欧式距离(准确地说是欧式距离的平方)来衡量两个向量之间的差距:
# MSE ≔ 1/ ∑(−)2 MSE误差函数的值总是大于等于0,当MSE函数达到最小值0时,
# 输出等于真实标签,此时神经网络的参数达到最优状态。
o = tf.random.normal([2,10]) # 构造网络输出
y_onehot = tf.constant([1,3]) # 构造真实值
y_onehot = tf.one_hot(y_onehot, depth=10)
loss = keras.losses.MSE(y_onehot, o) # 计算均方差
# TensorFlow MSE函数返回的是每个样本的均方差,需要在样本数量上再次平均来获得batch的均方差:
loss = tf.reduce_mean(loss) # 计算batch均方差
# 交叉熵
# 熵在信息学科中也叫信息熵,或者香农熵。熵越大,代表不确定性越大,信息量也就越大。
# 某个分布P()的熵定义为:():=−∑()2()
# 交叉熵(Cross Entropy)的定义: (, ): = −∑()2() = () + (|)
# (|) = ∑()2(()/())
# KL散度是Solomon Kullback和Richard A. Leibler在1951年提出的用于衡量2个分布之间距离的指标,
# = 时, (|)取得最小值 0。
# 特别地,当分类问题中 y 的编码分布采用 one-hot 编码时:() = 0,此时
# (, ) = () + (|) = (|) = ∑(/)
# 最小化交叉熵的过程也是最大化正确类别的预测概率的过程。从这个角度去理解交叉熵损失函数,非常直观易懂。
##################### 油耗预测实例 #####################
# 在线*载下**汽车效能数据集
dataset_path = keras.utils.get_file("auto-mpg.data", "http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data")
# 利用 pandas 读取数据集,字段有效能(公里数每加仑),气缸数,排量,马力,重量
# 加速度,型号年份,产地
column_names = ['MPG','Cylinders','Displacement','Horsepower','Weight','Acceleration', 'Model Year', 'Origin']
raw_dataset = pd.read_csv(dataset_path, names=column_names, na_values = "?", comment='\t', sep=" ", skipinitialspace=True)
dataset = raw_dataset.copy()
dataset.isna().sum() # 统计空白数据
dataset = dataset.dropna() # 删除空白数据项
dataset.isna().sum() # 再次统计空白数据
# 处理类别型数据,其中origin列代表了类别1,2,3,分布代表产地:美国、欧洲、日本
# 先弹出(删除并返回)origin 这一列
origin = dataset.pop('Origin')
# 根据origin列来写入新的3个列,并把对应行的值置为1.0
dataset['USA'] = (origin == 1)*1.0
dataset['Europe'] = (origin == 2)*1.0
dataset['Japan'] = (origin == 3)*1.0
# 按着 8:2 的比例切分训练集和测试集:
train_dataset = dataset.sample(frac=0.8, random_state=0)
test_dataset = dataset.drop(train_dataset.index)
# 移动 MPG 油耗效能这一列为真实标签Y,用pop取出来备用
train_labels = train_dataset.pop('MPG')
test_labels = test_dataset.pop('MPG')
# 统计训练集的各个字段数值的均值和标准差,并完成数据的标准化:
# 查看训练集的输入 X 的统计数据
train_stats = train_dataset.describe()
train_stats = train_stats.transpose()
# 标准化数据
def norm(x):
return (x - train_stats['mean']) / train_stats['std']
normed_train_data = norm(train_dataset)
normed_test_data = norm(test_dataset)
# 利用切分的训练集数据构建数据集对象:
# 构建 Dataset 对象
train_db = tf.data.Dataset.from_tensor_slices((normed_train_data.values, train_labels.values))
test_db = tf.data.Dataset.from_tensor_slices((normed_test_data.values, test_labels.values))
# 随机打散,批量化
train_db = train_db.shuffle(100).batch(32)
# 创建网络
# 考虑到Auto MPG数据集规模较小,我们只创建一个3层的全连接层网络来完成MPG值的预测任务。
# 输入 X 的特征共有9种,因此第一层的输入节点数为9。第一层、第二层的输出节点数设计为64,64。
# 由于只有一种预测值,输出层输出节点设计为 1。
# 考虑MPG ∈ R+,因此最后一次的激活函数可以不加,也可以添加ReLU激活函数。‘
# 我们将网络实现为一个自定义网络类,只需要在初始化函数中创建各个子网络层,
# 并在前向计算函数call中实现自定义网络类的计算逻辑即可。
# 自定义网络类继承自 keras.Model 基类,这也是自定义网络类的标准写法,
# 以方便地利用 keras.Model 基类提供的trainable_variables等各种便捷功能:
# 回归网络
class Network(keras.Model):
def __init__(self):
super(Network, self).__init__()
# 创建3个全连接层
self.fc1 = layers.Dense(64, activation='relu')
self.fc2 = layers.Dense(64, activation='relu')
self.fc3 = layers.Dense(1)
# 依次通过3个全连接层
def call(self, inputs, training=None, mask=None):
x = self.fc1(inputs)
x = self.fc2(x)
x = self.fc3(x)
return x
# 训练与测试
# 创建网络类实例
model = Network()
# 通过build函数完成内部张量的创建,其中4为任意的batch数量,9为输入特征长度
model.build(input_shape=(4, 9))
# 打印网络信息
model.summary()
# 创建优化器,指定学习率
optimizer = tf.keras.optimizers.RMSprop(0.001)
# 200个Epoch
for epoch in range(20):
# 遍历一次训练集,step = 总样本数/batch_size = 314/32 = 10
for step, (x,y) in enumerate(train_db):
# 梯度记录器
with tf.GradientTape() as tape:
out = model(x) # 通过网络获得输出
loss = tf.reduce_mean(keras.losses.MSE(y, out)) # 计算MSE
mae_loss = tf.reduce_mean(keras.losses.MAE(y, out)) # 计算MAE
#if step % 10 == 0:
# 打印训练误差
print(epoch, step, float(loss))
# 计算梯度,并更新
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
# 除了MSE均方差可以用来测试模型的性能,还可以用平均绝对误差(Mean Absolute Error, MAE)来衡量模型的性能,
# 它被定义为:∶= 1/ ∑|−|
##################### 反向传播 #####################
# 导数与梯度
# 当函数的自变量大于一个时,函数的导数拓展为函数值沿着任意∆方向的变化率。
# 导数本身是标量,没有方向,但是导数表征了函数值在某个方向∆的变化率。
# 在这些任意∆方向中,沿着坐标轴的几个方向比较特殊,此时的导数也叫做偏导数(Partial Derivative)。
# ∇L = (∂L/∂1,∂L/∂2,∂L/∂3,...∂n)
# 梯度下降,寻找函数的最小值: ′ = − ∗ ∇L( 为学习率)
# 梯度上升,求解函数的最大值: ′ = + ∗ ∇L( 为学习率)
# 激活函数导数
# Sigmoid:() = 1/1+− , (())/ = (1 − )
# ReLU: () ≔ (0, ), / = 1 ≥0 : 0 <0
# ReLU函数的导数
def derivative(x):
d = np.array(x, copy=True) # 用于保存梯度的张量
d[x < 0] = 0 # 元素为负的导数为0
d[x >= 0] = 1 # 元素为正的元素导数为1
return d
# : = ≥0 : ∗ <0, / = 1 ≥0 : <0
# 其中 p 为 LeakyReLU 的负半段斜率
def derivative(x, p):
d = np.ones_like(x) # 创建梯度张量
d[x < 0] = p # 元素为负的导数为p
return d
# Tanh: h() = ( − −)/( + −) = = 2 ∗ (2) − 1
# h()/ = 1 − h2()
# 损失函数梯度
# 均方差函数梯度: L/ =( −)
# Softmax梯度: / = (1−) 当= : −⋅ 当≠
# 交叉熵梯度:L/z = −
# 全连接层梯度
# 某条连接上面的梯度,只与当前连接的输出节点,对应的真实值节点的标签,以及对应的输入节点有关。
# 我们令 = ( − )(1 − ),表征连接线的终止节点的梯度传播的某种特性。
# ∂L/ = (ok − )(1 − ) =
# 链式法则
x = tf.constant(2.)
w1 = tf.constant(.1)
b1 = tf.constant(1.)
w2 = tf.constant(.2)
b2 = tf.constant(2.)
# 构建梯度记录器
with tf.GradientTape(persistent=True) as tape:
# 非 tf.Variable 类型的张量需要人为设置记录梯度信息
tape.watch([w1, b1, w2, b2])
# 构建2层线性网络
y1 = x * w1 + b1
y2 = y1 * w2 + b2
# 独立求解出各个偏导数
dy2_dy1 = tape.gradient(y2, [y1])[0]
dy1_dw1 = tape.gradient(y1, [w1])[0]
dy2_dw1 = tape.gradient(y2, [w1])[0]
# 验证链式法则
print(dy2_dy1 * dy1_dw1)
print(dy2_dw1)
# 反向传播算法
# 假设输出层节点数为,倒数第二层节点数为,倒数第三层的节点数为
# 输出层: ∂L/ = *
# 倒数第二层:∂L/ = *
# 倒数第三层:∂L/ = *
# Himmelblau 函数优化示例
def himmelblau(x):
# himmelblau函数实现
return (x[0] ** 2 + x[1] - 11) ** 2 + (x[0] + x[1] ** 2 - 7) ** 2
# 用np.meshgrid函数生成二维平面网格点坐标:
x = np.arange(-6, 6, 0.1)
y = np.arange(-6, 6, 0.1)
print('x,y range:', x.shape, y.shape)
# 生成 x-y 平面采样网格点,方便可视化
X, Y = np.meshgrid(x, y)
print('X,Y maps:', X.shape, Y.shape)
# 计算网格点上的函数值
Z = himmelblau([X, Y])
# 绘制 himmelblau 函数曲面
fig = plt.figure('himmelblau')
ax = fig.gca(projection='3d')
ax.plot_surface(X, Y, Z)
ax.view_init(60, -30)
ax.set_xlabel('x')
ax.set_ylabel('y')
plt.show()
# 利用TensorFlow自动求导来求出函数在, 的梯度,并循环迭代更新:
# 参数的初始化值对优化的影响不容忽视,可以通过尝试不同的初始化值,
# 检验函数优化的极小值情况
# [1., 0.], [-4, 0.], [4, 0.]
x = tf.constant([4., 0.]) # 初始化参数
for step in range(200):# 循环优化 200 次
with tf.GradientTape() as tape: #梯度跟踪
tape.watch([x]) # 加入梯度跟踪列表
y = himmelblau(x) # 前向传播
# 反向传播
grads = tape.gradient(y, [x])[0]
# 更新参数,0.01为学习率
x -= 0.01*grads
# 打印优化的极小值
if step % 20 == 19:
print ('step {}: x = {}, f(x) = {}' .format(step, x.numpy(), y.numpy()))
# 反向传播算法示例
N_SAMPLES = 2000 # 采样点数
TEST_SIZE = 0.3 # 测试数量比率
# 利用工具函数直接生成数据集
X, y = make_moons(n_samples = N_SAMPLES, noise=0.2, random_state=100)
# 将 2000 个点按着 7:3 分割为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE, random_state=42)
# 绘制数据集的分布,X 为 2D 坐标,y 为数据点的标签
def make_plot(X, y, plot_name, file_name=None, XX=None, YY=None, preds=None, dark=False):
if (dark):
plt.style.use('dark_background')
else:
sns.set_style("whitegrid")
plt.figure(figsize=(16,12))
axes = plt.gca()
axes.set(xlabel="$x_1#34;, ylabel="$x_2#34;)
plt.title(plot_name, fontsize=30)
plt.subplots_adjust(left=0.20)
plt.subplots_adjust(right=0.80)
if(XX is not None and YY is not None and preds is not None):
plt.contourf(XX, YY, preds.reshape(XX.shape), 25, alpha = 1, cmap=cm.Spectral)
plt.contour(XX, YY, preds.reshape(XX.shape), levels=[.5], cmap="Greys", vmin=0, vmax=.6)
# 绘制散点图,根据标签区分颜色
plt.scatter(X[:, 0], X[:, 1], c=y.ravel(), s=40, cmap=plt.cm.Spectral, edgecolors='none')
plt.savefig('dataset.svg')
# plt.close()
# 调用 make_plot 函数绘制数据的分布,其中 X 为 2D 坐标,y 为标签
make_plot(X, y, "Classification Dataset Visualization ")
plt.show()
# 网络层
class Layer:
# 全连接网络层
def __init__(self, n_input, n_neurons, activation=None, weights=None, bias=None):
"""
:param int n_input: 输入节点数
:param int n_neurons: 输出节点数
:param str activation: 激活函数类型
:param weights: 权值张量,默认类内部生成
:param bias: 偏置,默认类内部生成
"""
# 通过正态分布初始化网络权值,初始化非常重要,不合适的初始化将导致网络不收敛
self.weights = weights if weights is not None else np.random.randn(n_input, n_neurons) * np.sqrt(1 / n_neurons)
self.bias = bias if bias is not None else np.random.rand(n_neurons) * 0.1
self.activation = activation # 激活函数类型,如’sigmoid’
self.last_activation = None # 激活函数的输出值 o
self.error = None # 用于计算当前层的 delta 变量的中间变量
self.delta = None # 记录当前层的 delta 变量,用于计算梯度
# 前向传播 X@W+b
def activate(self, x):
r = np.dot(x, self.weights) + self.bias # 通过激活函数,得到全连接层的输出o
self.last_activation = self._apply_activation(r)
return self.last_activation
# 计算激活函数的输出
def _apply_activation(self, r):
if self.activation is None:
return r # 无激活函数,直接返回
# ReLU激活函数
elif self.activation == 'relu':
return np.maximum(r, 0)
# tanh
elif self.activation == 'tanh':
return np.tanh(r)
# sigmoid
elif self.activation == 'sigmoid':
return 1 / (1 + np.exp(-r))
return r
# 计算激活函数的导数
def apply_activation_derivative(self, r):
# 无激活函数,导数为 1
if self.activation is None:
return np.ones_like(r)
# ReLU函数的导数实现
elif self.activation == 'relu':
grad = np.array(r, copy=True)
grad[r > 0] = 1.
grad[r <= 0] = 0.
return grad
# tanh函数的导数实现
elif self.activation == 'tanh':
return 1 - r ** 2
# Sigmoid函数的导数实现
elif self.activation == 'sigmoid':
return r * (1 - r)
return r
# 网络模型
# 神经网络大类
class NeuralNetwork:
def __init__(self):
self._layers = [] # 网络层对象列表
def add_layer(self, layer): # 追加网络层
self._layers.append(layer)
def feed_forward(self, X): # 前向传播
for layer in self._layers: # 依次通过各个网络层
X = layer.activate(X)
return X
# 网络实例化
nn = NeuralNetwork()
nn.add_layer(Layer(2, 25, 'sigmoid')) # 隐藏层1, 2=>25
nn.add_layer(Layer(25, 50, 'sigmoid')) # 隐藏层2, 25=>50
nn.add_layer(Layer(50, 25, 'sigmoid')) # 隐藏层3, 50=>25
nn.add_layer(Layer(25, 2, 'sigmoid')) # 输出层, 25=>2
# 反向传播算法实现
def backpropagation(self, X, y, learning_rate):
# 前向计算,得到输出值
output = self.feed_forward(X)
for i in reversed(range(len(self._layers))): # 反向循环
layer = self._layers[i] # 得到当前层对象
# 如果是输出层
if layer == self._layers[-1]: # 对于输出层
layer.error = y - output # 计算 2 分类任务的均方差的导数
# 关键步骤:计算最后一层的 delta,参考输出层的梯度公式
layer.delta = layer.error * layer.apply_activation_derivative(output)
else: # 如果是隐藏层
next_layer = self._layers[i + 1] # 得到下一层对象
# 关键步骤:计算隐藏层的 delta,参考隐藏层的梯度公式
layer.error = np.dot(next_layer.weights, next_layer.delta)
layer.delta = layer.error * layer.apply_activation_derivative(layer.last_activation)
# 循环更新权值
for i in range(len(self._layers)):
layer = self._layers[i]
# o_i为上一网络层的输出
o_i = np.atleast_2d(X if i == 0 else self._layers[i - 1].last_activation)
# 梯度下降算法,delta 是公式中的负数,故这里用加号
layer.weights += layer.delta * o_i.T * learning_rate
# 网络训练函数
def train(self, X_train, X_test, y_train, y_test, learning_rate, max_epochs):
# one-hot编码
y_onehot = np.zeros((y_train.shape[0], 2))
y_onehot[np.arange(y_train.shape[0]), y_train] = 1
mses = []
for i in range(max_epochs): #训练1000个epoch
for j in range(len(X_train)): # 一次训练一个样本
self.backpropagation(X_train[j], y_onehot[j], learning_rate)
if i % 10 == 0:
# 打印出 MSE Loss
mse = np.mean(np.square(y_onehot - self.feed_forward(X_train)))
mses.append(mse)
print('Epoch: #%s, MSE: %f' % (i, float(mse)))
# 统计并打印准确率
print('Accuracy: %.2f%%' % (self.accuracy(self.predict(X_test), y_test.flatten()) * 100))
return mses
##################### Keras 高层接口 #####################
# 常见网络层类,我们以 Softmax 层为例,它既可以使用 tf.nn.softmax 函数在前向传播逻辑中完成,
# 也可以通过layers.Softmax(axis)类搭建Softmax 网络层,其中axis参数指定进行softmax运算的维度。
x = tf.constant([2.,1.,0.1])
layer = layers.Softmax(axis=-1) # 创建Softmax层
layer(x) # 调用softmax前向计算
# 网络容器
# 通过Keras提供的网络容器Sequential将多个网络层封装成一个大网络模型,
# 只需要调用网络模型的实例一次即可完成数据从第一层到 最末层的顺序运算。
# Sequential 容器也可以通过 add()方法继续追加新的网络层,实现动态创建网络的功能:
network = Sequential([ # 封装为一个网络
layers.Dense(3, activation=None), # 全连接层 layers.ReLU(),#激活函数层
layers.Dense(2, activation=None), # 全连接层
layers.ReLU() #激活函数层
])
x = tf.random.normal([4,3])
network(x) # 输入从第一层开始,逐层传播至最末层
layers_num = 2 # 堆叠2次
network = Sequential([]) # 先创建空的网络
for _ in range(layers_num):
network.add(layers.Dense(3)) # 添加全连接层
network.add(layers.ReLU())# 添加激活函数层
network.build(input_shape=(None, 4)) # 创建网络参数
network.summary()
# 打印网络的待优化参数名与shape
for p in network.trainable_variables:
print(p.name, p.shape)
# 模型装配
# 在Keras中,有2个比较特殊的类:keras.Model和keras.layers.Layer类。
# 其中Layer类是网络层的母类,定义了网络层的一些常见功能,如添加权值,管理权值列表等。
# Model类是网络的母类,除了具有Layer类的功能,还添加了保存、加载模型,训练与测试模型等便捷功能。
# Sequential也是Model的子类,因此具有Model类的所有功能。
# 创建5层的全连接层网络
network = Sequential(
[layers.Dense(256, activation='relu'),
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32, activation='relu'),
layers.Dense(10)])
network.build(input_shape=(None, 28*28))
network.summary()
# 创建网络后,正常的流程是通过循环迭代数据集多遍,每次按批产生训练数据,前向计算,
# 然后通过损失函数计算误差值,并反向传播自动计算梯度,更新网络参数。
# 这一部分逻辑由于非常通用,在keras中提供了compile()和fit()函数方便实现上述逻辑。
# 首先通过compile函数指定网络使用的优化器对象,损失函数,评价指标等:
# 采用Adam优化器,学习率为0.01;采用交叉熵损失函数,包含Softmax
# 设置测量指标为准确率
network.compile(
optimizer=optimizers.Adam(lr=0.01),
loss=losses.CategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
# 模型训练
# 模型装配完成后,即可通过fit()函数送入待训练的数据和验证用的数据集:
# 指定训练集为train_db,验证集为val_db, 训练5个epochs,每2个epoch验证一次
# 返回训练信息保存在history中
# 其中train_db为tf.data.Dataset对象,也可以传入Numpy Array类型的数据;
# validation_data指定用于验证(测试)的数据集和验证的频率validation_freq。
# fit函数会返回训练过程的数据记录history,
# 其中history.history为字典对象,包含了训练过程中的loss,测量指标等记录项
history = network.fit(train_db, epochs=5, validation_data=val_db, validation_freq=2)
# 打印训练记录
history.history
# 模型测试
# 加载一个batch的测试数据
x,y = next(iter(db_test))
print('predict x:', x.shape)
out = network.predict(x)
print(out)
# 通过 Model.evaluate(db)即可循环测试完db数据集上所有样本,并打印出性能指标:
network.evaluate(db_test)
# 模型保存和加载
# 张量方式
# 通过调用Model.save_weights(path)方法即可讲当前的网络参数保存到path文件上:
network.save_weights('weights.ckpt')
# 调用网络对象的load_weights(path)方法即可将指定的模型文件中保存的张量数值写入到当前网络参数中去。
# 删除网络对象
# del network
# 重新创建相同的网络结构
network = Sequential(
[layers.Dense(256, activation='relu'),
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32, activation='relu'),
layers.Dense(10)])
network.compile(
optimizer=optimizers.Adam(lr=0.01),
loss=tf.losses.CategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
# 从参数文件中读取数据并写入当前网络
network.load_weights('weights.ckpt')
# 这种保存与加载网络的方式最为轻量级,它需要使用相同的网络结构才能够恢复网络状态,
# 因此一般在拥有网络源文件的情况下使用。
# 网络方式
# 通过Model.save(path)函数可以将模型的结构以及模型的参数保存到一个path文件上,
# 在不需要网络源文件的条件下,通过keras.models.load_model(path)即可恢复网络结构和网络参数。
# 保存模型结构与模型参数到文件
network.save('model.h5')
# 删除网络对象
# del network
# 从文件恢复网络结构与网络参数
network = tf.keras.models.load_model('model.h5')
# SavedModel方式
# 具有平台无关性
# 通过tf.keras.experimental.export_saved_model(network, path)即可将模型以SavedModel方式保存到path目录中:
# 保存模型结构与模型参数到文件
tf.keras.experimental.export_saved_model(network, 'savedmodel')
# 删除网络对象
# del network
# 从文件恢复网络结构与网络参数
network = tf.keras.experimental.load_from_saved_model('savedmodel')
# 自定义类
# 自定义网络层
class MyDense(layers.Layer):
def __init__(self, inp_dim, outp_dim):
super(MyDense, self).__init__()
# 创建权值张量并添加到类管理列表中,设置为需要优化
self.kernel = self.add_weight(
'w',
[inp_dim, outp_dim],
trainable=True)
net = MyDense(4, 3) # 创建输入为4,输出为3节点的自定义层
net.variables, net.trainable_variables
# 通过 tf.Variable 创建的类成员也会自动加入类参数列表
class MyDense(layers.Layer):
def __init__(self, inp_dim, outp_dim):
super(MyDense, self).__init__()
# 创建权值张量并添加到类管理列表中,设置为不需要优化
self.kernel = tf.Variable(
tf.random.normal([inp_dim, outp_dim]),
trainable=False)
# 完成自定义类的初始化工作后,我们来设计自定义类的前项运算逻辑,
# 对于我们这个例子,只需要完成 = @矩阵运算,并通过激活函数即可:
def call(self, inputs, training=None): # 实现自定义类的前向计算逻辑
# X@W
out = inputs @ self.kernel # 执行激活函数运算
out = tf.nn.relu(out)
return out
# 自定义类的前向运算逻辑需要实现在call(inputs, training)函数中,其中inputs代表输入,由用户在调用时传入;
# training参数用于指定模型的状态:training为True时执行训练模式,False时执行测试模式,默认参数为None,即测试模式。
# 由于全连接层的训练模式和测试模式逻辑一致,此处不需要额外处理。
# 对于部份测试模式和训练模式 不一致的网络层,需要根据training参数来设计需要执行的逻辑。
# 自定义网络
# 使用自定义的层
# 自定义网络方式一
network = Sequential([MyDense(784, 256),
MyDense(256, 128),
MyDense(128, 64),
MyDense(64, 32),
MyDense(32, 10)])
network.build(input_shape=(None, 28*28))
network.summary()
# 自定义网络方式二
class MyModel(keras.Model):
# 自定义网络类,继承自Model基类
def __init__(self):
super(MyModel, self).__init__()
# 完成网络内需要的网络层的创建工作
self.fc1 = MyDense(28*28, 256)
self.fc2 = MyDense(256, 128)
self.fc3 = MyDense(128, 64)
self.fc4 = MyDense(64, 32)
self.fc5 = MyDense(32, 10)
# 然后实现自定义网络的前向运算逻辑:
def call(self, inputs, training=None):
x = self.fc1(inputs)
x = self.fc2(x)
x = self.fc3(x)
x = self.fc4(x)
x = self.fc5(x)
return x
# Keras已有模型
# 以ResNet50迁移学习为例,一般将ResNet50去掉最后一层后的网络作为新任务的特征提取子网络,
# 即利用 ImageNet 上面预训练的特征提取方法迁移到我们自定义的数据集上,
# 并根据自定义任务的类别追加一个对应数据类别数的全连接分类层,
# 从而可以在预训练网络的基础上可以快速、高效地学习新任务。
# 加载 ImageNet 预训练网络模型,并去掉最后一层
resnet = keras.applications.ResNet50(weights='imagenet',include_top=False)
resnet.summary()
# 测试网络的输出
x = tf.random.normal([4,224,224,3])
out = resnet(x)
out.shape
# 以100类的分类任务为例,我们在ResNet50基础上重新构建新网络。
# 新建一个池化层(这里的池化层可以理解为维度缩减功能),将特征从[b, 7,7,2048]降维到[b, 2048]:
# 新建池化层
# 利用上一层的输出作为本层的输入,测试其输出
global_average_layer = layers.GlobalAveragePooling2D()
x = tf.random.normal([4,7,7,2048])
out = global_average_layer(x) # 池化层降维
print(out.shape)
# 最后新建一个全连接层,并设置输出节点数为100:
# 新建全连接层
fc = layers.Dense(100)
# 利用上一层的输出作为本层的输入,测试其输出
x = tf.random.normal([4,2048])
out = fc(x)
print(out.shape)
# 在得到预训练的ResNet50特征层和我们新建的池化层、全连接层后,
# 我们重新利用Sequential容器封装成一个新的网络:
# 重新包裹成我们的网络模型
mynet = Sequential([resnet, global_average_layer, fc])
mynet.summary()
# 通过设置 resnet.trainable = False
# 可以选择冻结ResNet部分的网络参数,只训练新建的网络层,从而快速、高效完成网络模型的训练。
# 测试
# 在keras.metrics模块下,提供了较多的常用测量类,
# 如统计平均值的Mean类,统计准确率的Accuracy类,统计余弦相似度的CosineSimilarity类等等。
# 新建平均测量器,适合 Loss 数据
loss_meter = metrics.Mean()
# 通过测量器的update_state函数可以写入新的数据:
# 下述采样代码放置在每个batch 运算完成后,测量器会自动根据采样的数据来统计平均值。
loss_meter.update_state(float(loss))
# 打印统计的平均loss,并清零测量器,为下一step的统计做准备
if step % 100 == 0:
# 打印统计的平均 loss
print(step, 'loss:', loss_meter.result())
loss_meter.reset_states()
# 准确率统计实力
acc_meter = metrics.Accuracy() # 创建准确率测量器
# 需要注意的是,Accuracy类的update_state函数的参数为预测值和真实值,而不是已经计算过的batch的准确率:
# [b, 784] => [b, 10],网络输出值
out = network(x)
# [b, 10] => [b],经过argmax后计算预测值
pred = tf.argmax(out, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
# 根据预测值与真实值写入测量器
acc_meter.update_state(y, pred)
# 读取统计结果
print(step, 'Evaluate Acc:', acc_meter.result().numpy())
acc_meter.reset_states() # 清零测量器
# 可视化
# 模型端
# 创建监控类,监控数据将写入log_dir目录
summary_writer = tf.summary.create_file_writer(log_dir)
with summary_writer.as_default():
# 当前时间戳step上的数据为loss,写入到ID位train-loss对象中
tf.summary.scalar('train-loss', float(loss), step=step)
# 需要注意的是,TensorBoard通过字符串ID来区分不同类别的监控数据,
# 因此对于误差数据,我们将它命名为”train-loss”,其他类的数据不可写入此对象,防止数据污染。
with summary_writer.as_default():
# 写入测试准确率
tf.summary.scalar('test-acc', float(total_correct/total), step=step)
# 可视化测试用的图片,设置最多可视化 9 张图片
tf.summary.image("val-onebyone-images:", val_images, max_outputs=9, step=step)
# 浏览器端
# 输入命令行:tensorboard --logdir .
# 并访问 http://localhost:6006
# 除了监控标量数据和图片数据外,
# TensorBoard还支持通过tf.summary.histogram查看张量的数据直方图分布,
# 以及通过 tf.summary.text 打印文本信息:
with summary_writer.as_default():
# 当前时间戳step上的数据为loss,写入到ID位train-loss对象中
tf.summary.scalar('train-loss', float(loss), step=step)
# 可视化真实标签的直方图分布
tf.summary.histogram('y-hist',y, step=step)
# 查看文本信息
tf.summary.text('loss-text',str(float(loss)))
##################### 过拟合 #####################
# 提前停止
# 算法 1:带Early stopping功能的网络训练算法
# 随机初始化参数
# repeat
# for = 1,..., do
# 随机采样训练batch {( ,)}
# ← − L((), )
# end
# 数个 epoch 后验证一次
# if 验证性能连续数次不提升 do
# 提取停止训练
# end
# until 训练回合数epoch达到要求
#
# 测试性能
# 输出:网络参数与测试性能
#模型设计
# 对于神经网络来说,网络的层数和参数量是网络容量很重要的参考指标,
# 通过减少网络的层数,减少每层中网络参数量的规模可以有效降低网络的容量。
# 反之,如果发现模型欠拟合,需要增大网络的容量,可以通过增加层数,增大每层的参数量等方式实现。
# 正则化
# 对模型的参数添加额外的约束后,优化的目标变为:
# L((),)+∗(),(,)∈
# 其中()表示对网络参数的稀疏性约束函数。一般地,参数的稀疏性约束通过约束参数的范数实现,即:
# () = ∑‖‖
# 新的优化目标除了要最小化原来的损失函数L(,)之外,还需要约束网络参数的稀疏性,
# 优化算法会在降低L(,)的同时,尽可能地迫使网络参数变得稀疏,
# 他们之间的权重关系通过超参数来平衡,较大的意味着网络的稀疏性更重要;
# 较小的则意味着网络的训 练误差更重要。
# 通过选择合适的超参数可以获得较好的训练性能,同时保证网络的稀疏性,从而获得不错的泛化能力。
# 常用的正则化方式有 L0,L1,L2 正则化。
#
# L0正则化是指采用L0范数作为稀疏性惩罚项()的正则化方式,即:
# () = ∑‖‖0
# 其中L0范数‖‖0定义为中非零元素的个数。
# 通过约束∑‖‖0的大小可以迫使网络中的连接权值大部分为0,
# 从而降低网络的实际参数量和网络容量。
# 但是由于L0范数‖‖0并不可导,不能利用梯度下降算法进行优化,在神经网络中使用的并不多。
#
# 采用L1范数作为稀疏性惩罚项()的正则化方式叫做L1正则化,即:
# () = ∑‖‖1
# 其中L1范数‖‖1定义为张量中所有元素的绝对值之和。
# L1正则化也叫Lasso Regularization,它是连续可导的,在神经网络中使用广泛。
#
# 采用L2 范数作为稀疏性惩罚项()的正则化方式叫做L2正则化,即:
# () = ∑‖‖2
# 其中L2范数‖‖2定义为张量中所有元素的平方和。
# L2正则化也叫Ridge Regularization,它和L1正则化一样,也是连续可导的,在神经网络中使用广泛。
# 计算 L2 正则化项
loss_reg = tf.reduce_sum(tf.square(w1)) + tf.reduce_sum(tf.square(w2))
# Dropout
# 在 TensorFlow 中,可以通过tf.nn.dropout(x, rate)函数实现某条连接的Dropout功能,
# 其中rate参数设置断开的概率值:
x = tf.nn.dropout(x, rate=0.5)
# 也可以将 Dropout 作为一个网络层使用,在网络中间插入一个Dropout层
model.add(layers.Dropout(rate=0.5))
# 数据增强:比如对于图片,可以通过旋转、翻转、裁剪或其他方式生成更多数据
# 过拟合问题示例
N_SAMPLES = 2000 # 采样点数
TEST_SIZE = 0.3 # 测试数量比率
x_min = -3.
x_max = 3.
y_min = -2.
y_max = 2.
OUTPUT_DIR = 'OUTPUT'
N_EPOCHS = 10
X, y = make_moons(n_samples = N_SAMPLES, noise=0.25, random_state=100)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = TEST_SIZE, random_state=42)
def make_plot(X, y, plot_name, file_name, XX=None, YY=None, preds=None):
plt.figure()
# sns.set_style("whitegrid")
axes = plt.gca()
axes.set_xlim([x_min,x_max])
axes.set_ylim([y_min,y_max])
axes.set(xlabel="$x_1#34;, ylabel="$x_2#34;)
# 根据网络输出绘制预测曲面
if(XX is not None and YY is not None and preds is not None):
plt.contourf(XX, YY, preds.reshape(XX.shape), 25, alpha = 0.08, cmap=cm.Spectral)
plt.contour(XX, YY, preds.reshape(XX.shape), levels=[.5], cmap="Greys", vmin=0, vmax=.6)
# 绘制正负样本
markers = ['o' if i == 1 else 's' for i in y.ravel()]
plt.scatter(
X[:, 0],
X[:, 1],
c=y.ravel(),
s=20,
cmap=plt.cm.Spectral,
edgecolors='none'
#marker=markers
)
# 保存矢量图
plt.savefig(OUTPUT_DIR+'/'+file_name)
make_plot(X, y, None, "dataset.svg")
for n in range(5): # 构建5种不同层数的网络
model = Sequential()# 创建容器
# 创建第一层
model.add(tf.keras.layers.Dense(8, input_dim=2,activation='relu'))
for _ in range(n): # 添加n层,共n+2层
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(1, activation='sigmoid')) # 创建最末层
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # 模型装配与训练
history = model.fit(X_train, y_train, epochs=N_EPOCHS, verbose=1) # 绘制不同层数的网络决策边界曲线
preds = model.predict_classes(np.c_[XX.ravel(), YY.ravel()])
title = "网络层数({})".format(n)
file = "网络容量%f.png"%(2+n*1)
make_plot(X_train, y_train, title, file, XX, YY, preds)
for n in range(5): # 构建5种不同数量 Dropout 层的网络
model = Sequential()# 创建 # 创建第一层
model.add(tf.keras.layers.Dense(8, input_dim=2,activation='relu'))
counter = 0
for _ in range(5): # 网络层数固定为 5
model.add(tf.keras.layers.Dense(64, activation='relu'))
if counter < n: # 添加n个Dropout层
counter += 1
model.add(layers.Dropout(rate=0.5))
model.add(tf.keras.layers.Dense(1, activation='sigmoid')) # 输出层
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # 模型装配
# 训练
history = model.fit(X_train, y_train, epochs=N_EPOCHS, verbose=1) # 绘制不同 Dropout 层数的决策边界曲线
preds = model.predict_classes(np.c_[XX.ravel(), YY.ravel()])
title = "Dropout({})".format(n)
file = "Dropout%f.png"%(n)
make_plot(X_train, y_train, title, file, XX, YY, preds)
def build_model_with_regularization(_lambda):
# 创建带正则化项的神经网络
model = Sequential()
model.add(Dense(8, input_dim=2,activation='relu')) # 不带正则化项
# 带 L2 正则化项
model.add(Dense(256, activation='relu', kernel_regularizer=regularizers.l2(_lambda)))
# 带 L2 正则化项
model.add(Dense(256, activation='relu', kernel_regularizer=regularizers.l2(_lambda)))
# 带 L2 正则化项
model.add(Dense(256, activation='relu',kernel_regularizer=regularizers.l2(_lambda)))
# 输出层
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # 模型装配
return model
for _lambda in [1e-5,1e-3,1e-1,0.12,0.13]: # 设置不同的正则化系数 # 创建带正则化项的模型
model = build_model_with_regularization(_lambda)
# 模型训练
history = model.fit(X_train, y_train, epochs=N_EPOCHS, verbose=1)
# 绘制权值范围
layer_index = 2
plot_title = "正则化-[lambda = {}]".format(str(_lambda))
file_name = "正则化_" + str(_lambda)
# 绘制网络权值范围图
plot_weights_matrix(model, layer_index, plot_title, file_name) # 绘制不同正则化系数的决策边界线
preds = model.predict_classes(np.c_[XX.ravel(), YY.ravel()])
title = "正则化".format(_lambda)
file = "正则化%f.svg"%_lambda
make_plot(X_train, y_train, title, file, XX, YY, preds)