Слияние кода завершено, страница обновится автоматически
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"time"
"github.com/hanchuanchuan/goInception/distsql"
"github.com/hanchuanchuan/goInception/model"
plannercore "github.com/hanchuanchuan/goInception/planner/core"
"github.com/hanchuanchuan/goInception/statistics"
"github.com/hanchuanchuan/goInception/table"
"github.com/hanchuanchuan/goInception/util/chunk"
"github.com/hanchuanchuan/goInception/util/ranger"
"github.com/pingcap/errors"
tipb "github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
// make sure `TableReaderExecutor` implements `Executor`.
var _ Executor = &TableReaderExecutor{}
// TableReaderExecutor sends DAG request and reads table data from kv layer.
type TableReaderExecutor struct {
baseExecutor
table table.Table
physicalTableID int64
keepOrder bool
desc bool
ranges []*ranger.Range
dagPB *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo
// resultHandler handles the order of the result. Since (MAXInt64, MAXUint64] stores before [0, MaxInt64] physically
// for unsigned int.
resultHandler *tableResultHandler
streaming bool
feedback *statistics.QueryFeedback
// corColInFilter tells whether there's correlated column in filter.
corColInFilter bool
// corColInAccess tells whether there's correlated column in access conditions.
corColInAccess bool
plans []plannercore.PhysicalPlan
}
// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
if err != nil {
return errors.Trace(err)
}
}
if e.corColInAccess {
ts := e.plans[0].(*plannercore.PhysicalTableScan)
access := ts.AccessCondition
pkTP := ts.Table.GetPkColInfo().FieldType
e.ranges, err = ranger.BuildTableRange(access, e.ctx.GetSessionVars().StmtCtx, &pkTP)
if err != nil {
return errors.Trace(err)
}
}
e.resultHandler = &tableResultHandler{}
firstPartRanges, secondPartRanges := splitRanges(e.ranges, e.keepOrder)
firstResult, err := e.buildResp(ctx, firstPartRanges)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
}
if len(secondPartRanges) == 0 {
e.resultHandler.open(nil, firstResult)
return nil
}
var secondResult distsql.SelectResult
secondResult, err = e.buildResp(ctx, secondPartRanges)
if err != nil {
e.feedback.Invalidate()
return errors.Trace(err)
}
e.resultHandler.open(firstResult, secondResult)
return nil
}
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if err := e.resultHandler.nextChunk(ctx, chk); err != nil {
e.feedback.Invalidate()
return err
}
return errors.Trace(nil)
}
// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
e.ctx.StoreQueryFeedback(e.feedback)
err := e.resultHandler.Close()
return errors.Trace(err)
}
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
kvReq, err := builder.SetTableRanges(e.physicalTableID, ranges, e.feedback).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return nil, errors.Trace(err)
}
result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback)
if err != nil {
return nil, errors.Trace(err)
}
result.Fetch(ctx)
return result, nil
}
type tableResultHandler struct {
// If the pk is unsigned and we have KeepOrder=true.
// optionalResult handles the request whose range is in signed int range.
// result handles the request whose range is exceed signed int range.
// Otherwise, we just set optionalFinished true and the result handles the whole ranges.
optionalResult distsql.SelectResult
result distsql.SelectResult
optionalFinished bool
}
func (tr *tableResultHandler) open(optionalResult, result distsql.SelectResult) {
if optionalResult == nil {
tr.optionalFinished = true
tr.result = result
return
}
tr.optionalResult = optionalResult
tr.result = result
tr.optionalFinished = false
}
func (tr *tableResultHandler) nextChunk(ctx context.Context, chk *chunk.Chunk) error {
if !tr.optionalFinished {
err := tr.optionalResult.Next(ctx, chk)
if err != nil {
return errors.Trace(err)
}
if chk.NumRows() > 0 {
return nil
}
tr.optionalFinished = true
}
return tr.result.Next(ctx, chk)
}
func (tr *tableResultHandler) nextRaw(ctx context.Context) (data []byte, err error) {
if !tr.optionalFinished {
data, err = tr.optionalResult.NextRaw(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if data != nil {
return data, nil
}
tr.optionalFinished = true
}
data, err = tr.result.NextRaw(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return data, nil
}
func (tr *tableResultHandler) Close() error {
err := closeAll(tr.optionalResult, tr.result)
tr.optionalResult, tr.result = nil, nil
return errors.Trace(err)
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарий ( 0 )