技術備忘記

日々調べたこととかとりとめなく

ZooKeeper を使った Service Discovery入門

ZooKeeper

ZooKeeperとは、CoreOSのetcd や Hashicorpのconsul等とよく並び称される、 いわゆるCordination Serviceツールです。

詳しく知りたい方は、公式ドキュメントを、とりあえずおおまかな特徴を抑えたいという方は こちらの記事が非常によくまとまっていて わかりやすかったので、ご参照頂ければと思います。

尚、本エントリーではZooKeeperの説明は(ほとんど)行いません。 予めご了承下さい。

Service Discovery

従来、Service Discovery といえば service discovery protocol (SDP)や、DNS-SD など、 インフラ、ネットワーク周りのプロトコルやそれらが提供する機能を指すことが多かったかと思います。

昨今は Micoservices という、サービスを細かく分け、それぞれが情報を通信しあって全体を作り上げていく アーキテクチャが主流になりつつこともあり、アプリケーションの領域でも語られるようになってきました。

Microservices の世界では、Continuous Integration や Immutable Infrastracture といったキーワードが象徴するように、 常にアプリケーションを取り巻く環境が変化します。 (一応補足すると、 Immutable とは変化しない、ということではなく、常に0から作り直し、状態を持たないという意味です)

そういった中で、環境の変化、状態を管理・保持するのがアプリケーションにおける Service Discovery であり、 その際に利用されるのが、etcd や consul 、そして ZooKeeper です。

余談ですが、私が最近気になっているGo製の Microservices フレームワーク go-microでは、 consul を使っています。

consul や go-micro に関しては、また別の機会に投稿します。

ZooKeeper を使って Service Discoveryしてみる

ようやく本題です。ZooKeeper は 同じく Apacheプロダクトの KafkaSpark、それ以外にも 本当に色々なところで使われています。

ただし、そういったミドルウェアのバックエンドとして使っていることはあっても、 実際に自分が開発するアプリケーションから ZooKeeper を直接操作をした事が無い、という方は結構多いのではないでしょうか。 かく言う私もそうでした。

という訳で、今回直接 ZooKeeper を操作して、簡単な Service Discovery を実現してみたいと思います。 以後 Go のサンプルコード交えて説明します。全体はこちらにあげています。

今回作るサービスは、 名前を渡すと挨拶をしてくれる Greet サービスです。

f:id:junchang1031:20160115184236p:plain

順を追って説明します。

サービスの起動 ①Pete

まずサービス Pete を起動します。 起動すると以下の様なメッセージが表示されます。

SERVER_NAME=Pete HTTP_PORT=8001 go run server/server.go
2016/01/12 21:32:45 Connected to 192.168.99.100:32772
2016/01/12 21:32:45 Authenticated: id=95156352395182231, timeout=5000
Listen 8001

コードを見てみます。

Server (Pete)

   const Node = "/greet"

    conn, _, _ := zk.Connect([]string{zkServer}, time.Second*5)

    create := func() error {
        var err error
        // try creating ephemeral node
        _, err = conn.Create(Node, []byte(httpPort), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        return err
    }

    // ---- Pete は 書込に成功するので if以下には入らない ----
    if create() != nil {
        // watch ephemeral node event.
        another, _, eventChan, _ := conn.GetW(Node)
        fmt.Println("Now listen", string(another))
    loop:
        for {
            event := <-eventChan
            if event.Type == zk.EventNodeDeleted || event.Type.String() == "Unknown" {
                // retry creating ephemeral node
                if create() != nil {
                    break loop
                }
            }
        }
    }

Pete はZooKeeper上の /greet というエフェメラルノードに書込を試みます。

エフェメラルノードとは、和訳すると「一時ノード」です。 ZooKeeper のエフェメラルノードは、作成時に確立したセッションが有効な間だけ存在します。

Pete が起動した段階では /greet は存在しないため書込に成功します。 書き込む内容は、クライアントがアクセスするのに必要な、IPやポートの情報です。 ただし、今回はクラアント、サーバーともにローカル上で動作させるのでポート情報のみ書き込んでいます。

書き込み後、httpのListenを開始します。

func listen() {
    fmt.Println("Listen", httpPort)
    http.HandleFunc("/", greet)
    http.ListenAndServe(":"+httpPort, nil)
}

func greet(w http.ResponseWriter, r *http.Request) {
    name := r.FormValue("name")
    fmt.Println("Greeted in", name)
    fmt.Fprintf(w, "Hello %s! I'm %s", name, serverName)
}

特に難しいところはないと思います。

サービスの起動 ②Brian

次にサービス Brian を起動します。 メッセージが少し変わります。 少しわかりにくいですが、既に8001で別サービスがListenしている旨を表示します。

SERVER_NAME=Brian HTTP_PORT=8002 go run server/server.go
2016/01/12 21:33:40 Connected to 192.168.99.100:32772
2016/01/12 21:33:40 Authenticated: id=95156352395182232, timeout=5000
Now listen 8001

コードは先程と同じです。

Server (Brian)

   // ---- Brian は 書込に失敗するので ノード変更イベントの監視 ----
    if create() != nil {
        // watch ephemeral node event.
        another, _, eventChan, _ := conn.GetW(Node)
        fmt.Println("Now listen", string(another))
    loop:
        for {
            event := <-eventChan
            if event.Type == zk.EventNodeDeleted || event.Type.String() == "Unknown" {
                // retry creating ephemeral node
                if create() != nil {
                    break loop
                }
            }
        }
    }

Brian も同じくZooKeeper上の /greet に書込を試みます。 ただし、/greet は既に Pete が書き込みを行っているため書込に失敗します。 Brianは以後はforの中で、この /greetノードの変更イベントを監視しつつ待機状態となります。

クライアントの起動

次にクライアントを起動します。 名前の入力を求めるメッセージが表示されます。

go run client/client.go
Enter your name.
>

クライアントは、ZooKeeper 上の /greet の読み込みます。 また、同時に/greetノードの変更イベントの監視を別の goroutine で行います。 監視のコードはほぼサーバー側と同じです。

その後、入力を受け付け、enter を押されたらサービスに問合せに行って 受け取った結果を取得して出力します。

Client

func main() {
    zkServer = os.Getenv("ZK_SERVER")
    if zkServer == "" {
        panic("ZK_SERVER is required")
    }
    zkConn, _, _ = zk.Connect([]string{zkServer}, time.Second*5)
    zkConn.SetLogger(new(NoLog))

    go watch()

    var s scanner.Scanner
    s.Init(os.Stdin)
    for {
        fmt.Print("Enter your name.\n> ")
        s.Scan()
        fmt.Println(call(s.TokenText()))
    }
}

func call(name string) string {
    var p string
    for {
        if p = discover(false); p != "" {
            break
        }
    }
    resp, err := http.Get(fmt.Sprintf("http://localhost:%s/?name=%s", p, name))
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()

    b, _ := ioutil.ReadAll(resp.Body)

    return string(b)
}

func discover(change bool) string {
    servicePortMtx.Lock()
    defer servicePortMtx.Unlock()

    if change || servicePort == "" {
        p, _, _ := zkConn.Get(Node)
        servicePort = string(p)
    }

    return servicePort
}

func watch() {
    var eventChan <-chan zk.Event
    _, _, eventChan, _ = zkConn.GetW(Node)
    for {
        event := <-eventChan
        if event.Type == zk.EventNodeDeleted || event.Type.String() == "Unknown" {
            discover(true)
        }
        _, _, eventChan, _ = zkConn.GetW(Node)
    }
}

Pete が挨拶

Client

Enter your name.
> John
Hello John! I'm Pete

Server(Pete)

Greeted John

Pete が挨拶してくれました。

Pete サービスを落とす

Ctrl+C で Pete を落とします。

Server(Pete)

^Cexit status 2

しばらくすると Brian が立ち上がった旨が表示されます。

Server(Brian)

Listen 8002

Brian が挨拶

Client

Enter your name.
> Paul
Hello Paul! I'm Brian

Server(Brian)

Greeted Paul

想定通り、 Brian が Pete に替わって挨拶してくれました!

まとめ

いかがでしたでしょうか。

今回の実装では実用レベルには程遠いですが、 ZooKeeper を使った Service Discovery の おおよそのイメージは掴んで頂けたのではと思います。

もう少しきちんと実装したうえでの実サービスでの導入も 妄想 検討しているので、 実現の際にはまた記事にしたいと思います。