// Copyright 2016 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package domain import ( "math/rand" "time" "github.com/hanchuanchuan/goInception/util/testleak" . "github.com/pingcap/check" ) type leaseGrantItem struct { leaseGrantTS uint64 oldVer int64 schemaVer int64 } func (*testSuite) TestSchemaValidator(c *C) { defer testleak.AfterTest(c)() lease := 10 * time.Millisecond leaseGrantCh := make(chan leaseGrantItem) oracleCh := make(chan uint64) exit := make(chan struct{}) go serverFunc(lease, leaseGrantCh, oracleCh, exit) validator := NewSchemaValidator(lease).(*schemaValidator) for i := 0; i < 10; i++ { delay := time.Duration(100+rand.Intn(900)) * time.Microsecond time.Sleep(delay) // Reload can run arbitrarily, at any time. reload(validator, leaseGrantCh, 0) } // Take a lease, check it's valid. item := <-leaseGrantCh validator.Update(item.leaseGrantTS, item.oldVer, item.schemaVer, []int64{10}) valid := validator.Check(item.leaseGrantTS, item.schemaVer, []int64{10}) c.Assert(valid, Equals, ResultSucc) // Stop the validator, validator's items value is nil. validator.Stop() isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10}) c.Assert(isTablesChanged, IsTrue) valid = validator.Check(item.leaseGrantTS, item.schemaVer, []int64{10}) c.Assert(valid, Equals, ResultFail) validator.Restart() // Sleep for a long time, check schema is invalid. time.Sleep(lease) ts := <-oracleCh valid = validator.Check(ts, item.schemaVer, []int64{10}) c.Assert(valid, Equals, ResultUnknown) currVer := reload(validator, leaseGrantCh, 0) valid = validator.Check(ts, item.schemaVer, nil) c.Assert(valid, Equals, ResultFail) valid = validator.Check(ts, item.schemaVer, []int64{0}) c.Assert(valid, Equals, ResultFail) // Check the latest schema version must changed. c.Assert(item.schemaVer, Less, validator.latestSchemaVer) // Make sure newItem's version is bigger than currVer. time.Sleep(lease * 2) newItem := <-leaseGrantCh // Update current schema version to newItem's version and the delta table IDs is 1, 2, 3. validator.Update(ts, currVer, newItem.schemaVer, []int64{1, 2, 3}) // Make sure the updated table IDs don't be covered with the same schema version. validator.Update(ts, newItem.schemaVer, newItem.schemaVer, nil) isTablesChanged = validator.isRelatedTablesChanged(currVer, nil) c.Assert(isTablesChanged, IsFalse) isTablesChanged = validator.isRelatedTablesChanged(currVer, []int64{2}) c.Assert(isTablesChanged, IsTrue) // The current schema version is older than the oldest schema version. isTablesChanged = validator.isRelatedTablesChanged(-1, nil) c.Assert(isTablesChanged, IsTrue) // All schema versions is expired. ts = uint64(time.Now().Add(lease).UnixNano()) valid = validator.Check(ts, newItem.schemaVer, nil) c.Assert(valid, Equals, ResultUnknown) close(exit) time.Sleep(time.Millisecond) } func reload(validator SchemaValidator, leaseGrantCh chan leaseGrantItem, ids ...int64) int64 { item := <-leaseGrantCh validator.Update(item.leaseGrantTS, item.oldVer, item.schemaVer, ids) return item.schemaVer } // serverFunc plays the role as a remote server, runs in a separate goroutine. // It can grant lease and provide timestamp oracle. // Caller should communicate with it through channel to mock network. func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh chan uint64, exit chan struct{}) { var version int64 leaseTS := uint64(time.Now().UnixNano()) ticker := time.NewTicker(lease) for { select { case now := <-ticker.C: version++ leaseTS = uint64(now.UnixNano()) case requireLease <- leaseGrantItem{ leaseGrantTS: leaseTS, oldVer: version - 1, schemaVer: version, }: case oracleCh <- uint64(time.Now().UnixNano()): case <-exit: return } } }