1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/hanchuanchuan-goInception

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
partition.go 14 КБ
Копировать Редактировать Исходные данные Просмотреть построчно История
hanchuanchuan Отправлено 4 лет назад f5c39b3
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"bytes"
"fmt"
"strconv"
"strings"
"github.com/hanchuanchuan/goInception/ast"
"github.com/hanchuanchuan/goInception/expression"
"github.com/hanchuanchuan/goInception/meta"
"github.com/hanchuanchuan/goInception/model"
"github.com/hanchuanchuan/goInception/mysql"
"github.com/hanchuanchuan/goInception/parser/opcode"
"github.com/hanchuanchuan/goInception/sessionctx"
"github.com/hanchuanchuan/goInception/table"
"github.com/hanchuanchuan/goInception/types"
"github.com/hanchuanchuan/goInception/util/chunk"
"github.com/pingcap/errors"
)
const (
partitionMaxValue = "MAXVALUE"
primarykey = "PRIMARY KEY"
)
// buildTablePartitionInfo builds partition info and checks for some errors.
func buildTablePartitionInfo(ctx sessionctx.Context, d *ddl, s *ast.CreateTableStmt) (*model.PartitionInfo, error) {
if s.Partition == nil {
return nil, nil
}
pi := &model.PartitionInfo{
Type: s.Partition.Tp,
Enable: ctx.GetSessionVars().EnableTablePartition,
}
if s.Partition.Expr != nil {
buf := new(bytes.Buffer)
s.Partition.Expr.Format(buf)
pi.Expr = buf.String()
} else if s.Partition.ColumnNames != nil {
pi.Columns = make([]model.CIStr, 0, len(s.Partition.ColumnNames))
for _, cn := range s.Partition.ColumnNames {
pi.Columns = append(pi.Columns, cn.Name)
}
}
for _, def := range s.Partition.Definitions {
comment, _ := def.Comment()
// TODO: generate multiple global ID for paritions, reduce the times of obtaining the global ID from the storage.
pid, err := d.genGlobalID()
if err != nil {
return nil, errors.Trace(err)
}
piDef := model.PartitionDefinition{
Name: def.Name,
ID: pid,
Comment: comment,
}
if s.Partition.Tp == model.PartitionTypeRange {
buf := new(bytes.Buffer)
// Range columns partitions support multi-column partitions.
for _, expr := range def.Clause.(*ast.PartitionDefinitionClauseLessThan).Exprs {
expr.Format(buf)
piDef.LessThan = append(piDef.LessThan, buf.String())
buf.Reset()
}
pi.Definitions = append(pi.Definitions, piDef)
}
}
return pi, nil
}
func checkPartitionNameUnique(tbInfo *model.TableInfo, pi *model.PartitionInfo) error {
partNames := make(map[string]struct{})
if tbInfo.Partition != nil {
oldPars := tbInfo.Partition.Definitions
for _, oldPar := range oldPars {
partNames[oldPar.Name.L] = struct{}{}
}
}
newPars := pi.Definitions
for _, newPar := range newPars {
if _, ok := partNames[newPar.Name.L]; ok {
return ErrSameNamePartition.GenWithStackByArgs(newPar.Name)
}
partNames[newPar.Name.L] = struct{}{}
}
return nil
}
// checkPartitionFuncValid checks partition function validly.
func checkPartitionFuncValid(ctx sessionctx.Context, tblInfo *model.TableInfo, expr ast.ExprNode) error {
switch v := expr.(type) {
case *ast.FuncCastExpr, *ast.CaseExpr:
return errors.Trace(ErrPartitionFunctionIsNotAllowed)
case *ast.FuncCallExpr:
// check function which allowed in partitioning expressions
// see https://dev.mysql.com/doc/mysql-partitioning-excerpt/5.7/en/partitioning-limitations-functions.html
switch v.FnName.L {
case ast.Abs, ast.Ceiling, ast.DateDiff, ast.Day, ast.DayOfMonth, ast.DayOfWeek, ast.DayOfYear, ast.Extract, ast.Floor,
ast.Hour, ast.MicroSecond, ast.Minute, ast.Mod, ast.Month, ast.Quarter, ast.Second, ast.TimeToSec, ast.ToDays,
ast.ToSeconds, ast.Weekday, ast.Year, ast.YearWeek:
return nil
case ast.UnixTimestamp:
if len(v.Args) == 1 {
col, err := expression.RewriteSimpleExprWithTableInfo(ctx, tblInfo, v.Args[0])
if err != nil {
return errors.Trace(err)
}
if col.GetType().Tp != mysql.TypeTimestamp {
return errors.Trace(errWrongExprInPartitionFunc)
}
return nil
}
}
return errors.Trace(ErrPartitionFunctionIsNotAllowed)
case *ast.BinaryOperationExpr:
// The DIV operator (opcode.IntDiv) is also supported; the / operator ( opcode.Div ) is not permitted.
// see https://dev.mysql.com/doc/refman/5.7/en/partitioning-limitations.html
switch v.Op {
case opcode.Or, opcode.And, opcode.Xor, opcode.LeftShift, opcode.RightShift, opcode.BitNeg, opcode.Div:
return errors.Trace(ErrPartitionFunctionIsNotAllowed)
}
return nil
case *ast.UnaryOperationExpr:
if v.Op == opcode.BitNeg {
return errors.Trace(ErrPartitionFunctionIsNotAllowed)
}
}
return nil
}
// checkPartitionFuncType checks partition function return type.
func checkPartitionFuncType(ctx sessionctx.Context, s *ast.CreateTableStmt, cols []*table.Column, tblInfo *model.TableInfo) error {
if s.Partition.Expr == nil {
return nil
}
buf := new(bytes.Buffer)
s.Partition.Expr.Format(buf)
exprStr := buf.String()
if s.Partition.Tp == model.PartitionTypeRange {
// if partition by columnExpr, check the column type
if _, ok := s.Partition.Expr.(*ast.ColumnNameExpr); ok {
for _, col := range cols {
name := strings.Replace(col.Name.String(), ".", "`.`", -1)
// Range partitioning key supported types: tinyint, smallint, mediumint, int and bigint.
if !validRangePartitionType(col) && fmt.Sprintf("`%s`", name) == exprStr {
return errors.Trace(ErrNotAllowedTypeInPartition.GenWithStackByArgs(exprStr))
}
}
}
}
e, err := expression.ParseSimpleExprWithTableInfo(ctx, buf.String(), tblInfo)
if err != nil {
return errors.Trace(err)
}
if e.GetType().EvalType() == types.ETInt {
return nil
}
return ErrPartitionFuncNotAllowed.GenWithStackByArgs("PARTITION")
}
// checkCreatePartitionValue checks whether `less than value` is strictly increasing for each partition.
// Side effect: it may simplify the partition range definition from a constant expression to an integer.
func checkCreatePartitionValue(ctx sessionctx.Context, tblInfo *model.TableInfo, pi *model.PartitionInfo, cols []*table.Column) error {
defs := pi.Definitions
if len(defs) <= 1 {
return nil
}
if strings.EqualFold(defs[len(defs)-1].LessThan[0], partitionMaxValue) {
defs = defs[:len(defs)-1]
}
isUnsignedBigint := isRangePartitionColUnsignedBigint(cols, pi)
var prevRangeValue interface{}
for i := 0; i < len(defs); i++ {
if strings.EqualFold(defs[i].LessThan[0], partitionMaxValue) {
return errors.Trace(ErrPartitionMaxvalue)
}
currentRangeValue, fromExpr, err := getRangeValue(ctx, tblInfo, defs[i].LessThan[0], isUnsignedBigint)
if err != nil {
return errors.Trace(err)
}
if fromExpr {
// Constant fold the expression.
defs[i].LessThan[0] = fmt.Sprintf("%d", currentRangeValue)
}
if i == 0 {
prevRangeValue = currentRangeValue
continue
}
if isUnsignedBigint {
if currentRangeValue.(uint64) <= prevRangeValue.(uint64) {
return errors.Trace(ErrRangeNotIncreasing)
}
} else {
if currentRangeValue.(int64) <= prevRangeValue.(int64) {
return errors.Trace(ErrRangeNotIncreasing)
}
}
prevRangeValue = currentRangeValue
}
return nil
}
// getRangeValue gets an integer from the range value string.
// The returned boolean value indicates whether the input string is a constant expression.
func getRangeValue(ctx sessionctx.Context, tblInfo *model.TableInfo, str string, unsignedBigint bool) (interface{}, bool, error) {
// Unsigned bigint was converted to uint64 handle.
if unsignedBigint {
if value, err := strconv.ParseUint(str, 10, 64); err == nil {
return value, false, nil
}
if e, err1 := expression.ParseSimpleExprWithTableInfo(ctx, str, tblInfo); err1 == nil {
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
if err2 == nil && isNull == false {
return uint64(res), true, nil
}
}
} else {
if value, err := strconv.ParseInt(str, 10, 64); err == nil {
return value, false, nil
}
// The range value maybe not an integer, it could be a constant expression.
// For example, the following two cases are the same:
// PARTITION p0 VALUES LESS THAN (TO_SECONDS('2004-01-01'))
// PARTITION p0 VALUES LESS THAN (63340531200)
if e, err1 := expression.ParseSimpleExprWithTableInfo(ctx, str, tblInfo); err1 == nil {
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
if err2 == nil && isNull == false {
return res, true, nil
}
}
}
return 0, false, ErrNotAllowedTypeInPartition.GenWithStackByArgs(str)
}
// validRangePartitionType checks the type supported by the range partitioning key.
func validRangePartitionType(col *table.Column) bool {
switch col.FieldType.EvalType() {
case types.ETInt:
return true
default:
return false
}
}
// checkDropTablePartition checks if the partition exists and does not allow deleting the last existing partition in the table.
func checkDropTablePartition(meta *model.TableInfo, partLowerNames []string) error {
pi := meta.Partition
oldDefs := pi.Definitions
for _, pn := range partLowerNames {
found := false
for _, def := range oldDefs {
if def.Name.L == pn {
found = true
break
}
}
if !found {
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(pn))
}
}
if len(oldDefs) == len(partLowerNames) {
return errors.Trace(ErrDropLastPartition)
}
return nil
}
// removePartitionInfo each ddl job deletes a partition.
func removePartitionInfo(tblInfo *model.TableInfo, partName string) int64 {
oldDefs := tblInfo.Partition.Definitions
newDefs := make([]model.PartitionDefinition, 0, len(oldDefs)-1)
var pid int64
for i := 0; i < len(oldDefs); i++ {
if !strings.EqualFold(oldDefs[i].Name.L, strings.ToLower(partName)) {
continue
}
pid = oldDefs[i].ID
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
break
}
tblInfo.Partition.Definitions = newDefs
return pid
}
// onDropTablePartition deletes old partition meta.
func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var partName string
if err := job.DecodeArgs(&partName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
var partNames []string
if partName == "" {
if err := job.DecodeArgs(&partNames); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
} else {
partNames = []string{strings.ToLower(partName)}
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
err = checkDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
physicalTableID := removePartitionInfo(tblInfo, partName)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
// A background job will be created to delete old partition data.
job.Args = []interface{}{physicalTableID}
return ver, nil
}
func checkAddPartitionTooManyPartitions(piDefs int) error {
if piDefs > PartitionCountLimit {
return ErrTooManyPartitions
}
return nil
}
func getPartitionIDs(table *model.TableInfo) []int64 {
if table.GetPartitionInfo() == nil {
return []int64{}
}
physicalTableIDs := make([]int64, 0, len(table.Partition.Definitions))
for _, def := range table.Partition.Definitions {
physicalTableIDs = append(physicalTableIDs, def.ID)
}
return physicalTableIDs
}
// checkRangePartitioningKeysConstraints checks that the range partitioning key is included in the table constraint.
func checkRangePartitioningKeysConstraints(ctx sessionctx.Context, s *ast.CreateTableStmt, tblInfo *model.TableInfo, constraints []*ast.Constraint) error {
// Returns directly if there is no constraint in the partition table.
// TODO: Remove the test 's.Partition.Expr == nil' when we support 'PARTITION BY RANGE COLUMNS'
if len(constraints) == 0 || s.Partition.Expr == nil {
return nil
}
// Extract the column names in table constraints to []map[string]struct{}.
consColNames := extractConstraintsColumnNames(constraints)
// Parse partitioning key, extract the column names in the partitioning key to slice.
buf := new(bytes.Buffer)
s.Partition.Expr.Format(buf)
var partkeys []string
e, err := expression.ParseSimpleExprWithTableInfo(ctx, buf.String(), tblInfo)
if err != nil {
return errors.Trace(err)
}
cols := expression.ExtractColumns(e)
for _, col := range cols {
partkeys = append(partkeys, col.ColName.L)
}
// Checks that the partitioning key is included in the constraint.
for _, con := range consColNames {
// Every unique key on the table must use every column in the table's partitioning expression.
// See https://dev.mysql.com/doc/refman/5.7/en/partitioning-limitations-partitioning-keys-unique-keys.html.
if !checkConstraintIncludePartKey(partkeys, con) {
return ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs(primarykey)
}
}
return nil
}
// extractConstraintsColumnNames extract the column names in table constraints to []map[string]struct{}.
func extractConstraintsColumnNames(cons []*ast.Constraint) []map[string]struct{} {
var constraints []map[string]struct{}
for _, v := range cons {
if v.Tp == ast.ConstraintUniq || v.Tp == ast.ConstraintPrimaryKey {
uniKeys := make(map[string]struct{})
for _, key := range v.Keys {
uniKeys[key.Column.Name.L] = struct{}{}
}
// Extract every unique key and primary key.
if len(uniKeys) != 0 {
constraints = append(constraints, uniKeys)
}
}
}
return constraints
}
// checkConstraintIncludePartKey checks that the partitioning key is included in the constraint.
func checkConstraintIncludePartKey(partkeys []string, constraints map[string]struct{}) bool {
for _, pk := range partkeys {
if _, ok := constraints[pk]; !ok {
return false
}
}
return true
}
// isRangePartitionColUnsignedBigint returns true if the partitioning key column type is unsigned bigint type.
func isRangePartitionColUnsignedBigint(cols []*table.Column, pi *model.PartitionInfo) bool {
for _, col := range cols {
isUnsigned := col.Tp == mysql.TypeLonglong && mysql.HasUnsignedFlag(col.Flag)
if isUnsigned && strings.Contains(strings.ToLower(pi.Expr), col.Name.L) {
return true
}
}
return false
}

Комментарий ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://gitlife.ru/oschina-mirror/hanchuanchuan-goInception.git
git@gitlife.ru:oschina-mirror/hanchuanchuan-goInception.git
oschina-mirror
hanchuanchuan-goInception
hanchuanchuan-goInception
v1.2.4