Слияние кода завершено, страница обновится автоматически
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"math"
"github.com/hanchuanchuan/goInception/ast"
"github.com/hanchuanchuan/goInception/distsql"
"github.com/hanchuanchuan/goInception/infoschema"
"github.com/hanchuanchuan/goInception/kv"
"github.com/hanchuanchuan/goInception/model"
plannercore "github.com/hanchuanchuan/goInception/planner/core"
"github.com/hanchuanchuan/goInception/statistics"
"github.com/hanchuanchuan/goInception/table"
"github.com/hanchuanchuan/goInception/table/tables"
"github.com/hanchuanchuan/goInception/tablecodec"
"github.com/hanchuanchuan/goInception/terror"
"github.com/hanchuanchuan/goInception/types"
"github.com/hanchuanchuan/goInception/util/chunk"
"github.com/hanchuanchuan/goInception/util/ranger"
"github.com/hanchuanchuan/goInception/util/timeutil"
"github.com/pingcap/errors"
"github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
var (
_ Executor = &CheckIndexRangeExec{}
_ Executor = &RecoverIndexExec{}
_ Executor = &CleanupIndexExec{}
)
// CheckIndexRangeExec outputs the index values which has handle between begin and end.
type CheckIndexRangeExec struct {
baseExecutor
table *model.TableInfo
index *model.IndexInfo
is infoschema.InfoSchema
startKey []types.Datum
handleRanges []ast.HandleRange
srcChunk *chunk.Chunk
result distsql.SelectResult
cols []*model.ColumnInfo
}
// Next implements the Executor Next interface.
func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
handleIdx := e.schema.Len() - 1
for {
err := e.result.Next(ctx, e.srcChunk)
if err != nil {
return errors.Trace(err)
}
if e.srcChunk.NumRows() == 0 {
return nil
}
iter := chunk.NewIterator4Chunk(e.srcChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle := row.GetInt64(handleIdx)
for _, hr := range e.handleRanges {
if handle >= hr.Begin && handle < hr.End {
chk.AppendRow(row)
break
}
}
}
if chk.NumRows() > 0 {
return nil
}
}
}
// Open implements the Executor Open interface.
func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
tCols := e.table.Cols()
for _, ic := range e.index.Columns {
col := tCols[ic.Offset]
e.cols = append(e.cols, col)
}
colTypeForHandle := e.schema.Columns[len(e.cols)].RetType
e.cols = append(e.cols, &model.ColumnInfo{
ID: model.ExtraHandleID,
Name: model.ExtraHandleName,
FieldType: *colTypeForHandle,
})
e.srcChunk = e.newFirstChunk()
dagPB, err := e.buildDAGPB()
if err != nil {
return errors.Trace(err)
}
sc := e.ctx.GetSessionVars().StmtCtx
var builder distsql.RequestBuilder
kvReq, err := builder.SetIndexRanges(sc, e.table.ID, e.index.ID, ranger.FullRange()).
SetDAGRequest(dagPB).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retFieldTypes, statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(ctx)
return nil
}
func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = e.ctx.Txn().StartTS()
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.schema.Columns {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)
err := plannercore.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.cols)
if err != nil {
return nil, errors.Trace(err)
}
return dagReq, nil
}
func (e *CheckIndexRangeExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.table.ID,
IndexId: e.index.ID,
Columns: model.ColumnsToProto(e.cols, e.table.PKIsHandle),
}
return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}
}
// Close implements the Executor Close interface.
func (e *CheckIndexRangeExec) Close() error {
return nil
}
// RecoverIndexExec represents a recover index executor.
// It is built from "admin recover index" statement, is used to backfill
// corrupted index.
type RecoverIndexExec struct {
baseExecutor
done bool
index table.Index
table table.Table
batchSize int
columns []*model.ColumnInfo
colFieldTypes []*types.FieldType
srcChunk *chunk.Chunk
// below buf is used to reduce allocations.
recoverRows []recoverRows
idxValsBufs [][]types.Datum
idxKeyBufs [][]byte
batchKeys []kv.Key
}
func (e *RecoverIndexExec) columnsTypes() []*types.FieldType {
if e.colFieldTypes != nil {
return e.colFieldTypes
}
e.colFieldTypes = make([]*types.FieldType, 0, len(e.columns))
for _, col := range e.columns {
e.colFieldTypes = append(e.colFieldTypes, &col.FieldType)
}
return e.colFieldTypes
}
// Open implements the Executor Open interface.
func (e *RecoverIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.srcChunk = chunk.New(e.columnsTypes(), e.initCap, e.maxChunkSize)
e.batchSize = 2048
e.recoverRows = make([]recoverRows, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
e.idxKeyBufs = make([][]byte, e.batchSize)
return nil
}
func (e *RecoverIndexExec) constructTableScanPB(pbColumnInfos []*tipb.ColumnInfo) *tipb.Executor {
tblScan := &tipb.TableScan{
TableId: e.table.Meta().ID,
Columns: pbColumnInfos,
}
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}
}
func (e *RecoverIndexExec) constructLimitPB(count uint64) *tipb.Executor {
limitExec := &tipb.Limit{
Limit: count,
}
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}
func (e *RecoverIndexExec) buildDAGPB(txn kv.Transaction, limitCnt uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.columns {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
tblInfo := e.table.Meta()
pbColumnInfos := model.ColumnsToProto(e.columns, tblInfo.PKIsHandle)
err := plannercore.SetPBColumnsDefaultValue(e.ctx, pbColumnInfos, e.columns)
if err != nil {
return nil, errors.Trace(err)
}
tblScanExec := e.constructTableScanPB(pbColumnInfos)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
limitExec := e.constructLimitPB(limitCnt)
dagReq.Executors = append(dagReq.Executors, limitExec)
return dagReq, nil
}
func (e *RecoverIndexExec) buildTableScan(ctx context.Context, txn kv.Transaction, t table.Table, startHandle int64, limitCnt uint64) (distsql.SelectResult, error) {
dagPB, err := e.buildDAGPB(txn, limitCnt)
if err != nil {
return nil, errors.Trace(err)
}
tblInfo := e.table.Meta()
ranges := []*ranger.Range{{LowVal: []types.Datum{types.NewIntDatum(startHandle)}, HighVal: []types.Datum{types.NewIntDatum(math.MaxInt64)}}}
var builder distsql.RequestBuilder
kvReq, err := builder.SetTableRanges(tblInfo.ID, ranges, nil).
SetDAGRequest(dagPB).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
// Actually, with limitCnt, the match datas maybe only in one region, so let the concurrency to be 1,
// avoid unnecessary region scan.
kvReq.Concurrency = 1
result, err := distsql.Select(ctx, e.ctx, kvReq, e.columnsTypes(), statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return nil, errors.Trace(err)
}
result.Fetch(ctx)
return result, nil
}
type backfillResult struct {
nextHandle int64
addedCount int64
scanRowCount int64
}
func (e *RecoverIndexExec) backfillIndex(ctx context.Context) (int64, int64, error) {
var (
nextHandle = int64(math.MinInt64)
totalAddedCnt = int64(0)
totalScanCnt = int64(0)
lastLogCnt = int64(0)
result backfillResult
)
for {
errInTxn := kv.RunInNewTxn(e.ctx.GetStore(), true, func(txn kv.Transaction) error {
var err error
result, err = e.backfillIndexInTxn(ctx, txn, nextHandle)
return errors.Trace(err)
})
if errInTxn != nil {
return totalAddedCnt, totalScanCnt, errors.Trace(errInTxn)
}
totalAddedCnt += result.addedCount
totalScanCnt += result.scanRowCount
if totalScanCnt-lastLogCnt >= 50000 {
lastLogCnt = totalScanCnt
log.Infof("[recover-index] recover table:%v, index:%v, totalAddedCnt:%v, totalScanCnt:%v, nextHandle: %v",
e.table.Meta().Name.O, e.index.Meta().Name.O, totalAddedCnt, totalScanCnt, result.nextHandle)
}
// no more rows
if result.scanRowCount == 0 {
break
}
nextHandle = result.nextHandle
}
return totalAddedCnt, totalScanCnt, nil
}
type recoverRows struct {
handle int64
idxVals []types.Datum
skip bool
}
func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult distsql.SelectResult, result *backfillResult) ([]recoverRows, error) {
e.recoverRows = e.recoverRows[:0]
handleIdx := len(e.columns) - 1
result.scanRowCount = 0
for {
err := srcResult.Next(ctx, e.srcChunk)
if err != nil {
return nil, errors.Trace(err)
}
if e.srcChunk.NumRows() == 0 {
break
}
iter := chunk.NewIterator4Chunk(e.srcChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
if result.scanRowCount >= int64(e.batchSize) {
return e.recoverRows, nil
}
handle := row.GetInt64(handleIdx)
idxVals := extractIdxVals(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes)
e.idxValsBufs[result.scanRowCount] = idxVals
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, skip: false})
result.scanRowCount++
result.nextHandle = handle + 1
}
}
return e.recoverRows, nil
}
func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows) error {
if len(rows) == 0 {
return nil
}
e.batchKeys = e.batchKeys[:0]
sc := e.ctx.GetSessionVars().StmtCtx
distinctFlags := make([]bool, len(rows))
for i, row := range rows {
idxKey, distinct, err := e.index.GenIndexKey(sc, row.idxVals, row.handle, e.idxKeyBufs[i])
if err != nil {
return errors.Trace(err)
}
e.idxKeyBufs[i] = idxKey
e.batchKeys = append(e.batchKeys, idxKey)
distinctFlags[i] = distinct
}
values, err := kv.BatchGetValues(txn, e.batchKeys)
if err != nil {
return errors.Trace(err)
}
// 1. unique-key is duplicate and the handle is equal, skip it.
// 2. unique-key is duplicate and the handle is not equal, data is not consistent, log it and skip it.
// 3. non-unique-key is duplicate, skip it.
for i, key := range e.batchKeys {
if val, found := values[string(key)]; found {
if distinctFlags[i] {
handle, err1 := tables.DecodeHandle(val)
if err1 != nil {
return errors.Trace(err1)
}
if handle != rows[i].handle {
log.Warnf("[recover-index] The constraint of unique index:%v is broken, handle:%v is not equal handle:%v with idxKey:%v.",
e.index.Meta().Name.O, handle, rows[i].handle, key)
}
}
rows[i].skip = true
}
}
return nil
}
func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transaction, startHandle int64) (result backfillResult, err error) {
result.nextHandle = startHandle
srcResult, err := e.buildTableScan(ctx, txn, e.table, startHandle, uint64(e.batchSize))
if err != nil {
return result, errors.Trace(err)
}
defer terror.Call(srcResult.Close)
rows, err := e.fetchRecoverRows(ctx, srcResult, &result)
if err != nil {
return result, errors.Trace(err)
}
err = e.batchMarkDup(txn, rows)
if err != nil {
return result, errors.Trace(err)
}
// Constrains is already checked.
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
for _, row := range rows {
if row.skip {
continue
}
recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(recordKey)
if err != nil {
return result, errors.Trace(err)
}
_, err = e.index.Create(e.ctx, txn, row.idxVals, row.handle)
if err != nil {
return result, errors.Trace(err)
}
result.addedCount++
}
return result, nil
}
// Next implements the Executor Next interface.
func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
totalAddedCnt, totalScanCnt, err := e.backfillIndex(ctx)
if err != nil {
return errors.Trace(err)
}
chk.AppendInt64(0, totalAddedCnt)
chk.AppendInt64(1, totalScanCnt)
e.done = true
return nil
}
// CleanupIndexExec represents a cleanup index executor.
// It is built from "admin cleanup index" statement, is used to delete
// dangling index data.
type CleanupIndexExec struct {
baseExecutor
done bool
removeCnt uint64
index table.Index
table table.Table
idxCols []*model.ColumnInfo
idxColFieldTypes []*types.FieldType
idxChunk *chunk.Chunk
idxValues map[int64][][]types.Datum
batchSize uint64
batchKeys []kv.Key
idxValsBufs [][]types.Datum
lastIdxKey []byte
scanRowCnt uint64
}
func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType {
if e.idxColFieldTypes != nil {
return e.idxColFieldTypes
}
e.idxColFieldTypes = make([]*types.FieldType, 0, len(e.idxCols))
for _, col := range e.idxCols {
e.idxColFieldTypes = append(e.idxColFieldTypes, &col.FieldType)
}
return e.idxColFieldTypes
}
func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte, error) {
for handle := range e.idxValues {
e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle))
}
values, err := kv.BatchGetValues(txn, e.batchKeys)
if err != nil {
return nil, errors.Trace(err)
}
return values, nil
}
func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[string][]byte) error {
for _, k := range e.batchKeys {
if _, found := values[string(k)]; !found {
_, handle, err := tablecodec.DecodeRecordKey(k)
if err != nil {
return errors.Trace(err)
}
for _, idxVals := range e.idxValues[handle] {
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, idxVals, handle); err != nil {
return errors.Trace(err)
}
e.removeCnt++
if e.removeCnt%e.batchSize == 0 {
log.Infof("[cleaning up dangling index] table: %v, index: %v, count: %v.",
e.table.Meta().Name.String(), e.index.Meta().Name.String(), e.removeCnt)
}
}
}
}
return nil
}
func extractIdxVals(row chunk.Row, idxVals []types.Datum,
fieldTypes []*types.FieldType) []types.Datum {
if idxVals == nil {
idxVals = make([]types.Datum, 0, row.Len()-1)
} else {
idxVals = idxVals[:0]
}
for i := 0; i < row.Len()-1; i++ {
colVal := row.GetDatum(i, fieldTypes[i])
idxVals = append(idxVals, *colVal.Copy())
}
return idxVals
}
func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) error {
result, err := e.buildIndexScan(ctx, txn)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(result.Close)
sc := e.ctx.GetSessionVars().StmtCtx
for {
err := result.Next(ctx, e.idxChunk)
if err != nil {
return errors.Trace(err)
}
if e.idxChunk.NumRows() == 0 {
return nil
}
iter := chunk.NewIterator4Chunk(e.idxChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle := row.GetInt64(len(e.idxCols) - 1)
idxVals := extractIdxVals(row, e.idxValsBufs[e.scanRowCnt], e.idxColFieldTypes)
e.idxValsBufs[e.scanRowCnt] = idxVals
e.idxValues[handle] = append(e.idxValues[handle], idxVals)
idxKey, _, err := e.index.GenIndexKey(sc, idxVals, handle, nil)
if err != nil {
return errors.Trace(err)
}
e.scanRowCnt++
e.lastIdxKey = idxKey
if e.scanRowCnt >= e.batchSize {
return nil
}
}
}
}
// Next implements the Executor Next interface.
func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
for {
errInTxn := kv.RunInNewTxn(e.ctx.GetStore(), true, func(txn kv.Transaction) error {
err := e.fetchIndex(ctx, txn)
if err != nil {
return errors.Trace(err)
}
values, err := e.batchGetRecord(txn)
if err != nil {
return errors.Trace(err)
}
err = e.deleteDanglingIdx(txn, values)
if err != nil {
return errors.Trace(err)
}
return nil
})
if errInTxn != nil {
return errors.Trace(errInTxn)
}
if e.scanRowCnt == 0 {
break
}
e.scanRowCnt = 0
e.batchKeys = e.batchKeys[:0]
for k := range e.idxValues {
delete(e.idxValues, k)
}
}
e.done = true
chk.AppendUint64(0, e.removeCnt)
return nil
}
func (e *CleanupIndexExec) buildIndexScan(ctx context.Context, txn kv.Transaction) (distsql.SelectResult, error) {
dagPB, err := e.buildIdxDAGPB(txn)
if err != nil {
return nil, errors.Trace(err)
}
sc := e.ctx.GetSessionVars().StmtCtx
var builder distsql.RequestBuilder
ranges := ranger.FullRange()
kvReq, err := builder.SetIndexRanges(sc, e.table.Meta().ID, e.index.Meta().ID, ranges).
SetDAGRequest(dagPB).
SetKeepOrder(true).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
kvReq.KeyRanges[0].StartKey = kv.Key(e.lastIdxKey).PrefixNext()
kvReq.Concurrency = 1
result, err := distsql.Select(ctx, e.ctx, kvReq, e.getIdxColTypes(), statistics.NewQueryFeedback(0, nil, 0, false))
if err != nil {
return nil, errors.Trace(err)
}
result.Fetch(ctx)
return result, nil
}
// Open implements the Executor Open interface.
func (e *CleanupIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.idxChunk = chunk.New(e.getIdxColTypes(), e.initCap, e.maxChunkSize)
e.idxValues = make(map[int64][][]types.Datum, e.batchSize)
e.batchKeys = make([]kv.Key, 0, e.batchSize)
e.idxValsBufs = make([][]types.Datum, e.batchSize)
sc := e.ctx.GetSessionVars().StmtCtx
idxKey, _, err := e.index.GenIndexKey(sc, []types.Datum{{}}, math.MinInt64, nil)
if err != nil {
return errors.Trace(err)
}
e.lastIdxKey = idxKey
return nil
}
func (e *CleanupIndexExec) buildIdxDAGPB(txn kv.Transaction) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = txn.StartTS()
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(e.ctx.GetSessionVars().Location())
sc := e.ctx.GetSessionVars().StmtCtx
dagReq.Flags = statementContextToFlags(sc)
for i := range e.idxCols {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
execPB := e.constructIndexScanPB()
dagReq.Executors = append(dagReq.Executors, execPB)
err := plannercore.SetPBColumnsDefaultValue(e.ctx, dagReq.Executors[0].IdxScan.Columns, e.idxCols)
if err != nil {
return nil, errors.Trace(err)
}
limitExec := e.constructLimitPB()
dagReq.Executors = append(dagReq.Executors, limitExec)
return dagReq, nil
}
func (e *CleanupIndexExec) constructIndexScanPB() *tipb.Executor {
idxExec := &tipb.IndexScan{
TableId: e.table.Meta().ID,
IndexId: e.index.Meta().ID,
Columns: model.ColumnsToProto(e.idxCols, e.table.Meta().PKIsHandle),
}
return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}
}
func (e *CleanupIndexExec) constructLimitPB() *tipb.Executor {
limitExec := &tipb.Limit{
Limit: e.batchSize,
}
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}
// Close implements the Executor Close interface.
func (e *CleanupIndexExec) Close() error {
return nil
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарий ( 0 )