Слияние кода завершено, страница обновится автоматически
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package domain
import (
"sync"
"time"
"github.com/hanchuanchuan/goInception/store/tikv/oracle"
log "github.com/sirupsen/logrus"
)
type checkResult int
const (
// ResultSucc means schemaValidator's check is passing.
ResultSucc checkResult = iota
// ResultFail means schemaValidator's check is fail.
ResultFail
// ResultUnknown means schemaValidator doesn't know the check would be success or fail.
ResultUnknown
)
// SchemaValidator is the interface for checking the validity of schema version.
type SchemaValidator interface {
// Update the schema validator, add a new item, delete the expired deltaSchemaInfos.
// The latest schemaVer is valid within leaseGrantTime plus lease duration.
// Add the changed table IDs to the new schema information,
// which is produced when the oldSchemaVer is updated to the newSchemaVer.
Update(leaseGrantTime uint64, oldSchemaVer, newSchemaVer int64, changedTableIDs []int64)
// Check is it valid for a transaction to use schemaVer and related tables, at timestamp txnTS.
Check(txnTS uint64, schemaVer int64, relatedTableIDs []int64) checkResult
// Stop stops checking the valid of transaction.
Stop()
// Restart restarts the schema validator after it is stopped.
Restart()
// Reset resets SchemaValidator to initial state.
Reset()
}
type deltaSchemaInfo struct {
schemaVersion int64
relatedTableIDs []int64
}
type schemaValidator struct {
isStarted bool
mux sync.RWMutex
lease time.Duration
latestSchemaVer int64
latestSchemaExpire time.Time
// deltaSchemaInfos is a queue that maintain the history of changes.
deltaSchemaInfos []deltaSchemaInfo
}
// NewSchemaValidator returns a SchemaValidator structure.
func NewSchemaValidator(lease time.Duration) SchemaValidator {
return &schemaValidator{
isStarted: true,
lease: lease,
deltaSchemaInfos: make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad),
}
}
func (s *schemaValidator) Stop() {
log.Info("[domain-ddl] the schema validator stops")
s.mux.Lock()
defer s.mux.Unlock()
s.isStarted = false
s.latestSchemaVer = 0
s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad)
}
func (s *schemaValidator) Restart() {
log.Info("[domain-ddl] the schema validator restarts")
s.mux.Lock()
defer s.mux.Unlock()
s.isStarted = true
}
func (s *schemaValidator) Reset() {
s.mux.Lock()
defer s.mux.Unlock()
s.isStarted = true
s.latestSchemaVer = 0
s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad)
}
func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) {
s.mux.Lock()
defer s.mux.Unlock()
if !s.isStarted {
log.Infof("[domain-ddl] the schema validator stopped before updating")
return
}
// Renew the lease.
s.latestSchemaVer = currVer
leaseGrantTime := oracle.GetTimeFromTS(leaseGrantTS)
leaseExpire := leaseGrantTime.Add(s.lease - time.Millisecond)
s.latestSchemaExpire = leaseExpire
// Update the schema deltaItem information.
if currVer != oldVer {
log.Debugf("[domain-ddl] update schema validator, old ver %d, curr ver %d, changed IDs %v", oldVer, currVer, changedTableIDs)
s.enqueue(currVer, changedTableIDs)
}
}
func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool {
for _, tblID := range updateTableIDs {
for _, relatedTblID := range relatedTableIDs {
if tblID == relatedTblID {
return true
}
}
}
return false
}
// isRelatedTablesChanged returns the result whether relatedTableIDs is changed
// from usedVer to the latest schema version.
// NOTE, this function should be called under lock!
func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool {
if len(s.deltaSchemaInfos) == 0 {
log.Infof("[domain-ddl] schema change history is empty, checking %d", currVer)
return true
}
newerDeltas := s.findNewerDeltas(currVer)
if len(newerDeltas) == len(s.deltaSchemaInfos) {
log.Infof("[domain-ddl] the schema version %d is much older than the latest version %d", currVer, s.latestSchemaVer)
return true
}
for _, item := range newerDeltas {
if hasRelatedTableID(item.relatedTableIDs, tableIDs) {
return true
}
}
return false
}
func (s *schemaValidator) findNewerDeltas(currVer int64) []deltaSchemaInfo {
q := s.deltaSchemaInfos
pos := len(q)
for i := len(q) - 1; i >= 0 && q[i].schemaVersion > currVer; i-- {
pos = i
}
return q[pos:]
}
// Check checks schema validity, returns true if use schemaVer and related tables at txnTS is legal.
func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs []int64) checkResult {
s.mux.RLock()
defer s.mux.RUnlock()
if !s.isStarted {
log.Infof("[domain-ddl] the schema validator stopped before checking")
return ResultFail
}
if s.lease == 0 {
return ResultSucc
}
// Schema changed, result decided by whether related tables change.
if schemaVer < s.latestSchemaVer {
// The DDL relatedTableIDs is empty.
if len(relatedTableIDs) == 0 {
log.Infof("[domain-ddl] the related table ID is empty, current schema version %d, latest schema version %d",
schemaVer, s.latestSchemaVer)
return ResultFail
}
if s.isRelatedTablesChanged(schemaVer, relatedTableIDs) {
return ResultFail
}
return ResultSucc
}
// Schema unchanged, maybe success or the schema validator is unavailable.
t := oracle.GetTimeFromTS(txnTS)
if t.After(s.latestSchemaExpire) {
return ResultUnknown
}
return ResultSucc
}
func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) {
s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs})
if len(s.deltaSchemaInfos) > maxNumberOfDiffsToLoad {
s.deltaSchemaInfos = s.deltaSchemaInfos[1:]
}
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )