1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/hanchuanchuan-goInception

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Это зеркальный репозиторий, синхронизируется ежедневно с исходного репозитория.
Клонировать/Скачать
column.go 14 КБ
Копировать Редактировать Исходные данные Просмотреть построчно История
hanchuanchuan Отправлено 6 лет назад 27f3c5a
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"github.com/hanchuanchuan/goInception/ast"
"github.com/hanchuanchuan/goInception/ddl/util"
"github.com/hanchuanchuan/goInception/infoschema"
"github.com/hanchuanchuan/goInception/meta"
"github.com/hanchuanchuan/goInception/model"
"github.com/pingcap/errors"
log "github.com/sirupsen/logrus"
)
// adjustColumnInfoInAddColumn is used to set the correct position of column info when adding column.
// 1. The added column was append at the end of tblInfo.Columns, due to ddl state was not public then.
// It should be moved to the correct position when the ddl state to be changed to public.
// 2. The offset of column should also to be set to the right value.
func adjustColumnInfoInAddColumn(tblInfo *model.TableInfo, offset int) {
oldCols := tblInfo.Columns
newCols := make([]*model.ColumnInfo, 0, len(oldCols))
newCols = append(newCols, oldCols[:offset]...)
newCols = append(newCols, oldCols[len(oldCols)-1])
newCols = append(newCols, oldCols[offset:len(oldCols)-1]...)
// Adjust column offset.
offsetChanged := make(map[int]int)
for i := offset + 1; i < len(newCols); i++ {
offsetChanged[newCols[i].Offset] = i
newCols[i].Offset = i
}
newCols[offset].Offset = offset
// Update index column offset info.
// TODO: There may be some corner cases for index column offsets, we may check this later.
for _, idx := range tblInfo.Indices {
for _, col := range idx.Columns {
newOffset, ok := offsetChanged[col.Offset]
if ok {
col.Offset = newOffset
}
}
}
tblInfo.Columns = newCols
}
// adjustColumnInfoInDropColumn is used to set the correct position of column info when dropping column.
// 1. The offset of column should to be set to the last of the columns.
// 2. The dropped column is moved to the end of tblInfo.Columns, due to it was not public any more.
func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) {
oldCols := tblInfo.Columns
// Adjust column offset.
offsetChanged := make(map[int]int)
for i := offset + 1; i < len(oldCols); i++ {
offsetChanged[oldCols[i].Offset] = i - 1
oldCols[i].Offset = i - 1
}
oldCols[offset].Offset = len(oldCols) - 1
// Update index column offset info.
// TODO: There may be some corner cases for index column offsets, we may check this later.
for _, idx := range tblInfo.Indices {
for _, col := range idx.Columns {
newOffset, ok := offsetChanged[col.Offset]
if ok {
col.Offset = newOffset
}
}
}
newCols := make([]*model.ColumnInfo, 0, len(oldCols))
newCols = append(newCols, oldCols[:offset]...)
newCols = append(newCols, oldCols[offset+1:]...)
newCols = append(newCols, oldCols[offset])
tblInfo.Columns = newCols
}
func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) {
// Check column name duplicate.
cols := tblInfo.Columns
position := len(cols)
// Get column position.
if pos.Tp == ast.ColumnPositionFirst {
position = 0
} else if pos.Tp == ast.ColumnPositionAfter {
c := model.FindColumnInfo(cols, pos.RelativeColumn.Name.L)
if c == nil {
return nil, 0, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}
// Insert position is after the mentioned column.
position = c.Offset + 1
}
colInfo.ID = allocateColumnID(tblInfo)
colInfo.State = model.StateNone
// To support add column asynchronous, we should mark its offset as the last column.
// So that we can use origin column offset to get value from row.
colInfo.Offset = len(cols)
// Append the column info to the end of the tblInfo.Columns.
// It will reorder to the right position in "Columns" when it state change to public.
newCols := make([]*model.ColumnInfo, 0, len(cols)+1)
newCols = append(newCols, cols...)
newCols = append(newCols, colInfo)
tblInfo.Columns = newCols
return colInfo, position, nil
}
func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
// gofail: var errorBeforeDecodeArgs bool
// if errorBeforeDecodeArgs {
// return ver, errors.New("occur an error before decode args")
// }
col := &model.ColumnInfo{}
pos := &ast.ColumnPosition{}
offset := 0
err = job.DecodeArgs(col, pos, &offset)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
} else {
columnInfo, offset, err = createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
log.Infof("[ddl] add column, run DDL job %s, column info %#v, offset %d", job, columnInfo, offset)
// Set offset arg to job.
if offset != 0 {
job.Args = []interface{}{columnInfo, pos, offset}
}
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}
originalState := columnInfo.State
switch columnInfo.State {
case model.StateNone:
// none -> delete only
job.SchemaState = model.StateDeleteOnly
columnInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
columnInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
case model.StateWriteOnly:
// write only -> reorganization
job.SchemaState = model.StateWriteReorganization
columnInfo.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offset.
adjustColumnInfoInAddColumn(tblInfo, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
default:
err = ErrInvalidColumnState.GenWithStack("invalid column state %v", columnInfo.State)
}
return ver, errors.Trace(err)
}
func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
originalState := colInfo.State
switch colInfo.State {
case model.StatePublic:
// public -> write only
job.SchemaState = model.StateWriteOnly
colInfo.State = model.StateWriteOnly
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
colInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
colInfo.State = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
default:
err = ErrInvalidTableState.GenWithStack("invalid table state %v", tblInfo.State)
}
return ver, errors.Trace(err)
}
func onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return updateColumn(t, job, newCol, &newCol.Name)
}
func onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
oldColName := &model.CIStr{}
pos := &ast.ColumnPosition{}
err := job.DecodeArgs(newCol, oldColName, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return doModifyColumn(t, job, newCol, oldColName, pos)
}
// doModifyColumn updates the column information and reorders all columns.
func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition) (ver int64, _ error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
oldCol := model.FindColumnInfo(tblInfo.Columns, oldName.L)
if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name)
}
// If we want to rename the column name, we need to check whether it already exists.
if newCol.Name.L != oldName.L {
c := model.FindColumnInfo(tblInfo.Columns, newCol.Name.L)
if c != nil {
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnExists.GenWithStackByArgs(newCol.Name)
}
}
// gofail: var uninitializedOffsetAndState bool
// if uninitializedOffsetAndState {
// if newCol.State != model.StatePublic {
// return ver, errors.New("the column state is wrong")
// }
// }
// We need the latest column's offset and state. This information can be obtained from the store.
newCol.Offset = oldCol.Offset
newCol.State = oldCol.State
// Calculate column's new position.
oldPos, newPos := oldCol.Offset, oldCol.Offset
if pos.Tp == ast.ColumnPositionAfter {
if oldName.L == pos.RelativeColumn.Name.L {
// `alter table tableName modify column b int after b` will return ver,ErrColumnNotExists.
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name)
}
relative := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
if relative == nil || relative.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}
if relative.Offset < oldPos {
newPos = relative.Offset + 1
} else {
newPos = relative.Offset
}
} else if pos.Tp == ast.ColumnPositionFirst {
newPos = 0
}
columnChanged := make(map[string]*model.ColumnInfo)
columnChanged[oldName.L] = newCol
if newPos == oldPos {
tblInfo.Columns[newPos] = newCol
} else {
cols := tblInfo.Columns
// Reorder columns in place.
if newPos < oldPos {
copy(cols[newPos+1:], cols[newPos:oldPos])
} else {
copy(cols[oldPos:], cols[oldPos+1:newPos+1])
}
cols[newPos] = newCol
for i, col := range tblInfo.Columns {
if col.Offset != i {
columnChanged[col.Name.L] = col
col.Offset = i
}
}
}
// Change offset and name in indices.
for _, idx := range tblInfo.Indices {
for _, c := range idx.Columns {
if newCol, ok := columnChanged[c.Name.L]; ok {
c.Name = newCol.Name
c.Offset = newCol.Offset
}
}
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}
func updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldColName *model.CIStr) (ver int64, _ error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
oldCol := model.FindColumnInfo(tblInfo.Columns, oldColName.L)
if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(newCol.Name, tblInfo.Name)
}
*oldCol = *newCol
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}
func isColumnWithIndex(colName string, indices []*model.IndexInfo) bool {
for _, indexInfo := range indices {
for _, col := range indexInfo.Columns {
if col.Name.L == colName {
return true
}
}
}
return false
}
func allocateColumnID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxColumnID++
return tblInfo.MaxColumnID
}
func checkAddColumnTooManyColumns(oldCols int) error {
if oldCols > TableColumnCountLimit {
return errTooManyFields
}
return nil
}

Комментарий ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://gitlife.ru/oschina-mirror/hanchuanchuan-goInception.git
git@gitlife.ru:oschina-mirror/hanchuanchuan-goInception.git
oschina-mirror
hanchuanchuan-goInception
hanchuanchuan-goInception
v1.2.2