무회blog

200806, test 중 , keras_bert 본문

Python/mechineLearning

200806, test 중 , keras_bert

최무회 2020. 8. 6. 21:03

https://blog.csdn.net/asialee_bird/article/details/102747435

https://github.com/bojone/bert_in_keras/blob/master/sentiment.py

https://search.gitee.com/?q=keras%20bert&skin=rec&type=repository

https://www.kesci.com/home/project/5e78a11198d4a8002d2c52bc

 

keras bert - Gitee

语言: 全部 排序: 最佳匹配

search.gitee.com

 

bojone/bert_in_keras

在Keras下微调Bert的一些例子;some examples of bert in keras. Contribute to bojone/bert_in_keras development by creating an account on GitHub.

github.com

 

Bert文本分类(基于keras-bert实现)_Asia-Lee的博客-CSDN博客

目录 一、Bert 预训练模型准备 二、Bert 模型文本分类 1、数据准备 2、代码实现 3、分类过程与结果 一、Bert 预训练模型准备 中文预训练模型下载      当Bert遇上Keras:这可能是Bert最简单的打开�

blog.csdn.net

import json
import numpy as np
import pandas as pd
from random import choice
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
import re, os
import codecs
from Cleaning_Text import Cleaning_Text
# -------------------------------------------------------
import codecs, gc
from sklearn.model_selection import KFold
from keras.metrics import top_k_categorical_accuracy
from keras.layers import *
from keras.callbacks import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam
from keras.utils import to_categorical

print(dir(Model))
# -------------------------------------------------------
print('there is bert ')

maxlen = 100

config_path = './../multi_cased_L-12_H-768_A-12/bert_config.json'
checkpoint_path = './../multi_cased_L-12_H-768_A-12/bert_model.ckpt'
dict_path = './../multi_cased_L-12_H-768_A-12/vocab.txt'


read_path='./../../04-srcTest/test_data/test_allData_5000.xlsx'

df=pd.read_excel(read_path)
df = df[['subMenu', 'content']].sample(1000)
df2 = df.copy()
type(df2.subMenu.values)
label_name = set(df2.subMenu.values)
label_name = sorted(list(label_name))
print(label_name)

labels = []
subMenu = df.subMenu.tolist()
for i, j in enumerate(subMenu):
    for k, s in enumerate(label_name):
        if j == s:
            labels.append(str(k))
            k += 1

print(labels)
print(len(labels))

## 라벨 적용
df2['labels'] = labels
df2 = df2[['labels', 'subMenu', 'content']]
# print(df2)
print(df2.head(5))


type(df2.content.values)
datas = df2.content.values
len(datas)

data = []
for i in datas:
    ts = i
    ts = Cleaning_Text.text_cleaning(ts)
    ts = ts.split('\n')
    data.append(ts)

#  content 데이터에 [SEP] 적용
dt = []
for i, j in enumerate(data):
    dts = []
    for k in j:
        ts = k
        #         ts =  ts + ' [SEP] '
        dts.append(ts)
    dt.append(dts)

print('dt',dt)

#  content 데이터에 [CLS] 적용
dtt = dt
# dtt = ['[CLS]' + Cleaning_Text.listToText(dt[x]) for x in range(len(dt))]
dtt[2]
df2['cts'] = dtt
df2 = df2[['labels', 'subMenu', 'cts']]
df3 = df2.copy()

df3['id'] = [x for x in range(500)]

# df3 = df3[['id','subMenu','cts','labels']]
df3 = df3.rename(columns={'subMenu':'type', 'cts':'contents'})

df3 = df3[['id', 'type', 'contents', 'labels']].astype(str)
print(df3)

train_df=df3[:400].astype(str)
test_df= df3[400:].astype(str)

# train_df['labels'] = train_df.labels.values
# test_df['labels'] = test_df.labels.values
print(type(train_df.labels.values))


# --------------------------------------------------------------------------
# 将词表中的词编号转换为字典
token_dict = {}
with codecs.open(dict_path, 'r', 'utf8') as reader:
    print('reader, ' , reader)
    for line in reader:
        token = line.strip()
        token_dict[token] = len(token_dict)


# 重写tokenizer
class OurTokenizer(Tokenizer):
    def _tokenize(self, text):
        R = []
        for c in text:
            if c in self._token_dict:
                R.append(c)
            elif self._is_space(c):
                R.append('[unused1]')  # 用[unused1]来表示空格类字符
            else:
                R.append('[UNK]')  # 不在列表的字符用[UNK]表示
        return R
tokenizer = OurTokenizer(token_dict)
# --------------------------------------------------------------------------
#让每条文本的长度相同,用0填充
def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X])
# --------------------------------------------------------------------------
# data_generator只是一种为了节约内存的数据方式
class data_generator:
    def __init__(self, data, batch_size=32, shuffle=True):
        self.data = data
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))

            if self.shuffle:
                np.random.shuffle(idxs)

            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y[:, 0, :]
                    [X1, X2, Y] = [], [], []

# --------------------------------------------------------------------------
#计算top-k正确率,当预测值的前k个值中存在目标类别即认为预测正确
def acc_top2(y_true, y_pred):
    return top_k_categorical_accuracy(y_true, y_pred, k=2)

# --------------------------------------------------------------------------
# bert模型设置
def build_bert(nclass):
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)  # 加载预训练模型

    for l in bert_model.layers:
        l.trainable = True

    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))

    x = bert_model([x1_in, x2_in])
    x = Lambda(lambda x: x[:, 0])(x)  # 取出[CLS]对应的向量用来做分类
    p = Dense(nclass, activation='softmax')(x)

    model = Model([x1_in, x2_in], p)
    model.compile(loss='categorical_crossentropy',
                  optimizer=Adam(1e-5),  # 用足够小的学习率
                  metrics=['accuracy', acc_top2])
    print(model.summary())
    return model
# --------------------------------------------------------------------------
# 训练数据、测试数据和标签转化为模型输入格式
DATA_LIST = []
for data_row in train_df.iloc[:].itertuples():
    print(len(data_row.labels))
    print(data_row)
    print(type(data_row))
    print('-'*50)
    # DATA_LIST.append((data_row.contents, to_categorical(data_row.labels, 3)))

    DATA_LIST.append((data_row.contents, to_categorical(data_row.labels)))
DATA_LIST = np.array(DATA_LIST)

DATA_LIST_TEST = []
for data_row in test_df.iloc[:].itertuples():
    # DATA_LIST_TEST.append((data_row.contents, to_categorical(0, 3)))
    DATA_LIST_TEST.append((data_row.contents, to_categorical(data_row.labels)))
DATA_LIST_TEST = np.array(DATA_LIST_TEST)

# --------------------------------------------------------------------------

# 交叉验证训练和测试模型
def run_cv(nfold, data, data_labels, data_test):
    kf = KFold(n_splits=nfold, shuffle=True, random_state=520).split(data)
    train_model_pred = np.zeros((len(data), 3))
    test_model_pred = np.zeros((len(data_test), 3))

    for i, (train_fold, test_fold) in enumerate(kf):
        X_train, X_valid, = data[train_fold, :], data[test_fold, :]

        model = build_bert(3)
        early_stopping = EarlyStopping(monitor='val_acc', patience=3)  # 早停法,防止过拟合
        plateau = ReduceLROnPlateau(monitor="val_acc", verbose=1, mode='max', factor=0.5,
                                    patience=2)  # 当评价指标不在提升时,减少学习率
        checkpoint = ModelCheckpoint('./bert_dump/' + str(i) + '.hdf5', monitor='val_acc', verbose=2,
                                     save_best_only=True, mode='max', save_weights_only=True)  # 保存最好的模型

        train_D = data_generator(X_train, shuffle=True)
        valid_D = data_generator(X_valid, shuffle=True)
        test_D = data_generator(data_test, shuffle=False)
        # 模型训练
        model.fit_generator(
            train_D.__iter__(),
            steps_per_epoch=len(train_D),
            epochs=5,
            validation_data=valid_D.__iter__(),
            validation_steps=len(valid_D),
            callbacks=[early_stopping, plateau, checkpoint],
        )

        # model.load_weights('./bert_dump/' + str(i) + '.hdf5')

        # return model
        train_model_pred[test_fold, :] = model.predict_generator(valid_D.__iter__(), steps=len(valid_D), verbose=1)
        test_model_pred += model.predict_generator(test_D.__iter__(), steps=len(test_D), verbose=1)

        del model
        gc.collect()  # 清理内存
        K.clear_session()  # clear_session就是清除一个session
        # break

    return train_model_pred, test_model_pred

# --------------------------------------------------------------------------

# n折交叉验证
train_model_pred, test_model_pred = run_cv(2, DATA_LIST, None, DATA_LIST_TEST)

test_pred = [np.argmax(x) for x in test_model_pred]

# 将测试集预测结果写入文件
output = pd.DataFrame({'id': test_df.id, 'sentiment': test_pred})
output.to_csv('./results.csv', index=None)




Comments