// Copyright 2015 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package ddl import ( "time" "github.com/hanchuanchuan/goInception/kv" "github.com/hanchuanchuan/goInception/meta" "github.com/hanchuanchuan/goInception/model" "github.com/hanchuanchuan/goInception/types" . "github.com/pingcap/check" "golang.org/x/net/context" ) type testCtxKeyType int func (k testCtxKeyType) String() string { return "test_ctx_key" } const testCtxKey testCtxKeyType = 0 func (s *testDDLSuite) TestReorg(c *C) { store := testCreateStore(c, "test_reorg") defer store.Close() d := testNewDDL(context.Background(), nil, store, nil, nil, testLease) defer d.Stop() time.Sleep(testLease) ctx := testNewContext(d) ctx.SetValue(testCtxKey, 1) c.Assert(ctx.Value(testCtxKey), Equals, 1) ctx.ClearValue(testCtxKey) err := ctx.NewTxn() c.Assert(err, IsNil) ctx.Txn().Set([]byte("a"), []byte("b")) err = ctx.Txn().Rollback() c.Assert(err, IsNil) err = ctx.NewTxn() c.Assert(err, IsNil) ctx.Txn().Set([]byte("a"), []byte("b")) err = ctx.Txn().Commit(context.Background()) c.Assert(err, IsNil) rowCount := int64(10) handle := int64(100) f := func() error { d.generalWorker().reorgCtx.setRowCount(rowCount) d.generalWorker().reorgCtx.setNextHandle(handle) time.Sleep(1*ReorgWaitTimeout + 100*time.Millisecond) return nil } job := &model.Job{ ID: 1, SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false. } err = ctx.NewTxn() c.Assert(err, IsNil) m := meta.NewMeta(ctx.Txn()) rInfo := &reorgInfo{ Job: job, } err = d.generalWorker().runReorgJob(m, rInfo, d.lease, f) c.Assert(err, NotNil) // The longest to wait for 5 seconds to make sure the function of f is returned. for i := 0; i < 1000; i++ { time.Sleep(5 * time.Millisecond) err = d.generalWorker().runReorgJob(m, rInfo, d.lease, f) if err == nil { c.Assert(job.RowCount, Equals, rowCount) c.Assert(d.generalWorker().reorgCtx.rowCount, Equals, int64(0)) // Test whether reorgInfo's Handle is update. err = ctx.Txn().Commit(context.Background()) c.Assert(err, IsNil) err = ctx.NewTxn() c.Assert(err, IsNil) m = meta.NewMeta(ctx.Txn()) info, err1 := getReorgInfo(d.ddlCtx, m, job, nil) c.Assert(err1, IsNil) c.Assert(info.StartHandle, Equals, handle) c.Assert(d.generalWorker().reorgCtx.doneHandle, Equals, int64(0)) break } } c.Assert(err, IsNil) d.Stop() err = d.generalWorker().runReorgJob(m, rInfo, d.lease, func() error { time.Sleep(4 * testLease) return nil }) c.Assert(err, NotNil) err = ctx.Txn().Commit(context.Background()) c.Assert(err, IsNil) d.start(context.Background(), nil) job = &model.Job{ ID: 2, SchemaID: 1, Type: model.ActionCreateSchema, Args: []interface{}{model.NewCIStr("test")}, SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false. } var info *reorgInfo err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error info, err1 = getReorgInfo(d.ddlCtx, t, job, nil) c.Assert(err1, IsNil) err1 = info.UpdateReorgMeta(txn, 1, 0, 0) c.Assert(err1, IsNil) return nil }) c.Assert(err, IsNil) err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error info, err1 = getReorgInfo(d.ddlCtx, t, job, nil) c.Assert(err1, IsNil) c.Assert(info.StartHandle, Greater, int64(0)) return nil }) c.Assert(err, IsNil) } func (s *testDDLSuite) TestReorgOwner(c *C) { store := testCreateStore(c, "test_reorg_owner") defer store.Close() d1 := testNewDDL(context.Background(), nil, store, nil, nil, testLease) defer d1.Stop() ctx := testNewContext(d1) testCheckOwner(c, d1, true) d2 := testNewDDL(context.Background(), nil, store, nil, nil, testLease) defer d2.Stop() dbInfo := testSchemaInfo(c, d1, "test") testCreateSchema(c, ctx, d1, dbInfo) tblInfo := testTableInfo(c, d1, "t", 3) testCreateTable(c, ctx, d1, dbInfo, tblInfo) t := testGetTable(c, d1, dbInfo.ID, tblInfo.ID) num := 10 for i := 0; i < num; i++ { _, err := t.AddRecord(ctx, types.MakeDatums(i, i, i), false) c.Assert(err, IsNil) } err := ctx.Txn().Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} tc.onJobRunBefore = func(job *model.Job) { if job.SchemaState == model.StateDeleteReorganization { d1.Stop() } } d1.SetHook(tc) testDropSchema(c, ctx, d1, dbInfo) err = kv.RunInNewTxn(d1.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) db, err1 := t.GetDatabase(dbInfo.ID) c.Assert(err1, IsNil) c.Assert(db, IsNil) return nil }) c.Assert(err, IsNil) }