Commit 6473b6e4 authored by Ben Kochie's avatar Ben Kochie

Update redigo library

Use updated upstream vendor for redigo/sentinel libraries.
parent 11d5df02
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/jpillora/backoff" "github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
......
...@@ -7,8 +7,8 @@ import ( ...@@ -7,8 +7,8 @@ import (
"net/url" "net/url"
"time" "time"
sentinel "github.com/FZambia/go-sentinel" "github.com/FZambia/sentinel"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
......
...@@ -4,7 +4,7 @@ import ( ...@@ -4,7 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/rafaeljusto/redigomock" "github.com/rafaeljusto/redigomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
......
go-sentinel
===========
Redis Sentinel support for [redigo](https://github.com/garyburd/redigo) library.
**API is unstable and can change at any moment** – use with tools like Glide, Godep etc.
Documentation
-------------
- [API Reference](http://godoc.org/github.com/FZambia/go-sentinel)
Alternative solution
--------------------
You can alternatively configure Haproxy between your application and Redis to proxy requests to Redis master instance if you only need HA:
```
listen redis
server redis-01 127.0.0.1:6380 check port 6380 check inter 2s weight 1 inter 2s downinter 5s rise 10 fall 2
server redis-02 127.0.0.1:6381 check port 6381 check inter 2s weight 1 inter 2s downinter 5s rise 10 fall 2 backup
bind *:6379
mode tcp
option tcpka
option tcplog
option tcp-check
tcp-check send PING\r\n
tcp-check expect string +PONG
tcp-check send info\ replication\r\n
tcp-check expect string role:master
tcp-check send QUIT\r\n
tcp-check expect string +OK
balance roundrobin
```
This way you don't need to use this library.
License
-------
Library is available under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html).
go-sentinel
===========
Redis Sentinel support for [redigo](https://github.com/gomodule/redigo) library.
**API is unstable and can change at any moment** – use with tools like Glide, Godep etc.
Documentation
-------------
- [API Reference](http://godoc.org/github.com/FZambia/sentinel)
License
-------
Library is available under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html).
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/garyburd/redigo/redis" "github.com/gomodule/redigo/redis"
) )
// Sentinel provides a way to add high availability (HA) to Redis Pool using // Sentinel provides a way to add high availability (HA) to Redis Pool using
......
...@@ -29,9 +29,12 @@ import ( ...@@ -29,9 +29,12 @@ import (
"time" "time"
) )
var (
_ ConnWithTimeout = (*conn)(nil)
)
// conn is the low-level implementation of Conn // conn is the low-level implementation of Conn
type conn struct { type conn struct {
// Shared // Shared
mu sync.Mutex mu sync.Mutex
pending int pending int
...@@ -73,10 +76,11 @@ type DialOption struct { ...@@ -73,10 +76,11 @@ type DialOption struct {
type dialOptions struct { type dialOptions struct {
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
dialer *net.Dialer
dial func(network, addr string) (net.Conn, error) dial func(network, addr string) (net.Conn, error)
db int db int
password string password string
dialTLS bool useTLS bool
skipVerify bool skipVerify bool
tlsConfig *tls.Config tlsConfig *tls.Config
} }
...@@ -95,17 +99,27 @@ func DialWriteTimeout(d time.Duration) DialOption { ...@@ -95,17 +99,27 @@ func DialWriteTimeout(d time.Duration) DialOption {
}} }}
} }
// DialConnectTimeout specifies the timeout for connecting to the Redis server. // DialConnectTimeout specifies the timeout for connecting to the Redis server when
// no DialNetDial option is specified.
func DialConnectTimeout(d time.Duration) DialOption { func DialConnectTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) { return DialOption{func(do *dialOptions) {
dialer := net.Dialer{Timeout: d} do.dialer.Timeout = d
do.dial = dialer.Dial }}
}
// DialKeepAlive specifies the keep-alive period for TCP connections to the Redis server
// when no DialNetDial option is specified.
// If zero, keep-alives are not enabled. If no DialKeepAlive option is specified then
// the default of 5 minutes is used to ensure that half-closed TCP sessions are detected.
func DialKeepAlive(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.dialer.KeepAlive = d
}} }}
} }
// DialNetDial specifies a custom dial function for creating TCP // DialNetDial specifies a custom dial function for creating TCP
// connections. If this option is left out, then net.Dial is // connections, otherwise a net.Dialer customized via the other options is used.
// used. DialNetDial overrides DialConnectTimeout. // DialNetDial overrides DialConnectTimeout and DialKeepAlive.
func DialNetDial(dial func(network, addr string) (net.Conn, error)) DialOption { func DialNetDial(dial func(network, addr string) (net.Conn, error)) DialOption {
return DialOption{func(do *dialOptions) { return DialOption{func(do *dialOptions) {
do.dial = dial do.dial = dial
...@@ -135,31 +149,49 @@ func DialTLSConfig(c *tls.Config) DialOption { ...@@ -135,31 +149,49 @@ func DialTLSConfig(c *tls.Config) DialOption {
}} }}
} }
// DialTLSSkipVerify to disable server name verification when connecting // DialTLSSkipVerify disables server name verification when connecting over
// over TLS. Has no effect when not dialing a TLS connection. // TLS. Has no effect when not dialing a TLS connection.
func DialTLSSkipVerify(skip bool) DialOption { func DialTLSSkipVerify(skip bool) DialOption {
return DialOption{func(do *dialOptions) { return DialOption{func(do *dialOptions) {
do.skipVerify = skip do.skipVerify = skip
}} }}
} }
// DialUseTLS specifies whether TLS should be used when connecting to the
// server. This option is ignore by DialURL.
func DialUseTLS(useTLS bool) DialOption {
return DialOption{func(do *dialOptions) {
do.useTLS = useTLS
}}
}
// Dial connects to the Redis server at the given network and // Dial connects to the Redis server at the given network and
// address using the specified options. // address using the specified options.
func Dial(network, address string, options ...DialOption) (Conn, error) { func Dial(network, address string, options ...DialOption) (Conn, error) {
do := dialOptions{ do := dialOptions{
dial: net.Dial, dialer: &net.Dialer{
KeepAlive: time.Minute * 5,
},
} }
for _, option := range options { for _, option := range options {
option.f(&do) option.f(&do)
} }
if do.dial == nil {
do.dial = do.dialer.Dial
}
netConn, err := do.dial(network, address) netConn, err := do.dial(network, address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if do.dialTLS { if do.useTLS {
tlsConfig := cloneTLSClientConfig(do.tlsConfig, do.skipVerify) var tlsConfig *tls.Config
if do.tlsConfig == nil {
tlsConfig = &tls.Config{InsecureSkipVerify: do.skipVerify}
} else {
tlsConfig = cloneTLSConfig(do.tlsConfig)
}
if tlsConfig.ServerName == "" { if tlsConfig.ServerName == "" {
host, _, err := net.SplitHostPort(address) host, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
...@@ -202,10 +234,6 @@ func Dial(network, address string, options ...DialOption) (Conn, error) { ...@@ -202,10 +234,6 @@ func Dial(network, address string, options ...DialOption) (Conn, error) {
return c, nil return c, nil
} }
func dialTLS(do *dialOptions) {
do.dialTLS = true
}
var pathDBRegexp = regexp.MustCompile(`/(\d*)\z`) var pathDBRegexp = regexp.MustCompile(`/(\d*)\z`)
// DialURL connects to a Redis server at the given URL using the Redis // DialURL connects to a Redis server at the given URL using the Redis
...@@ -257,9 +285,7 @@ func DialURL(rawurl string, options ...DialOption) (Conn, error) { ...@@ -257,9 +285,7 @@ func DialURL(rawurl string, options ...DialOption) (Conn, error) {
return nil, fmt.Errorf("invalid database: %s", u.Path[1:]) return nil, fmt.Errorf("invalid database: %s", u.Path[1:])
} }
if u.Scheme == "rediss" { options = append(options, DialUseTLS(u.Scheme == "rediss"))
options = append([]DialOption{{dialTLS}}, options...)
}
return Dial("tcp", address, options...) return Dial("tcp", address, options...)
} }
...@@ -344,39 +370,55 @@ func (c *conn) writeFloat64(n float64) error { ...@@ -344,39 +370,55 @@ func (c *conn) writeFloat64(n float64) error {
return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64)) return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64))
} }
func (c *conn) writeCommand(cmd string, args []interface{}) (err error) { func (c *conn) writeCommand(cmd string, args []interface{}) error {
c.writeLen('*', 1+len(args)) c.writeLen('*', 1+len(args))
err = c.writeString(cmd) if err := c.writeString(cmd); err != nil {
return err
}
for _, arg := range args { for _, arg := range args {
if err != nil { if err := c.writeArg(arg, true); err != nil {
break return err
} }
switch arg := arg.(type) { }
case string: return nil
err = c.writeString(arg) }
case []byte:
err = c.writeBytes(arg) func (c *conn) writeArg(arg interface{}, argumentTypeOK bool) (err error) {
case int: switch arg := arg.(type) {
err = c.writeInt64(int64(arg)) case string:
case int64: return c.writeString(arg)
err = c.writeInt64(arg) case []byte:
case float64: return c.writeBytes(arg)
err = c.writeFloat64(arg) case int:
case bool: return c.writeInt64(int64(arg))
if arg { case int64:
err = c.writeString("1") return c.writeInt64(arg)
} else { case float64:
err = c.writeString("0") return c.writeFloat64(arg)
} case bool:
case nil: if arg {
err = c.writeString("") return c.writeString("1")
default: } else {
var buf bytes.Buffer return c.writeString("0")
fmt.Fprint(&buf, arg) }
err = c.writeBytes(buf.Bytes()) case nil:
return c.writeString("")
case Argument:
if argumentTypeOK {
return c.writeArg(arg.RedisArg(), false)
} }
// See comment in default clause below.
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
return c.writeBytes(buf.Bytes())
default:
// This default clause is intended to handle builtin numeric types.
// The function should return an error for other types, but this is not
// done for compatibility with previous versions of the package.
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
return c.writeBytes(buf.Bytes())
} }
return err
} }
type protocolError string type protocolError string
...@@ -538,10 +580,17 @@ func (c *conn) Flush() error { ...@@ -538,10 +580,17 @@ func (c *conn) Flush() error {
return nil return nil
} }
func (c *conn) Receive() (reply interface{}, err error) { func (c *conn) Receive() (interface{}, error) {
if c.readTimeout != 0 { return c.ReceiveWithTimeout(c.readTimeout)
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) }
func (c *conn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
var deadline time.Time
if timeout != 0 {
deadline = time.Now().Add(timeout)
} }
c.conn.SetReadDeadline(deadline)
if reply, err = c.readReply(); err != nil { if reply, err = c.readReply(); err != nil {
return nil, c.fatal(err) return nil, c.fatal(err)
} }
...@@ -564,6 +613,10 @@ func (c *conn) Receive() (reply interface{}, err error) { ...@@ -564,6 +613,10 @@ func (c *conn) Receive() (reply interface{}, err error) {
} }
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) { func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return c.DoWithTimeout(c.readTimeout, cmd, args...)
}
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
c.mu.Lock() c.mu.Lock()
pending := c.pending pending := c.pending
c.pending = 0 c.pending = 0
...@@ -587,9 +640,11 @@ func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) { ...@@ -587,9 +640,11 @@ func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return nil, c.fatal(err) return nil, c.fatal(err)
} }
if c.readTimeout != 0 { var deadline time.Time
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) if readTimeout != 0 {
deadline = time.Now().Add(readTimeout)
} }
c.conn.SetReadDeadline(deadline)
if cmd == "" { if cmd == "" {
reply := make([]interface{}, pending) reply := make([]interface{}, pending)
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
// Package redis is a client for the Redis database. // Package redis is a client for the Redis database.
// //
// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more // The Redigo FAQ (https://github.com/gomodule/redigo/wiki/FAQ) contains more
// documentation about this package. // documentation about this package.
// //
// Connections // Connections
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
// //
// n, err := conn.Do("APPEND", "key", "value") // n, err := conn.Do("APPEND", "key", "value")
// //
// The Do method converts command arguments to binary strings for transmission // The Do method converts command arguments to bulk strings for transmission
// to the server as follows: // to the server as follows:
// //
// Go Type Conversion // Go Type Conversion
...@@ -48,7 +48,7 @@ ...@@ -48,7 +48,7 @@
// float64 strconv.FormatFloat(v, 'g', -1, 64) // float64 strconv.FormatFloat(v, 'g', -1, 64)
// bool true -> "1", false -> "0" // bool true -> "1", false -> "0"
// nil "" // nil ""
// all other types fmt.Print(v) // all other types fmt.Fprint(w, v)
// //
// Redis command reply types are represented using the following Go types: // Redis command reply types are represented using the following Go types:
// //
...@@ -174,4 +174,4 @@ ...@@ -174,4 +174,4 @@
// non-recoverable error such as a network error or protocol parsing error. If // non-recoverable error such as a network error or protocol parsing error. If
// Err() returns a non-nil value, then the connection is not usable and should // Err() returns a non-nil value, then the connection is not usable and should
// be closed. // be closed.
package redis // import "github.com/garyburd/redigo/redis" package redis // import "github.com/gomodule/redigo/redis"
...@@ -4,11 +4,7 @@ package redis ...@@ -4,11 +4,7 @@ package redis
import "crypto/tls" import "crypto/tls"
// similar cloneTLSClientConfig in the stdlib, but also honor skipVerify for the nil case func cloneTLSConfig(cfg *tls.Config) *tls.Config {
func cloneTLSClientConfig(cfg *tls.Config, skipVerify bool) *tls.Config {
if cfg == nil {
return &tls.Config{InsecureSkipVerify: skipVerify}
}
return &tls.Config{ return &tls.Config{
Rand: cfg.Rand, Rand: cfg.Rand,
Time: cfg.Time, Time: cfg.Time,
......
// +build go1.7 // +build go1.7,!go1.8
package redis package redis
import "crypto/tls" import "crypto/tls"
// similar cloneTLSClientConfig in the stdlib, but also honor skipVerify for the nil case func cloneTLSConfig(cfg *tls.Config) *tls.Config {
func cloneTLSClientConfig(cfg *tls.Config, skipVerify bool) *tls.Config {
if cfg == nil {
return &tls.Config{InsecureSkipVerify: skipVerify}
}
return &tls.Config{ return &tls.Config{
Rand: cfg.Rand, Rand: cfg.Rand,
Time: cfg.Time, Time: cfg.Time,
......
// +build go1.8
package redis
import "crypto/tls"
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
return cfg.Clone()
}
...@@ -18,6 +18,11 @@ import ( ...@@ -18,6 +18,11 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"log" "log"
"time"
)
var (
_ ConnWithTimeout = (*loggingConn)(nil)
) )
// NewLoggingConn returns a logging wrapper around a connection. // NewLoggingConn returns a logging wrapper around a connection.
...@@ -104,6 +109,12 @@ func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, ...@@ -104,6 +109,12 @@ func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{},
return reply, err return reply, err
} }
func (c *loggingConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) {
reply, err := DoWithTimeout(c.Conn, timeout, commandName, args...)
c.print("DoWithTimeout", commandName, args, reply, err)
return reply, err
}
func (c *loggingConn) Send(commandName string, args ...interface{}) error { func (c *loggingConn) Send(commandName string, args ...interface{}) error {
err := c.Conn.Send(commandName, args...) err := c.Conn.Send(commandName, args...)
c.print("Send", commandName, args, nil, err) c.print("Send", commandName, args, nil, err)
...@@ -115,3 +126,9 @@ func (c *loggingConn) Receive() (interface{}, error) { ...@@ -115,3 +126,9 @@ func (c *loggingConn) Receive() (interface{}, error) {
c.print("Receive", "", nil, reply, err) c.print("Receive", "", nil, reply, err)
return reply, err return reply, err
} }
func (c *loggingConn) ReceiveWithTimeout(timeout time.Duration) (interface{}, error) {
reply, err := ReceiveWithTimeout(c.Conn, timeout)
c.print("ReceiveWithTimeout", "", nil, reply, err)
return reply, err
}
...@@ -16,16 +16,21 @@ package redis ...@@ -16,16 +16,21 @@ package redis
import ( import (
"bytes" "bytes"
"container/list"
"crypto/rand" "crypto/rand"
"crypto/sha1" "crypto/sha1"
"errors" "errors"
"io" "io"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/garyburd/redigo/internal" "github.com/gomodule/redigo/internal"
)
var (
_ ConnWithTimeout = (*activeConn)(nil)
_ ConnWithTimeout = (*errorConn)(nil)
) )
var nowFunc = time.Now // for testing var nowFunc = time.Now // for testing
...@@ -96,7 +101,7 @@ var ( ...@@ -96,7 +101,7 @@ var (
// return nil, err // return nil, err
// } // }
// return c, nil // return c, nil
// } // },
// } // }
// //
// Use the TestOnBorrow function to check the health of an idle connection // Use the TestOnBorrow function to check the health of an idle connection
...@@ -115,7 +120,6 @@ var ( ...@@ -115,7 +120,6 @@ var (
// } // }
// //
type Pool struct { type Pool struct {
// Dial is an application supplied function for creating and configuring a // Dial is an application supplied function for creating and configuring a
// connection. // connection.
// //
...@@ -146,19 +150,17 @@ type Pool struct { ...@@ -146,19 +150,17 @@ type Pool struct {
// for a connection to be returned to the pool before returning. // for a connection to be returned to the pool before returning.
Wait bool Wait bool
// mu protects fields defined below. // Close connections older than this duration. If the value is zero, then
mu sync.Mutex // the pool does not close connections based on age.
cond *sync.Cond MaxConnLifetime time.Duration
closed bool
active int
// Stack of idleConn with most recently used at the front. chInitialized uint32 // set to 1 when field ch is initialized
idle list.List
}
type idleConn struct { mu sync.Mutex // mu protects the following fields
c Conn closed bool // set to true when the pool is closed.
t time.Time active int // the number of open connections in the pool
ch chan struct{} // limits open connections when p.Wait is true
idle idleList // idle connections
} }
// NewPool creates a new pool. // NewPool creates a new pool.
...@@ -174,14 +176,36 @@ func NewPool(newFn func() (Conn, error), maxIdle int) *Pool { ...@@ -174,14 +176,36 @@ func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
// getting an underlying connection, then the connection Err, Do, Send, Flush // getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error. // and Receive methods return that error.
func (p *Pool) Get() Conn { func (p *Pool) Get() Conn {
c, err := p.get() pc, err := p.get(nil)
if err != nil { if err != nil {
return errorConnection{err} return errorConn{err}
} }
return &pooledConnection{p: p, c: c} return &activeConn{p: p, pc: pc}
} }
// ActiveCount returns the number of active connections in the pool. // PoolStats contains pool statistics.
type PoolStats struct {
// ActiveCount is the number of connections in the pool. The count includes
// idle connections and connections in use.
ActiveCount int
// IdleCount is the number of idle connections in the pool.
IdleCount int
}
// Stats returns pool's statistics.
func (p *Pool) Stats() PoolStats {
p.mu.Lock()
stats := PoolStats{
ActiveCount: p.active,
IdleCount: p.idle.count,
}
p.mu.Unlock()
return stats
}
// ActiveCount returns the number of connections in the pool. The count
// includes idle connections and connections in use.
func (p *Pool) ActiveCount() int { func (p *Pool) ActiveCount() int {
p.mu.Lock() p.mu.Lock()
active := p.active active := p.active
...@@ -189,141 +213,164 @@ func (p *Pool) ActiveCount() int { ...@@ -189,141 +213,164 @@ func (p *Pool) ActiveCount() int {
return active return active
} }
// IdleCount returns the number of idle connections in the pool.
func (p *Pool) IdleCount() int {
p.mu.Lock()
idle := p.idle.count
p.mu.Unlock()
return idle
}
// Close releases the resources used by the pool. // Close releases the resources used by the pool.
func (p *Pool) Close() error { func (p *Pool) Close() error {
p.mu.Lock() p.mu.Lock()
idle := p.idle if p.closed {
p.idle.Init() p.mu.Unlock()
return nil
}
p.closed = true p.closed = true
p.active -= idle.Len() p.active -= p.idle.count
if p.cond != nil { pc := p.idle.front
p.cond.Broadcast() p.idle.count = 0
p.idle.front, p.idle.back = nil, nil
if p.ch != nil {
close(p.ch)
} }
p.mu.Unlock() p.mu.Unlock()
for e := idle.Front(); e != nil; e = e.Next() { for ; pc != nil; pc = pc.next {
e.Value.(idleConn).c.Close() pc.c.Close()
} }
return nil return nil
} }
// release decrements the active count and signals waiters. The caller must func (p *Pool) lazyInit() {
// hold p.mu during the call. // Fast path.
func (p *Pool) release() { if atomic.LoadUint32(&p.chInitialized) == 1 {
p.active -= 1 return
if p.cond != nil {
p.cond.Signal()
} }
// Slow path.
p.mu.Lock()
if p.chInitialized == 0 {
p.ch = make(chan struct{}, p.MaxActive)
if p.closed {
close(p.ch)
} else {
for i := 0; i < p.MaxActive; i++ {
p.ch <- struct{}{}
}
}
atomic.StoreUint32(&p.chInitialized, 1)
}
p.mu.Unlock()
} }
// get prunes stale connections and returns a connection from the idle list or // get prunes stale connections and returns a connection from the idle list or
// creates a new connection. // creates a new connection.
func (p *Pool) get() (Conn, error) { func (p *Pool) get(ctx interface {
p.mu.Lock() Done() <-chan struct{}
Err() error
// Prune stale connections. }) (*poolConn, error) {
if timeout := p.IdleTimeout; timeout > 0 { // Handle limit for p.Wait == true.
for i, n := 0, p.idle.Len(); i < n; i++ { if p.Wait && p.MaxActive > 0 {
e := p.idle.Back() p.lazyInit()
if e == nil { if ctx == nil {
break <-p.ch
} } else {
ic := e.Value.(idleConn) select {
if ic.t.Add(timeout).After(nowFunc()) { case <-p.ch:
break case <-ctx.Done():
return nil, ctx.Err()
} }
p.idle.Remove(e)
p.release()
p.mu.Unlock()
ic.c.Close()
p.mu.Lock()
} }
} }
for { p.mu.Lock()
// Get idle connection.
for i, n := 0, p.idle.Len(); i < n; i++ { // Prune stale connections at the back of the idle list.
e := p.idle.Front() if p.IdleTimeout > 0 {
if e == nil { n := p.idle.count
break for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
} pc := p.idle.back
ic := e.Value.(idleConn) p.idle.popBack()
p.idle.Remove(e)
test := p.TestOnBorrow
p.mu.Unlock() p.mu.Unlock()
if test == nil || test(ic.c, ic.t) == nil { pc.c.Close()
return ic.c, nil
}
ic.c.Close()
p.mu.Lock() p.mu.Lock()
p.release() p.active--
} }
}
// Check for pool closed before dialing a new connection. // Get idle connection from the front of idle list.
for p.idle.front != nil {
if p.closed { pc := p.idle.front
p.mu.Unlock() p.idle.popFront()
return nil, errors.New("redigo: get on closed pool") p.mu.Unlock()
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
return pc, nil
} }
pc.c.Close()
p.mu.Lock()
p.active--
}
// Dial new connection if under limit. // Check for pool closed before dialing a new connection.
if p.closed {
if p.MaxActive == 0 || p.active < p.MaxActive { p.mu.Unlock()
dial := p.Dial return nil, errors.New("redigo: get on closed pool")
p.active += 1 }
p.mu.Unlock()
c, err := dial()
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
c = nil
}
return c, err
}
if !p.Wait { // Handle limit for p.Wait == false.
p.mu.Unlock() if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
return nil, ErrPoolExhausted p.mu.Unlock()
} return nil, ErrPoolExhausted
}
if p.cond == nil { p.active++
p.cond = sync.NewCond(&p.mu) p.mu.Unlock()
c, err := p.Dial()
if err != nil {
c = nil
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
} }
p.cond.Wait() p.mu.Unlock()
} }
return &poolConn{c: c, created: nowFunc()}, err
} }
func (p *Pool) put(c Conn, forceClose bool) error { func (p *Pool) put(pc *poolConn, forceClose bool) error {
err := c.Err()
p.mu.Lock() p.mu.Lock()
if !p.closed && err == nil && !forceClose { if !p.closed && !forceClose {
p.idle.PushFront(idleConn{t: nowFunc(), c: c}) pc.t = nowFunc()
if p.idle.Len() > p.MaxIdle { p.idle.pushFront(pc)
c = p.idle.Remove(p.idle.Back()).(idleConn).c if p.idle.count > p.MaxIdle {
pc = p.idle.back
p.idle.popBack()
} else { } else {
c = nil pc = nil
} }
} }
if c == nil { if pc != nil {
if p.cond != nil {
p.cond.Signal()
}
p.mu.Unlock() p.mu.Unlock()
return nil pc.c.Close()
p.mu.Lock()
p.active--
} }
p.release() if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock() p.mu.Unlock()
return c.Close() return nil
} }
type pooledConnection struct { type activeConn struct {
p *Pool p *Pool
c Conn pc *poolConn
state int state int
} }
...@@ -344,73 +391,172 @@ func initSentinel() { ...@@ -344,73 +391,172 @@ func initSentinel() {
} }
} }
func (pc *pooledConnection) Close() error { func (ac *activeConn) Close() error {
c := pc.c pc := ac.pc
if _, ok := c.(errorConnection); ok { if pc == nil {
return nil return nil
} }
pc.c = errorConnection{errConnClosed} ac.pc = nil
if pc.state&internal.MultiState != 0 { if ac.state&internal.MultiState != 0 {
c.Send("DISCARD") pc.c.Send("DISCARD")
pc.state &^= (internal.MultiState | internal.WatchState) ac.state &^= (internal.MultiState | internal.WatchState)
} else if pc.state&internal.WatchState != 0 { } else if ac.state&internal.WatchState != 0 {
c.Send("UNWATCH") pc.c.Send("UNWATCH")
pc.state &^= internal.WatchState ac.state &^= internal.WatchState
} }
if pc.state&internal.SubscribeState != 0 { if ac.state&internal.SubscribeState != 0 {
c.Send("UNSUBSCRIBE") pc.c.Send("UNSUBSCRIBE")
c.Send("PUNSUBSCRIBE") pc.c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo // To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value. // a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel) sentinelOnce.Do(initSentinel)
c.Send("ECHO", sentinel) pc.c.Send("ECHO", sentinel)
c.Flush() pc.c.Flush()
for { for {
p, err := c.Receive() p, err := pc.c.Receive()
if err != nil { if err != nil {
break break
} }
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
pc.state &^= internal.SubscribeState ac.state &^= internal.SubscribeState
break break
} }
} }
} }
c.Do("") pc.c.Do("")
pc.p.put(c, pc.state != 0) ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil return nil
} }
func (pc *pooledConnection) Err() error { func (ac *activeConn) Err() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Err() return pc.c.Err()
} }
func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) { func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
ci := internal.LookupCommandInfo(commandName) ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...) return pc.c.Do(commandName, args...)
} }
func (pc *pooledConnection) Send(commandName string, args ...interface{}) error { func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
ci := internal.LookupCommandInfo(commandName) ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear ac.state = (ac.state | ci.Set) &^ ci.Clear
return cwt.DoWithTimeout(timeout, commandName, args...)
}
func (ac *activeConn) Send(commandName string, args ...interface{}) error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
ci := internal.LookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Send(commandName, args...) return pc.c.Send(commandName, args...)
} }
func (pc *pooledConnection) Flush() error { func (ac *activeConn) Flush() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Flush() return pc.c.Flush()
} }
func (pc *pooledConnection) Receive() (reply interface{}, err error) { func (ac *activeConn) Receive() (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
return pc.c.Receive() return pc.c.Receive()
} }
type errorConnection struct{ err error } func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.ReceiveWithTimeout(timeout)
}
type errorConn struct{ err error }
func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
return nil, ec.err
}
func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
func (ec errorConn) Err() error { return ec.err }
func (ec errorConn) Close() error { return nil }
func (ec errorConn) Flush() error { return ec.err }
func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
type idleList struct {
count int
front, back *poolConn
}
type poolConn struct {
c Conn
t time.Time
created time.Time
next, prev *poolConn
}
func (l *idleList) pushFront(pc *poolConn) {
pc.next = l.front
pc.prev = nil
if l.count == 0 {
l.back = pc
} else {
l.front.prev = pc
}
l.front = pc
l.count++
return
}
func (l *idleList) popFront() {
pc := l.front
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
pc.next.prev = nil
l.front = pc.next
}
pc.next, pc.prev = nil, nil
}
func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } func (l *idleList) popBack() {
func (ec errorConnection) Send(string, ...interface{}) error { return ec.err } pc := l.back
func (ec errorConnection) Err() error { return ec.err } l.count--
func (ec errorConnection) Close() error { return ec.err } if l.count == 0 {
func (ec errorConnection) Flush() error { return ec.err } l.front, l.back = nil, nil
func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err } } else {
pc.prev.next = nil
l.back = pc.prev
}
pc.next, pc.prev = nil, nil
}
// Copyright 2012 Gary Burd // Copyright 2018 Gary Burd
// //
// Licensed under the Apache License, Version 2.0 (the "License"): you may // Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain // not use this file except in compliance with the License. You may obtain
...@@ -12,30 +12,24 @@ ...@@ -12,30 +12,24 @@
// License for the specific language governing permissions and limitations // License for the specific language governing permissions and limitations
// under the License. // under the License.
package redis // +build go1.7
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
// Conn represents a connection to a Redis server.
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply. package redis
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server. import "context"
Flush() error
// Receive receives a single reply from the Redis server // GetContext gets a connection using the provided context.
Receive() (reply interface{}, err error) //
// The provided Context must be non-nil. If the context expires before the
// connection is complete, an error is returned. Any expiration on the context
// will not affect the returned connection.
//
// If the function completes without error, then the application must close the
// returned connection.
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
pc, err := p.get(ctx)
if err != nil {
return errorConn{err}, err
}
return &activeConn{p: p, pc: pc}, nil
} }
...@@ -14,11 +14,13 @@ ...@@ -14,11 +14,13 @@
package redis package redis
import "errors" import (
"errors"
"time"
)
// Subscription represents a subscribe or unsubscribe notification. // Subscription represents a subscribe or unsubscribe notification.
type Subscription struct { type Subscription struct {
// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe" // Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
Kind string Kind string
...@@ -31,23 +33,12 @@ type Subscription struct { ...@@ -31,23 +33,12 @@ type Subscription struct {
// Message represents a message notification. // Message represents a message notification.
type Message struct { type Message struct {
// The originating channel. // The originating channel.
Channel string Channel string
// The message data. // The matched pattern, if any
Data []byte
}
// PMessage represents a pmessage notification.
type PMessage struct {
// The matched pattern.
Pattern string Pattern string
// The originating channel.
Channel string
// The message data. // The message data.
Data []byte Data []byte
} }
...@@ -94,16 +85,29 @@ func (c PubSubConn) PUnsubscribe(channel ...interface{}) error { ...@@ -94,16 +85,29 @@ func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
} }
// Ping sends a PING to the server with the specified data. // Ping sends a PING to the server with the specified data.
//
// The connection must be subscribed to at least one channel or pattern when
// calling this method.
func (c PubSubConn) Ping(data string) error { func (c PubSubConn) Ping(data string) error {
c.Conn.Send("PING", data) c.Conn.Send("PING", data)
return c.Conn.Flush() return c.Conn.Flush()
} }
// Receive returns a pushed message as a Subscription, Message, PMessage, Pong // Receive returns a pushed message as a Subscription, Message, Pong or error.
// or error. The return value is intended to be used directly in a type switch // The return value is intended to be used directly in a type switch as
// as illustrated in the PubSubConn example. // illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} { func (c PubSubConn) Receive() interface{} {
reply, err := Values(c.Conn.Receive()) return c.receiveInternal(c.Conn.Receive())
}
// ReceiveWithTimeout is like Receive, but it allows the application to
// override the connection's default timeout.
func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{} {
return c.receiveInternal(ReceiveWithTimeout(c.Conn, timeout))
}
func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
reply, err := Values(replyArg, errArg)
if err != nil { if err != nil {
return err return err
} }
...@@ -122,11 +126,11 @@ func (c PubSubConn) Receive() interface{} { ...@@ -122,11 +126,11 @@ func (c PubSubConn) Receive() interface{} {
} }
return m return m
case "pmessage": case "pmessage":
var pm PMessage var m Message
if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil { if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
return err return err
} }
return pm return m
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind} s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
......
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"time"
)
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
// Conn represents a connection to a Redis server.
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}
// Argument is the interface implemented by an object which wants to control how
// the object is converted to Redis bulk strings.
type Argument interface {
// RedisArg returns a value to be encoded as a bulk string per the
// conversions listed in the section 'Executing Commands'.
// Implementations should typically return a []byte or string.
RedisArg() interface{}
}
// Scanner is implemented by an object which wants to control its value is
// interpreted when read from Redis.
type Scanner interface {
// RedisScan assigns a value from a Redis value. The argument src is one of
// the reply types listed in the section `Executing Commands`.
//
// An error should be returned if the value cannot be stored without
// loss of information.
RedisScan(src interface{}) error
}
// ConnWithTimeout is an optional interface that allows the caller to override
// a connection's default read timeout. This interface is useful for executing
// the BLPOP, BRPOP, BRPOPLPUSH, XREAD and other commands that block at the
// server.
//
// A connection's default read timeout is set with the DialReadTimeout dial
// option. Applications should rely on the default timeout for commands that do
// not block at the server.
//
// All of the Conn implementations in this package satisfy the ConnWithTimeout
// interface.
//
// Use the DoWithTimeout and ReceiveWithTimeout helper functions to simplify
// use of this interface.
type ConnWithTimeout interface {
Conn
// Do sends a command to the server and returns the received reply.
// The timeout overrides the read timeout set when dialing the
// connection.
DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error)
// Receive receives a single reply from the Redis server. The timeout
// overrides the read timeout set when dialing the connection.
ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error)
}
var errTimeoutNotSupported = errors.New("redis: connection does not support ConnWithTimeout")
// DoWithTimeout executes a Redis command with the specified read timeout. If
// the connection does not satisfy the ConnWithTimeout interface, then an error
// is returned.
func DoWithTimeout(c Conn, timeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
cwt, ok := c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.DoWithTimeout(timeout, cmd, args...)
}
// ReceiveWithTimeout receives a reply with the specified read timeout. If the
// connection does not satisfy the ConnWithTimeout interface, then an error is
// returned.
func ReceiveWithTimeout(c Conn, timeout time.Duration) (interface{}, error) {
cwt, ok := c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.ReceiveWithTimeout(timeout)
}
...@@ -243,34 +243,67 @@ func Values(reply interface{}, err error) ([]interface{}, error) { ...@@ -243,34 +243,67 @@ func Values(reply interface{}, err error) ([]interface{}, error) {
return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply) return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply)
} }
// Strings is a helper that converts an array command reply to a []string. If func sliceHelper(reply interface{}, err error, name string, makeSlice func(int), assign func(int, interface{}) error) error {
// err is not equal to nil, then Strings returns nil, err. Nil array items are
// converted to "" in the output slice. Strings returns an error if an array
// item is not a bulk string or nil.
func Strings(reply interface{}, err error) ([]string, error) {
if err != nil { if err != nil {
return nil, err return err
} }
switch reply := reply.(type) { switch reply := reply.(type) {
case []interface{}: case []interface{}:
result := make([]string, len(reply)) makeSlice(len(reply))
for i := range reply { for i := range reply {
if reply[i] == nil { if reply[i] == nil {
continue continue
} }
p, ok := reply[i].([]byte) if err := assign(i, reply[i]); err != nil {
if !ok { return err
return nil, fmt.Errorf("redigo: unexpected element type for Strings, got type %T", reply[i])
} }
result[i] = string(p)
} }
return result, nil return nil
case nil: case nil:
return nil, ErrNil return ErrNil
case Error: case Error:
return nil, reply return reply
} }
return nil, fmt.Errorf("redigo: unexpected type for Strings, got type %T", reply) return fmt.Errorf("redigo: unexpected type for %s, got type %T", name, reply)
}
// Float64s is a helper that converts an array command reply to a []float64. If
// err is not equal to nil, then Float64s returns nil, err. Nil array items are
// converted to 0 in the output slice. Floats64 returns an error if an array
// item is not a bulk string or nil.
func Float64s(reply interface{}, err error) ([]float64, error) {
var result []float64
err = sliceHelper(reply, err, "Float64s", func(n int) { result = make([]float64, n) }, func(i int, v interface{}) error {
p, ok := v.([]byte)
if !ok {
return fmt.Errorf("redigo: unexpected element type for Floats64, got type %T", v)
}
f, err := strconv.ParseFloat(string(p), 64)
result[i] = f
return err
})
return result, err
}
// Strings is a helper that converts an array command reply to a []string. If
// err is not equal to nil, then Strings returns nil, err. Nil array items are
// converted to "" in the output slice. Strings returns an error if an array
// item is not a bulk string or nil.
func Strings(reply interface{}, err error) ([]string, error) {
var result []string
err = sliceHelper(reply, err, "Strings", func(n int) { result = make([]string, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case string:
result[i] = v
return nil
case []byte:
result[i] = string(v)
return nil
default:
return fmt.Errorf("redigo: unexpected element type for Strings, got type %T", v)
}
})
return result, err
} }
// ByteSlices is a helper that converts an array command reply to a [][]byte. // ByteSlices is a helper that converts an array command reply to a [][]byte.
...@@ -278,43 +311,64 @@ func Strings(reply interface{}, err error) ([]string, error) { ...@@ -278,43 +311,64 @@ func Strings(reply interface{}, err error) ([]string, error) {
// items are stay nil. ByteSlices returns an error if an array item is not a // items are stay nil. ByteSlices returns an error if an array item is not a
// bulk string or nil. // bulk string or nil.
func ByteSlices(reply interface{}, err error) ([][]byte, error) { func ByteSlices(reply interface{}, err error) ([][]byte, error) {
if err != nil { var result [][]byte
return nil, err err = sliceHelper(reply, err, "ByteSlices", func(n int) { result = make([][]byte, n) }, func(i int, v interface{}) error {
} p, ok := v.([]byte)
switch reply := reply.(type) { if !ok {
case []interface{}: return fmt.Errorf("redigo: unexpected element type for ByteSlices, got type %T", v)
result := make([][]byte, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
p, ok := reply[i].([]byte)
if !ok {
return nil, fmt.Errorf("redigo: unexpected element type for ByteSlices, got type %T", reply[i])
}
result[i] = p
} }
return result, nil result[i] = p
case nil: return nil
return nil, ErrNil })
case Error: return result, err
return nil, reply }
}
return nil, fmt.Errorf("redigo: unexpected type for ByteSlices, got type %T", reply) // Int64s is a helper that converts an array command reply to a []int64.
// If err is not equal to nil, then Int64s returns nil, err. Nil array
// items are stay nil. Int64s returns an error if an array item is not a
// bulk string or nil.
func Int64s(reply interface{}, err error) ([]int64, error) {
var result []int64
err = sliceHelper(reply, err, "Int64s", func(n int) { result = make([]int64, n) }, func(i int, v interface{}) error {
switch v := v.(type) {
case int64:
result[i] = v
return nil
case []byte:
n, err := strconv.ParseInt(string(v), 10, 64)
result[i] = n
return err
default:
return fmt.Errorf("redigo: unexpected element type for Int64s, got type %T", v)
}
})
return result, err
} }
// Ints is a helper that converts an array command reply to a []int. If // Ints is a helper that converts an array command reply to a []in.
// err is not equal to nil, then Ints returns nil, err. // If err is not equal to nil, then Ints returns nil, err. Nil array
// items are stay nil. Ints returns an error if an array item is not a
// bulk string or nil.
func Ints(reply interface{}, err error) ([]int, error) { func Ints(reply interface{}, err error) ([]int, error) {
var ints []int var result []int
values, err := Values(reply, err) err = sliceHelper(reply, err, "Ints", func(n int) { result = make([]int, n) }, func(i int, v interface{}) error {
if err != nil { switch v := v.(type) {
return ints, err case int64:
} n := int(v)
if err := ScanSlice(values, &ints); err != nil { if int64(n) != v {
return ints, err return strconv.ErrRange
} }
return ints, nil result[i] = n
return nil
case []byte:
n, err := strconv.Atoi(string(v))
result[i] = n
return err
default:
return fmt.Errorf("redigo: unexpected element type for Ints, got type %T", v)
}
})
return result, err
} }
// StringMap is a helper that converts an array of strings (alternating key, value) // StringMap is a helper that converts an array of strings (alternating key, value)
...@@ -333,7 +387,7 @@ func StringMap(result interface{}, err error) (map[string]string, error) { ...@@ -333,7 +387,7 @@ func StringMap(result interface{}, err error) (map[string]string, error) {
key, okKey := values[i].([]byte) key, okKey := values[i].([]byte)
value, okValue := values[i+1].([]byte) value, okValue := values[i+1].([]byte)
if !okKey || !okValue { if !okKey || !okValue {
return nil, errors.New("redigo: ScanMap key not a bulk string value") return nil, errors.New("redigo: StringMap key not a bulk string value")
} }
m[string(key)] = string(value) m[string(key)] = string(value)
} }
...@@ -355,7 +409,7 @@ func IntMap(result interface{}, err error) (map[string]int, error) { ...@@ -355,7 +409,7 @@ func IntMap(result interface{}, err error) (map[string]int, error) {
for i := 0; i < len(values); i += 2 { for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte) key, ok := values[i].([]byte)
if !ok { if !ok {
return nil, errors.New("redigo: ScanMap key not a bulk string value") return nil, errors.New("redigo: IntMap key not a bulk string value")
} }
value, err := Int(values[i+1], nil) value, err := Int(values[i+1], nil)
if err != nil { if err != nil {
...@@ -381,7 +435,7 @@ func Int64Map(result interface{}, err error) (map[string]int64, error) { ...@@ -381,7 +435,7 @@ func Int64Map(result interface{}, err error) (map[string]int64, error) {
for i := 0; i < len(values); i += 2 { for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte) key, ok := values[i].([]byte)
if !ok { if !ok {
return nil, errors.New("redigo: ScanMap key not a bulk string value") return nil, errors.New("redigo: Int64Map key not a bulk string value")
} }
value, err := Int64(values[i+1], nil) value, err := Int64(values[i+1], nil)
if err != nil { if err != nil {
...@@ -391,3 +445,35 @@ func Int64Map(result interface{}, err error) (map[string]int64, error) { ...@@ -391,3 +445,35 @@ func Int64Map(result interface{}, err error) (map[string]int64, error) {
} }
return m, nil return m, nil
} }
// Positions is a helper that converts an array of positions (lat, long)
// into a [][2]float64. The GEOPOS command returns replies in this format.
func Positions(result interface{}, err error) ([]*[2]float64, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
positions := make([]*[2]float64, len(values))
for i := range values {
if values[i] == nil {
continue
}
p, ok := values[i].([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: unexpected element type for interface slice, got type %T", values[i])
}
if len(p) != 2 {
return nil, fmt.Errorf("redigo: unexpected number of values for a member position, got %d", len(p))
}
lat, err := Float64(p[0], nil)
if err != nil {
return nil, err
}
long, err := Float64(p[1], nil)
if err != nil {
return nil, err
}
positions[i] = &[2]float64{lat, long}
}
return positions, nil
}
...@@ -110,6 +110,25 @@ func convertAssignInt(d reflect.Value, s int64) (err error) { ...@@ -110,6 +110,25 @@ func convertAssignInt(d reflect.Value, s int64) (err error) {
} }
func convertAssignValue(d reflect.Value, s interface{}) (err error) { func convertAssignValue(d reflect.Value, s interface{}) (err error) {
if d.Kind() != reflect.Ptr {
if d.CanAddr() {
d2 := d.Addr()
if d2.CanInterface() {
if scanner, ok := d2.Interface().(Scanner); ok {
return scanner.RedisScan(s)
}
}
}
} else if d.CanInterface() {
// Already a reflect.Ptr
if d.IsNil() {
d.Set(reflect.New(d.Type().Elem()))
}
if scanner, ok := d.Interface().(Scanner); ok {
return scanner.RedisScan(s)
}
}
switch s := s.(type) { switch s := s.(type) {
case []byte: case []byte:
err = convertAssignBulkString(d, s) err = convertAssignBulkString(d, s)
...@@ -135,11 +154,15 @@ func convertAssignArray(d reflect.Value, s []interface{}) error { ...@@ -135,11 +154,15 @@ func convertAssignArray(d reflect.Value, s []interface{}) error {
} }
func convertAssign(d interface{}, s interface{}) (err error) { func convertAssign(d interface{}, s interface{}) (err error) {
if scanner, ok := d.(Scanner); ok {
return scanner.RedisScan(s)
}
// Handle the most common destination types using type switches and // Handle the most common destination types using type switches and
// fall back to reflection for all other types. // fall back to reflection for all other types.
switch s := s.(type) { switch s := s.(type) {
case nil: case nil:
// ingore // ignore
case []byte: case []byte:
switch d := d.(type) { switch d := d.(type) {
case *string: case *string:
...@@ -186,7 +209,11 @@ func convertAssign(d interface{}, s interface{}) (err error) { ...@@ -186,7 +209,11 @@ func convertAssign(d interface{}, s interface{}) (err error) {
case string: case string:
switch d := d.(type) { switch d := d.(type) {
case *string: case *string:
*d = string(s) *d = s
case *interface{}:
*d = s
case nil:
// skip value
default: default:
err = cannotConvert(reflect.ValueOf(d), s) err = cannotConvert(reflect.ValueOf(d), s)
} }
...@@ -215,6 +242,8 @@ func convertAssign(d interface{}, s interface{}) (err error) { ...@@ -215,6 +242,8 @@ func convertAssign(d interface{}, s interface{}) (err error) {
// Scan copies from src to the values pointed at by dest. // Scan copies from src to the values pointed at by dest.
// //
// Scan uses RedisScan if available otherwise:
//
// The values pointed at by dest must be an integer, float, boolean, string, // The values pointed at by dest must be an integer, float, boolean, string,
// []byte, interface{} or slices of these types. Scan uses the standard strconv // []byte, interface{} or slices of these types. Scan uses the standard strconv
// package to convert bulk strings to numeric and boolean types. // package to convert bulk strings to numeric and boolean types.
...@@ -355,6 +384,7 @@ var errScanStructValue = errors.New("redigo.ScanStruct: value must be non-nil po ...@@ -355,6 +384,7 @@ var errScanStructValue = errors.New("redigo.ScanStruct: value must be non-nil po
// //
// Fields with the tag redis:"-" are ignored. // Fields with the tag redis:"-" are ignored.
// //
// Each field uses RedisScan if available otherwise:
// Integer, float, boolean, string and []byte fields are supported. Scan uses the // Integer, float, boolean, string and []byte fields are supported. Scan uses the
// standard strconv package to convert bulk string values to numeric and // standard strconv package to convert bulk string values to numeric and
// boolean types. // boolean types.
......
...@@ -55,6 +55,11 @@ func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} { ...@@ -55,6 +55,11 @@ func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} {
return args return args
} }
// Hash returns the script hash.
func (s *Script) Hash() string {
return s.hash
}
// Do evaluates the script. Under the covers, Do optimistically evaluates the // Do evaluates the script. Under the covers, Do optimistically evaluates the
// script using the EVALSHA command. If the command fails because the script is // script using the EVALSHA command. If the command fails because the script is
// not loaded, then Do evaluates the script using the EVAL command (thus // not loaded, then Do evaluates the script using the EVAL command (thus
......
...@@ -11,12 +11,12 @@ ...@@ -11,12 +11,12 @@
"versionExact": "master" "versionExact": "master"
}, },
{ {
"checksumSHA1": "hBqmLpr3P88FPgBk27hzpa5N0mM=", "checksumSHA1": "Ky/NqKY9oiS7XQdgUDp/wBWwDtI=",
"path": "github.com/FZambia/go-sentinel", "path": "github.com/FZambia/sentinel",
"revision": "76bd05e8e22f9f8f5e1dd6d3a85b7951da7cce57", "revision": "e69a8dc549bb0aabdc63fbfd9e8bfe971a368153",
"revisionTime": "2017-12-04T08:54:13Z", "revisionTime": "2018-03-27T06:17:58Z",
"version": "master", "version": "v1",
"versionExact": "master" "versionExact": "v1.0.0"
}, },
{ {
"checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=", "checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=",
...@@ -60,22 +60,6 @@ ...@@ -60,22 +60,6 @@
"version": "v3", "version": "v3",
"versionExact": "v3.2.0" "versionExact": "v3.2.0"
}, },
{
"checksumSHA1": "4F1jSQIKlCaSecor3wzxxf/0aso=",
"path": "github.com/garyburd/redigo/internal",
"revision": "9c11da706d9b7902c6da69c592f75637793fe121",
"revisionTime": "2018-03-14T22:34:43Z",
"version": "v2",
"versionExact": "v2.0.0"
},
{
"checksumSHA1": "81OSg/NapmTaRpSS+oYsPVE0b1Y=",
"path": "github.com/garyburd/redigo/redis",
"revision": "0d253a66e6e1349f4581d6d2b300ee434ee2da9f",
"revisionTime": "2017-02-16T21:49:44Z",
"version": "v2",
"versionExact": "v2.0.0"
},
{ {
"checksumSHA1": "hL8smC/vjdkuE1twM8TKpuTiOmw=", "checksumSHA1": "hL8smC/vjdkuE1twM8TKpuTiOmw=",
"path": "github.com/getsentry/raven-go", "path": "github.com/getsentry/raven-go",
...@@ -140,6 +124,22 @@ ...@@ -140,6 +124,22 @@
"version": "v1", "version": "v1",
"versionExact": "v1.2.0" "versionExact": "v1.2.0"
}, },
{
"checksumSHA1": "w3QCCIYHgZzIXQ+xTl7oLfFrXHs=",
"path": "github.com/gomodule/redigo/internal",
"revision": "9c11da706d9b7902c6da69c592f75637793fe121",
"revisionTime": "2018-03-14T22:34:43Z",
"version": "v2",
"versionExact": "v2.0.0"
},
{
"checksumSHA1": "HgOVOUtWUYHcNe8aipwuEZ7YLow=",
"path": "github.com/gomodule/redigo/redis",
"revision": "9c11da706d9b7902c6da69c592f75637793fe121",
"revisionTime": "2018-03-14T22:34:43Z",
"version": "v2",
"versionExact": "v2.0.0"
},
{ {
"checksumSHA1": "VKx/YhlIAbIJ9dXAxCCHOkjom4U=", "checksumSHA1": "VKx/YhlIAbIJ9dXAxCCHOkjom4U=",
"comment": "v1.0.0-39-ge8f0f8a", "comment": "v1.0.0-39-ge8f0f8a",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment