ZooKeeper を使った Service Discovery入門
ZooKeeper
ZooKeeper
とは、CoreOSのetcd
や Hashicorpのconsul
等とよく並び称される、
いわゆるCordination Serviceツールです。
詳しく知りたい方は、公式ドキュメントを、とりあえずおおまかな特徴を抑えたいという方は こちらの記事が非常によくまとまっていて わかりやすかったので、ご参照頂ければと思います。
尚、本エントリーではZooKeeperの説明は(ほとんど)行いません。 予めご了承下さい。
Service Discovery
従来、Service Discovery
といえば service discovery protocol (SDP)や、DNS-SD など、
インフラ、ネットワーク周りのプロトコルやそれらが提供する機能を指すことが多かったかと思います。
昨今は Micoservices
という、サービスを細かく分け、それぞれが情報を通信しあって全体を作り上げていく
アーキテクチャが主流になりつつこともあり、アプリケーションの領域でも語られるようになってきました。
Microservices の世界では、Continuous Integration や Immutable Infrastracture といったキーワードが象徴するように、 常にアプリケーションを取り巻く環境が変化します。 (一応補足すると、 Immutable とは変化しない、ということではなく、常に0から作り直し、状態を持たないという意味です)
そういった中で、環境の変化、状態を管理・保持するのがアプリケーションにおける Service Discovery であり、 その際に利用されるのが、etcd や consul 、そして ZooKeeper です。
余談ですが、私が最近気になっているGo製の Microservices フレームワーク go-micro
では、
consul を使っています。
consul や go-micro に関しては、また別の機会に投稿します。
ZooKeeper を使って Service Discoveryしてみる
ようやく本題です。ZooKeeper は 同じく Apacheプロダクトの Kafka
や Spark
、それ以外にも
本当に色々なところで使われています。
ただし、そういったミドルウェアのバックエンドとして使っていることはあっても、 実際に自分が開発するアプリケーションから ZooKeeper を直接操作をした事が無い、という方は結構多いのではないでしょうか。 かく言う私もそうでした。
という訳で、今回直接 ZooKeeper を操作して、簡単な Service Discovery を実現してみたいと思います。 以後 Go のサンプルコード交えて説明します。全体はこちらにあげています。
今回作るサービスは、 名前を渡すと挨拶をしてくれる Greet サービスです。
順を追って説明します。
サービスの起動 ①Pete
まずサービス Pete を起動します。 起動すると以下の様なメッセージが表示されます。
SERVER_NAME=Pete HTTP_PORT=8001 go run server/server.go 2016/01/12 21:32:45 Connected to 192.168.99.100:32772 2016/01/12 21:32:45 Authenticated: id=95156352395182231, timeout=5000 Listen 8001
コードを見てみます。
Server (Pete)
const Node = "/greet" conn, _, _ := zk.Connect([]string{zkServer}, time.Second*5) create := func() error { var err error // try creating ephemeral node _, err = conn.Create(Node, []byte(httpPort), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) return err } // ---- Pete は 書込に成功するので if以下には入らない ---- if create() != nil { // watch ephemeral node event. another, _, eventChan, _ := conn.GetW(Node) fmt.Println("Now listen", string(another)) loop: for { event := <-eventChan if event.Type == zk.EventNodeDeleted || event.Type.String() == "Unknown" { // retry creating ephemeral node if create() != nil { break loop } } } }
Pete はZooKeeper上の /greet
というエフェメラルノードに書込を試みます。
エフェメラルノードとは、和訳すると「一時ノード」です。 ZooKeeper のエフェメラルノードは、作成時に確立したセッションが有効な間だけ存在します。
Pete が起動した段階では /greet
は存在しないため書込に成功します。
書き込む内容は、クライアントがアクセスするのに必要な、IPやポートの情報です。
ただし、今回はクラアント、サーバーともにローカル上で動作させるのでポート情報のみ書き込んでいます。
書き込み後、httpのListenを開始します。
func listen() { fmt.Println("Listen", httpPort) http.HandleFunc("/", greet) http.ListenAndServe(":"+httpPort, nil) } func greet(w http.ResponseWriter, r *http.Request) { name := r.FormValue("name") fmt.Println("Greeted in", name) fmt.Fprintf(w, "Hello %s! I'm %s", name, serverName) }
特に難しいところはないと思います。
サービスの起動 ②Brian
次にサービス Brian を起動します。 メッセージが少し変わります。 少しわかりにくいですが、既に8001で別サービスがListenしている旨を表示します。
SERVER_NAME=Brian HTTP_PORT=8002 go run server/server.go 2016/01/12 21:33:40 Connected to 192.168.99.100:32772 2016/01/12 21:33:40 Authenticated: id=95156352395182232, timeout=5000 Now listen 8001
コードは先程と同じです。
Server (Brian)
// ---- Brian は 書込に失敗するので ノード変更イベントの監視 ---- if create() != nil { // watch ephemeral node event. another, _, eventChan, _ := conn.GetW(Node) fmt.Println("Now listen", string(another)) loop: for { event := <-eventChan if event.Type == zk.EventNodeDeleted || event.Type.String() == "Unknown" { // retry creating ephemeral node if create() != nil { break loop } } } }
Brian も同じくZooKeeper上の /greet
に書込を試みます。
ただし、/greet
は既に Pete が書き込みを行っているため書込に失敗します。
Brianは以後はforの中で、この /greet
ノードの変更イベントを監視しつつ待機状態となります。
クライアントの起動
次にクライアントを起動します。 名前の入力を求めるメッセージが表示されます。
go run client/client.go Enter your name. >
クライアントは、ZooKeeper 上の /greet
の読み込みます。
また、同時に/greet
ノードの変更イベントの監視を別の goroutine で行います。
監視のコードはほぼサーバー側と同じです。
その後、入力を受け付け、enter を押されたらサービスに問合せに行って 受け取った結果を取得して出力します。
Client
func main() { zkServer = os.Getenv("ZK_SERVER") if zkServer == "" { panic("ZK_SERVER is required") } zkConn, _, _ = zk.Connect([]string{zkServer}, time.Second*5) zkConn.SetLogger(new(NoLog)) go watch() var s scanner.Scanner s.Init(os.Stdin) for { fmt.Print("Enter your name.\n> ") s.Scan() fmt.Println(call(s.TokenText())) } } func call(name string) string { var p string for { if p = discover(false); p != "" { break } } resp, err := http.Get(fmt.Sprintf("http://localhost:%s/?name=%s", p, name)) if err != nil { panic(err) } defer resp.Body.Close() b, _ := ioutil.ReadAll(resp.Body) return string(b) } func discover(change bool) string { servicePortMtx.Lock() defer servicePortMtx.Unlock() if change || servicePort == "" { p, _, _ := zkConn.Get(Node) servicePort = string(p) } return servicePort } func watch() { var eventChan <-chan zk.Event _, _, eventChan, _ = zkConn.GetW(Node) for { event := <-eventChan if event.Type == zk.EventNodeDeleted || event.Type.String() == "Unknown" { discover(true) } _, _, eventChan, _ = zkConn.GetW(Node) } }
Pete が挨拶
Client
Enter your name. > John Hello John! I'm Pete
Server(Pete)
Greeted John
Pete が挨拶してくれました。
Pete サービスを落とす
Ctrl+C で Pete を落とします。
Server(Pete)
^Cexit status 2
しばらくすると Brian が立ち上がった旨が表示されます。
Server(Brian)
Listen 8002
Brian が挨拶
Client
Enter your name. > Paul Hello Paul! I'm Brian
Server(Brian)
Greeted Paul
想定通り、 Brian が Pete に替わって挨拶してくれました!
まとめ
いかがでしたでしょうか。
今回の実装では実用レベルには程遠いですが、 ZooKeeper を使った Service Discovery の おおよそのイメージは掴んで頂けたのではと思います。
もう少しきちんと実装したうえでの実サービスでの導入も 妄想 検討しているので、
実現の際にはまた記事にしたいと思います。