デブサミ2018 [Apache Kafkaによるスケーラブルアプリケーション開発] で登壇してきました #devsumi

ytakeです

2月15日、16日と開催されたDevelopers Summit 2018 「デブサミ」にて、
「Apache Kafkaによるスケーラブルアプリケーション開発」という講演で、
弊社で導入しているApache Kafkaについて話しました。

Apache Kafkaについての発表は、去年のbuilderscon2017でも一部取り上げていたテーマで、
去年のPHPカンファレンス2017の内容からPHPでの利用ポイントを省き、
Kafka Streamを追加してお話させていただきました。

弊社ではログなどの分析基盤のほか、
アプリケーションのメッセージブローカーとしても実際に利用しており、
今後もいろんなアプリケーションで活用されていくでしょう!、という状況です。

今回は発表内容にある実装例を簡単に紹介していきます。

Producer

Producerのサンプルとして、今回Goで実装したサンプルコードを用意しました。
ProducerはKafkaに対してメッセージを送信するクライアントでしかないため、
どんな言語で実装しても構いません。

istyle-inc/go-example-developers-summit

GoのApache Kafkaクライアントといえば、 sarama を利用することが多いですが、
サンプルでは折角なのでconfluentのライブラリを利用してみました。 confluent-kafka-go

これを利用するには、librdkafkaのインストールが必要になりますので、
事前にビルドしておきましょう。

$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ ./configure --prefix /usr
$ make
$ sudo make install

Goからの利用自体は非常にシンプルで簡単に組み込むことができます

func (kc *KafkaClient) PrepareProducer() {
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers":            *kc.BootstrapServers,
        "broker.address.family":        "v4",
        "queue.buffering.max.messages": "1000",
        "client.id":                    "testingClient",
    })
    if err != nil {
        panic(err)
    }

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    kc.Logger.Info(fmt.Sprintf("Delivery failed: %v\n", ev.TopicPartition))
                } else {
                    kc.Logger.Info(fmt.Sprintf("Delivered message to %v\n", ev.TopicPartition))
                }
            }
        }
    }()
}

サンプルではjsonで作ったサンプルのアプリケーションデータを送信するようになっています。

func (kc *KafkaClient) ProduceMessages(msgs []string) {
    topic := KafkaTopic
    p := kc.ClientConnector
    for _, msg := range msgs {
        p.Produce(
            &kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                Value:          []byte(msg),
            }, nil)
    }
    p.Flush(15 * 1000)
}

動かす場合は環境に合わせて変更してください
https://github.com/istyle-inc/go-example-developers-summit/blob/master/config/config.go

ログの送信、というよりも実際のアプリケーションで利用するようなサンプルデータをjsonで送信されます。

Kafka Streams

Kafka StreamsはKafkaのtopicに対して、ストリーム処理を実行させることができます。
topicにメッセージが格納されたら、処理を実行し、結果を他のtopicに格納する、という具合で、
受信するConsumerで実行するのではなく、Kafkaだけで完結する機能で、
弊社でも受信したデータをさまざなtopicに分割したり、
受信したメッセージに値を付け加えたり、といった処理で利用しています。

Kafkaだけで簡単な集計などもできますので、アプリケーションや分析処理では重宝する場面も多いのではないかと思います。

Kafka Streamsの簡単なサンプルも今回用意しました。
Producerのサンプルと合わせて利用することができます。

実装内容を簡単に紹介します。

developers-summit-kafka-streams

といっても処理自体は非常に簡単です。

package jp.co.istyle

import java.util.Properties

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, ValueMapper}

object ElementAppenderStreamApplication {
  def main(args: Array[String]) {

    println("Kafka Streams Sample.")

    val config: Properties = {
      val prop = new Properties()
      // 引数で指定した*.properties読み込み
      prop.load(new java.io.FileInputStream(args(0).toString))
      prop.put(StreamsConfig.APPLICATION_ID_CONFIG, prop.getProperty("sample.app_id"))
      prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getProperty("sample.bootstrap.servers"))
      prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
      prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
      // exactly once
      prop.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)
      prop
    }

    val ft: String = config.getProperty("sample.stream.topic")
    val tt: String = config.getProperty("sample.streamed.topic")

    println("stream topic: from " + ft)
    println("to " + tt)

    val stringSerde: Serde[String] = Serdes.String()
    val builder: StreamsBuilder = new StreamsBuilder
    val rawStream: KStream[String, String] = builder.stream(ft)
    val mapStream: KStream[String, String] = rawStream.mapValues(new ValueMapper[String, String] {
      override def apply(value: String): String = new ElementAppender(value).append()
    })
    mapStream.to(stringSerde, stringSerde, tt)
    val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
    streams.start()
  }
}

mainはKafka Streamsで実行されるメソッドで、
実行時のコマンド引数には args でアクセスすることができます。
サンプルでは、引数でプロパティファイルを指定して、実行対象のtopicなどを変更できるようにしています。

サンプルは、topicに格納されたメッセージすべてに、趣味 “Developers Summit” を追記するようになっています。

package jp.co.istyle

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.write

class ElementAppender(messageBody: String) {

  implicit val formats = DefaultFormats

  def append():String = {
    val js = parse(messageBody)
    val v = js.asInstanceOf[JObject]
    write(js.replace(List("Interests"), JString(new String((v \ "Interests").values.toString + ",Developers Summit"))))
  }
}

実装したコードは sbt assembly でjarファイルを生成しますので、
これを kafka-run-class コマンドで実行させます。

実行すると、前述したサンプルProducerからの送信されたメッセージに対して処理が実行されます。
両サンプルを実行すると、Apache Kafkaの機能がわかるようになっています。

これにさらにKafka Connectを使うことで、Elasticsearchなどにメッセージを転送して、
検索などが可能になります。

Kafka Connect

confluentを利用している環境であれば、Elasticsearchへの接続は簡単です。

たとえば、 /etc/schema-registry/elasticsearch-connect.properties として
下記のファイルを作成します。

bootstrap.servers=192.168.10.10:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

rest.port=8093
plugin.path=/usr/share/java/

converterをjsonConverterにし、topicのメッセージをそのままelasticsearchに挿入するようになります。

次にelasticsearchへの接続情報を /etc/kafka-connect-elasticsearch/sample-es.properties に記述します。

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=sample.append_employees
key.ignore=true
connection.url=http://localhost:9200

schema.ignore=true
type.name=kafka-connect

上記のものは、先に紹介したKafka Streamsのサンプルに合わせてありますので、
そのまま実行するとストリーム処理後のデータがelasticsearchのindexに格納され、
様々なアプリケーション(例えばwebサービスや管理画面など)からデータの検索が行えるようになります。

これらのミドルウェアを組み合わせることによって、
Event Sourcing + CQRSを取り入れるヒントになるのではないかと思います。

弊社ではDatabaseのデータをConsumerで結合し、
パーティションで分散させてCassandraに格納、という流れで利用しており、
このほかにも現在は、発表でも少し触れたKappa Archiectureライクに
Apache Kafka -> Spark Streaming -> Apache Cassandraで分析処理などを実装しています。

PHPアプリケーションで利用する場合のサンプルは、PHPカンファレンス2017でも簡単に紹介したサンプルを参照ください。

ytake/laravel-presto-kafka-demo

このサンプルでは、PHPアプリケーション(Laravel) からApache Kafkaにメッセージを送信し、
elasticsearch経由でPHPアプリケーションから検索する、というサンプルと、
Apache Kakfaのtopicを Presto で他のデータベースと結合し、
様々なデータを出力する例を紹介しています。

おまけのPresto with PHP

今回、Prestoのお話も少しさせていただきました。
PrestoはREST APIを利用してデータ操作が可能となっており、
拙作のライブラリで簡単に操作ができるようになっています。

ytake/php-presto-client

LaravelにPrestoを組み込んでみよう

Eloquentなどを使わずとも簡単に利用することができます。
もっとも簡単な例を紹介します。

ServiceProvider

設定ファイルは config/presto.php として下記のように記述します

<?php
return [
    'connections' => [
        'presto_test' => [
            'host'    => 'http://127.0.0.1:8080/',
            'catalog' => 'default',
        ],
    ],
];

次にサービスプロバイダでインスタンスの生成方法を指定します。

<?php
declare(strict_types=1);

namespace App\Providers;

use Illuminate\Foundation\Application;
use Illuminate\Support\ServiceProvider;
use Ytake\PrestoClient\ClientSession;

/**
 * Class AppServiceProvider
 */
class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        $this->app->bind(ClientSession::class, function (Application $app) {
            $prestoConfig = $app['config']->get('presto');
            $connectionConfig = $prestoConfig['connections']['presto_test'];
            return new ClientSession($connectionConfig['host'], $connectionConfig['catalog']);
        });
    }
}

最後にクライアントクラスを作成します。

<?php
declare(strict_types=1);

namespace App\Foundation\Presto;

use Ytake\PrestoClient\FixData;
use Ytake\PrestoClient\ClientSession;
use Ytake\PrestoClient\ResultsSession;
use Ytake\PrestoClient\StatementClient;

/**
 * Class PrestoClient
 */
class PrestoClient
{
    /** @var ClientSession */
    protected $session;
    /**
     * PrestoClient constructor.
     *
     * @param ClientSession $session
     */
    public function __construct(ClientSession $session)
    {
        $this->session = $session;
    }
    /**
     * @param string $query
     *
     * @return array
     */
    public function query(string $query): array
    {
        $result = [];
        $client = new StatementClient($this->session, $query);
        $resultSession = new ResultsSession($client);
        $yieldResult = $resultSession->execute()->yieldResults();
        /** @var \Ytake\PrestoClient\QueryResult $row */
        foreach ($yieldResult as $row) {
            foreach ($row->yieldData() as $yieldRow) {
                if ($yieldRow instanceof FixData) {
                    $result[] = $yieldRow;
                }
            }
        }
        return $result;
    }
}

これでコンストラクタの型宣言で \App\Foundation\Presto\PrestoClient クラスがあれば、
Laravelがインスタンス生成を行い、アプリケーションから利用可能になります。

処理速度や負荷でユーザーに提供するようなサービスではPrestoを使うのは難しいですが、
サービスの成長に不可欠なデータ分析などでは重宝しますので、
管理ツールなどで重宝することができると思いますので、

さいごに

今回のデブサミでは、みなさんのアプリケーション開発、サービス成長に役立つ例を紹介できたのではないかと思います。
ぜひチャレンジしてみてください!

最後の最後に

2018年3月8日、長らくお待たせした Laravel Meetup Tokyo Vol.10 を弊社で開催します。
10回目で PHPerKaigi の前日という素晴らしいPHP Week!
今回は特別に転職ドラフトさまがスポンサーとなり、なんとあのビールが提供されます!!!!
お楽しみに!

2018年3月9日、10日に開催される PHPerKaigi にて、弊社エンジニアの久保田とわたくし竹澤(ytake)が登壇します。
両名とも弊社の一部サービスで導入しているHHVM/Hackについてお話しします。

竹澤有貴 (@ex_takezawaHackで作るマイクロフレームワーク 2018/03/10(土) 13:50
久保田賢二朗 HackのAsyncCurlで死んだ話 2018/03/10(土) 16:30

またPHPerKaigiの二日目、Track Bで開催されるInteractive Round TableではLaravel相談会を竹澤が担当します。
弊社でも比較的大きな規模のアプリケーションでLaravelを使っていたり、
自分自身もライブラリを公開していたりしますので、
設計技法やチーム開発でのノウハウなど質問していただいても構いません。
ぜひ、気軽にご参加下さい!!!

そしてそして、 HHVM/Hack勉強会 Vol.1
弊社でも導入しだしたこともあり、いろんなお話も聞ければと思い開催します!!

そんなHackやApache Kafkaや、Apache Sparkを使った分析処理、
Cassandraなどを使ったアプリケーション開発(Go、Scala)を経験しててみたいエンジニアを募集中です!

アイスタイルCTO 開発で好んで使う言語はPHP, Hack, Go, Scala 好きな財団はApacheソフトウェア財団 github twitter