// Copyright 2018 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "github.com/hanchuanchuan/goInception/expression" "github.com/hanchuanchuan/goInception/kv" "github.com/hanchuanchuan/goInception/model" "github.com/hanchuanchuan/goInception/mysql" plannercore "github.com/hanchuanchuan/goInception/planner/core" "github.com/hanchuanchuan/goInception/sessionctx" "github.com/hanchuanchuan/goInception/table" "github.com/hanchuanchuan/goInception/table/tables" "github.com/hanchuanchuan/goInception/tablecodec" "github.com/hanchuanchuan/goInception/types" "github.com/hanchuanchuan/goInception/util/chunk" "github.com/hanchuanchuan/goInception/util/codec" "github.com/pingcap/errors" "golang.org/x/net/context" ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { return &PointGetExecutor{ ctx: b.ctx, schema: p.Schema(), tblInfo: p.TblInfo, idxInfo: p.IndexInfo, idxVals: p.IndexValues, handle: p.Handle, startTS: b.getStartTS(), } } // PointGetExecutor executes point select query. type PointGetExecutor struct { ctx sessionctx.Context schema *expression.Schema tps []*types.FieldType tblInfo *model.TableInfo handle int64 idxInfo *model.IndexInfo idxVals []types.Datum startTS uint64 snapshot kv.Snapshot done bool } // Open implements the Executor interface. func (e *PointGetExecutor) Open(context.Context) error { return nil } // Close implements the Executor interface. func (e *PointGetExecutor) Close() error { return nil } // Next implements the Executor interface. func (e *PointGetExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() if e.done { return nil } e.done = true var err error e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.startTS}) if err != nil { return errors.Trace(err) } if e.idxInfo != nil { idxKey, err1 := e.encodeIndexKey() if err1 != nil { return errors.Trace(err1) } handleVal, err1 := e.get(idxKey) if err1 != nil && !kv.ErrNotExist.Equal(err1) { return errors.Trace(err1) } if len(handleVal) == 0 { return nil } e.handle, err1 = tables.DecodeHandle(handleVal) if err1 != nil { return errors.Trace(err1) } } key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle) val, err := e.get(key) if err != nil && !kv.ErrNotExist.Equal(err) { return errors.Trace(err) } if len(val) == 0 { if e.idxInfo != nil { return kv.ErrNotExist.GenWithStack("inconsistent extra index %s, handle %d not found in table", e.idxInfo.Name.O, e.handle) } return nil } return e.decodeRowValToChunk(val, chk) } func (e *PointGetExecutor) encodeIndexKey() ([]byte, error) { for i := range e.idxVals { colInfo := e.tblInfo.Columns[e.idxInfo.Columns[i].Offset] casted, err := table.CastValue(e.ctx, e.idxVals[i], colInfo) if err != nil { return nil, errors.Trace(err) } e.idxVals[i] = casted } encodedIdxVals, err := codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, nil, e.idxVals...) if err != nil { return nil, errors.Trace(err) } return tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals), nil } func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) { txn := e.ctx.Txn() if txn != nil && txn.Valid() && !txn.IsReadOnly() { return txn.Get(key) } return e.snapshot.Get(key) } func (e *PointGetExecutor) decodeRowValToChunk(rowVal []byte, chk *chunk.Chunk) error { colIDs := make(map[int64]int, e.schema.Len()) for i, col := range e.schema.Columns { colIDs[col.ID] = i } colVals, err := tablecodec.CutRowNew(rowVal, colIDs) if err != nil { return errors.Trace(err) } if colVals == nil { colVals = make([][]byte, len(colIDs)) } decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location()) for id, offset := range colIDs { if e.tblInfo.PKIsHandle && mysql.HasPriKeyFlag(e.schema.Columns[offset].RetType.Flag) { chk.AppendInt64(offset, e.handle) continue } if id == model.ExtraHandleID { chk.AppendInt64(offset, e.handle) continue } if len(colVals[offset]) == 0 { colInfo := getColInfoByID(e.tblInfo, id) d, err1 := table.GetColOriginDefaultValue(e.ctx, colInfo) if err1 != nil { return errors.Trace(err1) } chk.AppendDatum(offset, &d) continue } _, err = decoder.DecodeOne(colVals[offset], offset, e.schema.Columns[offset].RetType) if err != nil { return errors.Trace(err) } } return nil } func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo { for _, col := range tbl.Columns { if col.ID == colID { return col } } return nil } // Schema implements the Executor interface. func (e *PointGetExecutor) Schema() *expression.Schema { return e.schema } func (e *PointGetExecutor) retTypes() []*types.FieldType { if e.tps == nil { e.tps = make([]*types.FieldType, e.schema.Len()) for i := range e.schema.Columns { e.tps[i] = e.schema.Columns[i].RetType } } return e.tps } func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk { return chunk.New(e.retTypes(), 1, 1) }