// Copyright 2013 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSES/QL-LICENSE file.

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
	"fmt"
	"sync"
	"time"

	"github.com/coreos/etcd/clientv3"
	"github.com/hanchuanchuan/goInception/ast"
	"github.com/hanchuanchuan/goInception/ddl/util"
	"github.com/hanchuanchuan/goInception/infoschema"
	"github.com/hanchuanchuan/goInception/kv"
	"github.com/hanchuanchuan/goInception/meta"
	"github.com/hanchuanchuan/goInception/model"
	"github.com/hanchuanchuan/goInception/mysql"
	"github.com/hanchuanchuan/goInception/owner"
	"github.com/hanchuanchuan/goInception/sessionctx"
	"github.com/hanchuanchuan/goInception/sessionctx/binloginfo"
	"github.com/hanchuanchuan/goInception/sessionctx/variable"
	"github.com/hanchuanchuan/goInception/table"
	"github.com/hanchuanchuan/goInception/terror"
	tidbutil "github.com/hanchuanchuan/goInception/util"
	"github.com/ngaut/pools"
	"github.com/pingcap/errors"
	log "github.com/sirupsen/logrus"
	"github.com/twinj/uuid"
	"golang.org/x/net/context"
)

const (
	// currentVersion is for all new DDL jobs.
	currentVersion = 1
	// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
	DDLOwnerKey = "/tidb/ddl/fg/owner"
	ddlPrompt   = "ddl"

	shardRowIDBitsMax = 15
)

var (
	// TableColumnCountLimit is limit of the number of columns in a table.
	// It's exported for testing.
	TableColumnCountLimit = 512
	// EnableSplitTableRegion is a flag to decide whether to split a new region for
	// a newly created table. It takes effect only if the Storage supports split
	// region.
	EnableSplitTableRegion = false

	// PartitionCountLimit is limit of the number of partitions in a table.
	// Mysql maximum number of partitions is 8192, our maximum number of partitions is 1024.
	// Reference linking https://dev.mysql.com/doc/refman/5.7/en/partitioning-limitations.html.
	PartitionCountLimit = 1024
)

var (
	// errWorkerClosed means we have already closed the DDL worker.
	errInvalidWorker = terror.ClassDDL.New(codeInvalidWorker, "invalid worker")
	// errNotOwner means we are not owner and can't handle DDL jobs.
	errNotOwner              = terror.ClassDDL.New(codeNotOwner, "not Owner")
	errInvalidDDLJob         = terror.ClassDDL.New(codeInvalidDDLJob, "invalid DDL job")
	errCancelledDDLJob       = terror.ClassDDL.New(codeCancelledDDLJob, "cancelled DDL job")
	errInvalidJobFlag        = terror.ClassDDL.New(codeInvalidJobFlag, "invalid job flag")
	errRunMultiSchemaChanges = terror.ClassDDL.New(codeRunMultiSchemaChanges, "can't run multi schema change")
	errWaitReorgTimeout      = terror.ClassDDL.New(codeWaitReorgTimeout, "wait for reorganization timeout")
	errInvalidStoreVer       = terror.ClassDDL.New(codeInvalidStoreVer, "invalid storage current version")

	// We don't support dropping column with index covered now.
	errCantDropColWithIndex    = terror.ClassDDL.New(codeCantDropColWithIndex, "can't drop column with index")
	errUnsupportedAddColumn    = terror.ClassDDL.New(codeUnsupportedAddColumn, "unsupported add column")
	errUnsupportedModifyColumn = terror.ClassDDL.New(codeUnsupportedModifyColumn, "unsupported modify column %s")
	errUnsupportedPKHandle     = terror.ClassDDL.New(codeUnsupportedDropPKHandle,
		"unsupported drop integer primary key")
	errUnsupportedCharset = terror.ClassDDL.New(codeUnsupportedCharset, "unsupported charset %s collate %s")

	errUnsupportedShardRowIDBits = terror.ClassDDL.New(codeUnsupportedShardRowIDBits, "unsupported shard_row_id_bits for table with auto_increment column.")

	errBlobKeyWithoutLength = terror.ClassDDL.New(codeBlobKeyWithoutLength, "index for BLOB/TEXT column must specify a key length")
	errIncorrectPrefixKey   = terror.ClassDDL.New(codeIncorrectPrefixKey, "Incorrect prefix key; the used key part isn't a string, the used length is longer than the key part, or the storage engine doesn't support unique prefix keys")
	errTooLongKey           = terror.ClassDDL.New(codeTooLongKey,
		fmt.Sprintf("Specified key was too long; max key length is %d bytes", maxPrefixLength))
	errKeyColumnDoesNotExits    = terror.ClassDDL.New(codeKeyColumnDoesNotExits, "this key column doesn't exist in table")
	errUnknownTypeLength        = terror.ClassDDL.New(codeUnknownTypeLength, "Unknown length for type tp %d")
	errUnknownFractionLength    = terror.ClassDDL.New(codeUnknownFractionLength, "Unknown Length for type tp %d and fraction %d")
	errInvalidJobVersion        = terror.ClassDDL.New(codeInvalidJobVersion, "DDL job with version %d greater than current %d")
	errFileNotFound             = terror.ClassDDL.New(codeFileNotFound, "Can't find file: './%s/%s.frm'")
	errErrorOnRename            = terror.ClassDDL.New(codeErrorOnRename, "Error on rename of './%s/%s' to './%s/%s'")
	errBadField                 = terror.ClassDDL.New(codeBadField, "Unknown column '%s' in '%s'")
	errInvalidUseOfNull         = terror.ClassDDL.New(codeInvalidUseOfNull, "Invalid use of NULL value")
	errTooManyFields            = terror.ClassDDL.New(codeTooManyFields, "Too many columns")
	errInvalidSplitRegionRanges = terror.ClassDDL.New(codeInvalidRanges, "Failed to split region ranges")
	errReorgPanic               = terror.ClassDDL.New(codeReorgWorkerPanic, "reorg worker panic.")

	// errWrongKeyColumn is for table column cannot be indexed.
	errWrongKeyColumn = terror.ClassDDL.New(codeWrongKeyColumn, mysql.MySQLErrName[mysql.ErrWrongKeyColumn])
	// errUnsupportedOnGeneratedColumn is for unsupported actions on generated columns.
	errUnsupportedOnGeneratedColumn = terror.ClassDDL.New(codeUnsupportedOnGeneratedColumn, mysql.MySQLErrName[mysql.ErrUnsupportedOnGeneratedColumn])
	// errGeneratedColumnNonPrior forbiddens to refer generated column non prior to it.
	errGeneratedColumnNonPrior = terror.ClassDDL.New(codeGeneratedColumnNonPrior, mysql.MySQLErrName[mysql.ErrGeneratedColumnNonPrior])
	// errDependentByGeneratedColumn forbiddens to delete columns which are dependent by generated columns.
	errDependentByGeneratedColumn = terror.ClassDDL.New(codeDependentByGeneratedColumn, mysql.MySQLErrName[mysql.ErrDependentByGeneratedColumn])
	// errJSONUsedAsKey forbiddens to use JSON as key or index.
	errJSONUsedAsKey = terror.ClassDDL.New(codeJSONUsedAsKey, mysql.MySQLErrName[mysql.ErrJSONUsedAsKey])
	// errBlobCantHaveDefault forbiddens to give not null default value to TEXT/BLOB/JSON.
	errBlobCantHaveDefault = terror.ClassDDL.New(codeBlobCantHaveDefault, mysql.MySQLErrName[mysql.ErrBlobCantHaveDefault])
	errTooLongIndexComment = terror.ClassDDL.New(codeErrTooLongIndexComment, mysql.MySQLErrName[mysql.ErrTooLongIndexComment])

	// ErrDupKeyName returns for duplicated key name
	ErrDupKeyName = terror.ClassDDL.New(codeDupKeyName, "duplicate key name")
	// ErrInvalidDBState returns for invalid database state.
	ErrInvalidDBState = terror.ClassDDL.New(codeInvalidDBState, "invalid database state")
	// ErrInvalidTableState returns for invalid Table state.
	ErrInvalidTableState = terror.ClassDDL.New(codeInvalidTableState, "invalid table state")
	// ErrInvalidColumnState returns for invalid column state.
	ErrInvalidColumnState = terror.ClassDDL.New(codeInvalidColumnState, "invalid column state")
	// ErrInvalidIndexState returns for invalid index state.
	ErrInvalidIndexState = terror.ClassDDL.New(codeInvalidIndexState, "invalid index state")
	// ErrInvalidForeignKeyState returns for invalid foreign key state.
	ErrInvalidForeignKeyState = terror.ClassDDL.New(codeInvalidForeignKeyState, "invalid foreign key state")
	// ErrUnsupportedModifyPrimaryKey returns an error when add or drop the primary key.
	// It's exported for testing.
	ErrUnsupportedModifyPrimaryKey = terror.ClassDDL.New(codeUnsupportedModifyPrimaryKey, "unsupported %s primary key")

	// ErrColumnBadNull returns for a bad null value.
	ErrColumnBadNull = terror.ClassDDL.New(codeBadNull, "column cann't be null")
	// ErrCantRemoveAllFields returns for deleting all columns.
	ErrCantRemoveAllFields = terror.ClassDDL.New(codeCantRemoveAllFields, "can't delete all columns with ALTER TABLE")
	// ErrCantDropFieldOrKey returns for dropping a non-existent field or key.
	ErrCantDropFieldOrKey = terror.ClassDDL.New(codeCantDropFieldOrKey, "can't drop field; check that column/key exists")
	// ErrInvalidOnUpdate returns for invalid ON UPDATE clause.
	ErrInvalidOnUpdate = terror.ClassDDL.New(codeInvalidOnUpdate, mysql.MySQLErrName[mysql.ErrInvalidOnUpdate])
	// ErrTooLongIdent returns for too long name of database/table/column/index.
	ErrTooLongIdent = terror.ClassDDL.New(codeTooLongIdent, mysql.MySQLErrName[mysql.ErrTooLongIdent])
	// ErrWrongDBName returns for wrong database name.
	ErrWrongDBName = terror.ClassDDL.New(codeWrongDBName, mysql.MySQLErrName[mysql.ErrWrongDBName])
	// ErrWrongTableName returns for wrong table name.
	ErrWrongTableName = terror.ClassDDL.New(codeWrongTableName, mysql.MySQLErrName[mysql.ErrWrongTableName])
	// ErrWrongColumnName returns for wrong column name.
	ErrWrongColumnName = terror.ClassDDL.New(codeWrongColumnName, mysql.MySQLErrName[mysql.ErrWrongColumnName])
	// ErrTableMustHaveColumns returns for missing column when creating a table.
	ErrTableMustHaveColumns = terror.ClassDDL.New(codeTableMustHaveColumns, mysql.MySQLErrName[mysql.ErrTableMustHaveColumns])
	// ErrWrongNameForIndex returns for wrong index name.
	ErrWrongNameForIndex = terror.ClassDDL.New(codeWrongNameForIndex, mysql.MySQLErrName[mysql.ErrWrongNameForIndex])
	// ErrUnknownCharacterSet returns unknown character set.
	ErrUnknownCharacterSet = terror.ClassDDL.New(codeUnknownCharacterSet, "Unknown character set: '%s'")
	// ErrPrimaryCantHaveNull returns All parts of a PRIMARY KEY must be NOT NULL; if you need NULL in a key, use UNIQUE instead
	ErrPrimaryCantHaveNull = terror.ClassDDL.New(codePrimaryCantHaveNull, mysql.MySQLErrName[mysql.ErrPrimaryCantHaveNull])

	// ErrNotAllowedTypeInPartition returns not allowed type error when creating table partiton with unsupport expression type.
	ErrNotAllowedTypeInPartition = terror.ClassDDL.New(codeErrFieldTypeNotAllowedAsPartitionField, mysql.MySQLErrName[mysql.ErrFieldTypeNotAllowedAsPartitionField])
	// ErrPartitionsMustBeDefined returns each partition must be defined.
	ErrPartitionsMustBeDefined = terror.ClassDDL.New(codePartitionsMustBeDefined, "For RANGE partitions each partition must be defined")
	// ErrPartitionMgmtOnNonpartitioned returns it's not a partition table.
	ErrPartitionMgmtOnNonpartitioned = terror.ClassDDL.New(codePartitionMgmtOnNonpartitioned, "Partition management on a not partitioned table is not possible")
	// ErrDropPartitionNonExistent returns error in list of partition.
	ErrDropPartitionNonExistent = terror.ClassDDL.New(codeDropPartitionNonExistent, " Error in list of partitions to %s")
	// ErrSameNamePartition returns duplicate partition name.
	ErrSameNamePartition = terror.ClassDDL.New(codeSameNamePartition, "Duplicate partition name %s")
	// ErrRangeNotIncreasing returns values less than value must be strictly increasing for each partition.
	ErrRangeNotIncreasing = terror.ClassDDL.New(codeRangeNotIncreasing, "VALUES LESS THAN value must be strictly increasing for each partition")
	// ErrPartitionMaxvalue returns maxvalue can only be used in last partition definition.
	ErrPartitionMaxvalue = terror.ClassDDL.New(codePartitionMaxvalue, "MAXVALUE can only be used in last partition definition")
	// ErrTooManyValues returns cannot have more than one value for this type of partitioning.
	ErrTooManyValues = terror.ClassDDL.New(codeErrTooManyValues, mysql.MySQLErrName[mysql.ErrTooManyValues])
	//ErrDropLastPartition returns cannot remove all partitions, use drop table instead.
	ErrDropLastPartition = terror.ClassDDL.New(codeDropLastPartition, mysql.MySQLErrName[mysql.ErrDropLastPartition])
	//ErrTooManyPartitions returns too many partitions were defined.
	ErrTooManyPartitions = terror.ClassDDL.New(codeTooManyPartitions, mysql.MySQLErrName[mysql.ErrTooManyPartitions])
	//ErrPartitionFunctionIsNotAllowed returns this partition function is not allowed.
	ErrPartitionFunctionIsNotAllowed = terror.ClassDDL.New(codePartitionFunctionIsNotAllowed, mysql.MySQLErrName[mysql.ErrPartitionFunctionIsNotAllowed])
	// ErrPartitionFuncNotAllowed returns partition function returns the wrong type.
	ErrPartitionFuncNotAllowed = terror.ClassDDL.New(codeErrPartitionFuncNotAllowed, mysql.MySQLErrName[mysql.ErrPartitionFuncNotAllowed])
	// ErrUniqueKeyNeedAllFieldsInPf returns must include all columns in the table's partitioning function.
	ErrUniqueKeyNeedAllFieldsInPf = terror.ClassDDL.New(codeUniqueKeyNeedAllFieldsInPf, mysql.MySQLErrName[mysql.ErrUniqueKeyNeedAllFieldsInPf])
	errWrongExprInPartitionFunc   = terror.ClassDDL.New(codeWrongExprInPartitionFunc, mysql.MySQLErrName[mysql.ErrWrongExprInPartitionFunc])
)

// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
	CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
	DropSchema(ctx sessionctx.Context, schema model.CIStr) error
	CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
	CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
	DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
	CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr,
		columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error
	DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr) error
	AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
	TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
	RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident) error

	// GetLease returns current schema lease time.
	GetLease() time.Duration
	// Stats returns the DDL statistics.
	Stats(vars *variable.SessionVars) (map[string]interface{}, error)
	// GetScope gets the status variables scope.
	GetScope(status string) variable.ScopeFlag
	// Stop stops DDL worker.
	Stop() error
	// RegisterEventCh registers event channel for ddl.
	RegisterEventCh(chan<- *util.Event)
	// SchemaSyncer gets the schema syncer.
	SchemaSyncer() SchemaSyncer
	// OwnerManager gets the owner manager.
	OwnerManager() owner.Manager
	// GetID gets the ddl ID.
	GetID() string
	// GetTableMaxRowID gets the max row ID of a normal table or a partition.
	GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (int64, bool, error)
	// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
	SetBinlogClient(interface{})
}

// ddl is used to handle the statements that define the structure or schema of the database.
type ddl struct {
	m          sync.RWMutex
	infoHandle *infoschema.Handle
	quitCh     chan struct{}

	*ddlCtx
	workers map[workerType]*worker
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
	uuid         string
	store        kv.Storage
	ownerManager owner.Manager
	schemaSyncer SchemaSyncer
	ddlJobDoneCh chan struct{}
	ddlEventCh   chan<- *util.Event
	lease        time.Duration // lease is schema lease.
	binlogCli    interface{}   // binlogCli is used for Binlog.

	// hook may be modified.
	mu struct {
		sync.RWMutex
		hook        Callback
		interceptor Interceptor
	}
}

func (dc *ddlCtx) isOwner() bool {
	isOwner := dc.ownerManager.IsOwner()
	log.Debugf("[ddl] it's the DDL owner %v, self ID %s", isOwner, dc.uuid)
	return isOwner
}

// RegisterEventCh registers passed channel for ddl Event.
func (d *ddl) RegisterEventCh(ch chan<- *util.Event) {
	d.ddlEventCh = ch
}

// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(d *ddlCtx, e *util.Event) {
	if d.ddlEventCh != nil {
		if d.lease == 0 {
			// If lease is 0, it's always used in test.
			select {
			case d.ddlEventCh <- e:
			default:
			}
			return
		}
		for i := 0; i < 10; i++ {
			select {
			case d.ddlEventCh <- e:
				return
			default:
				log.Warnf("Fail to notify event %s.", e)
				time.Sleep(time.Microsecond * 10)
			}
		}
	}
}

// NewDDL creates a new DDL.
func NewDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
	infoHandle *infoschema.Handle, hook Callback, lease time.Duration, ctxPool *pools.ResourcePool) DDL {
	return newDDL(ctx, etcdCli, store, infoHandle, hook, lease, ctxPool)
}

func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
	infoHandle *infoschema.Handle, hook Callback, lease time.Duration, ctxPool *pools.ResourcePool) *ddl {
	if hook == nil {
		hook = &BaseCallback{}
	}
	id := uuid.NewV4().String()
	ctx, cancelFunc := context.WithCancel(ctx)
	var manager owner.Manager
	var syncer SchemaSyncer
	if etcdCli == nil {
		// The etcdCli is nil if the store is localstore which is only used for testing.
		// So we use mockOwnerManager and mockSchemaSyncer.
		manager = owner.NewMockManager(id, cancelFunc)
		syncer = NewMockSchemaSyncer()
	} else {
		manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc)
		syncer = NewSchemaSyncer(etcdCli, id)
	}

	ddlCtx := &ddlCtx{
		uuid:         id,
		store:        store,
		lease:        lease,
		ddlJobDoneCh: make(chan struct{}, 1),
		ownerManager: manager,
		schemaSyncer: syncer,
		binlogCli:    binloginfo.GetPumpClient(),
	}
	ddlCtx.mu.hook = hook
	ddlCtx.mu.interceptor = &BaseInterceptor{}
	d := &ddl{
		infoHandle: infoHandle,
		ddlCtx:     ddlCtx,
	}

	d.start(ctx, ctxPool)
	variable.RegisterStatistics(d)

	return d
}

// Stop implements DDL.Stop interface.
func (d *ddl) Stop() error {
	d.m.Lock()
	defer d.m.Unlock()

	d.close()
	log.Infof("[ddl] stop DDL:%s", d.uuid)
	return nil
}

// start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
	log.Infof("[ddl] start DDL:%s, run worker %v", d.uuid, RunWorker)
	d.quitCh = make(chan struct{})

	// If RunWorker is true, we need campaign owner and do DDL job.
	// Otherwise, we needn't do that.
	if RunWorker {
		err := d.ownerManager.CampaignOwner(ctx)
		terror.Log(errors.Trace(err))

		d.workers = make(map[workerType]*worker, 2)
		d.workers[generalWorker] = newWorker(generalWorker, d.store, ctxPool)
		d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, ctxPool)
		for _, worker := range d.workers {
			worker.wg.Add(1)
			w := worker
			go tidbutil.WithRecovery(
				func() { w.start(d.ddlCtx) },
				func(r interface{}) {
					if r != nil {
						log.Errorf("[ddl-%s] ddl %s meet panic", w, d.uuid)
					}
				})

			// When the start function is called, we will send a fake job to let worker
			// checks owner firstly and try to find whether a job exists and run.
			asyncNotify(worker.ddlJobCh)
		}
	}
}

func (d *ddl) close() {
	if isChanClosed(d.quitCh) {
		return
	}

	startTime := time.Now()
	close(d.quitCh)
	d.ownerManager.Cancel()
	err := d.schemaSyncer.RemoveSelfVersionPath()
	if err != nil {
		log.Errorf("[ddl] remove self version path failed %v", err)
	}

	for _, worker := range d.workers {
		worker.close()
	}
	log.Infof("[ddl] closing DDL:%s takes time %v", d.uuid, time.Since(startTime))
}

// GetLease implements DDL.GetLease interface.
func (d *ddl) GetLease() time.Duration {
	d.m.RLock()
	lease := d.lease
	d.m.RUnlock()
	return lease
}

// GetInformationSchema gets the infoschema binding to d. It's expoted for testing.
func (d *ddl) GetInformationSchema(ctx sessionctx.Context) infoschema.InfoSchema {
	is := d.infoHandle.Get()

	d.mu.RLock()
	defer d.mu.RUnlock()
	return d.mu.interceptor.OnGetInfoSchema(ctx, is)
}

func (d *ddl) genGlobalID() (int64, error) {
	var globalID int64
	err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
		var err error
		globalID, err = meta.NewMeta(txn).GenGlobalID()
		return errors.Trace(err)
	})

	return globalID, errors.Trace(err)
}

func (d *ddl) genGlobalIDs(count int) ([]int64, error) {
	ret := make([]int64, count)
	err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
		var err error
		m := meta.NewMeta(txn)
		for i := 0; i < count; i++ {
			ret[i], err = m.GenGlobalID()
			if err != nil {
				return err
			}
		}
		return nil
	})

	return ret, err
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() SchemaSyncer {
	return d.schemaSyncer
}

// OwnerManager implements DDL.OwnerManager interface.
func (d *ddl) OwnerManager() owner.Manager {
	return d.ownerManager
}

// GetID implements DDL.GetID interface.
func (d *ddl) GetID() string {
	return d.uuid
}

func checkJobMaxInterval(job *model.Job) time.Duration {
	// The job of adding index takes more time to process.
	// So it uses the longer time.
	if job.Type == model.ActionAddIndex {
		return 3 * time.Second
	}
	if job.Type == model.ActionCreateTable || job.Type == model.ActionCreateSchema {
		return 500 * time.Millisecond
	}
	return 1 * time.Second
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
	// If the workers don't run, we needn't to notify workers.
	if !RunWorker {
		return
	}

	if jobTp == model.ActionAddIndex {
		asyncNotify(d.workers[addIdxWorker].ddlJobCh)
	} else {
		asyncNotify(d.workers[generalWorker].ddlJobCh)
	}
}

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
	// For every DDL, we must commit current transaction.
	if err := ctx.NewTxn(); err != nil {
		return errors.Trace(err)
	}

	// Get a global job ID and put the DDL job in the queue.
	err := d.addDDLJob(ctx, job)
	if err != nil {
		return errors.Trace(err)
	}
	ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

	// Notice worker that we push a new job and wait the job done.
	d.asyncNotifyWorker(job.Type)

	// log.Infof("[ddl] start DDL job %s, Query:%s", job, job.Query)

	var historyJob *model.Job
	jobID := job.ID
	// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
	// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
	// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
	ticker := time.NewTicker(chooseLeaseTime(10*d.lease, checkJobMaxInterval(job)))
	defer func() {
		ticker.Stop()
	}()
	for {
		select {
		case <-d.ddlJobDoneCh:
		case <-ticker.C:
		}

		historyJob, err = d.getHistoryDDLJob(jobID)
		if err != nil {
			log.Errorf("[ddl] get history DDL job err %v, check again", err)
			continue
		} else if historyJob == nil {
			log.Debugf("[ddl] DDL job ID:%d is not in history, maybe not run", jobID)
			continue
		}

		// If a job is a history job, the state must be JobSynced or JobCancel.
		if historyJob.IsSynced() {
			// log.Infof("[ddl] DDL job ID:%d is finished", jobID)
			return nil
		}

		if historyJob.Error != nil {
			return errors.Trace(historyJob.Error)
		}
		panic("When the state is JobCancel, historyJob.Error should never be nil")
	}
}

func (d *ddl) callHookOnChanged(err error) error {
	d.mu.RLock()
	defer d.mu.RUnlock()

	err = d.mu.hook.OnChanged(err)
	return errors.Trace(err)
}

// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli interface{}) {
	d.binlogCli = binlogCli
}

// DDL error codes.
const (
	codeInvalidWorker         terror.ErrCode = 1
	codeNotOwner                             = 2
	codeInvalidDDLJob                        = 3
	codeInvalidJobFlag                       = 5
	codeRunMultiSchemaChanges                = 6
	codeWaitReorgTimeout                     = 7
	codeInvalidStoreVer                      = 8
	codeUnknownTypeLength                    = 9
	codeUnknownFractionLength                = 10
	codeInvalidJobVersion                    = 11
	codeCancelledDDLJob                      = 12
	codeInvalidRanges                        = 13
	codeReorgWorkerPanic                     = 14

	codeInvalidDBState         = 100
	codeInvalidTableState      = 101
	codeInvalidColumnState     = 102
	codeInvalidIndexState      = 103
	codeInvalidForeignKeyState = 104

	codeCantDropColWithIndex        = 201
	codeUnsupportedAddColumn        = 202
	codeUnsupportedModifyColumn     = 203
	codeUnsupportedDropPKHandle     = 204
	codeUnsupportedCharset          = 205
	codeUnsupportedModifyPrimaryKey = 206
	codeUnsupportedShardRowIDBits   = 207

	codeFileNotFound                           = 1017
	codeErrorOnRename                          = 1025
	codeBadNull                                = mysql.ErrBadNull
	codeBadField                               = 1054
	codeTooLongIdent                           = 1059
	codeDupKeyName                             = 1061
	codeTooLongKey                             = 1071
	codeKeyColumnDoesNotExits                  = 1072
	codeIncorrectPrefixKey                     = 1089
	codeCantRemoveAllFields                    = 1090
	codeCantDropFieldOrKey                     = 1091
	codeBlobCantHaveDefault                    = 1101
	codeWrongDBName                            = 1102
	codeWrongTableName                         = 1103
	codeTooManyFields                          = 1117
	codeInvalidUseOfNull                       = 1138
	codeWrongColumnName                        = 1166
	codeWrongKeyColumn                         = 1167
	codeBlobKeyWithoutLength                   = 1170
	codeInvalidOnUpdate                        = 1294
	codeUnsupportedOnGeneratedColumn           = 3106
	codeGeneratedColumnNonPrior                = 3107
	codeDependentByGeneratedColumn             = 3108
	codeJSONUsedAsKey                          = 3152
	codeWrongNameForIndex                      = terror.ErrCode(mysql.ErrWrongNameForIndex)
	codeErrTooLongIndexComment                 = terror.ErrCode(mysql.ErrTooLongIndexComment)
	codeUnknownCharacterSet                    = terror.ErrCode(mysql.ErrUnknownCharacterSet)
	codeCantCreateTable                        = terror.ErrCode(mysql.ErrCantCreateTable)
	codeTableMustHaveColumns                   = terror.ErrCode(mysql.ErrTableMustHaveColumns)
	codePartitionsMustBeDefined                = terror.ErrCode(mysql.ErrPartitionsMustBeDefined)
	codePartitionMgmtOnNonpartitioned          = terror.ErrCode(mysql.ErrPartitionMgmtOnNonpartitioned)
	codeDropPartitionNonExistent               = terror.ErrCode(mysql.ErrDropPartitionNonExistent)
	codeSameNamePartition                      = terror.ErrCode(mysql.ErrSameNamePartition)
	codeRangeNotIncreasing                     = terror.ErrCode(mysql.ErrRangeNotIncreasing)
	codePartitionMaxvalue                      = terror.ErrCode(mysql.ErrPartitionMaxvalue)
	codeErrTooManyValues                       = terror.ErrCode(mysql.ErrTooManyValues)
	codeDropLastPartition                      = terror.ErrCode(mysql.ErrDropLastPartition)
	codeTooManyPartitions                      = terror.ErrCode(mysql.ErrTooManyPartitions)
	codePartitionFunctionIsNotAllowed          = terror.ErrCode(mysql.ErrPartitionFunctionIsNotAllowed)
	codeErrPartitionFuncNotAllowed             = terror.ErrCode(mysql.ErrPartitionFuncNotAllowed)
	codeErrFieldTypeNotAllowedAsPartitionField = terror.ErrCode(mysql.ErrFieldTypeNotAllowedAsPartitionField)
	codeUniqueKeyNeedAllFieldsInPf             = terror.ErrCode(mysql.ErrUniqueKeyNeedAllFieldsInPf)
	codePrimaryCantHaveNull                    = terror.ErrCode(mysql.ErrPrimaryCantHaveNull)
	codeWrongExprInPartitionFunc               = terror.ErrCode(mysql.ErrWrongExprInPartitionFunc)
)

func init() {
	ddlMySQLErrCodes := map[terror.ErrCode]uint16{
		codeBadNull:                                mysql.ErrBadNull,
		codeCantRemoveAllFields:                    mysql.ErrCantRemoveAllFields,
		codeCantDropFieldOrKey:                     mysql.ErrCantDropFieldOrKey,
		codeInvalidOnUpdate:                        mysql.ErrInvalidOnUpdate,
		codeBlobKeyWithoutLength:                   mysql.ErrBlobKeyWithoutLength,
		codeIncorrectPrefixKey:                     mysql.ErrWrongSubKey,
		codeTooLongIdent:                           mysql.ErrTooLongIdent,
		codeTooLongKey:                             mysql.ErrTooLongKey,
		codeKeyColumnDoesNotExits:                  mysql.ErrKeyColumnDoesNotExits,
		codeDupKeyName:                             mysql.ErrDupKeyName,
		codeWrongDBName:                            mysql.ErrWrongDBName,
		codeWrongTableName:                         mysql.ErrWrongTableName,
		codeFileNotFound:                           mysql.ErrFileNotFound,
		codeErrorOnRename:                          mysql.ErrErrorOnRename,
		codeBadField:                               mysql.ErrBadField,
		codeInvalidUseOfNull:                       mysql.ErrInvalidUseOfNull,
		codeUnsupportedOnGeneratedColumn:           mysql.ErrUnsupportedOnGeneratedColumn,
		codeGeneratedColumnNonPrior:                mysql.ErrGeneratedColumnNonPrior,
		codeDependentByGeneratedColumn:             mysql.ErrDependentByGeneratedColumn,
		codeJSONUsedAsKey:                          mysql.ErrJSONUsedAsKey,
		codeBlobCantHaveDefault:                    mysql.ErrBlobCantHaveDefault,
		codeWrongColumnName:                        mysql.ErrWrongColumnName,
		codeWrongKeyColumn:                         mysql.ErrWrongKeyColumn,
		codeWrongNameForIndex:                      mysql.ErrWrongNameForIndex,
		codeTableMustHaveColumns:                   mysql.ErrTableMustHaveColumns,
		codeTooManyFields:                          mysql.ErrTooManyFields,
		codeErrTooLongIndexComment:                 mysql.ErrTooLongIndexComment,
		codeUnknownCharacterSet:                    mysql.ErrUnknownCharacterSet,
		codePartitionsMustBeDefined:                mysql.ErrPartitionsMustBeDefined,
		codePartitionMgmtOnNonpartitioned:          mysql.ErrPartitionMgmtOnNonpartitioned,
		codeDropPartitionNonExistent:               mysql.ErrDropPartitionNonExistent,
		codeSameNamePartition:                      mysql.ErrSameNamePartition,
		codeRangeNotIncreasing:                     mysql.ErrRangeNotIncreasing,
		codePartitionMaxvalue:                      mysql.ErrPartitionMaxvalue,
		codeErrTooManyValues:                       mysql.ErrTooManyValues,
		codeDropLastPartition:                      mysql.ErrDropLastPartition,
		codeTooManyPartitions:                      mysql.ErrTooManyPartitions,
		codePartitionFunctionIsNotAllowed:          mysql.ErrPartitionFunctionIsNotAllowed,
		codeErrPartitionFuncNotAllowed:             mysql.ErrPartitionFuncNotAllowed,
		codeErrFieldTypeNotAllowedAsPartitionField: mysql.ErrFieldTypeNotAllowedAsPartitionField,
		codeUniqueKeyNeedAllFieldsInPf:             mysql.ErrUniqueKeyNeedAllFieldsInPf,
		codePrimaryCantHaveNull:                    mysql.ErrPrimaryCantHaveNull,
		codeWrongExprInPartitionFunc:               mysql.ErrWrongExprInPartitionFunc,
	}
	terror.ErrClassToMySQLCodes[terror.ClassDDL] = ddlMySQLErrCodes
}