Слияние кода завершено, страница обновится автоматически
package session
import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"sync"
"unicode/utf8"
mysqlDriver "github.com/go-sql-driver/mysql"
"github.com/hanchuanchuan/goInception/ast"
"github.com/hanchuanchuan/goInception/mysql"
log "github.com/sirupsen/logrus"
)
// chanBackup 备份channal数据,用来传递备份的sql等信息
type chanBackup struct {
// values []interface{}
// 库名
dbname string
values []interface{}
record *Record
}
func (s *session) processChanBackup(wg *sync.WaitGroup) {
for {
r := <-s.chBackupRecord
if r == nil {
s.flushBackupRecord(s.lastBackupTable, s.myRecord)
wg.Done()
break
}
// flush标志. 不能在外面调用flush函数,会导致线程并发操作,写入数据错误
// 如数据尚未进入到ch通道,此时调用flush,数据无法正确入库
if r.values == nil {
s.flushBackupRecord(r.dbname, r.record)
} else {
s.writeBackupRecord(r.dbname, r.record, r.values)
}
}
}
func (s *session) runBackup(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
s.chBackupRecord = make(chan *chanBackup, 50)
go s.processChanBackup(&wg)
defer func() {
close(s.chBackupRecord)
wg.Wait()
// 清空临时的库名
s.lastBackupTable = ""
}()
for _, record := range s.recordSets.All() {
if s.checkSqlIsDML(record) || s.checkSqlIsDDL(record) {
s.myRecord = record
longDataType := s.mysqlCreateBackupTable(record)
// errno := s.mysqlCreateBackupTable(record)
// if errno == 2 {
// break
// }
if record.TableInfo == nil {
s.appendErrorNo(ErrNotFoundTableInfo)
} else {
s.mysqlBackupSql(record, longDataType)
}
if s.hasError() {
break
}
}
// // 进程Killed
// if err := checkClose(ctx); err != nil {
// log.Warn("Killed: ", err)
// s.AppendErrorMessage("Operation has been killed!")
// break
// }
}
}
// 解析的sql写入缓存,并定期入库
func (s *session) writeBackupRecord(dbname string, record *Record, values []interface{}) {
s.insertBuffer = append(s.insertBuffer, values...)
// 每500行insert提交一次
if len(s.insertBuffer) >= 500*11 {
s.flushBackupRecord(dbname, record)
}
}
// flush用以写入当前insert缓存,并清空缓存.
func (s *session) flushBackupRecord(dbname string, record *Record) {
// log.Info("flush ", len(s.insertBuffer))
if len(s.insertBuffer) > 0 {
const backupRecordColumnCount int = 11
const rowSQL = "(?,?,?,?,?,?,?,?,?,?,NOW(),?),"
tableName := fmt.Sprintf("`%s`.`%s`", dbname, remoteBackupTable)
sql := "insert into %s values%s"
values := strings.TrimRight(
strings.Repeat(rowSQL, len(s.insertBuffer)/backupRecordColumnCount), ",")
err := s.backupdb.Exec(fmt.Sprintf(sql, tableName, values),
s.insertBuffer...).Error
if err != nil {
log.Error(err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.recordSets.MaxLevel = 2
record.StageStatus = StatusBackupFail
record.appendErrorMessage(myErr.Message)
}
}
// s.BackupTotalRows += len(s.insertBuffer) / backupRecordColumnCount
// s.SetMyProcessInfo(record.Sql, time.Now(),
// float64(s.BackupTotalRows)/float64(s.TotalChangeRows))
s.insertBuffer = nil
}
}
func (s *session) mysqlExecuteBackupSqlForDDL(record *Record) {
if record.DDLRollback == "" {
return
}
var buf strings.Builder
buf.WriteString("INSERT INTO ")
dbname := s.getRemoteBackupDBName(record)
buf.WriteString(fmt.Sprintf("`%s`.`%s`", dbname, record.TableInfo.Name))
buf.WriteString("(rollback_statement, opid_time) VALUES('")
buf.WriteString(HTMLEscapeString(record.DDLRollback))
buf.WriteString("','")
buf.WriteString(record.OPID)
buf.WriteString("')")
sql := buf.String()
if err := s.backupdb.Exec(sql).Error; err != nil {
log.Errorf("con:%d %v sql:%s", s.sessionVars.ConnectionID, err, sql)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMessage(myErr.Message)
} else {
s.appendErrorMessage(err.Error())
}
record.StageStatus = StatusBackupFail
}
record.StageStatus = StatusBackupOK
}
// mysqlExecuteBackupInfoInsertSql 写入备份记录表
// longDataType 为true表示字段类型已更新,否则为text,需要在写入时自动截断
func (s *session) mysqlExecuteBackupInfoInsertSql(record *Record, longDataType bool) int {
record.OPID = makeOPIDByTime(record.ExecTimestamp, record.ThreadId, record.SeqNo)
typeStr := "UNKNOWN"
switch record.Type.(type) {
case *ast.InsertStmt:
typeStr = "INSERT"
case *ast.DeleteStmt:
typeStr = "DELETE"
case *ast.UpdateStmt:
typeStr = "UPDATE"
case *ast.CreateDatabaseStmt:
typeStr = "CREATEDB"
case *ast.CreateTableStmt:
typeStr = "CREATETABLE"
case *ast.AlterTableStmt:
typeStr = "ALTERTABLE"
case *ast.DropTableStmt:
typeStr = "DROPTABLE"
case *ast.RenameTableStmt:
typeStr = "RENAMETABLE"
case *ast.CreateIndexStmt:
typeStr = "CREATEINDEX"
case *ast.DropIndexStmt:
typeStr = "DROPINDEX"
default:
log.Warning("类型未知: ", record.Type)
}
sql_stmt := HTMLEscapeString(record.Sql)
// 已更新sql_statement类型为mediumtext
// longDataType 为true表示字段类型已更新,否则为text,需要在写入时自动截断
// 最大可存储65535个字节(64KB-1)
if !longDataType && len(sql_stmt) > (1<<16)-1 {
s.appendWarning(ErrDataTooLong, "sql_statement", 1)
sql_stmt = sql_stmt[:(1<<16)-4]
// 如果误截取了utf8字符,则往前找最后一个有效字符
for {
ch, _ := utf8.DecodeLastRuneInString(sql_stmt)
if ch != utf8.RuneError {
break
} else {
sql_stmt = sql_stmt[:len(sql_stmt)-1]
}
}
sql_stmt = sql_stmt + "..."
}
values := []interface{}{
record.OPID,
record.StartFile,
strconv.Itoa(record.StartPosition),
record.EndFile,
strconv.Itoa(record.EndPosition),
sql_stmt,
s.opt.Host,
record.TableInfo.Schema,
record.TableInfo.Name,
strconv.Itoa(s.opt.Port),
typeStr,
}
dbName := s.getRemoteBackupDBName(record)
if s.lastBackupTable == "" {
s.lastBackupTable = dbName
}
// 库名改变时强制flush
if s.lastBackupTable != dbName {
s.chBackupRecord <- &chanBackup{
dbname: s.lastBackupTable,
record: record,
values: nil,
}
s.lastBackupTable = dbName
}
s.chBackupRecord <- &chanBackup{
dbname: dbName,
record: record,
values: values,
}
return 0
}
func (s *session) mysqlCreateSqlBackupTable(dbname string) string {
// if not exists
buf := bytes.NewBufferString("CREATE TABLE ")
buf.WriteString(fmt.Sprintf("`%s`.`%s`", dbname, remoteBackupTable))
buf.WriteString("(")
buf.WriteString("opid_time varchar(50),")
buf.WriteString("start_binlog_file varchar(512),")
buf.WriteString("start_binlog_pos int,")
buf.WriteString("end_binlog_file varchar(512),")
buf.WriteString("end_binlog_pos int,")
buf.WriteString("sql_statement mediumtext,")
buf.WriteString("host VARCHAR(64),")
buf.WriteString("dbname VARCHAR(64),")
buf.WriteString("tablename VARCHAR(64),")
buf.WriteString("port INT,")
buf.WriteString("time TIMESTAMP,")
buf.WriteString("type VARCHAR(20),")
buf.WriteString("PRIMARY KEY(opid_time)")
buf.WriteString(")ENGINE INNODB DEFAULT CHARSET UTF8MB4;")
return buf.String()
}
func (s *session) mysqlCreateSqlFromTableInfo(dbname string, ti *TableInfo) string {
buf := bytes.NewBufferString("CREATE TABLE if not exists ")
buf.WriteString(fmt.Sprintf("`%s`.`%s`", dbname, ti.Name))
buf.WriteString("(")
buf.WriteString("id bigint auto_increment primary key, ")
buf.WriteString("rollback_statement mediumtext, ")
buf.WriteString("opid_time varchar(50)")
buf.WriteString(") ENGINE INNODB DEFAULT CHARSET UTF8MB4;")
return buf.String()
}
func (s *session) getRemoteBackupDBName(record *Record) string {
if record.BackupDBName != "" {
return record.BackupDBName
}
v := fmt.Sprintf("%s_%d_%s", s.opt.Host, s.opt.Port, record.TableInfo.Schema)
if len(v) > mysql.MaxDatabaseNameLength {
v = v[len(v)-mysql.MaxDatabaseNameLength:]
// s.AppendErrorNo(ER_TOO_LONG_BAKDB_NAME, s.opt.host, s.opt.port, record.TableInfo.Schema)
// return ""
}
v = strings.Replace(v, "-", "_", -1)
v = strings.Replace(v, ".", "_", -1)
record.BackupDBName = v
return record.BackupDBName
}
// mysqlCreateBackupTable 创建备份表.
// 如果备份表的表结构是旧表结构,即sql_statement字段类型为text,则返回false,否则返回true
// longDataType 为true表示字段类型已更新,否则为text,需要在写入时自动截断
func (s *session) mysqlCreateBackupTable(record *Record) (longDataType bool) {
if record.TableInfo == nil {
return
}
backupDBName := s.getRemoteBackupDBName(record)
if backupDBName == "" {
return
}
if record.TableInfo.IsCreated {
// 返回longDataType值
key := fmt.Sprintf("%s.%s", backupDBName, remoteBackupTable)
if v, ok := s.backupTableCacheList[key]; ok {
return v
}
return
}
if _, ok := s.backupDBCacheList[backupDBName]; !ok {
sql := fmt.Sprintf("create database if not exists `%s`;", backupDBName)
if err := s.backupdb.Exec(sql).Error; err != nil {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
if myErr.Number != 1007 { /*ER_DB_CREATE_EXISTS*/
s.appendErrorMessage(myErr.Message)
return
}
} else {
s.appendErrorMessage(err.Error())
return
}
}
s.backupDBCacheList[backupDBName] = true
}
key := fmt.Sprintf("%s.%s", backupDBName, record.TableInfo.Name)
if _, ok := s.backupTableCacheList[key]; !ok {
createSql := s.mysqlCreateSqlFromTableInfo(backupDBName, record.TableInfo)
if err := s.backupdb.Exec(createSql).Error; err != nil {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
if myErr.Number != 1050 { /*ER_TABLE_EXISTS_ERROR*/
s.appendErrorMessage(myErr.Message)
return
}
} else {
s.appendErrorMessage(err.Error())
return
}
}
s.backupTableCacheList[key] = true
}
key = fmt.Sprintf("%s.%s", backupDBName, remoteBackupTable)
if _, ok := s.backupTableCacheList[key]; !ok {
createSql := s.mysqlCreateSqlBackupTable(backupDBName)
if err := s.backupdb.Exec(createSql).Error; err != nil {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
if myErr.Number != 1050 { /*ER_TABLE_EXISTS_ERROR*/
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
s.appendErrorMessage(myErr.Message)
return
} else {
// 获取sql_statement字段类型,用以兼容类型为text的旧表结构
longDataType = s.checkBackupTableSqlStmtColumnType(backupDBName)
}
} else {
s.appendErrorMessage(err.Error())
return
}
} else {
longDataType = true
}
s.backupTableCacheList[key] = longDataType
}
record.TableInfo.IsCreated = true
return
}
// checkBackupTableSqlStmtColumnType 检查sql_statement字段类型,用以兼容类型为text的旧表结构
func (s *session) checkBackupTableSqlStmtColumnType(dbname string) (longDataType bool) {
// 获取sql_statement字段类型,用以兼容类型为text的旧表结构
sql := fmt.Sprintf(`select DATA_TYPE from information_schema.columns
where table_schema='%s' and table_name='%s' and column_name='sql_statement';`,
dbname, remoteBackupTable)
var res string
rows, err2 := s.backupdb.DB().Query(sql)
if err2 != nil {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err2)
if myErr, ok := err2.(*mysqlDriver.MySQLError); ok {
s.appendErrorMessage(myErr.Message)
} else {
s.appendErrorMessage(err2.Error())
}
}
if rows != nil {
defer rows.Close()
for rows.Next() {
rows.Scan(&res)
}
return res != "text"
}
return
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )