我们可以看到 gorilla/websocket中的examples中有一个聊天室的demo。
10年积累的成都做网站、成都网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计制作后付款的网站建设流程,更有宣州免费网站建设让你可以放心的选择与我们合作。
我们进入该项目可以看到里面有这样的一些内容
按照官方的运行方式来运行这个项目
在浏览器中打开8080端口,可以看到该项目可以被成功运行了。
就是这样一个简单的demo。
然后我们去看一下它的具体实现。
在这个项目中首先定义了一个hub的结构体:
这个结构体中,clients代表所有已经注册的用户,broadcast管道会存储客户端发送来的信息。 register是一个*Client类型的管道,用于存储新注册的用户,unregister管道反之。
我们打开main.go,main函数的源码为:
在这里首先会新开一个goroutine,去跑hub的run方法,run方法中一个死循环,不停地去轮询hub中的内容
如果取到了新用户,就加入到clients中,如果取到了信息,就循环所有的client,将信息写到client.send中。
我们看到在请求路径为根的时候,它会请求一个函数,而这个函数就是将home.html发送到客户端。
而在请求路径为“/ws”的时候,他会执行一个serveWS的函数。
每当一个新的用户进来之后,首先将连接升级为长连接,然后将当前的client写到register中,由hub.run函数去做处理。然后开启两个goroutine,一个去读client中发送来的数据,一个将数据写入到所有的client中,去发送给用户。
这就是整个聊天室的实现原理。
继续进入下一个初始化
n点虐 Service, err = nebnet.NewNebService(n)
if err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to setup net service.")
}
netservice有两个成员
type NebServicestruct {
node *Node
dispatcher *Dispatcher
}
跳出stup()函数
先进入start()函数看一看
if err := n点虐 Service.Start(); err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to start net service.")
}
进入netservice.start()
func (ns *NebService) Start() error {
logging.CLog().Info("Starting NebService...")
// start dispatcher.
ns.dispatcher.Start()
// start node.
if err := ns.node.Start(); err != nil {
ns.dispatcher.Stop()
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Error("Failed to start NebService.")
return err
}
logging.CLog().Info("Started NebService.")
return nil
}
可以看到第一个start()的函数是dispatcher.start()
进入dispatch.start()
func (dp *Dispatcher) Start() {
logging.CLog().Info("Starting NebService Dispatcher...")
go dp.loop()
}
然后就出现一个新的线程、goruntime
go dp.loop()
进入该线程,看它干了些什么
timerChan := time.NewTicker(time.Second).C
for {
select {
case -timerChan:
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))
case -dp.quitCh:
logging.CLog().Info("Stoped NebService Dispatcher.")
return
case msg := -dp.receivedMessageCh:
msgType := msg.MessageType()
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
m, _ := v.(*sync.Map)
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan - msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispatch message.")
}
return true
})
}
}
一个有点长的循环
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))一秒钟刷新一次缓冲区
case msg := -dp.receivedMessageCh:
msgType := msg.MessageType()如果能取出dp.receivedMessageCh
msgType := msg.MessageType()首先判断取出的信息类型
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
根据类型取出相应的map
如果取不出,那么使用continue结束这个case
m, _ := v.(*sync.Map)
断言
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan - msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispa+tch message.")
}
return true
})
将msg推入其他管道里面去。其他goruntime会循环等待该
智能合约调用是实现一个 DApp 的关键,一个完整的 DApp 包括前端、后端、智能合约及区块 链系统,智能合约的调用是连接区块链与前后端的关键。
我们先来了解一下智能合约调用的基础原理。智能合约运行在以太坊节点的 EVM 中。因此要 想调用合约必须要访问某个节点。
以后端程序为例,后端服务若想连接节点有两种可能,一种是双 方在同一主机,此时后端连接节点可以采用 本地 IPC(Inter-Process Communication,进 程间通信)机制,也可以采用 RPC(Remote Procedure Call,远程过程调用)机制;另 一种情况是双方不在同一台主机,此时只能采用 RPC 机制进行通信。
提到 RPC, 读者应该对 Geth 启动参数有点印象,Geth 启动时可以选择开启 RPC 服务,对应的 默认服务端口是 8545。。
接着,我们来了解一下智能合约运行的过程。
智能合约的运行过程是后端服务连接某节点,将 智能合约的调用(交易)发送给节点,节点在验证了交易的合法性后进行全网广播,被矿工打包到 区块中代表此交易得到确认,至此交易才算完成。
就像数据库一样,每个区块链平台都会提供主流 开发语言的 SDK(Software Development Kit,软件开发工具包),由于 Geth 本身就是用 Go 语言 编写的,因此若想使用 Go 语言连接节点、发交易,直接在工程内导入 go-ethereum(Geth 源码) 包就可以了,剩下的问题就是流程和 API 的事情了。
总结一下,智能合约被调用的两个关键点是节点和 SDK。
由于 IPC 要求后端与节点必须在同一主机,所以很多时候开发者都会采用 RPC 模式。除了 RPC,以太坊也为开发者提供了 json- rpc 接口,本文就不展开讨论了。
接下来介绍如何使用 Go 语言,借助 go-ethereum 源码库来实现智能合约的调用。这是有固定 步骤的,我们先来说一下总体步骤,以下面的合约为例。
步骤 01:编译合约,获取合约 ABI(Application Binary Interface,应用二进制接口)。 单击【ABI】按钮拷贝合约 ABI 信息,将其粘贴到文件 calldemo.abi 中(可使用 Go 语言IDE 创建该文件,文件名可自定义,后缀最好使用 abi)。
最好能将 calldemo.abi 单独保存在一个目录下,输入“ls”命令只能看到 calldemo.abi 文件,参 考效果如下:
步骤 02:获得合约地址。注意要将合约部署到 Geth 节点。因此 Environment 选择为 Web3 Provider。
在【Environment】选项框中选择“Web3 Provider”,然后单击【Deploy】按钮。
部署后,获得合约地址为:0xa09209c28AEf59a4653b905792a9a910E78E7407。
步骤 03:利用 abigen 工具(Geth 工具包内的可执行程序)编译智能合约为 Go 代码。abigen 工具的作用是将 abi 文件转换为 Go 代码,命令如下:
其中各参数的含义如下。 (1)abi:是指定传入的 abi 文件。 (2)type:是指定输出文件中的基本结构类型。 (3)pkg:指定输出文件 package 名称。 (4)out:指定输出文件名。 执行后,将在代码目录下看到 funcdemo.go 文件,读者可以打开该文件欣赏一下,注意不要修改它。
步骤 04:创建 main.go,填入如下代码。 注意代码中 HexToAddress 函数内要传入该合约部署后的地址,此地址在步骤 01 中获得。
步骤 04:设置 go mod,以便工程自动识别。
前面有所提及,若要使用 Go 语言调用智能合约,需要下载 go-ethereum 工程,可以使用下面 的指令:
该指令会自动将 go-ethereum 下载到“$GOPATH/src/github点抗 /ethereum/go-ethereum”,这样还算 不错。不过,Go 语言自 1.11 版本后,增加了 module 管理工程的模式。只要设置好了 go mod,下载 依赖工程的事情就不必关心了。
接下来设置 module 生效和 GOPROXY,命令如下:
在项目工程内,执行初始化,calldemo 可以自定义名称。
步骤 05:运行代码。执行代码,将看到下面的效果,以及最终输出的 2020。
上述输出信息中,可以看到 Go 语言会自动下载依赖文件,这就是 go mod 的神奇之处。看到 2020,相信读者也知道运行结果是正确的了。
package p2p
import (
"context"
"errors"
"time"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)
//P2P结构保存当前正在运行的流/监听器的信息
// P2P structure holds information on currently running streams/listeners
type P2P struct {
//监听器
Listeners ListenerRegistry
//数据流
Streams StreamRegistry
//节点ID
identity peer.ID
//节点地址
peerHost p2phost.Host
//一个线程安全的对等节点存储
peerstore pstore.Peerstore
}
//创建一个新的p2p结构
// NewP2P creates new P2P struct
//这个新的p2p结构不包含p2p结构中的监听器和数据流
func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P {
return P2P{
identity: identity,
peerHost: peerHost,
peerstore: peerstore,
}
}
//新建一个数据流 工具方法 构建一个有节点id,内容和协议的流
func (p2p P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {
//30s 后会自动timeout
ctx, cancel := context.WithTimeout(ctx2, time.Second 30) //TODO: configurable?
defer cancel()
err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol))
}
//对话为远程监听器创建新的P2P流
//创建一个新的p2p流实现对对话的监听
// Dial creates new P2P stream to a remote listener
//Multiaddr是一种跨协议、跨平台的表示格式的互联网地址。它强调明确性和自我描述。
//对内接收
func (p2p P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) ( ListenerInfo, error) {
//获取一些节点信息 network, host, nil
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
//监听信息
listenerInfo := ListenerInfo{
//节点身份
Identity: p2p.identity,
////应用程序协议标识符。
Protocol: proto,
}
//调用newStreamTo 通过ctx(内容) peer(节点id) proto(协议标识符) 参数获取一个新的数据流
remote, err := p2p.newStreamTo(ctx, peer, proto)
if err != nil {
return nil, err
}
//network协议标识
switch lnet {
//network为"tcp", "tcp4", "tcp6"
case "tcp", "tcp4", "tcp6":
//从监听器获取新的信息 nla.Listener, nil
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Reset(); err2 != nil {
return nil, err2
}
return nil, err
}
//将获取的新信息保存到listenerInfo
listenerInfo.Address = listener.Multiaddr()
listenerInfo.Closer = listener
listenerInfo.Running = true
//开启接受
go p2p.doAccept(listenerInfo, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return listenerInfo, nil
}
//
func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {
//关闭侦听器并删除流处理程序
defer listener.Close()
//Returns a Multiaddr friendly Conn
//一个有好的 Multiaddr 连接
local, err := listener.Accept()
if err != nil {
return
}
stream := StreamInfo{
//连接协议
Protocol: listenerInfo.Protocol,
//定位节点
LocalPeer: listenerInfo.Identity,
//定位节点地址
LocalAddr: listenerInfo.Address,
//远程节点
RemotePeer: remote.Conn().RemotePeer(),
//远程节点地址
RemoteAddr: remote.Conn().RemoteMultiaddr(),
//定位
Local: local,
//远程
Remote: remote,
//注册码
Registry: p2p.Streams,
}
//注册连接信息
p2p.Streams.Register(stream)
//开启节点广播
stream.startStreaming()
}
//侦听器将流处理程序包装到侦听器中
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
//P2PListener保存关于侦听器的信息
// P2PListener holds information on a listener
type P2PListener struct {
peerHost p2phost.Host
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
//等待侦听器的连接
// Accept waits for a connection from the listener
func (il *P2PListener) Accept() (net.Stream, error) {
select {
case c := -il.conCh:
return c, nil
case -il.ctx.Done():
return nil, il.ctx.Err()
}
}
//关闭侦听器并删除流处理程序
// Close closes the listener and removes stream handler
func (il *P2PListener) Close() error {
il.cancel()
il.peerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen创建新的P2PListener
// Listen creates new P2PListener
func (p2p P2P) registerStreamHandler(ctx2 context.Context, protocol string) ( P2PListener, error) {
ctx, cancel := context.WithCancel(ctx2)
list := P2PListener{
peerHost: p2p.peerHost,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh - s:
case -ctx.Done():
s.Reset()
}
})
return list, nil
}
// NewListener创建新的p2p侦听器
// NewListener creates new p2p listener
//对外广播
func (p2p P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) ( ListenerInfo, error) {
//调用registerStreamHandler 构造一个新的listener
listener, err := p2p.registerStreamHandler(ctx, proto)
if err != nil {
return nil, err
}
//构造新的listenerInfo
listenerInfo := ListenerInfo{
Identity: p2p.identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: p2p.Listeners,
}
go p2p.acceptStreams(listenerInfo, listener)
//注册连接信息
p2p.Listeners.Register(listenerInfo)
return listenerInfo, nil
}
//接受流
func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {
for listenerInfo.Running {
//一个有好的 远程 连接
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
}
//取消注册表中的p2p侦听器
p2p.Listeners.Deregister(listenerInfo.Protocol)
}
// CheckProtoExists检查是否注册了协议处理程序
// mux处理程序
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func (p2p *P2P) CheckProtoExists(proto string) bool {
protos := p2p.peerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}