393 lines
9.7 KiB
Go
Raw Normal View History

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/terror"
)
func (d *ddl) startDDLJob(ctx context.Context, job *model.Job) error {
// for every DDL, we must commit current transaction.
if err := ctx.FinishTxn(false); err != nil {
return errors.Trace(err)
}
// Create a new job and queue it.
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
job.ID, err = t.GenGlobalID()
if err != nil {
return errors.Trace(err)
}
err = t.EnQueueDDLJob(job)
return errors.Trace(err)
})
if err != nil {
return errors.Trace(err)
}
// notice worker that we push a new job and wait the job done.
asyncNotify(d.ddlJobCh)
log.Warnf("[ddl] start DDL job %v", job)
jobID := job.ID
var historyJob *model.Job
// for a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// for every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
defer ticker.Stop()
for {
select {
case <-d.ddlJobDoneCh:
case <-ticker.C:
}
historyJob, err = d.getHistoryDDLJob(jobID)
if err != nil {
log.Errorf("[ddl] get history DDL job err %v, check again", err)
continue
} else if historyJob == nil {
log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID)
continue
}
// if a job is a history table, the state must be JobDone or JobCancel.
if historyJob.State == model.JobDone {
return nil
}
return errors.Errorf(historyJob.Error)
}
}
func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
})
return job, errors.Trace(err)
}
func asyncNotify(ch chan struct{}) {
select {
case ch <- struct{}{}:
default:
}
}
func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
var owner *model.Owner
var err error
switch flag {
case ddlJobFlag:
owner, err = t.GetDDLJobOwner()
case bgJobFlag:
owner, err = t.GetBgJobOwner()
default:
err = errInvalidJobFlag
}
if err != nil {
return nil, errors.Trace(err)
}
if owner == nil {
owner = &model.Owner{}
// try to set onwer
owner.OwnerID = d.uuid
}
now := time.Now().UnixNano()
// we must wait 2 * lease time to guarantee other servers update the schema,
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
maxTimeout := int64(4 * d.lease)
if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout {
owner.OwnerID = d.uuid
owner.LastUpdateTS = now
// update status.
switch flag {
case ddlJobFlag:
err = t.SetDDLJobOwner(owner)
case bgJobFlag:
err = t.SetBgJobOwner(owner)
}
if err != nil {
return nil, errors.Trace(err)
}
log.Debugf("[ddl] become %s job owner %s", flag, owner.OwnerID)
}
if owner.OwnerID != d.uuid {
log.Debugf("[ddl] not %s job owner, owner is %s", flag, owner.OwnerID)
return nil, errors.Trace(ErrNotOwner)
}
return owner, nil
}
func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) {
job, err := t.GetDDLJob(0)
return job, errors.Trace(err)
}
// every time we enter another state except final state, we must call this function.
func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job) error {
err := t.UpdateDDLJob(0, job)
return errors.Trace(err)
}
func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error {
log.Warnf("[ddl] finish DDL job %v", job)
// done, notice and run next job.
_, err := t.DeQueueDDLJob()
if err != nil {
return errors.Trace(err)
}
switch job.Type {
case model.ActionDropSchema, model.ActionDropTable:
if err = d.prepareBgJob(job); err != nil {
return errors.Trace(err)
}
}
err = t.AddHistoryDDLJob(job)
return errors.Trace(err)
}
// ErrNotOwner means we are not owner and can't handle DDL jobs.
var ErrNotOwner = errors.New("DDL: not owner")
// ErrWorkerClosed means we have already closed the DDL worker.
var ErrWorkerClosed = errors.New("DDL: worker is closed")
var errInvalidJobFlag = errors.New("DDL: invalid job flag")
// JobType is job type, including ddl/background.
type JobType int
const (
ddlJobFlag = iota + 1
bgJobFlag
)
func (j JobType) String() string {
switch j {
case ddlJobFlag:
return "ddl"
case bgJobFlag:
return "background"
}
return "unknown"
}
func (d *ddl) handleDDLJobQueue() error {
for {
if d.isClosed() {
return nil
}
waitTime := 2 * d.lease
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlJobFlag)
if terror.ErrorEqual(err, ErrNotOwner) {
// we are not owner, return and retry checking later.
return nil
} else if err != nil {
return errors.Trace(err)
}
// become the owner
// get the first job and run
job, err = d.getFirstDDLJob(t)
if job == nil || err != nil {
return errors.Trace(err)
}
if job.IsRunning() {
// if we enter a new state, crash when waiting 2 * lease time, and restart quickly,
// we may run the job immediately again, but we don't wait enough 2 * lease time to
// let other servers update the schema.
// so here we must check the elapsed time from last update, if < 2 * lease, we must
// wait again.
elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS)
if elapsed > 0 && elapsed < waitTime {
log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
waitTime -= elapsed
return nil
}
}
log.Warnf("[ddl] run DDL job %v", job)
d.hook.OnJobRunBefore(job)
// if run job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
d.runDDLJob(t, job)
if job.IsFinished() {
err = d.finishDDLJob(t, job)
} else {
err = d.updateDDLJob(t, job)
}
if err != nil {
return errors.Trace(err)
}
// running job may cost some time, so here we must update owner status to
// prevent other become the owner.
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
return errors.Trace(err)
} else if job == nil {
// no job now, return and retry get later.
return nil
}
d.hook.OnJobUpdated(job)
// here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
d.waitSchemaChanged(waitTime)
}
if job.IsFinished() {
d.startBgJob(job.Type)
asyncNotify(d.ddlJobDoneCh)
}
}
}
func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
if n1 > 0 {
return n1
}
return n2
}
// onDDLWorker is for async online schema change, it will try to become the owner first,
// then wait or pull the job queue to handle a schema change job.
func (d *ddl) onDDLWorker() {
defer d.wait.Done()
// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time, if lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
ticker := time.NewTicker(checkTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
case <-d.ddlJobCh:
case <-d.quitCh:
return
}
err := d.handleDDLJobQueue()
if err != nil {
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
}
}
}
func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) {
if job.IsFinished() {
return
}
job.State = model.JobRunning
var err error
switch job.Type {
case model.ActionCreateSchema:
err = d.onCreateSchema(t, job)
case model.ActionDropSchema:
err = d.onDropSchema(t, job)
case model.ActionCreateTable:
err = d.onCreateTable(t, job)
case model.ActionDropTable:
err = d.onDropTable(t, job)
case model.ActionAddColumn:
err = d.onAddColumn(t, job)
case model.ActionDropColumn:
err = d.onDropColumn(t, job)
case model.ActionAddIndex:
err = d.onCreateIndex(t, job)
case model.ActionDropIndex:
err = d.onDropIndex(t, job)
default:
// invalid job, cancel it.
job.State = model.JobCancelled
err = errors.Errorf("invalid ddl job %v", job)
}
// saves error in job, so that others can know error happens.
if err != nil {
// if job is not cancelled, we should log this error.
if job.State != model.JobCancelled {
log.Errorf("run ddl job err %v", errors.ErrorStack(err))
}
job.Error = err.Error()
job.ErrorCount++
}
}
// for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time
// to guarantee that all servers have already updated schema.
func (d *ddl) waitSchemaChanged(waitTime time.Duration) {
if waitTime == 0 {
return
}
select {
case <-time.After(waitTime):
case <-d.quitCh:
}
}