// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
	"math"
	"sync/atomic"
	"time"

	"github.com/hanchuanchuan/goInception/distsql"
	"github.com/hanchuanchuan/goInception/kv"
	"github.com/hanchuanchuan/goInception/meta"
	"github.com/hanchuanchuan/goInception/model"
	"github.com/hanchuanchuan/goInception/mysql"
	"github.com/hanchuanchuan/goInception/sessionctx"
	"github.com/hanchuanchuan/goInception/statistics"
	"github.com/hanchuanchuan/goInception/table"
	"github.com/hanchuanchuan/goInception/terror"
	"github.com/hanchuanchuan/goInception/types"
	"github.com/hanchuanchuan/goInception/util/chunk"
	"github.com/hanchuanchuan/goInception/util/mock"
	"github.com/hanchuanchuan/goInception/util/ranger"
	"github.com/pingcap/errors"
	"github.com/pingcap/tipb/go-tipb"
	log "github.com/sirupsen/logrus"
	"golang.org/x/net/context"
)

// reorgCtx is for reorganization.
type reorgCtx struct {
	// doneCh is used to notify.
	// If the reorganization job is done, we will use this channel to notify outer.
	// TODO: Now we use goroutine to simulate reorganization jobs, later we may
	// use a persistent job list.
	doneCh chan error
	// rowCount is used to simulate a job's row count.
	rowCount int64
	// notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled.
	// 0: job is not canceled.
	// 1: job is canceled.
	notifyCancelReorgJob int32
	// doneHandle is used to simulate the handle that has been processed.
	doneHandle int64
}

// newContext gets a context. It is only used for adding column in reorganization state.
func newContext(store kv.Storage) sessionctx.Context {
	c := mock.NewContext()
	c.Store = store
	c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false)
	c.GetSessionVars().StmtCtx.TimeZone = time.UTC
	return c
}

const defaultWaitReorgTimeout = 10 * time.Second

// ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage.
var ReorgWaitTimeout = 5 * time.Second

func (rc *reorgCtx) notifyReorgCancel() {
	atomic.StoreInt32(&rc.notifyCancelReorgJob, 1)
}

func (rc *reorgCtx) cleanNotifyReorgCancel() {
	atomic.StoreInt32(&rc.notifyCancelReorgJob, 0)
}

func (rc *reorgCtx) isReorgCanceled() bool {
	return atomic.LoadInt32(&rc.notifyCancelReorgJob) == 1
}

func (rc *reorgCtx) setRowCount(count int64) {
	atomic.StoreInt64(&rc.rowCount, count)
}

func (rc *reorgCtx) setNextHandle(doneHandle int64) {
	atomic.StoreInt64(&rc.doneHandle, doneHandle)
}

func (rc *reorgCtx) increaseRowCount(count int64) {
	atomic.AddInt64(&rc.rowCount, count)
}

func (rc *reorgCtx) getRowCountAndHandle() (int64, int64) {
	row := atomic.LoadInt64(&rc.rowCount)
	handle := atomic.LoadInt64(&rc.doneHandle)
	return row, handle
}

func (rc *reorgCtx) clean() {
	rc.setRowCount(0)
	rc.setNextHandle(0)
	rc.doneCh = nil
}

func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, lease time.Duration, f func() error) error {
	job := reorgInfo.Job
	if w.reorgCtx.doneCh == nil {
		// start a reorganization job
		w.wg.Add(1)
		w.reorgCtx.doneCh = make(chan error, 1)
		// initial reorgCtx
		w.reorgCtx.setRowCount(job.GetRowCount())
		w.reorgCtx.setNextHandle(reorgInfo.StartHandle)
		go func() {
			defer w.wg.Done()
			w.reorgCtx.doneCh <- f()
		}()
	}

	waitTimeout := defaultWaitReorgTimeout
	// if lease is 0, we are using a local storage,
	// and we can wait the reorganization to be done here.
	// if lease > 0, we don't need to wait here because
	// we should update some job's progress context and try checking again,
	// so we use a very little timeout here.
	if lease > 0 {
		waitTimeout = ReorgWaitTimeout
	}

	// wait reorganization job done or timeout
	select {
	case err := <-w.reorgCtx.doneCh:
		rowCount, _ := w.reorgCtx.getRowCountAndHandle()
		log.Infof("[ddl] run reorg job done, handled %d rows", rowCount)
		// Update a job's RowCount.
		job.SetRowCount(rowCount)
		w.reorgCtx.clean()
		return errors.Trace(err)
	case <-w.quitCh:
		log.Info("[ddl] run reorg job quit")
		w.reorgCtx.setNextHandle(0)
		w.reorgCtx.setRowCount(0)
		// We return errWaitReorgTimeout here too, so that outer loop will break.
		return errWaitReorgTimeout
	case <-time.After(waitTimeout):
		rowCount, doneHandle := w.reorgCtx.getRowCountAndHandle()
		// Update a job's RowCount.
		job.SetRowCount(rowCount)
		// Update a reorgInfo's handle.
		err := t.UpdateDDLReorgStartHandle(job, doneHandle)
		log.Infof("[ddl] run reorg job wait timeout %v, handled %d rows, current done handle %d, err %v", waitTimeout, rowCount, doneHandle, err)
		// If timeout, we will return, check the owner and retry to wait job done again.
		return errWaitReorgTimeout
	}
}

func (w *worker) isReorgRunnable(d *ddlCtx) error {
	if isChanClosed(w.quitCh) {
		// Worker is closed. So it can't do the reorganizational job.
		return errInvalidWorker.GenWithStack("worker is closed")
	}

	if w.reorgCtx.isReorgCanceled() {
		// Job is cancelled. So it can't be done.
		return errCancelledDDLJob
	}

	if !d.isOwner() {
		// If it's not the owner, we will try later, so here just returns an error.
		log.Infof("[ddl] the %s not the job owner", d.uuid)
		return errors.Trace(errNotOwner)
	}
	return nil
}

type reorgInfo struct {
	*model.Job

	// StartHandle is the first handle of the adding indices table.
	StartHandle int64
	// EndHandle is the last handle of the adding indices table.
	EndHandle int64
	d         *ddlCtx
	first     bool
	// PhysicalTableID is used for partitioned table.
	// DDL reorganize for a partitioned table will handle partitions one by one,
	// PhysicalTableID is used to trace the current partition we are handling.
	// If the table is not partitioned, PhysicalTableID would be TableID.
	PhysicalTableID int64
}

func constructDescTableScanPB(physicalTableID int64, pbColumnInfos []*tipb.ColumnInfo) *tipb.Executor {
	tblScan := &tipb.TableScan{
		TableId: physicalTableID,
		Columns: pbColumnInfos,
		Desc:    true,
	}

	return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}
}

func constructLimitPB(count uint64) *tipb.Executor {
	limitExec := &tipb.Limit{
		Limit: count,
	}
	return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}

func buildDescTableScanDAG(startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
	dagReq := &tipb.DAGRequest{}
	dagReq.StartTs = startTS
	_, timeZoneOffset := time.Now().In(time.UTC).Zone()
	dagReq.TimeZoneOffset = int64(timeZoneOffset)
	for i := range columns {
		dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
	}
	dagReq.Flags |= model.FlagInSelectStmt

	pbColumnInfos := model.ColumnsToProto(columns, tbl.Meta().PKIsHandle)
	tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), pbColumnInfos)
	dagReq.Executors = append(dagReq.Executors, tblScanExec)
	dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
	return dagReq, nil
}

func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
	colTypes := make([]*types.FieldType, 0, len(columns))
	for _, col := range columns {
		colTypes = append(colTypes, &col.FieldType)
	}
	return colTypes
}

// buildDescTableScan builds a desc table scan upon tblInfo.
func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.PhysicalTable, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
	dagPB, err := buildDescTableScanDAG(startTS, tbl, columns, limit)
	if err != nil {
		return nil, errors.Trace(err)
	}
	ranges := ranger.FullIntRange(false)
	var builder distsql.RequestBuilder
	builder.SetTableRanges(tbl.GetPhysicalID(), ranges, nil).
		SetDAGRequest(dagPB).
		SetKeepOrder(true).
		SetConcurrency(1).SetDesc(true)

	builder.Request.NotFillCache = true
	builder.Request.Priority = kv.PriorityLow

	kvReq, err := builder.Build()
	sctx := newContext(d.store)
	result, err := distsql.Select(ctx, sctx, kvReq, getColumnsTypes(columns), statistics.NewQueryFeedback(0, nil, 0, false))
	if err != nil {
		return nil, errors.Trace(err)
	}
	result.Fetch(ctx)
	return result, nil
}

// GetTableMaxRowID gets the last row id of the table partition.
func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxRowID int64, emptyTable bool, err error) {
	maxRowID = int64(math.MaxInt64)
	var columns []*model.ColumnInfo
	if tbl.Meta().PKIsHandle {
		for _, col := range tbl.Meta().Columns {
			if mysql.HasPriKeyFlag(col.Flag) {
				columns = []*model.ColumnInfo{col}
				break
			}
		}
	} else {
		columns = []*model.ColumnInfo{model.NewExtraHandleColInfo()}
	}

	ctx := context.Background()
	// build a desc scan of tblInfo, which limit is 1, we can use it to retrive the last handle of the table.
	result, err := d.buildDescTableScan(ctx, startTS, tbl, columns, 1)
	if err != nil {
		return maxRowID, false, errors.Trace(err)
	}
	defer terror.Call(result.Close)

	chk := chunk.New(getColumnsTypes(columns), 1, 1)
	err = result.Next(ctx, chk)
	if err != nil {
		return maxRowID, false, errors.Trace(err)
	}

	if chk.NumRows() == 0 {
		// empty table
		return maxRowID, true, nil
	}
	row := chk.GetRow(0)
	maxRowID = row.GetInt64(0)
	return maxRowID, false, nil
}

var gofailOnceGuard bool

// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandle, endHandle int64, err error) {
	startHandle = math.MinInt64
	endHandle = math.MaxInt64
	// Get the start handle of this partition.
	err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64,
		func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
			startHandle = h
			return false, nil
		})
	if err != nil {
		return 0, 0, errors.Trace(err)
	}
	var emptyTable bool
	// Get the end handle of this partition.
	endHandle, emptyTable, err = d.GetTableMaxRowID(snapshotVer, tbl)
	if err != nil {
		return 0, 0, errors.Trace(err)
	}
	if endHandle < startHandle || emptyTable {
		log.Infof("[ddl-reorg] get table range %v endHandle < startHandle partition %d [%d %d]", tbl.Meta(), tbl.GetPhysicalID(), endHandle, startHandle)
		endHandle = startHandle
	}
	return
}

func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table) (*reorgInfo, error) {
	var (
		err   error
		start int64 = math.MinInt64
		end   int64 = math.MaxInt64
		pid   int64
		info  reorgInfo
	)

	if job.SnapshotVer == 0 {
		info.first = true
		// get the current version for reorganization if we don't have
		var ver kv.Version
		ver, err = d.store.CurrentVersion()
		if err != nil {
			return nil, errors.Trace(err)
		} else if ver.Ver <= 0 {
			return nil, errInvalidStoreVer.GenWithStack("invalid storage current version %d", ver.Ver)
		}
		tblInfo := tbl.Meta()
		pid = tblInfo.ID
		var tb table.PhysicalTable
		if pi := tblInfo.GetPartitionInfo(); pi != nil {
			pid = pi.Definitions[0].ID
			tb = tbl.(table.PartitionedTable).GetPartition(pid)
		} else {
			tb = tbl.(table.PhysicalTable)
		}
		start, end, err = getTableRange(d, tb, ver.Ver, job.Priority)
		if err != nil {
			return nil, errors.Trace(err)
		}
		log.Infof("[ddl-reorg] job %v get partition %d range [%d %d]", job.ID, pid, start, end)

		// gofail: var errorUpdateReorgHandle bool
		// if errorUpdateReorgHandle && !gofailOnceGuard {
		//  // only return error once.
		//	gofailOnceGuard = true
		// 	return &info, errors.New("occur an error when update reorg handle.")
		// }
		err = t.UpdateDDLReorgHandle(job, start, end, pid)
		if err != nil {
			return &info, errors.Trace(err)
		}
		// Update info should after data persistent.
		job.SnapshotVer = ver.Ver
	} else {
		start, end, pid, err = t.GetDDLReorgHandle(job)
		if err != nil {
			return nil, errors.Trace(err)
		}
	}
	info.Job = job
	info.d = d
	info.StartHandle = start
	info.EndHandle = end
	info.PhysicalTableID = pid

	return &info, errors.Trace(err)
}

func (r *reorgInfo) UpdateReorgMeta(txn kv.Transaction, startHandle, endHandle, physicalTableID int64) error {
	t := meta.NewMeta(txn)
	return errors.Trace(t.UpdateDDLReorgHandle(r.Job, startHandle, endHandle, physicalTableID))
}