473 lines
9.2 KiB
Go
Raw Normal View History

package zkhelper
import (
"errors"
"fmt"
"path"
"strings"
"sync"
"time"
etcderr "github.com/coreos/etcd/error"
"github.com/coreos/go-etcd/etcd"
zk "github.com/ngaut/go-zookeeper/zk"
"github.com/ngaut/log"
"github.com/ngaut/pools"
)
var (
singleInstanceLock sync.Mutex
etcdInstance *etcdImpl
)
type PooledEtcdClient struct {
c *etcd.Client
}
func (c *PooledEtcdClient) Close() {
}
func (e *etcdImpl) Seq2Str(seq int64) string {
return fmt.Sprintf("%d", seq)
}
type etcdImpl struct {
sync.Mutex
cluster string
pool *pools.ResourcePool
indexMap map[string]uint64
}
func convertToZkError(err error) error {
//todo: convert other errors
if ec, ok := err.(*etcd.EtcdError); ok {
switch ec.ErrorCode {
case etcderr.EcodeKeyNotFound:
return zk.ErrNoNode
case etcderr.EcodeNotFile:
case etcderr.EcodeNotDir:
case etcderr.EcodeNodeExist:
return zk.ErrNodeExists
case etcderr.EcodeDirNotEmpty:
return zk.ErrNotEmpty
}
}
return err
}
func convertToZkEvent(watchPath string, resp *etcd.Response, err error) zk.Event {
//log.Infof("convert event from path:%s, %+v, %+v", watchPath, resp, resp.Node.Key)
var e zk.Event
if err != nil {
e.Err = convertToZkError(err)
e.State = zk.StateDisconnected
return e
}
e.State = zk.StateConnected
e.Path = resp.Node.Key
if len(resp.Node.Key) > len(watchPath) {
e.Type = zk.EventNodeChildrenChanged
return e
}
switch resp.Action {
case "set":
e.Type = zk.EventNodeDataChanged
case "delete":
e.Type = zk.EventNodeDeleted
case "update":
e.Type = zk.EventNodeDataChanged
case "create":
e.Type = zk.EventNodeCreated
case "expire":
e.Type = zk.EventNotWatching
}
return e
}
func NewEtcdConn(zkAddr string) (Conn, error) {
singleInstanceLock.Lock()
defer singleInstanceLock.Unlock()
if etcdInstance != nil {
return etcdInstance, nil
}
p := pools.NewResourcePool(func() (pools.Resource, error) {
cluster := strings.Split(zkAddr, ",")
for i, addr := range cluster {
if !strings.HasPrefix(addr, "http://") {
cluster[i] = "http://" + addr
}
}
newClient := etcd.NewClient(cluster)
newClient.SetConsistency(etcd.STRONG_CONSISTENCY)
return &PooledEtcdClient{c: newClient}, nil
}, 10, 10, 0)
etcdInstance = &etcdImpl{
cluster: zkAddr,
pool: p,
indexMap: make(map[string]uint64),
}
log.Infof("new etcd %s", zkAddr)
if etcdInstance == nil {
return nil, errors.New("unknown error")
}
return etcdInstance, nil
}
func (e *etcdImpl) Get(key string) (data []byte, stat zk.Stat, err error) {
conn, err := e.pool.Get()
if err != nil {
return nil, nil, err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
resp, err := c.Get(key, true, false)
if resp == nil {
return nil, nil, convertToZkError(err)
}
return []byte(resp.Node.Value), nil, nil
}
func (e *etcdImpl) setIndex(key string, index uint64) {
e.Lock()
defer e.Unlock()
e.indexMap[key] = index
}
func (e *etcdImpl) getIndex(key string) uint64 {
e.Lock()
defer e.Unlock()
index := e.indexMap[key]
return index
}
func (e *etcdImpl) watch(key string, children bool) (resp *etcd.Response, stat zk.Stat, watch <-chan zk.Event, err error) {
conn, err := e.pool.Get()
if err != nil {
return nil, nil, nil, err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
index := e.getIndex(key)
resp, err = c.Get(key, true, true)
if resp == nil {
return nil, nil, nil, convertToZkError(err)
}
if index < resp.Node.ModifiedIndex {
index = resp.Node.ModifiedIndex
}
for _, n := range resp.Node.Nodes {
if n.ModifiedIndex > index {
index = n.ModifiedIndex
}
}
log.Info("try watch", key)
ch := make(chan zk.Event, 100)
originVal := resp.Node.Value
go func() {
defer func() {
e.setIndex(key, index)
}()
for {
conn, err := e.pool.Get()
if err != nil {
log.Error(err)
return
}
c := conn.(*PooledEtcdClient).c
resp, err := c.Watch(key, index, children, nil, nil)
e.pool.Put(conn)
if err != nil {
if ec, ok := err.(*etcd.EtcdError); ok {
if ec.ErrorCode == etcderr.EcodeEventIndexCleared {
index++
continue
}
}
log.Warning("watch", err)
ch <- convertToZkEvent(key, resp, err)
return
}
if key == resp.Node.Key && originVal == string(resp.Node.Value) { //keep alive event
index++
continue
}
ch <- convertToZkEvent(key, resp, err)
//update index
if index <= resp.Node.ModifiedIndex {
index = resp.Node.ModifiedIndex + 1
} else {
index++
}
return
}
}()
return resp, nil, ch, nil
}
func (e *etcdImpl) GetW(key string) (data []byte, stat zk.Stat, watch <-chan zk.Event, err error) {
resp, stat, watch, err := e.watch(key, false)
if err != nil {
return
}
return []byte(resp.Node.Value), stat, watch, nil
}
func (e *etcdImpl) Children(key string) (children []string, stat zk.Stat, err error) {
conn, err := e.pool.Get()
if err != nil {
return nil, nil, err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
resp, err := c.Get(key, true, false)
if resp == nil {
return nil, nil, convertToZkError(err)
}
for _, c := range resp.Node.Nodes {
children = append(children, path.Base(c.Key))
}
return
}
func (e *etcdImpl) ChildrenW(key string) (children []string, stat zk.Stat, watch <-chan zk.Event, err error) {
resp, stat, watch, err := e.watch(key, true)
if err != nil {
return nil, stat, nil, convertToZkError(err)
}
for _, c := range resp.Node.Nodes {
children = append(children, path.Base(c.Key))
}
return children, stat, watch, nil
}
func (e *etcdImpl) Exists(key string) (exist bool, stat zk.Stat, err error) {
conn, err := e.pool.Get()
if err != nil {
return false, nil, err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
_, err = c.Get(key, true, false)
if err == nil {
return true, nil, nil
}
if ec, ok := err.(*etcd.EtcdError); ok {
if ec.ErrorCode == etcderr.EcodeKeyNotFound {
return false, nil, nil
}
}
return false, nil, convertToZkError(err)
}
func (e *etcdImpl) ExistsW(key string) (exist bool, stat zk.Stat, watch <-chan zk.Event, err error) {
_, stat, watch, err = e.watch(key, false)
if err != nil {
return false, nil, nil, convertToZkError(err)
}
return true, nil, watch, nil
}
const MAX_TTL = 365 * 24 * 60 * 60
func (e *etcdImpl) doKeepAlive(key string, ttl uint64) error {
conn, err := e.pool.Get()
if err != nil {
return err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
resp, err := c.Get(key, false, false)
if err != nil {
log.Error(err)
return err
}
if resp.Node.Dir {
return fmt.Errorf("can not set ttl to directory", key)
}
//log.Info("keep alive ", key)
resp, err = c.CompareAndSwap(key, resp.Node.Value, ttl, resp.Node.Value, resp.Node.ModifiedIndex)
if err == nil {
return nil
}
if ec, ok := err.(*etcd.EtcdError); ok && ec.ErrorCode == etcderr.EcodeTestFailed {
return nil
}
return err
}
//todo:add test for keepAlive
func (e *etcdImpl) keepAlive(key string, ttl uint64) {
go func() {
for {
time.Sleep(1 * time.Second)
err := e.doKeepAlive(key, ttl)
if err != nil {
log.Error(err)
return
}
}
}()
}
func (e *etcdImpl) Create(wholekey string, value []byte, flags int32, aclv []zk.ACL) (keyCreated string, err error) {
seq := (flags & zk.FlagSequence) != 0
tmp := (flags & zk.FlagEphemeral) != 0
ttl := uint64(MAX_TTL)
if tmp {
ttl = 5
}
var resp *etcd.Response
conn, err := e.pool.Get()
if err != nil {
return "", err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
fn := c.Create
log.Info("create", wholekey)
if seq {
wholekey = path.Dir(wholekey)
fn = c.CreateInOrder
} else {
for _, v := range aclv {
if v.Perms == PERM_DIRECTORY {
log.Info("etcdImpl:create directory", wholekey)
fn = nil
resp, err = c.CreateDir(wholekey, uint64(ttl))
if err != nil {
return "", convertToZkError(err)
}
}
}
}
if fn == nil {
if tmp {
e.keepAlive(wholekey, ttl)
}
return resp.Node.Key, nil
}
resp, err = fn(wholekey, string(value), uint64(ttl))
if err != nil {
return "", convertToZkError(err)
}
if tmp {
e.keepAlive(resp.Node.Key, ttl)
}
return resp.Node.Key, nil
}
func (e *etcdImpl) Set(key string, value []byte, version int32) (stat zk.Stat, err error) {
if version == 0 {
return nil, errors.New("invalid version")
}
conn, err := e.pool.Get()
if err != nil {
return nil, err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
resp, err := c.Get(key, true, false)
if resp == nil {
return nil, convertToZkError(err)
}
_, err = c.Set(key, string(value), uint64(resp.Node.TTL))
return nil, convertToZkError(err)
}
func (e *etcdImpl) Delete(key string, version int32) (err error) {
//todo: handle version
conn, err := e.pool.Get()
if err != nil {
return err
}
defer e.pool.Put(conn)
c := conn.(*PooledEtcdClient).c
resp, err := c.Get(key, true, false)
if resp == nil {
return convertToZkError(err)
}
if resp.Node.Dir {
_, err = c.DeleteDir(key)
} else {
_, err = c.Delete(key, false)
}
return convertToZkError(err)
}
func (e *etcdImpl) GetACL(key string) ([]zk.ACL, zk.Stat, error) {
return nil, nil, nil
}
func (e *etcdImpl) SetACL(key string, aclv []zk.ACL, version int32) (zk.Stat, error) {
return nil, nil
}
func (e *etcdImpl) Close() {
//how to implement this
}