// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package distsql

import (
	"github.com/hanchuanchuan/goInception/kv"
	"github.com/hanchuanchuan/goInception/sessionctx"
	"github.com/hanchuanchuan/goInception/statistics"
	"github.com/hanchuanchuan/goInception/terror"
	"github.com/hanchuanchuan/goInception/types"
	"github.com/hanchuanchuan/goInception/util/chunk"
	"github.com/hanchuanchuan/goInception/util/codec"
	"github.com/pingcap/errors"
	"github.com/pingcap/tipb/go-tipb"
	"golang.org/x/net/context"
)

// streamResult implements the SelectResult interface.
type streamResult struct {
	resp       kv.Response
	rowLen     int
	fieldTypes []*types.FieldType
	ctx        sessionctx.Context

	// NOTE: curr == nil means stream finish, while len(curr.RowsData) == 0 doesn't.
	curr         *tipb.Chunk
	partialCount int64
	feedback     *statistics.QueryFeedback
}

func (r *streamResult) Fetch(context.Context) {}

func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error {
	chk.Reset()
	maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
	for chk.NumRows() < maxChunkSize {
		err := r.readDataIfNecessary(ctx)
		if err != nil {
			return errors.Trace(err)
		}
		if r.curr == nil {
			return nil
		}

		err = r.flushToChunk(chk)
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

// readDataFromResponse read the data to result. Returns true means the resp is finished.
func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Response, result *tipb.Chunk) (bool, error) {
	resultSubset, err := resp.Next(ctx)
	if err != nil {
		return false, errors.Trace(err)
	}
	if resultSubset == nil {
		return true, nil
	}

	var stream tipb.StreamResponse
	err = stream.Unmarshal(resultSubset.GetData())
	if err != nil {
		return false, errors.Trace(err)
	}
	if stream.Error != nil {
		return false, errors.Errorf("stream response error: [%d]%s\n", stream.Error.Code, stream.Error.Msg)
	}
	for _, warning := range stream.Warnings {
		r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
	}

	err = result.Unmarshal(stream.Data)
	if err != nil {
		return false, errors.Trace(err)
	}
	r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts)
	r.partialCount++
	return false, nil
}

// readDataIfNecessary ensures there are some data in current chunk. If no more data, r.curr == nil.
func (r *streamResult) readDataIfNecessary(ctx context.Context) error {
	if r.curr != nil && len(r.curr.RowsData) > 0 {
		return nil
	}

	tmp := new(tipb.Chunk)
	finish, err := r.readDataFromResponse(ctx, r.resp, tmp)
	if err != nil {
		return errors.Trace(err)
	}
	if finish {
		r.curr = nil
		return nil
	}
	r.curr = tmp
	return nil
}

func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) {
	remainRowsData := r.curr.RowsData
	maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
	decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
	for chk.NumRows() < maxChunkSize && len(remainRowsData) > 0 {
		for i := 0; i < r.rowLen; i++ {
			remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i])
			if err != nil {
				return errors.Trace(err)
			}
		}
	}
	if len(remainRowsData) == 0 {
		r.curr = nil // Current chunk is finished.
	} else {
		r.curr.RowsData = remainRowsData
	}
	return nil
}

func (r *streamResult) NextRaw(ctx context.Context) ([]byte, error) {
	r.partialCount++
	r.feedback.Invalidate()
	resultSubset, err := r.resp.Next(ctx)
	if resultSubset == nil || err != nil {
		return nil, errors.Trace(err)
	}
	return resultSubset.GetData(), errors.Trace(err)
}

func (r *streamResult) Close() error {
	return nil
}