加载数据cnews_loader.py:
# coding: utf-8import sys
from collections import Counterimport numpy as np
import tensorflow.contrib.keras as krif sys.version_info[0] > 2:
is_py3 = True
else:
reload(sys)
sys.setdefaultencoding("utf-8")
is_py3 = Falsedef native_word(word, encoding='utf-8'):
"""如果在python2下面使用python3训练的模型,可考虑调用此函数转化一下字符编码"""
if not is_py3:
return word.encode(encoding)
else:
return worddef native_content(content):
if not is_py3:
return content.decode('utf-8')
else:
return contentdef open_file(filename, mode='r'):
"""
常用文件操作,可在python2和python3间切换.
mode: 'r' or 'w' for read or write
"""
if is_py3:
return open(filename, mode, encoding='utf-8', errors='ignore')
else:
return open(filename, mode)def read_file(filename):
"""读取文件数据"""
contents, labels = [], []
with open_file(filename) as f:
for line in f:
try:
label, content = line.strip().split('\t')#分别获取每一行的主题和内容
if content:
contents.append(list(native_content(content)))#对每一行文本进行切割,每一个字成为列表中的一个元素
labels.append(native_content(label))
except:
pass
return contents, labelsdef build_vocab(train_dir, vocab_dir, vocab_size=5000):
"""根据训练集构建词汇表,存储"""
data_train, _ = read_file(train_dir)all_data = https://www.it610.com/article/[]
for content in data_train:
all_data.extend(content)counter = Counter(all_data)
count_pairs = counter.most_common(vocab_size - 1)
words, _ = list(zip(*count_pairs))
# 添加一个 来将所有文本pad为同一长度
words = [''] + list(words)
open_file(vocab_dir, mode='w').write('\n'.join(words) + '\n')def read_vocab(vocab_dir):
"""读取词汇表"""
# words = open_file(vocab_dir).read().strip().split('\n')
with open_file(vocab_dir) as fp:
# 如果是py2 则每个值都转化为unicode
words = [native_content(_.strip()) for _ in fp.readlines()]# Python strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列。
word_to_id = dict(zip(words, range(len(words))))# 将word做成字典,key是每一行,value是行数
return words, word_to_iddef read_category():
"""读取分类目录,固定"""
categories = ['体育', '财经', '房产', '家居', '教育', '科技', '时尚', '时政', '游戏', '娱乐']categories = [native_content(x) for x in categories]cat_to_id = dict(zip(categories, range(len(categories))))return categories, cat_to_iddef to_words(content, words):
"""将id表示的内容转换为文字"""
return ''.join(words[x] for x in content)def process_file(filename, word_to_id, cat_to_id, max_length=600):
"""将文件转换为id表示"""
contents, labels = read_file(filename)
#contents是一个二维的矩阵,每一行就是文本中这一行的每个字组成的列表如: ['收', '评', ':', '沪', '基']
#labels就是其所对应的主题
data_id, label_id = [], []#每个字和这段话所对应的主题的编号
for i in range(len(contents)):
data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id])
label_id.append(cat_to_id[labels[i]])
# 使用keras提供的pad_sequences来将文本pad为固定长度
x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length)# 将句子都变成600大小的句子,超过600的从后边开始数,去除前边的,不足600的前边补零
y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id))# 将标签转换为one-hot表示
return x_pad, y_paddef batch_iter(x, y, batch_size=64):
"""生成批次数据"""
data_len = len(x)
num_batch = int((data_len - 1) / batch_size) + 1indices = np.random.permutation(np.arange(data_len))
x_shuffle = x[indices]
y_shuffle = y[indices]for i in range(num_batch):
start_id = i * batch_size
end_id = min((i + 1) * batch_size, data_len)
yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id]
CNN网络结构模型cnn_model.py:
# coding: utf-8import tensorflow as tfclass TCNNConfig(object):
"""CNN配置参数"""embedding_dim = 64# 词向量维度
seq_length = 600# 序列长度
num_classes = 10# 类别数
num_filters = 256# 卷积核数目
kernel_size = 5# 卷积核尺寸
vocab_size = 5000# 词汇表达小hidden_dim = 128# 全连接层神经元dropout_keep_prob = 0.5# dropout保留比例
learning_rate = 1e-3# 学习率batch_size = 64# 每批训练大小
num_epochs = 10# 总迭代轮次print_per_batch = 100# 每多少轮输出一次结果
save_per_batch = 10# 每多少轮存入tensorboardclass TextCNN(object):
"""文本分类,CNN模型"""def __init__(self, config):
self.config = config# 三个待输入的数据
self.input_x = tf.placeholder(tf.int32, [None, self.config.seq_length], name='input_x')
self.input_y = tf.placeholder(tf.float32, [None, self.config.num_classes], name='input_y')
self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')self.cnn()def cnn(self):
"""CNN模型"""
# 词向量映射
#强制代码在CPU上面执行操作。因为默认情况下,TensorFlow会尝试将操作放在GPU上面进行运行(如果存在GPU),
# 但是嵌入层的操作目前还不支持GPU运行,所以如果你不指定CPU进行运行,那么程序会报错。
with tf.device('/cpu:0'):
embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim])
embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x)#选取一个张量里面索引对应的元素
#选取embedding里边以self.input_x为索引的元素with tf.name_scope("cnn"):
# CNN layer
conv = tf.layers.conv1d(embedding_inputs, self.config.num_filters, self.config.kernel_size, name='conv')
# global max pooling layer降维
gmp = tf.reduce_max(conv, reduction_indices=[1], name='gmp')with tf.name_scope("score"):
# 全连接层,后面接dropout以及relu激活
fc = tf.layers.dense(gmp, self.config.hidden_dim, name='fc1')
fc = tf.contrib.layers.dropout(fc, self.keep_prob)
fc = tf.nn.relu(fc)# 分类器
self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2')
self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1)# 预测类别with tf.name_scope("optimize"):
# 损失函数,交叉熵
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y)
self.loss = tf.reduce_mean(cross_entropy)
# 优化器
self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss)with tf.name_scope("accuracy"):
# 准确率
correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls)
self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))# tf.cast()将correct_pred 转化成tf.float32类型
网络的训练main.py:
#!/usr/bin/python
# -*- coding: utf-8 -*-from __future__ import print_functionimport os
import sys
import time
from datetime import timedeltaimport numpy as np
import tensorflow as tf
from sklearn import metricsfrom cnn_model import TCNNConfig, TextCNN
from data.cnews_loader import read_vocab, read_category, batch_iter, process_file, build_vocabbase_dir = 'data/cnews'
train_dir = os.path.join(base_dir, 'cnews.test.txt')# cnews.train.txt
test_dir = os.path.join(base_dir, 'cnews.test.txt')# cnews.test.txt
val_dir = os.path.join(base_dir, 'cnews.test.txt')
vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt')save_dir = 'checkpoints/textcnn'
save_path = os.path.join(save_dir, 'best_validation')# 最佳验证结果保存路径def get_time_dif(start_time):
"""获取已使用时间"""
end_time = time.time()
time_dif = end_time - start_time
return timedelta(seconds=int(round(time_dif)))def feed_data(x_batch, y_batch, keep_prob):
feed_dict = {
model.input_x: x_batch,
model.input_y: y_batch,
model.keep_prob: keep_prob
}
return feed_dictdef evaluate(sess, x_, y_):
"""评估在某一数据上的准确率和损失"""
data_len = len(x_)
batch_eval = batch_iter(x_, y_, 128)
total_loss = 0.0
total_acc = 0.0
for x_batch, y_batch in batch_eval:
batch_len = len(x_batch)
feed_dict = feed_data(x_batch, y_batch, 1.0)
loss, acc = sess.run([model.loss, model.acc], feed_dict=feed_dict)
total_loss += loss * batch_len
total_acc += acc * batch_lenreturn total_loss / data_len, total_acc / data_lendef train():
print("Configuring TensorBoard and Saver...")# 这个是可视化的参数保存处,也就是每次训练的时候我们都可以在这里看参数的边化
# 配置 Tensorboard,重新训练时,请将tensorboard文件夹删除,不然图会覆盖
tensorboard_dir = 'tensorboard/textcnn'
if not os.path.exists(tensorboard_dir):
os.makedirs(tensorboard_dir)tf.summary.scalar("loss", model.loss)
tf.summary.scalar("accuracy", model.acc)
merged_summary = tf.summary.merge_all()
writer = tf.summary.FileWriter(tensorboard_dir)# 配置 Saver
saver = tf.train.Saver()# 也就是我们说的checkpoint存放处,这个是参数存放处,可以继续训练或者保存最好的模型
if not os.path.exists(save_dir):
os.makedirs(save_dir)print("Loading training and validation data...")
# 载入训练集与验证集
start_time = time.time()
x_train, y_train = process_file(train_dir, word_to_id, cat_to_id, config.seq_length)
#word_to_id是字的编号, cat_to_id是主题的编号
#x_train是文本中每个字对应的编号的集合
#y_train是每段话对应主题的集合
x_val, y_val = process_file(val_dir, word_to_id, cat_to_id, config.seq_length)
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)# 创建session
session = tf.Session()
session.run(tf.global_variables_initializer())
writer.add_graph(session.graph)print('Training and evaluating...')
start_time = time.time()
total_batch = 0# 总批次
best_acc_val = 0.0# 最佳验证集准确率
last_improved = 0# 记录上一次提升批次
require_improvement = 1000# 如果超过1000轮未提升,提前结束训练
#
flag = False
for epoch in range(config.num_epochs):
print('Epoch:', epoch + 1)
batch_train = batch_iter(x_train, y_train, config.batch_size)
for x_batch, y_batch in batch_train:
feed_dict = feed_data(x_batch, y_batch, config.dropout_keep_prob)# 将三个数据和标签放在一块,是model的传参
loss_train, acc_train= session.run([model.loss, model.acc], feed_dict=feed_dict)
#print("x_batch is {}".format(x_batch.shape))
if total_batch % config.save_per_batch == 0:
# 每多少轮次将训练结果写入tensorboard scalar
s = session.run(merged_summary, feed_dict=feed_dict)
writer.add_summary(s, total_batch)if total_batch % config.print_per_batch == 0:
# 每多少轮次输出在训练集和验证集上的性能
feed_dict[model.keep_prob] = 1.0
loss_train, acc_train = session.run([model.loss, model.acc], feed_dict=feed_dict)
loss_val, acc_val = evaluate(session, x_val, y_val)# todoif acc_val > best_acc_val:
# 保存最好结果
best_acc_val = acc_val
last_improved = total_batch
saver.save(sess=session, save_path=save_path)
improved_str = '*'
else:
improved_str = ''time_dif = get_time_dif(start_time)
msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%},' \
+ ' Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}'
print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improved_str))
#
session.run(model.optim, feed_dict=feed_dict)# 运行优化 真正开始运行,因为是相互依赖,倒着找的
total_batch += 1if total_batch - last_improved > require_improvement or acc_val > 0.98:
# 验证集正确率长期不提升,提前结束训练
print("No optimization for a long time, auto-stopping...")
flag = True
break# 跳出循环
if flag:# 同上
breakdef test():
print("Loading test data...")
start_time = time.time()
x_test, y_test = process_file(test_dir, word_to_id, cat_to_id, config.seq_length)
print(y_test)
session = tf.Session()
session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess=session, save_path=save_path)# 读取保存的模型print('Testing...')
loss_test, acc_test = evaluate(session, x_test, y_test)
msg = 'Test Loss: {0:>6.2}, Test Acc: {1:>7.2%}'
print(msg.format(loss_test, acc_test))batch_size = 128
data_len = len(x_test)
num_batch = int((data_len - 1) / batch_size) + 1y_test_cls = np.argmax(y_test, 1)
y_pred_cls = np.zeros(shape=len(x_test), dtype=np.int32)# 保存预测结果
for i in range(num_batch):# 逐批次处理
start_id = i * batch_size
end_id = min((i + 1) * batch_size, data_len)
feed_dict = {
model.input_x: x_test[start_id:end_id],
model.keep_prob: 1.0
}
y_pred_cls[start_id:end_id] = session.run(model.y_pred_cls, feed_dict=feed_dict)
#
# # 评估
print("Precision, Recall and F1-Score...")
print(metrics.classification_report(y_test_cls, y_pred_cls, target_names=categories))# 混淆矩阵
print("Confusion Matrix...")
cm = metrics.confusion_matrix(y_test_cls, y_pred_cls)
print(cm)time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)if __name__ == '__main__':config = TCNNConfig()# 1.加载配置参数;
初始化右边的类之后得到左边的对象config
if not os.path.exists(vocab_dir):# 如果不存在词汇表,重建
build_vocab(train_dir, vocab_dir, config.vocab_size)
categories, cat_to_id = read_category()# 制作分类目录
#cat_to_id是一个字典,对每一个类别进行编号
words, word_to_id = read_vocab(vocab_dir)#words是文件中每一行(这里一行是一个字)组成的列表,word_to_id是每一行编号组成的字典
config.vocab_size = len(words)#统计字的个数
model = TextCNN(config)
option='test'
if option == 'train':
train()
else:
test()
做出预测predict.py:
# coding: utf-8from __future__ import print_functionimport os
import tensorflow as tf
import tensorflow.contrib.keras as kr# 使用keras提供的pad_sequences来将文本pad为固定长度from cnn_model import TCNNConfig, TextCNN
from data.cnews_loader import read_category, read_vocabtry:
bool(type(unicode))
except NameError:
unicode = strbase_dir = 'data/cnews'
vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt')save_dir = 'checkpoints/textcnn'
save_path = os.path.join(save_dir, 'best_validation')# 最佳验证结果保存路径class CnnModel:
def __init__(self):
self.config = TCNNConfig()
self.categories, self.cat_to_id = read_category()
self.words, self.word_to_id = read_vocab(vocab_dir)
self.config.vocab_size = len(self.words)
self.model = TextCNN(self.config)self.session = tf.Session()
self.session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess=self.session, save_path=save_path)# 读取保存的模型def predict(self, message):
# 支持不论在python2还是python3下训练的模型都可以在2或者3的环境下运行
content = unicode(message)
data = https://www.it610.com/article/[self.word_to_id[x] for x in content if x in self.word_to_id]feed_dict = {
self.model.input_x: kr.preprocessing.sequence.pad_sequences([data], self.config.seq_length),
self.model.keep_prob: 1#失活率
}y_pred_cls = self.session.run(self.model.y_pred_cls, feed_dict=feed_dict)
return self.categories[y_pred_cls[0]]#因为y_pred_cls是列表所有取0if __name__ =='__main__':
cnn_model = CnnModel()
test_demo = ['有意思!细菌会发“垃圾邮件”基因从捕食者病毒中窃取遗传物质',
'ROARINGWILD 是由六名在校大学生成立于2010年5月4日的深圳本土原创品牌。一直以来,坚持原创设计,以街头风格为主。']
for i in test_demo:
print(cnn_model.predict(i))
结果:
【人工智能|tensorflow搭建CNN对文本进行分类】科技
时尚
推荐阅读
- 深度学习|基于keras深度学习模型新闻标签一二级分类
- 自然语言处理|人工智能框架实战精讲(Keras项目-英文语料的文本分类实战与调参优化)
- 实战|基于Keras搭建LSTM网络实现文本情感分类
- 笔记|[转]基于POI的功能区划分()
- OpenCV|OpenCV人脸识别
- 深度学习|一文看尽 6篇 CVPR2021 2D 异常检测论文
- 论文|论文分享-- >异常检测-- >Deep Autoencoding Gaussian Mixture Model for Unsupervised Anomaly Detection
- 深度学习|超大规模的产业实用语义分割数据集PSSL与预训练模型开源啦!
- 人工智能|高手云集、丰富活动,斩获佳绩,超过2万名开发者参与的AI社团邀你加入!