// Copyright 2018 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package distsql import ( "github.com/hanchuanchuan/goInception/kv" "github.com/hanchuanchuan/goInception/sessionctx" "github.com/hanchuanchuan/goInception/statistics" "github.com/hanchuanchuan/goInception/terror" "github.com/hanchuanchuan/goInception/types" "github.com/hanchuanchuan/goInception/util/chunk" "github.com/hanchuanchuan/goInception/util/codec" "github.com/pingcap/errors" "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" ) var ( _ SelectResult = (*selectResult)(nil) _ SelectResult = (*streamResult)(nil) ) // SelectResult is an iterator of coprocessor partial results. type SelectResult interface { // Fetch fetches partial results from client. Fetch(context.Context) // NextRaw gets the next raw result. NextRaw(context.Context) ([]byte, error) // Next reads the data into chunk. Next(context.Context, *chunk.Chunk) error // Close closes the iterator. Close() error } type resultWithErr struct { result kv.ResultSubset err error } type selectResult struct { label string resp kv.Response results chan resultWithErr closed chan struct{} rowLen int fieldTypes []*types.FieldType ctx sessionctx.Context selectResp *tipb.SelectResponse respChkIdx int feedback *statistics.QueryFeedback partialCount int64 // number of partial results. sqlType string } func (r *selectResult) Fetch(ctx context.Context) { go r.fetch(ctx) } func (r *selectResult) fetch(ctx context.Context) { defer func() { close(r.results) }() for { resultSubset, err := r.resp.Next(ctx) if err != nil { r.results <- resultWithErr{err: errors.Trace(err)} return } if resultSubset == nil { return } select { case r.results <- resultWithErr{result: resultSubset}: case <-r.closed: // If selectResult called Close() already, make fetch goroutine exit. return case <-ctx.Done(): return } } } // NextRaw returns the next raw partial result. func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) { re := <-r.results r.partialCount++ r.feedback.Invalidate() if re.result == nil || re.err != nil { return nil, errors.Trace(re.err) } return re.result.GetData(), nil } // Next reads data to the chunk. func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize { if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) { err := r.getSelectResp() if err != nil || r.selectResp == nil { return errors.Trace(err) } } err := r.readRowsData(chk) if err != nil { return errors.Trace(err) } if len(r.selectResp.Chunks[r.respChkIdx].RowsData) == 0 { r.respChkIdx++ } } return nil } func (r *selectResult) getSelectResp() error { r.respChkIdx = 0 for { re := <-r.results if re.err != nil { return errors.Trace(re.err) } if re.result == nil { r.selectResp = nil return nil } r.selectResp = new(tipb.SelectResponse) err := r.selectResp.Unmarshal(re.result.GetData()) if err != nil { return errors.Trace(err) } if err := r.selectResp.Error; err != nil { return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg) } sc := r.ctx.GetSessionVars().StmtCtx for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) } r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ sc.MergeExecDetails(re.result.GetExecDetails()) if len(r.selectResp.Chunks) == 0 { continue } return nil } } func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) { rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location()) for chk.NumRows() < maxChunkSize && len(rowsData) > 0 { for i := 0; i < r.rowLen; i++ { rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i]) if err != nil { return errors.Trace(err) } } } r.selectResp.Chunks[r.respChkIdx].RowsData = rowsData return nil } // Close closes selectResult. func (r *selectResult) Close() error { // Close this channel tell fetch goroutine to exit. close(r.closed) return r.resp.Close() }