179 lines
4.0 KiB
Go
Raw Normal View History

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/terror"
)
// handleBgJobQueue handles the background job queue.
func (d *ddl) handleBgJobQueue() error {
if d.isClosed() {
return nil
}
job := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, bgJobFlag)
if terror.ErrorEqual(err, ErrNotOwner) {
return nil
}
if err != nil {
return errors.Trace(err)
}
// get the first background job and run
job, err = d.getFirstBgJob(t)
if err != nil {
return errors.Trace(err)
}
if job == nil {
return nil
}
d.runBgJob(t, job)
if job.IsFinished() {
err = d.finishBgJob(t, job)
} else {
err = d.updateBgJob(t, job)
}
if err != nil {
return errors.Trace(err)
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetBgJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
return errors.Trace(err)
}
return nil
}
// runBgJob runs a background job.
func (d *ddl) runBgJob(t *meta.Meta, job *model.Job) {
job.State = model.JobRunning
var err error
switch job.Type {
case model.ActionDropSchema:
err = d.delReorgSchema(t, job)
case model.ActionDropTable:
err = d.delReorgTable(t, job)
default:
job.State = model.JobCancelled
err = errors.Errorf("invalid background job %v", job)
}
if err != nil {
if job.State != model.JobCancelled {
log.Errorf("run background job err %v", errors.ErrorStack(err))
}
job.Error = err.Error()
job.ErrorCount++
}
}
// prepareBgJob prepares a background job.
func (d *ddl) prepareBgJob(ddlJob *model.Job) error {
job := &model.Job{
ID: ddlJob.ID,
SchemaID: ddlJob.SchemaID,
TableID: ddlJob.TableID,
Type: ddlJob.Type,
Args: ddlJob.Args,
}
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err1 := t.EnQueueBgJob(job)
return errors.Trace(err1)
})
return errors.Trace(err)
}
// startBgJob starts a background job.
func (d *ddl) startBgJob(tp model.ActionType) {
switch tp {
case model.ActionDropSchema, model.ActionDropTable:
asyncNotify(d.bgJobCh)
}
}
// getFirstBgJob gets the first background job.
func (d *ddl) getFirstBgJob(t *meta.Meta) (*model.Job, error) {
job, err := t.GetBgJob(0)
return job, errors.Trace(err)
}
// updateBgJob updates a background job.
func (d *ddl) updateBgJob(t *meta.Meta, job *model.Job) error {
err := t.UpdateBgJob(0, job)
return errors.Trace(err)
}
// finishBgJob finishs a background job.
func (d *ddl) finishBgJob(t *meta.Meta, job *model.Job) error {
log.Warnf("[ddl] finish background job %v", job)
if _, err := t.DeQueueBgJob(); err != nil {
return errors.Trace(err)
}
err := t.AddHistoryBgJob(job)
return errors.Trace(err)
}
func (d *ddl) onBackgroundWorker() {
defer d.wait.Done()
// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time, if lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
ticker := time.NewTicker(checkTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check background job status again", checkTime)
case <-d.bgJobCh:
case <-d.quitCh:
return
}
err := d.handleBgJobQueue()
if err != nil {
log.Errorf("[ddl] handle background job err %v", errors.ErrorStack(err))
}
}
}