成都网站建设设计

将想法与焦点和您一起共享

Mongodb代理程序如何实现

这篇文章主要介绍“MongoDB代理程序如何实现”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Mongodb代理程序如何实现”文章能帮助大家解决问题。

创新互联是一家集网站建设,二道企业网站建设,二道品牌网站建设,网站定制,二道网站建设报价,网络营销,网络优化,二道网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

根据一贯的风格,我们先来梳理下项目目录结构,结构如下:

|__ bin/                   # 用于存放编译后生成的二进制文件

|__ config/                # 用于存放配置文件

|__ connection/            # 存放连接相关的文件

|   |__ proxy.go           # 代理组件

|   |__ pool.go            # 连接池组件

|   |__ repl_set.go        # 复制集组件

|   |__ conn.go            # 连接对象组件

|__ internal/              # 存放 mongo 内部协议相关文件

|   |__ auth.go            # 握手鉴权组件

|   |__ protocol.go        # 协议解析组件

|   |__ request.go         # 请求重写组件

|   |__ response.go        # 响应重写组件

|__ statistics/            # 存放指标统计上报组件

|__ test/                  # 存放各种语言驱动测试代码的文件夹

|__ utils/                 # 工具函数文件夹

|__ glide.yaml             # 依赖包配置文件

|__ main.go                # 入口文件

proxy 实现

最简单的 proxy 实现套路就像下面这样:

// main.go

func main() {

  // 传入配置参数,实例化一个代理对象

  p := NewProxy(conf)

  // 卡住,循环接受客户端请求

  p.LoopAccept()

}

接着来实现 NewProxy、LoopAccept 方法:

// connection/proxy.go

type Proxy struct {

  sync.RWMutex

  listener            net.Listener

  writePool, readPool *pool

}

func NewProxy(conf config.UserConf) *Proxy {

  // 开始监听本地端口

  listener, err := net.Listen("tcp", ":"+conf.GetString("port"))

  if err != nil {

    log.Fatalln(err)

  }

  p := &Proxy{

    listener: listener,

  }

  // 实例化连接池

  p.readPool, p.writePool, err = newPool(p)

  if err != nil {

    panic(err)

  }

  return p 

}

func (p *Proxy) LoopAccept() {

  for {

    client, err := p.listener.Accept()

    go func(c net.Conn) {

      defer c.Close()

      // 一个连接在多次 messageHandler 中共用一个 Reader 对象

      cr := bufio.NewReader(c)

      // 因为一个连接可能会进行多次读或写操作

      for {

        // 将客户端请求代理给服务端,服务端响应代理回客户端

        // 同时中间对请求或响应进行重写操作

        err := p.messageHandler(cr, c)

        if err != nil {

          // 只要出现错误,就执行到上面的 defer c.Close() 来关闭连接

          return

        }

      }

    }(client)

  }

}

接着来实现核心逻辑 messageHandler:

// connection/proxy.go

func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {

  // 对请求报文进行解析操作

  req, err := internal.Decode(clientReader)

  if err != nil {

        return errors.New("decode error")

    }

  // 将客户端请求发送给数据库服务器

  res, err := p.clientToServer(req)

  if err != nil {

    return errors.New("request error")

  }

  // 将数据库服务器响应返回给客户端

  return res.WriteTo(c)

}

func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {

  var server net.Conn

  // 如果是读操作,就从读池中取出连接

  if req.IsReadOp() {

    host := req.GetHost()

    // 某些读操作需要发送到指定的读库上,所以需要传 host,来获取指定读库连接

    server = p.readPool.Acquire(host)

  // 反之,写操作从写池中取出连接

  } else {

    // 由于写库只有一个,所以不用传 host 参数了

    server = p.writePool.Acquire()

  }

  // 将客户端请求发送给数据库服务器

  err := req.WriteTo(server)

  if err != nil {

    return nil, err

  }

  // 获取解析数据库服务器响应

  res, err := internal.Decode(bufio.NewReader(server))

  return res, err

}

大致逻辑就是,客户端通过代理把请求发给服务端,服务端响应也通过代理响应回客户端。

------------  request  -----------  request  ------------

|          | --------> |         | --------> |          |

|  client  |           |  proxy  |           | repl_set |

|          | <-------- |         | <-------- |          |

------------  response -----------  response ------------

呐——,当然还有非常多的细节,由于篇幅原因不得不省略...

pool 实现

由 proxy 的代码逻辑来看,我们取读或写库连接是通过读或写池的 Acquire 方法来取的:

// connection/pool.go

type pool struct {

  sync.RWMutex

  connCh   chan net.Conn

  newConn  func(string) (net.Conn, error)

  freeConn func(net.Conn) error

}

func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {

  host := ""

  if len(opts) > 0 {

    host, _ = (opts[0]).(string)

  }

  chLen := len(p.connCh)

  // 从 channel 中遍历剩余数量的 conn

  for i := 0; i < chLen; i++ {

    select {

    case conn, ok := <- ch:

      if ok {

        if len(host) > 0 {

          if conn.RemoteAddr().String() == host {

            return conn, nil

          }

          // 没有找到对应 host 的 conn,则把 conn 重新放回 channel

          // 你可以简单理解为只是执行了 p.connCh <- conn 操作

          p.freeConn(conn)

        } else {

          return conn, nil

        }

      }

    // 避免数量不足而导致 channel 阻塞等待

    default:

    }

  }

  // 若还没有从 channel 中取到 conn,则立马 new 一个

  conn, err := p.newConn(host)

  if err != nil {

    return nil, err

  }

  return conn, nil

}

池的实现大致就是实现了一个循环队列,连接从池中取,取出的连接在使用完后,可以放回池中。

关于“Mongodb代理程序如何实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注创新互联行业资讯频道,小编每天都会为大家更新不同的知识点。


分享文章:Mongodb代理程序如何实现
当前路径:http://chengdu.cdxwcx.cn/article/jgidjo.html