// Copyright 2015 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package ddl import ( "math" "sync/atomic" "time" "github.com/hanchuanchuan/goInception/distsql" "github.com/hanchuanchuan/goInception/kv" "github.com/hanchuanchuan/goInception/meta" "github.com/hanchuanchuan/goInception/model" "github.com/hanchuanchuan/goInception/mysql" "github.com/hanchuanchuan/goInception/sessionctx" "github.com/hanchuanchuan/goInception/statistics" "github.com/hanchuanchuan/goInception/table" "github.com/hanchuanchuan/goInception/terror" "github.com/hanchuanchuan/goInception/types" "github.com/hanchuanchuan/goInception/util/chunk" "github.com/hanchuanchuan/goInception/util/mock" "github.com/hanchuanchuan/goInception/util/ranger" "github.com/pingcap/errors" "github.com/pingcap/tipb/go-tipb" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) // reorgCtx is for reorganization. type reorgCtx struct { // doneCh is used to notify. // If the reorganization job is done, we will use this channel to notify outer. // TODO: Now we use goroutine to simulate reorganization jobs, later we may // use a persistent job list. doneCh chan error // rowCount is used to simulate a job's row count. rowCount int64 // notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled. // 0: job is not canceled. // 1: job is canceled. notifyCancelReorgJob int32 // doneHandle is used to simulate the handle that has been processed. doneHandle int64 } // newContext gets a context. It is only used for adding column in reorganization state. func newContext(store kv.Storage) sessionctx.Context { c := mock.NewContext() c.Store = store c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false) c.GetSessionVars().StmtCtx.TimeZone = time.UTC return c } const defaultWaitReorgTimeout = 10 * time.Second // ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage. var ReorgWaitTimeout = 5 * time.Second func (rc *reorgCtx) notifyReorgCancel() { atomic.StoreInt32(&rc.notifyCancelReorgJob, 1) } func (rc *reorgCtx) cleanNotifyReorgCancel() { atomic.StoreInt32(&rc.notifyCancelReorgJob, 0) } func (rc *reorgCtx) isReorgCanceled() bool { return atomic.LoadInt32(&rc.notifyCancelReorgJob) == 1 } func (rc *reorgCtx) setRowCount(count int64) { atomic.StoreInt64(&rc.rowCount, count) } func (rc *reorgCtx) setNextHandle(doneHandle int64) { atomic.StoreInt64(&rc.doneHandle, doneHandle) } func (rc *reorgCtx) increaseRowCount(count int64) { atomic.AddInt64(&rc.rowCount, count) } func (rc *reorgCtx) getRowCountAndHandle() (int64, int64) { row := atomic.LoadInt64(&rc.rowCount) handle := atomic.LoadInt64(&rc.doneHandle) return row, handle } func (rc *reorgCtx) clean() { rc.setRowCount(0) rc.setNextHandle(0) rc.doneCh = nil } func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, lease time.Duration, f func() error) error { job := reorgInfo.Job if w.reorgCtx.doneCh == nil { // start a reorganization job w.wg.Add(1) w.reorgCtx.doneCh = make(chan error, 1) // initial reorgCtx w.reorgCtx.setRowCount(job.GetRowCount()) w.reorgCtx.setNextHandle(reorgInfo.StartHandle) go func() { defer w.wg.Done() w.reorgCtx.doneCh <- f() }() } waitTimeout := defaultWaitReorgTimeout // if lease is 0, we are using a local storage, // and we can wait the reorganization to be done here. // if lease > 0, we don't need to wait here because // we should update some job's progress context and try checking again, // so we use a very little timeout here. if lease > 0 { waitTimeout = ReorgWaitTimeout } // wait reorganization job done or timeout select { case err := <-w.reorgCtx.doneCh: rowCount, _ := w.reorgCtx.getRowCountAndHandle() log.Infof("[ddl] run reorg job done, handled %d rows", rowCount) // Update a job's RowCount. job.SetRowCount(rowCount) w.reorgCtx.clean() return errors.Trace(err) case <-w.quitCh: log.Info("[ddl] run reorg job quit") w.reorgCtx.setNextHandle(0) w.reorgCtx.setRowCount(0) // We return errWaitReorgTimeout here too, so that outer loop will break. return errWaitReorgTimeout case <-time.After(waitTimeout): rowCount, doneHandle := w.reorgCtx.getRowCountAndHandle() // Update a job's RowCount. job.SetRowCount(rowCount) // Update a reorgInfo's handle. err := t.UpdateDDLReorgStartHandle(job, doneHandle) log.Infof("[ddl] run reorg job wait timeout %v, handled %d rows, current done handle %d, err %v", waitTimeout, rowCount, doneHandle, err) // If timeout, we will return, check the owner and retry to wait job done again. return errWaitReorgTimeout } } func (w *worker) isReorgRunnable(d *ddlCtx) error { if isChanClosed(w.quitCh) { // Worker is closed. So it can't do the reorganizational job. return errInvalidWorker.GenWithStack("worker is closed") } if w.reorgCtx.isReorgCanceled() { // Job is cancelled. So it can't be done. return errCancelledDDLJob } if !d.isOwner() { // If it's not the owner, we will try later, so here just returns an error. log.Infof("[ddl] the %s not the job owner", d.uuid) return errors.Trace(errNotOwner) } return nil } type reorgInfo struct { *model.Job // StartHandle is the first handle of the adding indices table. StartHandle int64 // EndHandle is the last handle of the adding indices table. EndHandle int64 d *ddlCtx first bool // PhysicalTableID is used for partitioned table. // DDL reorganize for a partitioned table will handle partitions one by one, // PhysicalTableID is used to trace the current partition we are handling. // If the table is not partitioned, PhysicalTableID would be TableID. PhysicalTableID int64 } func constructDescTableScanPB(physicalTableID int64, pbColumnInfos []*tipb.ColumnInfo) *tipb.Executor { tblScan := &tipb.TableScan{ TableId: physicalTableID, Columns: pbColumnInfos, Desc: true, } return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan} } func constructLimitPB(count uint64) *tipb.Executor { limitExec := &tipb.Limit{ Limit: count, } return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec} } func buildDescTableScanDAG(startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} dagReq.StartTs = startTS _, timeZoneOffset := time.Now().In(time.UTC).Zone() dagReq.TimeZoneOffset = int64(timeZoneOffset) for i := range columns { dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) } dagReq.Flags |= model.FlagInSelectStmt pbColumnInfos := model.ColumnsToProto(columns, tbl.Meta().PKIsHandle) tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), pbColumnInfos) dagReq.Executors = append(dagReq.Executors, tblScanExec) dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit)) return dagReq, nil } func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType { colTypes := make([]*types.FieldType, 0, len(columns)) for _, col := range columns { colTypes = append(colTypes, &col.FieldType) } return colTypes } // buildDescTableScan builds a desc table scan upon tblInfo. func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { dagPB, err := buildDescTableScanDAG(startTS, tbl, columns, limit) if err != nil { return nil, errors.Trace(err) } ranges := ranger.FullIntRange(false) var builder distsql.RequestBuilder builder.SetTableRanges(tbl.GetPhysicalID(), ranges, nil). SetDAGRequest(dagPB). SetKeepOrder(true). SetConcurrency(1).SetDesc(true) builder.Request.NotFillCache = true builder.Request.Priority = kv.PriorityLow kvReq, err := builder.Build() sctx := newContext(d.store) result, err := distsql.Select(ctx, sctx, kvReq, getColumnsTypes(columns), statistics.NewQueryFeedback(0, nil, 0, false)) if err != nil { return nil, errors.Trace(err) } result.Fetch(ctx) return result, nil } // GetTableMaxRowID gets the last row id of the table partition. func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxRowID int64, emptyTable bool, err error) { maxRowID = int64(math.MaxInt64) var columns []*model.ColumnInfo if tbl.Meta().PKIsHandle { for _, col := range tbl.Meta().Columns { if mysql.HasPriKeyFlag(col.Flag) { columns = []*model.ColumnInfo{col} break } } } else { columns = []*model.ColumnInfo{model.NewExtraHandleColInfo()} } ctx := context.Background() // build a desc scan of tblInfo, which limit is 1, we can use it to retrive the last handle of the table. result, err := d.buildDescTableScan(ctx, startTS, tbl, columns, 1) if err != nil { return maxRowID, false, errors.Trace(err) } defer terror.Call(result.Close) chk := chunk.New(getColumnsTypes(columns), 1, 1) err = result.Next(ctx, chk) if err != nil { return maxRowID, false, errors.Trace(err) } if chk.NumRows() == 0 { // empty table return maxRowID, true, nil } row := chk.GetRow(0) maxRowID = row.GetInt64(0) return maxRowID, false, nil } var gofailOnceGuard bool // getTableRange gets the start and end handle of a table (or partition). func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandle, endHandle int64, err error) { startHandle = math.MinInt64 endHandle = math.MaxInt64 // Get the start handle of this partition. err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { startHandle = h return false, nil }) if err != nil { return 0, 0, errors.Trace(err) } var emptyTable bool // Get the end handle of this partition. endHandle, emptyTable, err = d.GetTableMaxRowID(snapshotVer, tbl) if err != nil { return 0, 0, errors.Trace(err) } if endHandle < startHandle || emptyTable { log.Infof("[ddl-reorg] get table range %v endHandle < startHandle partition %d [%d %d]", tbl.Meta(), tbl.GetPhysicalID(), endHandle, startHandle) endHandle = startHandle } return } func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table) (*reorgInfo, error) { var ( err error start int64 = math.MinInt64 end int64 = math.MaxInt64 pid int64 info reorgInfo ) if job.SnapshotVer == 0 { info.first = true // get the current version for reorganization if we don't have var ver kv.Version ver, err = d.store.CurrentVersion() if err != nil { return nil, errors.Trace(err) } else if ver.Ver <= 0 { return nil, errInvalidStoreVer.GenWithStack("invalid storage current version %d", ver.Ver) } tblInfo := tbl.Meta() pid = tblInfo.ID var tb table.PhysicalTable if pi := tblInfo.GetPartitionInfo(); pi != nil { pid = pi.Definitions[0].ID tb = tbl.(table.PartitionedTable).GetPartition(pid) } else { tb = tbl.(table.PhysicalTable) } start, end, err = getTableRange(d, tb, ver.Ver, job.Priority) if err != nil { return nil, errors.Trace(err) } log.Infof("[ddl-reorg] job %v get partition %d range [%d %d]", job.ID, pid, start, end) // gofail: var errorUpdateReorgHandle bool // if errorUpdateReorgHandle && !gofailOnceGuard { // // only return error once. // gofailOnceGuard = true // return &info, errors.New("occur an error when update reorg handle.") // } err = t.UpdateDDLReorgHandle(job, start, end, pid) if err != nil { return &info, errors.Trace(err) } // Update info should after data persistent. job.SnapshotVer = ver.Ver } else { start, end, pid, err = t.GetDDLReorgHandle(job) if err != nil { return nil, errors.Trace(err) } } info.Job = job info.d = d info.StartHandle = start info.EndHandle = end info.PhysicalTableID = pid return &info, errors.Trace(err) } func (r *reorgInfo) UpdateReorgMeta(txn kv.Transaction, startHandle, endHandle, physicalTableID int64) error { t := meta.NewMeta(txn) return errors.Trace(t.UpdateDDLReorgHandle(r.Job, startHandle, endHandle, physicalTableID)) }