ytakeです
2月15日、16日と開催されたDevelopers Summit 2018 「デブサミ」にて、
「Apache Kafkaによるスケーラブルアプリケーション開発」という講演で、
弊社で導入しているApache Kafkaについて話しました。
Apache Kafkaについての発表は、去年のbuilderscon2017でも一部取り上げていたテーマで、
去年のPHPカンファレンス2017の内容からPHPでの利用ポイントを省き、
Kafka Streamを追加してお話させていただきました。
弊社ではログなどの分析基盤のほか、
アプリケーションのメッセージブローカーとしても実際に利用しており、
今後もいろんなアプリケーションで活用されていくでしょう!、という状況です。
今回は発表内容にある実装例を簡単に紹介していきます。
Producer
Producerのサンプルとして、今回Goで実装したサンプルコードを用意しました。
ProducerはKafkaに対してメッセージを送信するクライアントでしかないため、
どんな言語で実装しても構いません。
istyle-inc/go-example-developers-summit
GoのApache Kafkaクライアントといえば、 sarama を利用することが多いですが、
サンプルでは折角なのでconfluentのライブラリを利用してみました。 confluent-kafka-go
これを利用するには、librdkafkaのインストールが必要になりますので、
事前にビルドしておきましょう。
$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ ./configure --prefix /usr
$ make
$ sudo make install
Goからの利用自体は非常にシンプルで簡単に組み込むことができます
func (kc *KafkaClient) PrepareProducer() {
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": *kc.BootstrapServers,
"broker.address.family": "v4",
"queue.buffering.max.messages": "1000",
"client.id": "testingClient",
})
if err != nil {
panic(err)
}
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
kc.Logger.Info(fmt.Sprintf("Delivery failed: %v\n", ev.TopicPartition))
} else {
kc.Logger.Info(fmt.Sprintf("Delivered message to %v\n", ev.TopicPartition))
}
}
}
}()
}
サンプルではjsonで作ったサンプルのアプリケーションデータを送信するようになっています。
func (kc *KafkaClient) ProduceMessages(msgs []string) {
topic := KafkaTopic
p := kc.ClientConnector
for _, msg := range msgs {
p.Produce(
&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(msg),
}, nil)
}
p.Flush(15 * 1000)
}
動かす場合は環境に合わせて変更してください
https://github.com/istyle-inc/go-example-developers-summit/blob/master/config/config.go
ログの送信、というよりも実際のアプリケーションで利用するようなサンプルデータをjsonで送信されます。
Kafka Streams
Kafka StreamsはKafkaのtopicに対して、ストリーム処理を実行させることができます。
topicにメッセージが格納されたら、処理を実行し、結果を他のtopicに格納する、という具合で、
受信するConsumerで実行するのではなく、Kafkaだけで完結する機能で、
弊社でも受信したデータをさまざなtopicに分割したり、
受信したメッセージに値を付け加えたり、といった処理で利用しています。
Kafkaだけで簡単な集計などもできますので、アプリケーションや分析処理では重宝する場面も多いのではないかと思います。
Kafka Streamsの簡単なサンプルも今回用意しました。
Producerのサンプルと合わせて利用することができます。
実装内容を簡単に紹介します。
developers-summit-kafka-streams
といっても処理自体は非常に簡単です。
package jp.co.istyle
import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, ValueMapper}
object ElementAppenderStreamApplication {
def main(args: Array[String]) {
println("Kafka Streams Sample.")
val config: Properties = {
val prop = new Properties()
// 引数で指定した*.properties読み込み
prop.load(new java.io.FileInputStream(args(0).toString))
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, prop.getProperty("sample.app_id"))
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getProperty("sample.bootstrap.servers"))
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
// exactly once
prop.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)
prop
}
val ft: String = config.getProperty("sample.stream.topic")
val tt: String = config.getProperty("sample.streamed.topic")
println("stream topic: from " + ft)
println("to " + tt)
val stringSerde: Serde[String] = Serdes.String()
val builder: StreamsBuilder = new StreamsBuilder
val rawStream: KStream[String, String] = builder.stream(ft)
val mapStream: KStream[String, String] = rawStream.mapValues(new ValueMapper[String, String] {
override def apply(value: String): String = new ElementAppender(value).append()
})
mapStream.to(stringSerde, stringSerde, tt)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
}
}
mainはKafka Streamsで実行されるメソッドで、
実行時のコマンド引数には args でアクセスすることができます。
サンプルでは、引数でプロパティファイルを指定して、実行対象のtopicなどを変更できるようにしています。
サンプルは、topicに格納されたメッセージすべてに、趣味 “Developers Summit” を追記するようになっています。
package jp.co.istyle
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.write
class ElementAppender(messageBody: String) {
implicit val formats = DefaultFormats
def append():String = {
val js = parse(messageBody)
val v = js.asInstanceOf[JObject]
write(js.replace(List("Interests"), JString(new String((v \ "Interests").values.toString + ",Developers Summit"))))
}
}
実装したコードは sbt assembly
でjarファイルを生成しますので、
これを kafka-run-class コマンドで実行させます。
実行すると、前述したサンプルProducerからの送信されたメッセージに対して処理が実行されます。
両サンプルを実行すると、Apache Kafkaの機能がわかるようになっています。
これにさらにKafka Connectを使うことで、Elasticsearchなどにメッセージを転送して、
検索などが可能になります。
Kafka Connect
confluentを利用している環境であれば、Elasticsearchへの接続は簡単です。
たとえば、 /etc/schema-registry/elasticsearch-connect.properties
として
下記のファイルを作成します。
bootstrap.servers=192.168.10.10:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
rest.port=8093
plugin.path=/usr/share/java/
converterをjsonConverterにし、topicのメッセージをそのままelasticsearchに挿入するようになります。
次にelasticsearchへの接続情報を /etc/kafka-connect-elasticsearch/sample-es.properties
に記述します。
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=sample.append_employees
key.ignore=true
connection.url=http://localhost:9200
schema.ignore=true
type.name=kafka-connect
上記のものは、先に紹介したKafka Streamsのサンプルに合わせてありますので、
そのまま実行するとストリーム処理後のデータがelasticsearchのindexに格納され、
様々なアプリケーション(例えばwebサービスや管理画面など)からデータの検索が行えるようになります。
これらのミドルウェアを組み合わせることによって、
Event Sourcing + CQRSを取り入れるヒントになるのではないかと思います。
弊社ではDatabaseのデータをConsumerで結合し、
パーティションで分散させてCassandraに格納、という流れで利用しており、
このほかにも現在は、発表でも少し触れたKappa Archiectureライクに
Apache Kafka -> Spark Streaming -> Apache Cassandraで分析処理などを実装しています。
PHPアプリケーションで利用する場合のサンプルは、PHPカンファレンス2017でも簡単に紹介したサンプルを参照ください。
ytake/laravel-presto-kafka-demo
このサンプルでは、PHPアプリケーション(Laravel) からApache Kafkaにメッセージを送信し、
elasticsearch経由でPHPアプリケーションから検索する、というサンプルと、
Apache Kakfaのtopicを Presto で他のデータベースと結合し、
様々なデータを出力する例を紹介しています。
おまけのPresto with PHP
今回、Prestoのお話も少しさせていただきました。
PrestoはREST APIを利用してデータ操作が可能となっており、
拙作のライブラリで簡単に操作ができるようになっています。
LaravelにPrestoを組み込んでみよう
Eloquentなどを使わずとも簡単に利用することができます。
もっとも簡単な例を紹介します。
ServiceProvider
設定ファイルは config/presto.php
として下記のように記述します
<?php
return [
'connections' => [
'presto_test' => [
'host' => 'http://127.0.0.1:8080/',
'catalog' => 'default',
],
],
];
次にサービスプロバイダでインスタンスの生成方法を指定します。
<?php
declare(strict_types=1);
namespace App\Providers;
use Illuminate\Foundation\Application;
use Illuminate\Support\ServiceProvider;
use Ytake\PrestoClient\ClientSession;
/**
* Class AppServiceProvider
*/
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*
* @return void
*/
public function register()
{
this->app->bind(ClientSession::class, function (Applicationapp) {
prestoConfig =app['config']->get('presto');
connectionConfig =prestoConfig['connections']['presto_test'];
return new ClientSession(connectionConfig['host'],connectionConfig['catalog']);
});
}
}
最後にクライアントクラスを作成します。
<?php
declare(strict_types=1);
namespace App\Foundation\Presto;
use Ytake\PrestoClient\FixData;
use Ytake\PrestoClient\ClientSession;
use Ytake\PrestoClient\ResultsSession;
use Ytake\PrestoClient\StatementClient;
/**
* Class PrestoClient
*/
class PrestoClient
{
/** @var ClientSession */
protected session;
/**
* PrestoClient constructor.
*
* @param ClientSessionsession
*/
public function __construct(ClientSession session)
{this->session = session;
}
/**
* @param stringquery
*
* @return array
*/
public function query(string query): array
{result = [];
client = new StatementClient(this->session, query);resultSession = new ResultsSession(client);yieldResult = resultSession->execute()->yieldResults();
/** @var \Ytake\PrestoClient\QueryResultrow */
foreach (yieldResult asrow) {
foreach (row->yieldData() asyieldRow) {
if (yieldRow instanceof FixData) {result[] = yieldRow;
}
}
}
returnresult;
}
}
これでコンストラクタの型宣言で \App\Foundation\Presto\PrestoClient
クラスがあれば、
Laravelがインスタンス生成を行い、アプリケーションから利用可能になります。
処理速度や負荷でユーザーに提供するようなサービスではPrestoを使うのは難しいですが、
サービスの成長に不可欠なデータ分析などでは重宝しますので、
管理ツールなどで重宝することができると思いますので、
さいごに
今回のデブサミでは、みなさんのアプリケーション開発、サービス成長に役立つ例を紹介できたのではないかと思います。
ぜひチャレンジしてみてください!
最後の最後に
2018年3月8日、長らくお待たせした Laravel Meetup Tokyo Vol.10 を弊社で開催します。
10回目で PHPerKaigi の前日という素晴らしいPHP Week!
今回は特別に転職ドラフトさまがスポンサーとなり、なんとあのビールが提供されます!!!!
お楽しみに!
2018年3月9日、10日に開催される PHPerKaigi にて、弊社エンジニアの久保田とわたくし竹澤(ytake)が登壇します。
両名とも弊社の一部サービスで導入しているHHVM/Hackについてお話しします。
竹澤有貴 (@ex_takezawa) Hackで作るマイクロフレームワーク 2018/03/10(土) 13:50
久保田賢二朗 HackのAsyncCurlで死んだ話 2018/03/10(土) 16:30
またPHPerKaigiの二日目、Track Bで開催されるInteractive Round TableではLaravel相談会を竹澤が担当します。
弊社でも比較的大きな規模のアプリケーションでLaravelを使っていたり、
自分自身もライブラリを公開していたりしますので、
設計技法やチーム開発でのノウハウなど質問していただいても構いません。
ぜひ、気軽にご参加下さい!!!
そしてそして、 HHVM/Hack勉強会 Vol.1 、
弊社でも導入しだしたこともあり、いろんなお話も聞ければと思い開催します!!
そんなHackやApache Kafkaや、Apache Sparkを使った分析処理、
Cassandraなどを使ったアプリケーション開発(Go、Scala)を経験しててみたいエンジニアを募集中です!