Queue: Allow Redis to connect to unix

This commit is contained in:
Andrew Thornton 2019-12-17 19:44:37 +00:00
parent 1013ced326
commit 1fb9104009
No known key found for this signature in database
GPG Key ID: 3CDE74631F13A748
2 changed files with 9 additions and 3 deletions

View File

@ -41,6 +41,7 @@ type RedisQueue struct {
// RedisQueueConfiguration is the configuration for the redis queue // RedisQueueConfiguration is the configuration for the redis queue
type RedisQueueConfiguration struct { type RedisQueueConfiguration struct {
Network string
Addresses string Addresses string
Password string Password string
DBIndex int DBIndex int
@ -88,6 +89,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, errors.New("no redis host found") return nil, errors.New("no redis host found")
} else if len(dbs) == 1 { } else if len(dbs) == 1 {
queue.client = redis.NewClient(&redis.Options{ queue.client = redis.NewClient(&redis.Options{
Network: config.Network,
Addr: strings.TrimSpace(dbs[0]), // use default Addr Addr: strings.TrimSpace(dbs[0]), // use default Addr
Password: config.Password, // no password set Password: config.Password, // no password set
DB: config.DBIndex, // use default DB DB: config.DBIndex, // use default DB

View File

@ -22,6 +22,7 @@ type queueSettings struct {
BatchLength int BatchLength int
ConnectionString string ConnectionString string
Type string Type string
Network string
Addresses string Addresses string
Password string Password string
QueueName string QueueName string
@ -47,6 +48,7 @@ func CreateQueue(name string, handle queue.HandlerFunc, exemplar interface{}) qu
opts["BatchLength"] = q.BatchLength opts["BatchLength"] = q.BatchLength
opts["DataDir"] = q.DataDir opts["DataDir"] = q.DataDir
opts["Addresses"] = q.Addresses opts["Addresses"] = q.Addresses
opts["Network"] = q.Network
opts["Password"] = q.Password opts["Password"] = q.Password
opts["DBIndex"] = q.DBIndex opts["DBIndex"] = q.DBIndex
opts["QueueName"] = q.QueueName opts["QueueName"] = q.QueueName
@ -111,7 +113,7 @@ func getQueueSettings(name string) queueSettings {
q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers) q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
q.QueueName = sec.Key("QUEUE_NAME").MustString(Queue.QueueName) q.QueueName = sec.Key("QUEUE_NAME").MustString(Queue.QueueName)
q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString) q.Network, q.Addresses, q.Password, q.DBIndex, _ = ParseQueueConnStr(q.ConnectionString)
return q return q
} }
@ -128,7 +130,7 @@ func NewQueueService() {
Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, "")) Queue.ConnectionString = sec.Key("CONN_STR").MustString(path.Join(AppDataPath, ""))
validTypes := queue.RegisteredTypesAsString() validTypes := queue.RegisteredTypesAsString()
Queue.Type = sec.Key("TYPE").In(string(queue.PersistableChannelQueueType), validTypes) Queue.Type = sec.Key("TYPE").In(string(queue.PersistableChannelQueueType), validTypes)
Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString) Queue.Network, Queue.Addresses, Queue.Password, Queue.DBIndex, _ = ParseQueueConnStr(Queue.ConnectionString)
Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true) Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10) Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second) Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
@ -183,7 +185,7 @@ func NewQueueService() {
} }
// ParseQueueConnStr parses a queue connection string // ParseQueueConnStr parses a queue connection string
func ParseQueueConnStr(connStr string) (addrs, password string, dbIdx int, err error) { func ParseQueueConnStr(connStr string) (network, addrs, password string, dbIdx int, err error) {
fields := strings.Fields(connStr) fields := strings.Fields(connStr)
for _, f := range fields { for _, f := range fields {
items := strings.SplitN(f, "=", 2) items := strings.SplitN(f, "=", 2)
@ -191,6 +193,8 @@ func ParseQueueConnStr(connStr string) (addrs, password string, dbIdx int, err e
continue continue
} }
switch strings.ToLower(items[0]) { switch strings.ToLower(items[0]) {
case "network":
network = items[1]
case "addrs": case "addrs":
addrs = items[1] addrs = items[1]
case "password": case "password":