日度归档:2020-07-21

gomicro-02-集成grpc和etcd

一 go micro 示例

1.0 说明

本章的 go micro 示例使用 etcd 为服务发现机制, grpc 为通信协议,并且基于go1.13版本,使用go mod管理包。

项目目录结构:

microdemo 
 .
├── common                      通用文件夹
│   ├── code                    项目状态码文件夹
│   │   └── commonCode.go
│   └── config              项目通用配置文件夹
│       └── commonConfig.go
├── go.mod
├── go.sum
├── simple                      简单示例服务文件夹
│   ├── handler             简单示例服务的服务句柄文件夹
│   │   └── simpleHandler
│   │       └── simpleHandler.go
│   ├── main.go             simple主文件
│   ├── proto                   简单示例服务的grpc协议文件夹
│   │   └── simpleProto
│   │       ├── simple.pb.go
│   │       ├── simple.pb.micro.go
│   │       └── simple.proto
│   └── start.sh
└── web                         web服务文件夹
    ├── handler
    │   └── simpleHandler
    │       └── simpleHandler.go
    ├── main.go         web主文件
    └── start.sh

11 directories, 13 files   

贴士:本项目基于etcd,必须先启动etcd!!!

1.1 创建基础配置

创建一个名为 microdemo 的项目

go mod init microdemo           

commonCode.go:

package code

type res struct {
    Code int
    Msg string
}

var OK *res
var SERERR *res
var DBERR *res
var INFOERR *res
var FILTERERR *res
var INFONOTFOUND *res

func init() {

    // 正确请求
    OK = &res{1000, "成功"}

    // 数据校验 3
    FILTERERR = &res{ 3001, "校验未通过", }

    // 资源状态 4
    INFONOTFOUND = &res{4001, "资源不存在",}

    // 服务器状态 5
    SERERR = &res{5001, "服务器错误",}
    DBERR = &res{5002, "数据库错误",}

}

commonConfig.go

package config

import "fmt"

var ENV = "TEST"

var EtcdAddr []string = []string{
    "127.0.0.1:2379",
    "127.0.0.1:2379",
    "127.0.0.1:2379",
}

var RedisAddr string    = "127.0.0.1"
var RedisPort string    = "6379"
var RedisDB string      = "0"

var FastDfsAddr string  = "127.0.0.1"
var FastDfsPort string  = "9090"

var StaticAddr string = "http://" + FastDfsAddr + ":" + FastDfsPort + "/"

func init() {

    if ENV == "PROD" {
        fmt.Println("执行生产环境配置")
    }

}

1.2 创建第一个微服务:simple

simple服务只是一个微服务的简单示例。

生成协议文件:

# simple/proto/simpleProto/simple.proto
syntax = "proto3";

option go_package="./;simple";

package simpleProto;

service SimpleService {
    rpc SimpleFunc(SimpleRequest) returns (SimpleResponse) {}
}

message SimpleRequest {
    int32 id = 1;
}

message SimpleResponse {
    int32 code = 1;
    string msg = 2;
}

# 生成go协议文件
cd simple/proto/simpleProto
protoc simple.proto --proto_path=. --go_out=. --micro_out=.

书写句柄函数:即本服务具体做什么

// simple/handler/simplerHandler/simplerHandler.go
package simpleHandler

import (
    "context"
    "microdemo/simple/proto/simpleProto"
)

// 简单微服务
type SimpleService struct{}
func (s *SimpleService) SimpleFunc(ctx context.Context, req *simpleProto.SimpleRequest, rsp *simpleProto.SimpleResponse) error {

    // 执行业务操作....

    // 返回业务数据给web服务
    rsp.Code = 1
    rsp.Msg = "成功"
    return nil
}

main文件:

package main

import (
    "github.com/micro/go-micro"
    "github.com/micro/go-micro/registry"
    "github.com/micro/go-micro/service/grpc"
    "github.com/micro/go-micro/util/log"
    "github.com/micro/go-plugins/registry/etcdv3"
    "microdemo/simple/handler/simpleHandler"
    "microdemo/simple/proto/simpleProto"
    "microdemo/common/config"
)

func main() {

    // 替换micro默认的服务发现框架consul为etcd
    reg := etcdv3.NewRegistry(func(op *registry.Options){
        op.Addrs = config.EtcdAddr
    })

    // 创建服务
    service := grpc.NewService(
        micro.Name("demo.srv.simple"),
        micro.Registry(reg),
        micro.Version("latest"),
        micro.Address(":" + "30066"),
    )
    service.Init()

    // 注册服务句柄
    err := simpleProto.RegisterSimpleServiceHandler(service.Server(), new(simpleHandler.SimpleService))
    if err != nil {
        log.Errorf("注册句柄错误:", err)
        return
    }

    // 运行服务
    if err := service.Run(); err != nil {
        log.Errorf("运行服务错误:", err)
        return
    }
}

1.3 创建第二个微服务:web

handler句柄文件:

// web/handler/simpleHandler/simpleHandler.go
package simpleHandler

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/julienschmidt/httprouter"
    "github.com/micro/go-micro/service/grpc"
    "microdemo/simple/proto/simpleProto"
    "net/http"
)

// 简单微服务方法
func Simple(w http.ResponseWriter, r *http.Request, p httprouter.Params) {

    fmt.Println("参数:", p)

    //  grpc 服务初始化
    service := grpc.NewService()
    service.Init()

    // 获取服务句柄
    simpleClient := simpleProto.NewSimpleService("demo.srv.simple", service.Client())

    // 调用服务
    rsp, err := simpleClient.SimpleFunc(context.TODO(), &simpleProto.SimpleRequest{})
    if err != nil {
        fmt.Println("调用服务错误:", err)
        http.Error(w, err.Error(), 500)
    }

    // 创建返回给前端的数据
    result := map[string]interface{}{
        "code":     rsp.Code,
        "msg":      rsp.Msg,
    }
    if err := json.NewEncoder(w).Encode(result); err != nil {
        http.Error(w, err.Error(), 500)
    }
}

main.go

package main

import (
        "github.com/julienschmidt/httprouter"
        "github.com/micro/go-micro/registry"
        "github.com/micro/go-micro/util/log"
        "github.com/micro/go-micro/web"
        "github.com/micro/go-plugins/registry/etcdv3"
        "microdemo/account/config"
        "microdemo/web/handler/simpleHandler"
        "net/http"
)

func main() {

        // 替换micro默认的服务发现框架consul为etcd
        reg := etcdv3.NewRegistry(func(op *registry.Options){
                op.Addrs = config.EtcdAddr
        })

        // 创建web服务
        service := web.NewService(
                web.Name("demo.web.web"),
                web.Registry(reg),
                web.Version("latest"),
                web.Address(":" + "3000"),
        )
        if err := service.Init(); err != nil {
                log.Error("服务初始化错误:", err)
        }

        // 创建路由
        router := httprouter.New()
        router.NotFound = http.FileServer(http.Dir("public"))

        // 测试路由
        router.GET("/simple/:id", simpleHandler.Simple)

        service.Handle("/", router)

        // 运行服务
        if err := service.Run(); err != nil {
                log.Error("服务运行错误:", err)
        }
}

1.4 服务启动与访问

启动etcd后,启动服务:

# 启动simple服务
cd simple
go run main.go --registry=etcd --registry_address=127.0.0.1:2379

# 启动web服务
cd web
go run main.go --registry=etcd --registry_address=127.0.0.1:2379

访问:

localhost:3000/simple/10001

若出现错误,mod中修改为google.golang.org/grpc v1.26.0

/Users/arsen/go/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption

gomicro-01-概述和使用

一 go micro简介

Go Micro是基于Golang的微服务开发框架,该框架解决了构建云本地系统的关键需求,提供了分布式系统开发需要的核心库,包含了RPC与事件驱动的通信机制。

Go Micro隐藏了分布式系统的复杂性,将微服务体系内的技术转换为了一组工具集合,且符合可插拔的设计哲学,开发人员可以利用它快速构建系统组件,并能依据需求剥离默认实现并实现定制。

Go Micro核心特性:

  • 服务发现(Service Discovery):自动服务注册与名称解析,默认的服务发现系统是Consul
  • 负载均衡(Load Balancing):在服务发现之上构建了负载均衡机制,对服务请求分发的均匀分布,并且在发生问题时进行重试
  • 消息编码(Message Encoding:支持基于内容类型(content-type)动态编码消息,content-type默认包含proto-rpc和json-rpc。
  • Request/Response:RPC通信基于支持双向流的请求/响应方式,提供有抽象的同步通信机制,默认的传输协议是http/1.1,而tls下使用http2协议。
  • 异步消息(Async Messaging):发布订阅(PubSub)等功能内置在异步通信与事件驱动架构中
  • 可插拔接口(Pluggable Interfaces) – Go Micro为每个分布式系统抽象出接口。因此,Go Micro的接口都是可插拔的

二 go micro安装

micro安装步骤:

# 安装依赖插件 protobuf
go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
go get -u github.com/micro/protoc-gen-micro

# 安装 micro库,该库用于生成micro命令,micro命令可以用来快速生成基于go-micro的项目:
go get -u -v github.com/micro/micro
cd $GOPATH
go install github.com/micro/micro                    

# 测试
micro

三 micro 的 new 和 run 命令

micro new   # 相对于$GOPATH创建一个新的微服务
            # 参数 --namespace "test"   服务的命名空间
            # 参数 --type "srv"         服务类型,常用的有 srv api web fnc
            # 参数 --fqdn               服务正式的全定义
            # 参数 --alias              别名是在指定时作为组合名的一部分使用的短名称

micro run   # 运行这个微服务

注意:new默认创建的项目是以rpc为通信协议、mdns为服务发现的,基本不具备生产价值。笔者在下一章使用 go micro 手动创建项目,集成了grpc、etcd。

三 项目结构

.
├── client
│   └── client.go
├── go.mod
├── go.sum
├── protoes
│   ├── build.sh
│   ├── hello.pb.go
│   ├── hello.pb.micro.go
│   └── hello.proto
├── readme.md
└── server
    └── server.go

3 directories, 9 files

四 hello world

protoes/hello.proto:

syntax = "proto3";

option go_package="./;hello";

service Greeter {
    rpc Hello(HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
    string name = 1;
}

message HelloResponse {
    string greeting = 2;
}

生成proto的go文件:

protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. hello.proto

服务端:

server/server.go

package main

import (
    "context"
    "fmt"

    micro "github.com/micro/go-micro"
    proto "mygoproject/gomirco" //这里写你的proto文件放置路劲
)

type Greeter struct{}

func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
    rsp.Greeting = "Hello " + req.Name
    return nil
}

func main() {
    // Create a new service. Optionally include some options here.
    service := micro.NewService(
        micro.Name("greeter"),
    )

    // Init will parse the command line flags.
    service.Init()

    // Register handler
    proto.RegisterGreeterHandler(service.Server(), new(Greeter))

    // Run the server
    if err := service.Run(); err != nil {
        fmt.Println(err)
    }
}

客户端:

client/client.go

package main

import (
    "context"
    "fmt"

    micro "github.com/micro/go-micro"
    proto "mygoproject/gomirco" //这里写你的proto文件放置路劲
)


func main() {
    // Create a new service. Optionally include some options here.
    service := micro.NewService(micro.Name("greeter.client"))
    service.Init()

    // Create new greeter client
    greeter := proto.NewGreeterService("greeter", service.Client())

    // Call the greeter
    rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
    if err != nil {
        fmt.Println(err)
    }

    // Print response
    fmt.Println(rsp.Greeting)
}

五 运行

创建go mod

go mod init test
go mod tidy

运行server

go run server/server.go

运行client

go run client/client.go

etcd-06-事务

一 etcd中的事务

二 示例

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/mvcc/mvccpb"
)

func main() {
  //连接
    cli, err := connect()
    if err != nil {
        return
    }
    defer cli.Close()

  // 第一步:加锁(创建租约,确保租约不过期,使用租约抢占key)

    // 申请一个5秒租约
    lease := clientv3.NewLease(cli)
    leaseR, err := lease.Grant(context.TODO(), 5)
    if err != nil {
        fmt.Println("lease err:", err)
        return
    }

    // 第三步中的释放锁 准备一个用于取消自动续租的context
    ctx, cancelFunc := context.WithCancel(context.TODO())
    defer cancelFunc()
    defer lease.Revoke(context.TODO(), leaseR.ID)       // 释放租约

    // 自动续租 返回值是个只读的chan,因为写入只能是etcd实现
    keepChan, err := lease.KeepAlive(ctx, leaseR.ID )
    if err != nil {
        fmt.Println("keep err:", err)
        return
    }
    // 启动一个协程去消费chan的应答
    go func(){
        for {
            select {
            case keepR := <- keepChan:
                if keepChan == nil {        // 此时系统异常或者主动取消context
                    fmt.Println("租约失效")
                    goto END
                } else {        // 每秒续租一次
                    fmt.Println("收到自动续租应答:", keepR.ID)
                }
            }
        }
    END:
    }()

    // 使用事务判断key是否存在;判断其
    key := "/cron/lock/jobX"
    kv := clientv3.NewKV(cli)
    txn := kv.Txn(context.TODO())       // 分布式事务
    txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
        Then(clientv3.OpPut(key, "xxx", clientv3.WithLease(leaseR.ID))).        // 一般这里val记录是哪个ID抢到
        Else(clientv3.OpGet(key))       // 否则抢锁失败

    // 提交事务
    txnR, err := txn.Commit()
    if err != nil {
        fmt.Println("txn失败:", err)
        return
    }
    // 判断是否抢到了锁
    if !txnR.Succeeded {
        fmt.Println("没抢到锁,锁已被占用;", string(txnR.Responses[0].GetResponseRange().Kvs[0].Value))
        return
    }

    // 第二步:业务代码书写
    fmt.Println("模拟处理任务")
    time.Sleep(time.Second * 5)

   // 第三步:释放锁(取消续租,释放租约) 
} 

func connect() (client *clientv3.Client, err error) {
    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:12379", "127.0.0.1:22379"},
        DialTimeout: time.Second * 5,
    })
    if err != nil {
        fmt.Println("connect err:", err)
        return nil, err
    }
    return client, err
}

多次执行上述方法,观察结果

etcd-05-监听

1.3 示例2 watch机制

package main

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/coreos/etcd/mvcc/mvccpb"
)

func connect() (client *clientv3.Client, err error){

    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
        DialTimeout: 5 * time.Second,
    })

    if err != nil {
        fmt.Println("connect err:", err)
        return nil, err
    }

    return client, err
}

func main() {

    // 连接
    cli, err := connect()
    defer cli.Close()
    if err != nil {
        return
    }

    // 获取etcd读写对象
    kv := clientv3.NewKV(cli)

    // 模拟变化
    go func() {
        for {
            kv.Put(context.TODO(), "/cron/jobs/job7", "job7")
            kv.Delete(context.TODO(), "/cron/jobs/job7")
            time.Sleep(1 * time.Second)
        }
    }()

    // 获取当前值
    getR, err := kv.Get(context.TODO(),  "/cron/jobs/job7")
    if err != nil {
        fmt.Println("get err:", err)
        return
    }
    if len(getR.Kvs) != 0 {     // key存在
        fmt.Println("当前值:", string(getR.Kvs[0].Value))
    }

    // 监听后续变化: revision是当前etcd集群事务ID,该ID是单调递增
    wathStartRevision := getR.Header.Revision + 1
    watcher := clientv3.NewWatcher(cli)         // 创建wathcer
    fmt.Println("从该版本向后监听:", wathStartRevision)
    watchChan := watcher.Watch(context.TODO(),  "/cron/jobs/job7", clientv3.WithRev(wathStartRevision))

    // 如果有变化,则会将变化丢到watchChan
    for watchResult := range watchChan{
        for _, event := range watchResult.Events {
            switch event.Type {
            case mvccpb.PUT:
                fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
            case mvccpb.DELETE:
                fmt.Println("删除了Revision:", event.Kv.ModRevision )
            }
        }
    }
}

如果要取消监听,同样是通过取消contex来实现:

ctx, cancelFunc := context.WithTimeout(context.TODO(), 5 * time.Second)
// 5秒后执行退出函数
time.AfterFunc(5 * time.Second, func(){
    cancelFunc()
})
watchChan := watcher.Watch(ctx,  "/cron/jobs/job7", clientv3.WithRev(wathStartRevision))

etcd-04-租约

一 租约机制(自动过期)

package main

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "time"
)

func connect() (client *clientv3.Client, err error){

    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
        DialTimeout: 5 * time.Second,
    })

    if err != nil {
        fmt.Println("connect err:", err)
        return nil, err
    }

    return client, err
}

func main() {

    // 连接
    cli, err := connect()
    defer cli.Close()
    if err != nil {
        return
    }

    // 获取etcd读写对象
    kv := clientv3.NewKV(cli)

    // 申请一个10秒租约
    lease := clientv3.NewLease(cli)
    leaseR, err := lease.Grant(context.TODO(), 10)
    if err != nil {
        fmt.Println("lease err:", err)
        return
    }

    // 使用该租约put一个kv
    putR, err := kv.Put(context.TODO(), "/cron/lock/job1", "10001", clientv3.WithLease(leaseR.ID))
    if err != nil {
        fmt.Println("put err:", err)
        return
    }
    fmt.Println("写入成功:", putR.Header.Revision)

    // 定时查看key是否过期
    for {
        getR, err := kv.Get(context.TODO(), "/cron/lock/job1")
        if err != nil {
            fmt.Println("get err:", err)
            return
        }
        if getR.Count == 0 {
            fmt.Println("key过期")
            break
        } else {
            fmt.Println("还未过期")
            time.Sleep(2 * time.Second)
        }
    }
}

1.3 租约续租

我们希望能够续约,并能根据需要删除:

package main

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "time"
)

func connect() (client *clientv3.Client, err error){

    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
        DialTimeout: 5 * time.Second,
    })

    if err != nil {
        fmt.Println("connect err:", err)
        return nil, err
    }

    return client, err
}

func main() {

    // 连接
    cli, err := connect()
    defer cli.Close()
    if err != nil {
        return
    }

    // 获取etcd读写对象
    kv := clientv3.NewKV(cli)

    // 申请一个10秒租约
    lease := clientv3.NewLease(cli)
    leaseR, err := lease.Grant(context.TODO(), 10)
    if err != nil {
        fmt.Println("lease err:", err)
        return
    }

    // 自动续租 返回值是个只读的chan,因为写入只能是etcd实现
    keepChan, err := lease.KeepAlive(context.TODO(), leaseR.ID )
    if err != nil {
        fmt.Println("keep err:", err)
        return
    }
    // 启动一个协程去消费chan的应答
    go func(){
        for {
            select {
            case keepR := <- keepChan:
                if keepChan == nil {        // 此时系统异常或者主动取消context
                    fmt.Println("租约失效")
                    goto END
                } else {        // 每秒续租一次
                    fmt.Println("收到自动续租应答:", keepR.ID)
                }
            }
        }
        END:
    }()

    // 使用该租约put一个kv
    putR, err := kv.Put(context.TODO(), "/cron/lock/job1", "10001", clientv3.WithLease(leaseR.ID))
    if err != nil {
        fmt.Println("put err:", err)
        return
    }
    fmt.Println("写入成功:", putR.Header.Revision)

    // 定时查看key是否过期
    for {
        getR, err := kv.Get(context.TODO(), "/cron/lock/job1")
        if err != nil {
            fmt.Println("get err:", err)
            return
        }
        if getR.Count == 0 {
            fmt.Println("key过期")
            break
        } else {
            fmt.Println("还未过期")
            time.Sleep(2 * time.Second)
        }
    }
}

如果我们要主动让context取消,则会让租约失效,现在定义一个5秒后取消的context:

// 续租了5秒,然后手动停止续租,即总共有15秒生命
ctx, _ := context.WithTimeout(context.TODO(), 5 * time.Second)

// 自动续租 返回值是个只读的chan,因为写入只能是etcd实现
keepChan, err := lease.KeepAlive(ctx, leaseR.ID )

// 启动一个协程去消费chan的应答
go func() {
        for keepR := range keepChan {
            if keepR != nil {
                fmt.Println("收到自动续租应答:", keepR.ID)
            } else {
                fmt.Println("租约失效")
            }
        }
    }()

etcd-03-增删改查

一 golang对etcd的增删改查

package main

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "time"
)

func connect() (client *clientv3.Client, err error){

    client, err = clientv3.New(clientv3.Config{
         // etcd的集群数组,我们这里只有1个
        Endpoints:   []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"},
        DialTimeout: 5 * time.Second,
    })

    if err != nil {
        fmt.Println("connect err:", err)
        return nil, err
    }

    return client, err
}

func main() {

    // 连接
    cli, err := connect()
    defer cli.Close()
    if err != nil {
        return
    }

    // 获取etcd读写对象
    kv := clientv3.NewKV(cli)

    // 添加键值对
    r1, err := kv.Put(context.TODO(), "/lesson/math", "100")        // 添加数学课程为100分
    if err != nil {
        fmt.Println("put key1 err:", err)
        return
    }

    // 继续添加键值对
    r2, err := kv.Put(context.TODO(), "/lesson/music", "50")        // 添加音乐课程为50分
    if err != nil {
        fmt.Println("put key2 err:", err)
        return
    }

    fmt.Println("添加结果r1: ", r1)         // &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:9 raft_term:2  <nil>}
    fmt.Println("添加结果r2: ", r2)         // &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:10 raft_term:2  <nil>}

    // 获取整个 /lesson目录下的数据
    getAll, err := kv.Get(context.TODO(), "/lesson/", clientv3.WithPrefix())
    if err != nil {
        fmt.Println("select all err: ", err)
        return
    }
    // [key:"/lesson/math" create_revision:9 mod_revision:25 version:8 value:"100"  key:"/lesson/music" create_revision:26 mod_revision:26 version:1 value:"50" ]
    fmt.Println("查询所有:", getAll.Kvs)

    // 删除键值对,如果添加参数:clientv3.WithPrevKV(),则delResult结果中包含删除前的结果:PrevKvs
    delResult, err := kv.Delete(context.TODO(), "/lesson/music")
    if err != nil {
        fmt.Println("del key2 err:", err)
        return
    }
    fmt.Println("删除r2结果:", delResult)

    // 修改键值对,修改仍然是Put
    updRerulst, err := kv.Put(context.TODO(), "/lesson/math", "30")
    if err != nil {
        fmt.Println("upd key2 err:", err)
        return
    }
    fmt.Println("修改r1结果:", updRerulst)

    // 查询当前r1的值 该函数支持重载,第三个参数都是 clientv3.With***,用来限制返回结果
    getR1, err := kv.Get(context.TODO(), "/lesson/math")
    if err != nil {
        fmt.Println("select r1 err: ", err)
        return
    }
    //  [key:"/lesson/math" create_revision:9 mod_revision:13 version:3 value:"100" ]
    fmt.Println("查询r1结果:", getR1.Kvs)   

    // 查询被删除的r2的值
    getR2, err := kv.Get(context.TODO(), "/lesson/music")
    if err != nil {
        fmt.Println("select r2 err: ", err)
        return
    }
    //  []
    fmt.Println("查询r2结果:", getR2.Kvs)   
}

二 批量操作

  • 批量删除:kv.Delete(context.TODO(), "/lesson/", clientv3.WithPrevfix())
  • 批量按顺序删除,并删除2个:kv.Delete(context.TODO(), "/lesson/lesson1", clientv3.WithFromKey(),clientv3.WithLimit(2))

三 使用OP操作代替原有的增删改查

    // 创建Op
    putOp := clientv3.OpPut("/cron/jobs/job8", "888")
    // 执行Op
    opR, err := kv.Do(context.TODO(), putOp)
    if err != nil {
        fmt.Println("putOp err:", err)
        return
    }
    fmt.Println("写入Revision:", opR.Put().Header.Revision)

    // 创建Op
    getOp := clientv3.OpGet("/cron/jobs/job8")
    // 执行Op
    opR2, err := kv.Do(context.TODO(), getOp)
    if err != nil {
        fmt.Println("getOp err:", err)
        return
    }
    fmt.Println("获取Revisoon:", opR2.Get().Kvs[0].ModRevision)
    fmt.Println("获取Value:", opR2.Get().Kvs[0].Value)

四 注意事项

遇到如下错误,grpc使用 v1.26.0

../../../../go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions

etcd-02-服务发现

一 etcd实现服务发现

etcd是一个采用HTTP协议的健/值对存储系统,它是一个分布式和功能层次配置系统,可用于构建服务发现系统。对比庞大的conusl和Zookeeper,etcd系统本身极为简单(因为仅仅是一个分布式kv存储),但是他需要搭配一些第三方工具才可以实现服务发现功能。

现在,我们有一个地方来存储服务相关信息,我们还需要一个工具可以自动发送信息给etcd。但在这之后,为什么我们还需要手动把数据发送给etcd呢?即使我们希望手动将信息发送给etcd,我们通常情况下也不会知道是什么信息。记住这一点,服务可能会被部署到一台运行最少数量容器的服务器上,并且随机分配一个端口。理想情况下,这个工具应该监视所有节点上的Docker容器,并且每当有新容器运行或者现有的一个容器停止的时候更新etcd,其中的一个可以帮助我们达成目标的工具就是Registrator。

Registrator通过检查容器在线或者停止运行状态自动注册和去注册服务,它目前支持etcd、Consul和SkyDNS 2。

Registrator与etcd是一个简单但是功能强大的组合,可以运行很多先进的技术。每当我们打开一个容器,所有数据将被存储在etcd并传播到集群中的所有节点。我们将决定什么信息是我们的。

我们还需要一种方法来创建配置文件,与数据都存储在etcd,通过运行一些命令来创建这些配置文件。

Confd是一个轻量级的配置管理工具,常见的用法是通过使用存储在etcd、consul和其他一些数据登记处的数据保持配置文件的最新状态,它也可以用来在配置文件改变时重新加载应用程序。换句话说,我们可以用存储在etcd(或者其他注册中心)的信息来重新配置所有服务。

最后的组合如图所示:

当etcd、Registrator和Confd结合时,可以获得一个简单而强大的方法来自动化操作我们所有的服务发现和需要的配置。这个组合还展示了“小”工具正确组合的有效性,这三个小东西可以如我们所愿正好完成我们需要达到的目标,若范围稍微小一些,我们将无法完成我们面前的目标,而另一方面如果他们设计时考虑到更大的范围,我们将引入不必要的复杂性和服务器资源开销。

etcd-01-etcd概述

一 etcd简介

1.1 etcd是什么

etcd是一个分布式KV存储库,内部采用Raft协议作为一致性算法选举leader,同步key-value,其特性是:高可用,强一致。

集群一般采取大多数模型(quorum)来选举leader,即集群需要2N+1个节点,这时总能产生1个leader,多个follower。etcd也不例外,每个etcd cluster都由若干个member组成,每个member是一个独立运行的etcd实例,单机上也可以运行多个member。

在正常运行的状态下,集群中会有一个 leader,其余的 member 都是 followers。leader 向 followers 同步日志,保证数据在各个 member 都有副本。leader 还会定时向所有的 member 发送心跳报文,如果在规定的时间里 follower 没有收到心跳,就会重新进行选举。客户端所有的请求都会先发送给 leader,leader 向所有的 followers 同步日志,等收到超过半数的确认后就把该日志存储到磁盘,并返回响应客户端。

每个 etcd 服务有三大主要部分组成:

  • raft 实现
  • WAL (Write Ahead Log)预写日志,日志存储:在本地磁盘(–data-dir)上存储日志内容(wal file)和快照(snapshot)
  • 数据的存储和索引

etcd调用阶段:

  • 阶段1:调用者调用leader,leader会将kv数据存储在日志中,并利用实时算法raft进行复制
  • 阶段2:当复制给了N+1个节点后,本地提交,返回给客户端,最后leader异步通知follower完成通知

注意:日志只要复制给了大多数就不会丢。

raft日志概念:

  • replication:日志在leader生成,向follower复制,最终达到各个节点日志序列一致
  • term:任期,重新选举产生的leader,其term单调递增
  • log index:日志行在日志序列的下标

二 etcd安装

2.1 Linux安装

启动

## 启动:强制其监听在公网端口
nohup ./etcd --listen-client-urls 'http://0.0.0.0:2379' --advertise-client-urls  'http://0.0.0.0:2379' &

## 查看日志,确认etcd是否启动成功
less nohup.out

2.2 Mac安装

安装etcd:

brew search etcd
brew install etcd

运行:

etcd

2.3 启动后的一些默认显示

在启动etcd后,会显示一些配置信息:

  • etcdserver: name = default name表示节点名称,默认为default
  • data-dir:保存日志和快照的目录,默认为当前工作目录default.etcd/
  • 通信相关:
    • 在http://localhost:2380和集群中其他节点通信
    • 在http://localhost:2379提供HTTP API服务,供客户端交互。等会配置webui就是这个地址
  • etcdserver: heartbeat = 100ms leader发送心跳到followers的间隔时间
  • etcdserver: election = 1000ms 重新投票的超时时间,如果follow在该时间间隔没有收到心跳包,会触发重新投票,默认为1000ms

2.4 etcd webui

这里使用了一个nodejs开发的web:

git clone https://github.com/henszey/etcd-browser.git
cd etcd-browser/
vim server.js  

var etcdHost = process.env.ETCD_HOST || '127.0.0.1';  # etcd 主机IP
var etcdPort = process.env.ETCD_PORT || 4001;          # etcd 主机端口
var serverPort = process.env.SERVER_PORT || 8000;      # etcd-browser 监听端口

# 启动
node server.js

三 etcd客户端操作

常用命令:

ETCDCTL_API=3 ./etcdctl # 查看所有命令
ETCDCTL_API=3 ./etcdctl put "hello" "world"
ETCDCTL_API=3 ./etcdctl get "hello"

# 顺序存储的键可以使用前缀模糊查询
ETCDCTL_API=3 ./etcdctl put "/users/user1" "zs"
ETCDCTL_API=3 ./etcdctl put "/users/user2" "ls"
ETCDCTL_API=3 ./etcdctl get "/users/" --prefix      # 查询全部该前缀
ETCDCTL_API=3 ./etcdctl watch "/users/" --prefix    # 监听该前缀数据变化,此时另起命令行操作数据,则当前命令行能监听到

微服务-服务发现简介

一 服务发现

1.1 服务发现出现的缘由

因为一套微服务架构中有很多个服务需要管理,管理几百个服务所使用的端口列表是一大挑战,我们应该部署无需指定端口的服务,并让Docker为我们分配一个随机端口。
那么问题就演变成了我们需要发现端口号,让别人知道。为了能够定位服务,需要下面2个步骤:

  • 服务注册:该步骤存储的信息至少包括正在运行的服务的主机和端口信息
  • 服务发现:该步骤允许其他用户可以发现在服务注册阶段存储的信息。

微服务的框架体系中,服务发现是不能不提的一个模块。客户端的一个接口需要调用多个服务,客户端必须知道所有服务的网络位置,以往的做法是使用配置文件,或者配置在数据库中,这就出现了一些问题:

  • 需要配置N个服务的网络位置,加大配置的复杂性
  • 服务的网络位置变化,都需要改变每个调用者的配置
  • 集群的情况下,难以做负载(反向代理的方式除外)

有了服务发现模块,多个微服务把当前自己的网络位置注册 到服务发现模块(这里注册的意思就是告诉),服务发现就以K-V的方式记录下,K一般是服务名,V就是 IP:PORT。服务发现模块定时的轮询查看这些服务能不能访问的了(这就是健康检查)。客户端在调用服务A-N的 时候,就跑去服务发现模块问下它们的网络位置,然后再调用它们的服务。

客户端完全不需要记录这些服务网络位置,客户端和服务端完全解耦!

1.2 需要额外考虑的地方

如果一个服务停止工作并部署/注册了一个新的服务实例,那么该服务是否应该注销呢?当有相同服务的多个副本时咋办?我们该如何做负载均衡呢?如果一个服务器宕机了咋办?所有这些问题都与注册和发现阶段紧密关联。现在,我们限定只在服务发现的范围里(常见的名字,围绕上述步骤)以及用于服务发现任务的工具,它们中的大多数采用了高可用的分布式键/值存储,这就是服务发现工具需要实现的功能。

1.3 服务发现工具

服务发现背后的基本思想是对于服务的每一个新实例(或应用程序),能够识别当前环境和存储相关信息。存储的注册表信息本身通常采用键/值对的格式,由于服务发现经常用于分布式系统,所以要求这些信息可伸缩、支持容错和分布式集群中的所有节点。这种存储的主要用途是给所有感兴趣的各方提供最起码诸如服务IP地址和端口这样的信息,用于它们之间的相互通讯,这些数据还经常扩展到其它类型的信息服务发现工具倾向于提供某种形式的API,用于服务自身的注册以及服务信息的查找。

比方说我们有两个服务,一个是提供方,另一个是第一个服务的消费者,一旦部署了服务提供方,就需要在服务发现注册表中存储其信息。接着,当消费者试图访问服务提供者时,它首先查询服务注册表,使用获取到的IP地址和端口来调用服务提供者。为了与注册表中的服务提供方的具体实现解耦,我们常常采用某种代理服务。这样消费者总是向固定IP地址的代理请求信息,代理再依次使用服务发现来查找服务提供方信息并重定向请求,在本文中我们稍后通过反向代理来实现。现在重要的是要理解基于三种角色(服务消费者、提供者和代理)的服务发现流程。

服务发现工具要查找的是数据,至少我们应该能够找出服务在哪里?服务是否健康和可用?配置是什么样的?既然我们正在多台服务器上构建一个分布式系统,那么该工具需要足够健壮,保证其中一个节点的宕机不会危及数据,同时,每个节点应该有完全相同的数据副本,进一步地,我们希望能够以任何顺序启动服务、杀死服务或者替换服务的新版本,我们还应该能够重新配置服务并且查看到数据相应的变化。

二 常用的服务发现技术

2.0 常见服务发现技术列表

  • zookeeper:历史最悠久,起源于Hadoop,成熟、健壮、生态丰富,已经被大量公司使用。但是其过于复杂、重量级,后续诞生了许多替代品
  • etcd:是一个采用http协议的jkv存储系统,搭配第三方工具后可以提供服务发现功能:Registrator、Confd
  • Eureka:
  • consul:

特点对比:

注:CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

2.1 健康检查对比

  • consul:非常详细,如检查内存占用是否到达90%、文件系统空间是否不足
  • Zookeeper、etcd在失去了和服务进程连接的情况下任务不健康
  • Euraka需要显式配置健康检查

2.2 多数据中心支持对比

  • consul:使用WAN的Gossip协议,完成了跨数据中心同步,其他产品则需要额外开发工具链

2.3 CAP理论取舍对比

  • consul、Eureka:典型的CA,适合分布式服务,服务发现的可用性优先级较高,consul更能提供更高的可用性、保证KV stor的一致性
  • zookeeper、etcd:CP类型,牺牲可用性,在服务发现场景优势较弱

2.4 kv支持对比

只有Eureka不支持。

2.5 跨语言支持对比

  • consul:支持http1.1接入,还支持标准的REST服务api,还提供了DNS支持
  • etc:支持http1.1接入,还支持grpc
  • Zookeeper:跨语言支持较弱
  • Euraka:一般通过sidecar的方式提供多语言客户端接入支持

2.6 watch支持对比(客户端观察到服务提供者变化)

  • consul:使用长轮询方式实现变化感知
  • etcd:使用长轮询方式实现变化感知
  • Zookeeper:支持服务端推送变化
  • Eureka:1.0版本使用长轮询方式实现变化感知,2.0版本计划支持服务端推送变化

2.7 自身集群监控

除了Zookeeper,其他都默认支持metrics,可以搜集并报警这些度量信息达到监控目的

2.8 其他

Java著名微服务架构体系SpringCloud对上述四者都提供了集成,但对Consul支持较为完善。

参考地址:http://dockone.io/article/667

protobuf-03-go与protobuf

一 安装Go语言编译protobuf环境

# 安装Go语言的proto API接口:
go get -v -u github.com/golang/protobuf/proto

# 安装protoc-gen-go插件:这是个go程序
go get -v -u github.com/golang/protobuf/protoc-gen-go

# 开启go mod版本的golang:拷贝命令,该命令位于 go/bin/
cp protoc-gen-go /usr/local/bin/ 

# 低版本golang:需要编译该源码为可执行文件,然后拷贝该文件
cd /gopath/github.com/golang/protobuf/protoc-gen-go/           
go build 

二 编译proto文件

新建一个golang项目,项目根目录创建protoes/hello.proto文件,内容如下:

syntax = "proto3";               

package protoes;                  // 指定包

message HelloRequest {
  string name = 1;                 // 1-4分别是键对应的数字id
  int32 u_count = 2;     
}

编译hello.proto:

cd protoes
protoc --go_out=./ *.proto      # 在当前目录生成了文件 hello.pb.go

当用protocol buffer编译器来运行.proto文件时,编译器将生成所选择语言的代码,包括获取、设置字段值,将消息序列化到一个输出流中,以及从一个输入流中解析消息:

  • C++:编译器会为每个.proto文件生成一个.h文件和一个.cc文件,.proto文件中的每一个消息有一个对应的类
  • Python:.proto文件中的每个消息类型生成一个含有静态描述符的模块,该模块与一个元类(metaclass)在运行时被用来创建所需的Python数据访问类
  • Go:编译器会为每个消息类型生成了一个.pb.go文件

hello.pb.go主要内容如下:

package protoes

import (
    fmt "fmt"
    proto "github.com/golang/protobuf/proto"
    math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type HelloRequest struct {
    Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
    UCount               int32    `protobuf:"varint,2,opt,name=u_count,json=uCount,proto3" json:"u_count,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

三 使用Go代码获取数据

package main

import (
    "fmt"
    "github.com/golang/protobuf/proto"
    "test/protoes"                          // test是go mod的项目名
)

func main() {

    HelloRequest := protoes.HelloRequest{
        Name: *proto.String("lisi"),
        UCount: *proto.Int32(17),
    }

    // 序列化
    data, err := proto.Marshal(&HelloRequest)
    if err != nil {
        fmt.Println("marshal error:", err)
        return
    }
    fmt.Println("marshal data:", data)      // 一串流数据

    // 反序列化
    var list protoes.HelloRequest
    err = proto.Unmarshal(data, &list)
    if err != nil {
        fmt.Println("unmarshal error:", err)
        return
    }
    fmt.Println("Name:", list.GetName())        // lisi
    fmt.Println("UCount:", list.GetUCount())    // 17

}