// Copyright 2015 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package ddl import ( "fmt" "strconv" "strings" "github.com/hanchuanchuan/goInception/ddl/util" "github.com/hanchuanchuan/goInception/infoschema" "github.com/hanchuanchuan/goInception/kv" "github.com/hanchuanchuan/goInception/meta" "github.com/hanchuanchuan/goInception/meta/autoid" "github.com/hanchuanchuan/goInception/model" "github.com/hanchuanchuan/goInception/table" "github.com/hanchuanchuan/goInception/tablecodec" "github.com/pingcap/errors" log "github.com/sirupsen/logrus" ) func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID tbInfo := &model.TableInfo{} if err := job.DecodeArgs(tbInfo); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } tbInfo.State = model.StateNone err := checkTableNotExists(t, job, schemaID, tbInfo.Name.L) if err != nil { return ver, errors.Trace(err) } if tbInfo.Partition != nil { err = checkAddPartitionTooManyPartitions(len(tbInfo.Partition.Definitions)) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } } ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) } switch tbInfo.State { case model.StateNone: // none -> public tbInfo.State = model.StatePublic tbInfo.UpdateTS = t.StartTS err = t.CreateTable(schemaID, tbInfo) if err != nil { return ver, errors.Trace(err) } if EnableSplitTableRegion { // TODO: Add restrictions to this operation. go splitTableRegion(d.store, tbInfo.ID) } // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) asyncNotifyEvent(d, &util.Event{Tp: model.ActionCreateTable, TableInfo: tbInfo}) return ver, nil default: return ver, ErrInvalidTableState.GenWithStack("invalid table state %v", tbInfo.State) } } func onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID tableID := job.TableID // Check this table's database. tblInfo, err := t.GetTable(schemaID, tableID) if err != nil { if meta.ErrDBNotExists.Equal(err) { job.State = model.JobStateCancelled return ver, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), )) } return ver, errors.Trace(err) } // Check the table. if tblInfo == nil { job.State = model.JobStateCancelled return ver, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), fmt.Sprintf("(Table ID %d)", tableID), )) } originalState := job.SchemaState switch tblInfo.State { case model.StatePublic: // public -> write only job.SchemaState = model.StateWriteOnly tblInfo.State = model.StateWriteOnly ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State) case model.StateWriteOnly: // write only -> delete only job.SchemaState = model.StateDeleteOnly tblInfo.State = model.StateDeleteOnly ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State) case model.StateDeleteOnly: tblInfo.State = model.StateNone ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State) if err != nil { return ver, errors.Trace(err) } if err = t.DropTable(job.SchemaID, tableID, true); err != nil { break } // Finish this job. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) startKey := tablecodec.EncodeTablePrefix(tableID) job.Args = append(job.Args, startKey, getPartitionIDs(tblInfo)) default: err = ErrInvalidTableState.GenWithStack("invalid table state %v", tblInfo.State) } return ver, errors.Trace(err) } type splitableStore interface { SplitRegion(splitKey kv.Key) error } func splitTableRegion(store kv.Storage, tableID int64) { s, ok := store.(splitableStore) if !ok { return } tableStartKey := tablecodec.GenTablePrefix(tableID) if err := s.SplitRegion(tableStartKey); err != nil { // It will be automatically split by TiKV later. log.Warnf("[ddl] splitting table region failed %v", errors.ErrorStack(err)) } } func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) { alloc := autoid.NewAllocator(store, tblInfo.GetDBID(schemaID)) tbl, err := table.TableFromMeta(alloc, tblInfo) return tbl, errors.Trace(err) } func getTableInfo(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) { tableID := job.TableID tblInfo, err := t.GetTable(schemaID, tableID) if err != nil { if meta.ErrDBNotExists.Equal(err) { job.State = model.JobStateCancelled return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), )) } return nil, errors.Trace(err) } else if tblInfo == nil { job.State = model.JobStateCancelled return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs( fmt.Sprintf("(Schema ID %d)", schemaID), fmt.Sprintf("(Table ID %d)", tableID), )) } if tblInfo.State != model.StatePublic { job.State = model.JobStateCancelled return nil, ErrInvalidTableState.GenWithStack("table %s is not in public, but %s", tblInfo.Name, tblInfo.State) } return tblInfo, nil } // onTruncateTable delete old table meta, and creates a new table identical to old table except for table ID. // As all the old data is encoded with old table ID, it can not be accessed any more. // A background job will be created to delete old data. func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID tableID := job.TableID var newTableID int64 err := job.DecodeArgs(&newTableID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo, err := getTableInfo(t, job, schemaID) if err != nil { return ver, errors.Trace(err) } err = t.DropTable(schemaID, tblInfo.ID, true) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } // We use the new partition ID because all the old data is encoded with the old partition ID, it can not be accessed anymore. var oldPartitionIDs []int64 if tblInfo.GetPartitionInfo() != nil { oldPartitionIDs = getPartitionIDs(tblInfo) for _, def := range tblInfo.Partition.Definitions { var pid int64 pid, err = t.GenGlobalID() if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } def.ID = pid } } tblInfo.ID = newTableID err = t.CreateTable(schemaID, tblInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTable, TableInfo: tblInfo}) startKey := tablecodec.EncodeTablePrefix(tableID) job.Args = []interface{}{startKey, oldPartitionIDs} return ver, nil } func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID var newBase int64 err := job.DecodeArgs(&newBase) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo, err := getTableInfo(t, job, schemaID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo.AutoIncID = newBase tbl, err := getTable(store, schemaID, tblInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } // The operation of the minus 1 to make sure that the current value doesn't be used, // the next Alloc operation will get this value. // Its behavior is consistent with MySQL. err = tbl.RebaseAutoID(nil, tblInfo.AutoIncID-1, false) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) return ver, nil } func onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) { var shardRowIDBits uint64 err := job.DecodeArgs(&shardRowIDBits) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo, err := getTableInfo(t, job, job.SchemaID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo.ShardRowIDBits = shardRowIDBits ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) return ver, nil } func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { var oldSchemaID int64 var tableName model.CIStr if err := job.DecodeArgs(&oldSchemaID, &tableName); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo, err := getTableInfo(t, job, oldSchemaID) if err != nil { return ver, errors.Trace(err) } newSchemaID := job.SchemaID err = checkTableNotExists(t, job, newSchemaID, tableName.L) if err != nil { return ver, errors.Trace(err) } var baseID int64 shouldDelAutoID := false if newSchemaID != oldSchemaID { shouldDelAutoID = true baseID, err = t.GetAutoTableID(tblInfo.GetDBID(oldSchemaID), tblInfo.ID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } // It's compatible with old version. // TODO: Remove it. tblInfo.OldSchemaID = 0 } err = t.DropTable(oldSchemaID, tblInfo.ID, shouldDelAutoID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo.Name = tableName err = t.CreateTable(newSchemaID, tblInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } // Update the table's auto-increment ID. if newSchemaID != oldSchemaID { _, err = t.GenAutoTableID(newSchemaID, tblInfo.ID, baseID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } } ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) return ver, nil } func onModifyTableComment(t *meta.Meta, job *model.Job) (ver int64, _ error) { var comment string if err := job.DecodeArgs(&comment); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo, err := getTableInfo(t, job, job.SchemaID) if err != nil { return ver, errors.Trace(err) } tblInfo.Comment = comment ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) return ver, nil } func checkTableNotExists(t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { // Check this table's database. tables, err := t.ListTables(schemaID) if err != nil { if meta.ErrDBNotExists.Equal(err) { job.State = model.JobStateCancelled return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") } return errors.Trace(err) } // Check the table. for _, tbl := range tables { if tbl.Name.L == tableName { // This table already exists and can't be created, we should cancel this job now. job.State = model.JobStateCancelled return infoschema.ErrTableExists.GenWithStackByArgs(tbl.Name) } } return nil } // updateVersionAndTableInfo updates the schema version and the table information. func updateVersionAndTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) ( ver int64, err error) { if shouldUpdateVer { ver, err = updateSchemaVersion(t, job) if err != nil { return 0, errors.Trace(err) } } if tblInfo.State == model.StatePublic { tblInfo.UpdateTS = t.StartTS } return ver, t.UpdateTable(job.SchemaID, tblInfo) } // TODO: It may have the issue when two clients concurrently add partitions to a table. func onAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) { partInfo := &model.PartitionInfo{} err := job.DecodeArgs(&partInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo, err := getTableInfo(t, job, job.SchemaID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = checkAddPartitionTooManyPartitions(len(tblInfo.Partition.Definitions) + len(partInfo.Definitions)) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = checkPartitionNameUnique(tblInfo, partInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } updatePartitionInfo(partInfo, tblInfo) ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) } // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) return ver, errors.Trace(err) } func updatePartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *model.TableInfo) { parInfo := &model.PartitionInfo{} oldDefs, newDefs := tblInfo.Partition.Definitions, partitionInfo.Definitions parInfo.Definitions = make([]model.PartitionDefinition, 0, len(newDefs)+len(oldDefs)) parInfo.Definitions = append(parInfo.Definitions, oldDefs...) parInfo.Definitions = append(parInfo.Definitions, newDefs...) tblInfo.Partition.Definitions = parInfo.Definitions } // checkAddPartitionValue values less than value must be strictly increasing for each partition. func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) error { if meta.Partition.Type == model.PartitionTypeRange { newDefs, oldDefs := part.Definitions, meta.Partition.Definitions rangeValue := oldDefs[len(oldDefs)-1].LessThan[0] if strings.EqualFold(rangeValue, "MAXVALUE") { return errors.Trace(ErrPartitionMaxvalue) } currentRangeValue, err := strconv.Atoi(rangeValue) if err != nil { return errors.Trace(err) } for i := 0; i < len(newDefs); i++ { ifMaxvalue := strings.EqualFold(newDefs[i].LessThan[0], "MAXVALUE") if ifMaxvalue && i == len(newDefs)-1 { return nil } else if ifMaxvalue && i != len(newDefs)-1 { return errors.Trace(ErrPartitionMaxvalue) } nextRangeValue, err := strconv.Atoi(newDefs[i].LessThan[0]) if err != nil { return errors.Trace(err) } if nextRangeValue <= currentRangeValue { return errors.Trace(ErrRangeNotIncreasing) } currentRangeValue = nextRangeValue } } return nil }