// Copyright 2018 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package distsql import ( "math" "github.com/hanchuanchuan/goInception/kv" "github.com/hanchuanchuan/goInception/mysql" "github.com/hanchuanchuan/goInception/sessionctx/stmtctx" "github.com/hanchuanchuan/goInception/sessionctx/variable" "github.com/hanchuanchuan/goInception/statistics" "github.com/hanchuanchuan/goInception/tablecodec" "github.com/hanchuanchuan/goInception/types" "github.com/hanchuanchuan/goInception/util/codec" "github.com/hanchuanchuan/goInception/util/ranger" "github.com/pingcap/errors" "github.com/pingcap/tipb/go-tipb" ) // RequestBuilder is used to build a "kv.Request". // It is called before we issue a kv request by "Select". type RequestBuilder struct { kv.Request err error } // Build builds a "kv.Request". func (builder *RequestBuilder) Build() (*kv.Request, error) { return &builder.Request, errors.Trace(builder.err) } // SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges" // to "KeyRanges" firstly. func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { if builder.err != nil { return builder } builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb) return builder } // SetIndexRanges sets "KeyRanges" for "kv.Request" by converting index range // "ranges" to "KeyRanges" firstly. func (builder *RequestBuilder) SetIndexRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range) *RequestBuilder { if builder.err != nil { return builder } builder.Request.KeyRanges, builder.err = IndexRangesToKVRanges(sc, tid, idxID, ranges, nil) return builder } // SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles // "handles" to "KeyRanges" firstly. func (builder *RequestBuilder) SetTableHandles(tid int64, handles []int64) *RequestBuilder { builder.Request.KeyRanges = TableHandlesToKVRanges(tid, handles) return builder } // SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data. func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuilder { if builder.err != nil { return builder } builder.Request.Tp = kv.ReqTypeDAG builder.Request.StartTs = dag.StartTs builder.Request.Data, builder.err = dag.Marshal() return builder } // SetAnalyzeRequest sets the request type to "ReqTypeAnalyze" and cosntruct request data. func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestBuilder { if builder.err != nil { return builder } builder.Request.Tp = kv.ReqTypeAnalyze builder.Request.StartTs = ana.StartTs builder.Request.Data, builder.err = ana.Marshal() builder.Request.NotFillCache = true builder.Request.IsolationLevel = kv.RC builder.Request.Priority = kv.PriorityLow return builder } // SetChecksumRequest sets the request type to "ReqTypeChecksum" and construct request data. func (builder *RequestBuilder) SetChecksumRequest(checksum *tipb.ChecksumRequest) *RequestBuilder { if builder.err != nil { return builder } builder.Request.Tp = kv.ReqTypeChecksum builder.Request.StartTs = checksum.StartTs builder.Request.Data, builder.err = checksum.Marshal() builder.Request.NotFillCache = true return builder } // SetKeyRanges sets "KeyRanges" for "kv.Request". func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBuilder { builder.Request.KeyRanges = keyRanges return builder } // SetDesc sets "Desc" for "kv.Request". func (builder *RequestBuilder) SetDesc(desc bool) *RequestBuilder { builder.Request.Desc = desc return builder } // SetKeepOrder sets "KeepOrder" for "kv.Request". func (builder *RequestBuilder) SetKeepOrder(order bool) *RequestBuilder { builder.Request.KeepOrder = order return builder } func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel { switch builder.Tp { case kv.ReqTypeAnalyze: return kv.RC } return kv.SI } func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int { switch sv.StmtCtx.Priority { case mysql.NoPriority, mysql.DelayedPriority: return kv.PriorityNormal case mysql.LowPriority: return kv.PriorityLow case mysql.HighPriority: return kv.PriorityHigh } return kv.PriorityNormal } // SetFromSessionVars sets the following fields for "kv.Request" from session variables: // "Concurrency", "IsolationLevel", "NotFillCache". func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder { builder.Request.Concurrency = sv.DistSQLScanConcurrency builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) return builder } // SetStreaming sets "Streaming" flag for "kv.Request". func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder { builder.Request.Streaming = streaming return builder } // SetConcurrency sets "Concurrency" for "kv.Request". func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder { builder.Request.Concurrency = concurrency return builder } // TableRangesToKVRanges converts table ranges to "KeyRange". func TableRangesToKVRanges(tid int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange { if fb == nil || fb.Hist() == nil { return tableRangesToKVRangesWithoutSplit(tid, ranges) } ranges = fb.Hist().SplitRange(ranges) krs := make([]kv.KeyRange, 0, len(ranges)) feedbackRanges := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { low := codec.EncodeInt(nil, ran.LowVal[0].GetInt64()) high := codec.EncodeInt(nil, ran.HighVal[0].GetInt64()) if ran.LowExclude { low = []byte(kv.Key(low).PrefixNext()) } // If this range is split by histogram, then the high val will equal to one bucket's upper bound, // since we need to guarantee each range falls inside the exactly one bucket, `PerfixNext` will make the // high value greater than upper bound, so we store the range here. r := &ranger.Range{LowVal: []types.Datum{types.NewBytesDatum(low)}, HighVal: []types.Datum{types.NewBytesDatum(high)}} feedbackRanges = append(feedbackRanges, r) if !ran.HighExclude { high = []byte(kv.Key(high).PrefixNext()) } startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) } fb.StoreRanges(feedbackRanges) return krs } func tableRangesToKVRangesWithoutSplit(tid int64, ranges []*ranger.Range) []kv.KeyRange { krs := make([]kv.KeyRange, 0, len(ranges)) for _, ran := range ranges { low, high := encodeHandleKey(ran) startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) } return krs } func encodeHandleKey(ran *ranger.Range) ([]byte, []byte) { low := codec.EncodeInt(nil, ran.LowVal[0].GetInt64()) high := codec.EncodeInt(nil, ran.HighVal[0].GetInt64()) if ran.LowExclude { low = []byte(kv.Key(low).PrefixNext()) } if !ran.HighExclude { high = []byte(kv.Key(high).PrefixNext()) } return low, high } // TableHandlesToKVRanges converts sorted handle to kv ranges. // For continuous handles, we should merge them to a single key range. func TableHandlesToKVRanges(tid int64, handles []int64) []kv.KeyRange { krs := make([]kv.KeyRange, 0, len(handles)) i := 0 for i < len(handles) { j := i + 1 for ; j < len(handles) && handles[j-1] != math.MaxInt64; j++ { if handles[j] != handles[j-1]+1 { break } } low := codec.EncodeInt(nil, handles[i]) high := codec.EncodeInt(nil, handles[j-1]) high = []byte(kv.Key(high).PrefixNext()) startKey := tablecodec.EncodeRowKey(tid, low) endKey := tablecodec.EncodeRowKey(tid, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) i = j } return krs } // IndexRangesToKVRanges converts index ranges to "KeyRange". func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { if fb == nil || fb.Hist() == nil { return indexRangesToKVWithoutSplit(sc, tid, idxID, ranges) } feedbackRanges := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { low, high, err := encodeIndexKey(sc, ran) if err != nil { return nil, errors.Trace(err) } feedbackRanges = append(feedbackRanges, &ranger.Range{LowVal: []types.Datum{types.NewBytesDatum(low)}, HighVal: []types.Datum{types.NewBytesDatum(high)}, LowExclude: false, HighExclude: true}) } feedbackRanges = fb.Hist().SplitRange(feedbackRanges) krs := make([]kv.KeyRange, 0, len(feedbackRanges)) for _, ran := range feedbackRanges { low, high := ran.LowVal[0].GetBytes(), ran.HighVal[0].GetBytes() if ran.LowExclude { low = kv.Key(low).PrefixNext() } ran.LowVal[0].SetBytes(low) // If this range is split by histogram, then the high val will equal to one bucket's upper bound, // since we need to guarantee each range falls inside the exactly one bucket, `PerfixNext` will make the // high value greater than upper bound, so we store the high value here. ran.HighVal[0].SetBytes(high) if !ran.HighExclude { high = kv.Key(high).PrefixNext() } startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low) endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) } fb.StoreRanges(feedbackRanges) return krs, nil } func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range) ([]kv.KeyRange, error) { krs := make([]kv.KeyRange, 0, len(ranges)) for _, ran := range ranges { low, high, err := encodeIndexKey(sc, ran) if err != nil { return nil, errors.Trace(err) } startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low) endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high) krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) } return krs, nil } func encodeIndexKey(sc *stmtctx.StatementContext, ran *ranger.Range) ([]byte, []byte, error) { low, err := codec.EncodeKey(sc, nil, ran.LowVal...) if err != nil { return nil, nil, errors.Trace(err) } if ran.LowExclude { low = []byte(kv.Key(low).PrefixNext()) } high, err := codec.EncodeKey(sc, nil, ran.HighVal...) if err != nil { return nil, nil, errors.Trace(err) } if !ran.HighExclude { high = []byte(kv.Key(high).PrefixNext()) } var hasNull bool for _, highVal := range ran.HighVal { if highVal.IsNull() { hasNull = true break } } if hasNull { // Append 0 to make unique-key range [null, null] to be a scan rather than point-get. high = []byte(kv.Key(high).Next()) } return low, high, nil }