// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package admin

import (
	"fmt"
	"io"
	"sort"

	"github.com/hanchuanchuan/goInception/expression"
	"github.com/hanchuanchuan/goInception/kv"
	"github.com/hanchuanchuan/goInception/meta"
	"github.com/hanchuanchuan/goInception/model"
	"github.com/hanchuanchuan/goInception/mysql"
	"github.com/hanchuanchuan/goInception/sessionctx"
	"github.com/hanchuanchuan/goInception/sessionctx/stmtctx"
	"github.com/hanchuanchuan/goInception/table"
	"github.com/hanchuanchuan/goInception/table/tables"
	"github.com/hanchuanchuan/goInception/tablecodec"
	"github.com/hanchuanchuan/goInception/terror"
	"github.com/hanchuanchuan/goInception/types"
	"github.com/hanchuanchuan/goInception/util"
	"github.com/hanchuanchuan/goInception/util/chunk"
	"github.com/hanchuanchuan/goInception/util/sqlexec"
	"github.com/pingcap/errors"
	log "github.com/sirupsen/logrus"
)

// DDLInfo is for DDL information.
type DDLInfo struct {
	SchemaVer   int64
	ReorgHandle int64        // It's only used for DDL information.
	Jobs        []*model.Job // It's the currently running jobs.
}

// GetDDLInfo returns DDL information.
func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
	var err error
	info := &DDLInfo{}
	t := meta.NewMeta(txn)

	info.Jobs = make([]*model.Job, 0, 2)
	job, err := t.GetDDLJobByIdx(0)
	if err != nil {
		return nil, errors.Trace(err)
	}
	if job != nil {
		info.Jobs = append(info.Jobs, job)
	}
	addIdxJob, err := t.GetDDLJobByIdx(0, meta.AddIndexJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	if addIdxJob != nil {
		info.Jobs = append(info.Jobs, addIdxJob)
	}

	info.SchemaVer, err = t.GetSchemaVersion()
	if err != nil {
		return nil, errors.Trace(err)
	}
	if addIdxJob == nil {
		return info, nil
	}

	info.ReorgHandle, _, _, err = t.GetDDLReorgHandle(addIdxJob)
	if err != nil {
		return nil, errors.Trace(err)
	}

	return info, nil
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
	if len(ids) == 0 {
		return nil, nil
	}

	jobs, err := GetDDLJobs(txn)
	if err != nil {
		return nil, errors.Trace(err)
	}

	errs := make([]error, len(ids))
	t := meta.NewMeta(txn)
	for i, id := range ids {
		found := false
		for j, job := range jobs {
			if id != job.ID {
				log.Debugf("the job ID %d that needs to be canceled isn't equal to current job ID %d", id, job.ID)
				continue
			}
			found = true
			// These states can't be cancelled.
			if job.IsDone() || job.IsSynced() {
				errs[i] = errors.New("This job is finished, so can't be cancelled")
				continue
			}
			// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
			if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
				continue
			}
			job.State = model.JobStateCancelling
			// Make sure RawArgs isn't overwritten.
			err := job.DecodeArgs(job.RawArgs)
			if err != nil {
				errs[i] = errors.Trace(err)
				continue
			}
			if job.Type == model.ActionAddIndex {
				err = t.UpdateDDLJob(int64(j), job, true, meta.AddIndexJobListKey)
			} else {
				err = t.UpdateDDLJob(int64(j), job, true)
			}
			if err != nil {
				errs[i] = errors.Trace(err)
			}
		}
		if !found {
			errs[i] = errors.New("Can't find this job")
		}
	}
	return errs, nil
}

func getDDLJobsInQueue(t *meta.Meta, jobListKey meta.JobListKeyType) ([]*model.Job, error) {
	cnt, err := t.DDLJobQueueLen(jobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	jobs := make([]*model.Job, cnt)
	for i := range jobs {
		jobs[i], err = t.GetDDLJobByIdx(int64(i), jobListKey)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}
	return jobs, nil
}

// GetDDLJobs get all DDL jobs and sorts jobs by job.ID.
func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) {
	t := meta.NewMeta(txn)
	generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	addIdxJobs, err := getDDLJobsInQueue(t, meta.AddIndexJobListKey)
	if err != nil {
		return nil, errors.Trace(err)
	}
	jobs := append(generalJobs, addIdxJobs...)
	sort.Sort(jobArray(jobs))
	return jobs, nil
}

type jobArray []*model.Job

func (v jobArray) Len() int {
	return len(v)
}

func (v jobArray) Less(i, j int) bool {
	return v[i].ID < v[j].ID
}

func (v jobArray) Swap(i, j int) {
	v[i], v[j] = v[j], v[i]
}

// MaxHistoryJobs is exported for testing.
const MaxHistoryJobs = 10

// DefNumHistoryJobs is default value of the default number of history job
const DefNumHistoryJobs = 10

// GetHistoryDDLJobs returns the DDL history jobs and an error.
// The maximum count of history jobs is num.
func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) {
	t := meta.NewMeta(txn)
	jobs, err := t.GetAllHistoryDDLJobs()
	if err != nil {
		return nil, errors.Trace(err)
	}

	jobsLen := len(jobs)
	if jobsLen > maxNumJobs {
		start := jobsLen - maxNumJobs
		jobs = jobs[start:]
	}
	jobsLen = len(jobs)
	ret := make([]*model.Job, 0, jobsLen)
	for i := jobsLen - 1; i >= 0; i-- {
		ret = append(ret, jobs[i])
	}
	return ret, nil
}

func nextIndexVals(data []types.Datum) []types.Datum {
	// Add 0x0 to the end of data.
	return append(data, types.Datum{})
}

// RecordData is the record data composed of a handle and values.
type RecordData struct {
	Handle int64
	Values []types.Datum
}

func getCount(ctx sessionctx.Context, sql string) (int64, error) {
	rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
	if err != nil {
		return 0, errors.Trace(err)
	}
	if len(rows) != 1 {
		return 0, errors.Errorf("can not get count, sql %s result rows %d", sql, len(rows))
	}
	return rows[0].GetInt64(0), nil
}

// CheckIndicesCount compares indices count with table count.
// It returns nil if the count from the index is equal to the count from the table columns,
// otherwise it returns an error with a different information.
func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices []string) error {
	// Add `` for some names like `table name`.
	sql := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName)
	tblCnt, err := getCount(ctx, sql)
	if err != nil {
		return errors.Trace(err)
	}
	for _, idx := range indices {
		sql = fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s` USE INDEX(`%s`)", dbName, tableName, idx)
		idxCnt, err := getCount(ctx, sql)
		if err != nil {
			return errors.Trace(err)
		}
		if tblCnt != idxCnt {
			return errors.Errorf("table count %d != index(%s) count %d", tblCnt, idx, idxCnt)
		}
	}

	return nil
}

// ScanIndexData scans the index handles and values in a limited number, according to the index information.
// It returns data and the next startVals until it doesn't have data, then returns data is nil and
// the next startVals is the values which can't get data. If startVals = nil and limit = -1,
// it returns the index data of the whole.
func ScanIndexData(sc *stmtctx.StatementContext, txn kv.Transaction, kvIndex table.Index, startVals []types.Datum, limit int64) (
	[]*RecordData, []types.Datum, error) {
	it, _, err := kvIndex.Seek(sc, txn, startVals)
	if err != nil {
		return nil, nil, errors.Trace(err)
	}
	defer it.Close()

	var idxRows []*RecordData
	var curVals []types.Datum
	for limit != 0 {
		val, h, err1 := it.Next()
		if terror.ErrorEqual(err1, io.EOF) {
			return idxRows, nextIndexVals(curVals), nil
		} else if err1 != nil {
			return nil, nil, errors.Trace(err1)
		}
		idxRows = append(idxRows, &RecordData{Handle: h, Values: val})
		limit--
		curVals = val
	}

	nextVals, _, err := it.Next()
	if terror.ErrorEqual(err, io.EOF) {
		return idxRows, nextIndexVals(curVals), nil
	} else if err != nil {
		return nil, nil, errors.Trace(err)
	}

	return idxRows, nextVals, nil
}

// CompareIndexData compares index data one by one.
// It returns nil if the data from the index is equal to the data from the table columns,
// otherwise it returns an error with a different set of records.
// genExprs is use to calculate the virtual generate column.
func CompareIndexData(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[string]expression.Expression) error {
	err := checkIndexAndRecord(sessCtx, txn, t, idx, genExprs)
	if err != nil {
		return errors.Trace(err)
	}

	return CheckRecordAndIndex(sessCtx, txn, t, idx, genExprs)
}

func getIndexFieldTypes(t table.Table, idx table.Index) ([]*types.FieldType, error) {
	idxColumns := idx.Meta().Columns
	tblColumns := t.Meta().Columns
	fieldTypes := make([]*types.FieldType, 0, len(idxColumns))
	for _, col := range idxColumns {
		colInfo := model.FindColumnInfo(tblColumns, col.Name.L)
		if colInfo == nil {
			return nil, errors.Errorf("index col:%v not found in table:%v", col.Name.String(), t.Meta().Name.String())
		}

		fieldTypes = append(fieldTypes, &colInfo.FieldType)
	}
	return fieldTypes, nil
}

// adjustDatumKind treats KindString as KindBytes.
func adjustDatumKind(vals1, vals2 []types.Datum) {
	if len(vals1) != len(vals2) {
		return
	}

	for i, val1 := range vals1 {
		val2 := vals2[i]
		if val1.Kind() != val2.Kind() {
			if (val1.Kind() == types.KindBytes || val1.Kind() == types.KindString) &&
				(val2.Kind() == types.KindBytes || val2.Kind() == types.KindString) {
				vals1[i].SetBytes(val1.GetBytes())
				vals2[i].SetBytes(val2.GetBytes())
			}
		}
	}
}

func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[string]expression.Expression) error {
	it, err := idx.SeekFirst(txn)
	if err != nil {
		return errors.Trace(err)
	}
	defer it.Close()

	cols := make([]*table.Column, len(idx.Meta().Columns))
	for i, col := range idx.Meta().Columns {
		cols[i] = t.Cols()[col.Offset]
	}

	fieldTypes, err := getIndexFieldTypes(t, idx)
	if err != nil {
		return errors.Trace(err)
	}
	sc := sessCtx.GetSessionVars().StmtCtx
	for {
		vals1, h, err := it.Next()
		if terror.ErrorEqual(err, io.EOF) {
			break
		} else if err != nil {
			return errors.Trace(err)
		}

		vals1, err = tablecodec.UnflattenDatums(vals1, fieldTypes, sessCtx.GetSessionVars().Location())
		if err != nil {
			return errors.Trace(err)
		}
		vals2, err := rowWithCols(sessCtx, txn, t, h, cols, genExprs)
		vals2 = tables.TruncateIndexValuesIfNeeded(t.Meta(), idx.Meta(), vals2)
		if kv.ErrNotExist.Equal(err) {
			record := &RecordData{Handle: h, Values: vals1}
			err = ErrDataInConsistent.GenWithStack("index:%#v != record:%#v", record, nil)
		}
		if err != nil {
			return errors.Trace(err)
		}
		adjustDatumKind(vals1, vals2)
		if !compareDatumSlice(sc, vals1, vals2) {
			record1 := &RecordData{Handle: h, Values: vals1}
			record2 := &RecordData{Handle: h, Values: vals2}
			return ErrDataInConsistent.GenWithStack("index:%#v != record:%#v", record1, record2)
		}
	}

	return nil
}

func compareDatumSlice(sc *stmtctx.StatementContext, val1s, val2s []types.Datum) bool {
	if len(val1s) != len(val2s) {
		return false
	}
	for i, v := range val1s {
		res, err := v.CompareDatum(sc, &val2s[i])
		if err != nil || res != 0 {
			return false
		}
	}
	return true
}

// CheckRecordAndIndex is exported for testing.
func CheckRecordAndIndex(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[string]expression.Expression) error {
	sc := sessCtx.GetSessionVars().StmtCtx
	cols := make([]*table.Column, len(idx.Meta().Columns))
	for i, col := range idx.Meta().Columns {
		cols[i] = t.Cols()[col.Offset]
	}

	startKey := t.RecordKey(0)
	filterFunc := func(h1 int64, vals1 []types.Datum, cols []*table.Column) (bool, error) {
		for i, val := range vals1 {
			col := cols[i]
			if val.IsNull() {
				if mysql.HasNotNullFlag(col.Flag) {
					return false, errors.New("Miss")
				}
				// NULL value is regarded as its default value.
				colDefVal, err := table.GetColOriginDefaultValue(sessCtx, col.ToInfo())
				if err != nil {
					return false, errors.Trace(err)
				}
				vals1[i] = colDefVal
			}
		}
		isExist, h2, err := idx.Exist(sc, txn, vals1, h1)
		if kv.ErrKeyExists.Equal(err) {
			record1 := &RecordData{Handle: h1, Values: vals1}
			record2 := &RecordData{Handle: h2, Values: vals1}
			return false, ErrDataInConsistent.GenWithStack("index:%#v != record:%#v", record2, record1)
		}
		if err != nil {
			return false, errors.Trace(err)
		}
		if !isExist {
			record := &RecordData{Handle: h1, Values: vals1}
			return false, ErrDataInConsistent.GenWithStack("index:%#v != record:%#v", nil, record)
		}

		return true, nil
	}
	err := iterRecords(sessCtx, txn, t, startKey, cols, filterFunc, genExprs)

	if err != nil {
		return errors.Trace(err)
	}

	return nil
}

func scanTableData(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, cols []*table.Column, startHandle, limit int64) (
	[]*RecordData, int64, error) {
	var records []*RecordData

	startKey := t.RecordKey(startHandle)
	filterFunc := func(h int64, d []types.Datum, cols []*table.Column) (bool, error) {
		if limit != 0 {
			r := &RecordData{
				Handle: h,
				Values: d,
			}
			records = append(records, r)
			limit--
			return true, nil
		}
		return false, nil
	}
	err := iterRecords(sessCtx, retriever, t, startKey, cols, filterFunc, nil)
	if err != nil {
		return nil, 0, errors.Trace(err)
	}

	if len(records) == 0 {
		return records, startHandle, nil
	}

	nextHandle := records[len(records)-1].Handle + 1

	return records, nextHandle, nil
}

// ScanTableRecord scans table row handles and column values in a limited number.
// It returns data and the next startHandle until it doesn't have data, then returns data is nil and
// the next startHandle is the handle which can't get data. If startHandle = 0 and limit = -1,
// it returns the table data of the whole.
func ScanTableRecord(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startHandle, limit int64) (
	[]*RecordData, int64, error) {
	return scanTableData(sessCtx, retriever, t, t.Cols(), startHandle, limit)
}

// ScanSnapshotTableRecord scans the ver version of the table data in a limited number.
// It returns data and the next startHandle until it doesn't have data, then returns data is nil and
// the next startHandle is the handle which can't get data. If startHandle = 0 and limit = -1,
// it returns the table data of the whole.
func ScanSnapshotTableRecord(sessCtx sessionctx.Context, store kv.Storage, ver kv.Version, t table.Table, startHandle, limit int64) (
	[]*RecordData, int64, error) {
	snap, err := store.GetSnapshot(ver)
	if err != nil {
		return nil, 0, errors.Trace(err)
	}

	records, nextHandle, err := ScanTableRecord(sessCtx, snap, t, startHandle, limit)

	return records, nextHandle, errors.Trace(err)
}

// CompareTableRecord compares data and the corresponding table data one by one.
// It returns nil if data is equal to the data that scans from table, otherwise
// it returns an error with a different set of records. If exact is false, only compares handle.
func CompareTableRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, data []*RecordData, exact bool) error {
	m := make(map[int64][]types.Datum, len(data))
	for _, r := range data {
		if _, ok := m[r.Handle]; ok {
			return errRepeatHandle.GenWithStack("handle:%d is repeated in data", r.Handle)
		}
		m[r.Handle] = r.Values
	}

	startKey := t.RecordKey(0)
	sc := sessCtx.GetSessionVars().StmtCtx
	filterFunc := func(h int64, vals []types.Datum, cols []*table.Column) (bool, error) {
		vals2, ok := m[h]
		if !ok {
			record := &RecordData{Handle: h, Values: vals}
			return false, ErrDataInConsistent.GenWithStack("data:%#v != record:%#v", nil, record)
		}
		if !exact {
			delete(m, h)
			return true, nil
		}

		if !compareDatumSlice(sc, vals, vals2) {
			record1 := &RecordData{Handle: h, Values: vals2}
			record2 := &RecordData{Handle: h, Values: vals}
			return false, ErrDataInConsistent.GenWithStack("data:%#v != record:%#v", record1, record2)
		}

		delete(m, h)

		return true, nil
	}
	err := iterRecords(sessCtx, txn, t, startKey, t.Cols(), filterFunc, nil)
	if err != nil {
		return errors.Trace(err)
	}

	for h, vals := range m {
		record := &RecordData{Handle: h, Values: vals}
		return ErrDataInConsistent.GenWithStack("data:%#v != record:%#v", record, nil)
	}

	return nil
}

// genExprs use to calculate generated column value.
func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h int64, cols []*table.Column, genExprs map[string]expression.Expression) ([]types.Datum, error) {
	key := t.RecordKey(h)
	value, err := txn.Get(key)
	genColFlag := false
	if err != nil {
		return nil, errors.Trace(err)
	}
	v := make([]types.Datum, len(cols))
	colTps := make(map[int64]*types.FieldType, len(cols))
	for i, col := range cols {
		if col == nil {
			continue
		}
		if col.State != model.StatePublic {
			return nil, errInvalidColumnState.GenWithStack("Cannot use none public column - %v", cols)
		}
		if col.IsPKHandleColumn(t.Meta()) {
			if mysql.HasUnsignedFlag(col.Flag) {
				v[i].SetUint64(uint64(h))
			} else {
				v[i].SetInt64(h)
			}
			continue
		}
		// If have virtual generate column , decode all columns.
		if col.IsGenerated() && col.GeneratedStored == false {
			genColFlag = true
		}
		colTps[col.ID] = &col.FieldType
	}
	// if have virtual generate column, decode all columns
	if genColFlag {
		for _, c := range t.Cols() {
			if c.State != model.StatePublic {
				continue
			}
			colTps[c.ID] = &c.FieldType
		}
	}

	rowMap, err := tablecodec.DecodeRow(value, colTps, sessCtx.GetSessionVars().Location())
	if err != nil {
		return nil, errors.Trace(err)
	}

	if genColFlag && genExprs != nil {
		err = fillGenColData(sessCtx, rowMap, t, cols, genExprs)
		if err != nil {
			return v, errors.Trace(err)
		}
	}

	for i, col := range cols {
		if col == nil {
			continue
		}
		if col.State != model.StatePublic {
			// TODO: check this
			return nil, errInvalidColumnState.GenWithStack("Cannot use none public column - %v", cols)
		}
		if col.IsPKHandleColumn(t.Meta()) {
			continue
		}
		ri, ok := rowMap[col.ID]
		if !ok {
			if mysql.HasNotNullFlag(col.Flag) {
				return nil, errors.New("Miss")
			}
			// NULL value is regarded as its default value.
			colDefVal, err := table.GetColOriginDefaultValue(sessCtx, col.ToInfo())
			if err != nil {
				return nil, errors.Trace(err)
			}
			v[i] = colDefVal
			continue
		}
		v[i] = ri
	}
	return v, nil
}

// genExprs use to calculate generated column value.
func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
	fn table.RecordIterFunc, genExprs map[string]expression.Expression) error {
	it, err := retriever.Seek(startKey)
	if err != nil {
		return errors.Trace(err)
	}
	defer it.Close()

	if !it.Valid() {
		return nil
	}

	log.Debugf("startKey:%q, key:%q, value:%q", startKey, it.Key(), it.Value())

	genColFlag := false
	colMap := make(map[int64]*types.FieldType, len(cols))
	for _, col := range cols {
		if col.IsGenerated() && col.GeneratedStored == false {
			genColFlag = true
			break
		}
		colMap[col.ID] = &col.FieldType
	}
	if genColFlag {
		for _, col := range t.Cols() {
			colMap[col.ID] = &col.FieldType
		}
	}

	prefix := t.RecordPrefix()
	for it.Valid() && it.Key().HasPrefix(prefix) {
		// first kv pair is row lock information.
		// TODO: check valid lock
		// get row handle
		handle, err := tablecodec.DecodeRowKey(it.Key())
		if err != nil {
			return errors.Trace(err)
		}

		rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, sessCtx.GetSessionVars().Location())
		if err != nil {
			return errors.Trace(err)
		}

		if genColFlag && genExprs != nil {
			err = fillGenColData(sessCtx, rowMap, t, cols, genExprs)
			if err != nil {
				return errors.Trace(err)
			}
		}

		data := make([]types.Datum, 0, len(cols))
		for _, col := range cols {
			if col.IsPKHandleColumn(t.Meta()) {
				if mysql.HasUnsignedFlag(col.Flag) {
					data = append(data, types.NewUintDatum(uint64(handle)))
				} else {
					data = append(data, types.NewIntDatum(handle))
				}
			} else {
				data = append(data, rowMap[col.ID])
			}
		}
		more, err := fn(handle, data, cols)
		if !more || err != nil {
			return errors.Trace(err)
		}

		rk := t.RecordKey(handle)
		err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
		if err != nil {
			return errors.Trace(err)
		}
	}

	return nil
}

// genExprs use to calculate generated column value.
func fillGenColData(sessCtx sessionctx.Context, rowMap map[int64]types.Datum, t table.Table, cols []*table.Column, genExprs map[string]expression.Expression) error {
	tableInfo := t.Meta()
	row := make([]types.Datum, len(t.Cols()))
	for _, col := range t.Cols() {
		ri, ok := rowMap[col.ID]
		if ok {
			row[col.Offset] = ri
		}
	}

	var err error
	for _, col := range cols {
		if !col.IsGenerated() || col.GeneratedStored == true {
			continue
		}
		genColumnName := model.GetTableColumnID(tableInfo, col.ColumnInfo)
		if expr, ok := genExprs[genColumnName]; ok {
			var val types.Datum
			val, err = expr.Eval(chunk.MutRowFromDatums(row).ToRow())
			if err != nil {
				return errors.Trace(err)
			}
			val, err = table.CastValue(sessCtx, val, col.ToInfo())
			if err != nil {
				return errors.Trace(err)
			}
			rowMap[col.ID] = val
		}
	}
	return nil
}

// admin error codes.
const (
	codeDataNotEqual       terror.ErrCode = 1
	codeRepeatHandle                      = 2
	codeInvalidColumnState                = 3
)

var (
	// ErrDataInConsistent indicate that meets inconsistent data.
	ErrDataInConsistent   = terror.ClassAdmin.New(codeDataNotEqual, "data isn't equal")
	errRepeatHandle       = terror.ClassAdmin.New(codeRepeatHandle, "handle is repeated")
	errInvalidColumnState = terror.ClassAdmin.New(codeInvalidColumnState, "invalid column state")
)