Radix.v3 另一款Golang Redis 驱动

Radix.v3 是在 Radix 基础上改造过来的, 主要是借鉴了Go-Redis 的异步方式。当时我们项目 急切需要一个比较好用的Redis驱动,在官网上推荐的只有两个,在比较之后我们选择了radix。在之后 的考察中,我们发现了Go-Redis,并被Go-Redis的性能所折服,打算使用这个ops极高的驱动,但是 它的代码是在是太老了,而且并不适合业务开发。所以我花了1天半在Radix.v2上改造了一番,在底层使 用了异步工作方式,在外层保留了同步调用的接口。这分代码开源了10天左右,人气不高,后来发现了一个 比较严重的bug,意识到不能误人子弟,所以取消了共享,打算在内部改造成熟之后再开放出来。

改造原理

在原来的Radix中一个典型的调用如下:

func (c *Client) Cmd(cmd string, args ...interface{}) *Resp {
    err := c.writeRequest(request{cmd, args})
    if err != nil {
        return NewRespIOErr(err)
    }
    return c.readResp(true)
}

他在底层中,使用的 send && recv 的方式, 这个是同步调用模式,受限于网络延时,单连接最高 的ops取决于两机的ping值,所以一般的bench在1w ops左右。

在Go-Redis中一个调用如下:

func (c *syncClient) Incr(arg0 string) (result int64, err Error) {
    arg0bytes := []byte(arg0)

    var resp Response
    resp, err = c.conn.ServiceRequest(&INCR, [][]byte{arg0bytes})
    if err == nil {
        result = resp.GetNumberValue()
    }
    return result, err

}

他们之间的区别在于Go-Redis返回的是一个Future对象,如果想要拿到结果,则必须再Get一次。这样 做可以将sendrecv 完全分开,tcp 连接处于完全双工的模式进行流处理。具体代码的细节可以参考 Go-Redis的实现。实践表明,这种处理方式单链接可以达到40w ops。

优势与限制

你可能认识到,在Go-Redis中使用单协程,同样也只能达到1w 左右ops。它的优势是协程安全的,并且单 连接能够有效的保证时序性。在Radix中,想要达到高的ops,只能开链接池,在原来的Radix中,连接池 是动态增长的,如果你用1w 协程去打压力,基本上处于不可用状态。有鉴于此,我修改了底层的pool,让他 固定的连接个数为1,之所以还要保持一个pool,则主要是复用连接重连的代码。此外,另外一个无法比拟的 优势在于他的延时不敏感性,我们的服务器部署在3个不同的机房,每个机房不同的ping值在1ms~3ms之间,如 果采用传统的驱动,基本上不能完成正常的业务处理,但是使用这个驱动,则不会受到延时的影响导致性能大幅度降低。

20w有序持久化队列

在开源的队列服务器中,能达到10w级别的队列服务器并不是没有,但是并不是有序的,能保证有序的也有,但 是不能保证持久化,不过利用Radix.v3能够达到这一点。主要的要点是使用BPOP操作来避免无效的POP调 用,第二个要点是要预发送BPOP指令,要达到10w级别,起码要预发送10条指令,第三是要做第二次分片,把 业务分配到不同的协程中去处理。按业务的需求来说,有全局有序,这点是业务决定的,和队列其实没关系。还 有就是用户有序,这点按用户协程分配即可处理。相关代码如下:

func (q *QueueRedis) GetFutureMsg(msg_key []byte) *redis.Future {
    return q.FCmd("BRPOP", msg_key, 1)
}

func (q *QueueMsgProcess) StartToProcess(chan_deep int) error {
    if chan_deep <= 10 {
        chan_deep = 10
    }
    future_chan := make(chan (*redis.Future), chan_deep)
    go q.processMsg(future_chan)
    q.running = true
    msg_key := []byte(q.GetMsgKey())
    go func() {
        defer Logger.Fatal("queue msg process stop to processed", msg_key)
        defer close(future_chan)
        for q.running {
            msg := q.msg_redis.GetFutureMsg(msg_key)
            future_chan <- msg
        }
    }()
    return nil
}

单链接测试结果

测试环境 PING INCR

讨论请加 QQ group: 549675095


Powered by Jekyll and Theme by solid