この投稿は Advent Calendar 2018 の20日目の記事です。
先日掲載された 新卒エンジニアが初めてのVue.jsとFirebaseで社内サイネージを作った話 のバックエンド側の話になります。
はじめに
つい先日弊社ECサイト @cosme shopping にて弊社としては初の試みとなる年1回の大セール @cosme Beauty Day 2018 を開催しました。
当日はいくつかのトラブルはありましたが、1日あたりの過去最高の売り上げを記録しイベントの目標は達成することができました。
そんな中、イベントの経過を見守るためのツールとして売り上げ金額を社内サイネージに表示するシステムを開発したので、多少のエピソードを交えながらその話をしようと思います。
開発の依頼
開発の依頼は「売り上げをサイネージに表示するものを作って」の一言だけでした。
ディレクターがつくこともなく、明確にプロジェクトとして管理されることもなく、極秘プロジェクトでもないわけですが社内全体でもこのプロジェクトを知る人はごく一部だけのひっそりとした状況から開発はスタートしています。
そんなスタートの社内ツールなので、仮に開発がうまくいかなくてもイベント当日のユーザーさんの体験を損なうわけではないので、気軽に出来る開発と捉えてプレッシャーのない良い状態でスタートを切れました。
何をつくるか
気楽なスタートを切ったとは言え、これを作り上げるには以下のようなハードルがありました。
- あまり時間を掛けられない。
担当している他のプロジェクトの時間を割くことは出来ませんでした。 - 売り上げデータに関する知識がない。
担当外のため、売り上げデータを取り巻く環境や基盤を知っている人間がチームにいない。 - 売り上げデータに深く関わりのあるチームの協力は得られない。
ECサイトと決済基盤の担当はすでにイベントに向けての対応でスケジュールに1mmの空きもなかったため、何か頼み事をすることできない。
つまり、依頼を受けた段階で分かっていたことは「大きなことはできない」ということだけでした。
うりあげクン
今回のシステムは社内では「うりあげクン」という親しみを込めた名前で呼ばれることになりました。
語呂は似ていますが、某コンビニの人気おつまみとは一切関係ありません。
チームのメンバーを集めて最初にしたことはシステムの名前付けでした。
メンバー間での呼び方が「サイネージのやつ」「売り上げを表示するやつ」みたいに入り乱れていたので、プログラマが知るべき97のことのお気に入りのエッセイ 命を吹き込む魔法に倣って、真っ先に統一した呼び名をつけることにしました。
そこでついた名前が「うりあげクン」!!
まだどんなものを作るか具体的に決まっていない状態での名前付けながらも、最終的にはそのまま社内に浸透していきました。
うりあげクンの構成
フロント部分は先日の記事に書かれている通りなので、ここではバックエンドの話をしていきます。
弊社の売り上げデータの全てはデータベースに溜め込まれています。
が、イベント当日はデータベースの負荷が最大の懸念でもあったので、社内ツールごときが負荷を増やすわけにはいきません。
たまたまECサイトと決済基盤のシステム連携に Apache Kafka が導入されようとしており、やりとりされるKafkaメッセージに必要最低限の売り上げデータが含まれていることが分かったので、まずはKafkaのConsumerとしてサービスを作ることが決定しました。
直近で組織変更があり、チームメンバーが新しく抱えるプロジェクトにSpringで作られたサービスが多く、チームメンバーは未経験者がほとんどだったのでConsumer自体の実装は勉強がてら Spring Boot を採用しました。
また、リアルタイムに売り上げ金額をフロント画面に表示するための基盤として Firebase Realtime Database を採用しています。
最終的にはバックエンドの構成は以下のようになりました。
何の変哲もない至ってシンプルな構成です。
またKafkaでやりとりされる売り上げデータのメッセージフォーマットは以下のようなものでした。
(実際にはもっと複雑ですが、本記事向けにかなり簡素なものにしています)
{
"billing_id": "20181203-1200xxxxxxxxxxxxx", // 請求番号、先頭の数字は請求日時情報
"amount": 10000, // 売り上げ金額
}
実装の話
ようやくここから少しずつ具体的な実装の話に入っていきます。
全てを書くと結構な量になるためポイントのみを書いていますが、全体を通してとくに難しいことは一切していません。
Spring Boot
まずはSpringプロジェクトの雛形を SPRING INITIALIZR で作成します。
おおむね以下の内容でプロジェクトを作成しています。
- Spring Boot 2.0.5
- Gradle + Java 11
- Dependencies
- Lombok (ボイラープレートコードの自動生成)
- Spring Kafka (Spring for Apache Kafka)
その他、INITIALIZRでは追加できなかった依存関係を build.gradle
に追加していきます。
LombokについてはINITIALIZRのデフォルトのままでは正常に動作しなかったため、バージョンを明記するように変更しています。
dependencies {
...
implementation('com.google.firebase:firebase-admin:5.10.0')
compileOnly('org.projectlombok:lombok:1.18.2') # バージョン明記
}
Apache kafka Consumer
まずはKafkaのConsumerを実装していきます。
情報源としてググって出てきた実装サンプルでなんとなくの実装イメージを作り、あとは Spring Kafkaのリファレンス や Spring Kafka や Kafka Clients、 Auto Configure のソースコードをチラ見しながら実装していました。
Kafka Consumerの設定
まずはKafkaのトピックを購読するための設定を application.yml
に登録しておきます。
購読対象のKafkaサーバーやトピック、メッセージ購読の挙動を設定しています。
spring:
kafka:
bootstrap-servers:
- example.com:9092
client-id: uriage-consumer
template:
default-topic: invoice
consumer:
group-id: uriage-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
Kafka Consumerの実装
最初にKafkaのメッセージを表すエンティティクラスを作成します。
フィールドとその setter/getter
があるだけの何の変哲もないただのBeanです。
@ToString
@JsonIgnoreProperties(ignoreUnknown = true)
public class InvoiceMessage {
@Setter
@Getter
@JsonProperty("billing_id")
private String billingId;
@Setter
@Getter
private long amount;
}
このエンティティクラスをKafkaメッセージとして受け取るListenerの実装を行います。
ひとまずは、Kafkaメッセージを購読できているかどうかの確認をするだけなのでログ出力しか実装していません。
@KafkaListener(
id = "{spring.kafka.client-id}",
topics = "{spring.kafka.template.default-topic}",
errorHandler = "listenerErrorHandler"
)
public void subscribe(
InvoiceMessage invoiceMessage,
@Header(
name = KafkaHeaders.RECEIVED_MESSAGE_KEY,
required = false,
defaultValue = "undefined"
) String key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Header(KafkaHeaders.TIMESTAMP_TYPE) String timestampType
) {
log.info(
"key = {}, topic = {}, partition = {}, offset = {}, timestamp = {}, timestampType = {}, message = {}",
key, topic, partition, offset, Instant.ofEpochMilli(timestamp), timestampType, invoiceMessage
);
}
Producer側でKafkaメッセージにキーを指定していない場合、Listenerがエラーになるため、 KafkaHeaders.RECEIVED_MESSAGE_KEY
のみデフォルト値を設定しています。
では、上記のKafka Listenerの初期化処理を実装してみます。
@EnableKafka
@Configuration
public class KafkaConsumerInitializer {
@Autowired
private final KafkaConsumerProperties properties;
@Autowired
public KafkaConsumerInitializer(KafkaConsumerProperties properties) {
this.properties = properties;
}
@Bean
public ConsumerFactory<String, KafkaMessageEntity> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
properties.getProps(),
properties.getKeyDeserializer,
properties.getValueDeserializer
);
}
@Bean
public KafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String, InvoiceMessage>
> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, InvoiceMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
/**
* Listener実行中のエラーハンドラー
*/
@Bean
public ConsumerAwareListenerErrorHandler listenerErrorHandler() {
return (message, execption, consumer) -> {
MessageHeaders headers = message.getHeaders();
log.error(
"Kafka Listener error occurred : error message = {}",
execption.getMessage()
);
consumer.seek(
new TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)
),
headers.get(KafkaHeaders.OFFSET, Long.class) + 1
);
return null;
};
}
}
KafkaConsumerProperties
クラスはただ application.yml
のKafka設定を取得して返したり、値が無ければデフォルト値を返すようにしているだけのシンプルな実装なので割愛します。
内部的には org.springframework.boot.autoconfigure.kafka.KafkaProperties
を利用した委譲クラスのような構成になっています。
また、エラーハンドラーは今回の緩い要件では、メッセージの処理中にエラーが発生したとしてもConsumerとして他のKafkaメッセージの購読は継続させられるように、メッセージトピック内のオフセットは進めるようにしてみました。
厳格な商用サービスでは許容されないので真似はしないでください。
Deserializerの拡張
前述のKafka Consumerの初期化では特別なことはしていませんが、Kafkaメッセージのデシリアライズだけは手を加えています。
不正なフォーマットのKafkaメッセージを購読した際、 メッセージのデシリアライズに標準の org.springframework.kafka.support.serializer.JsonDeserializer
を使っていると例外が発生し、Consumerは強制終了してしまいます。
現システムにそんな問題があったわけではないのですが、今回は多少のデータの取りこぼしが発生しても致命的にはならないため、処理は継続しつつエラーログだけを出力するという緩い実装に変更しました。
public class SafeJsonDeserializer<T> extends JsonDeserializer<T> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public SafeJsonDeserializer() {
super();
}
public SafeJsonDeserializer(ObjectMapper objectMapper) {
super(objectMapper);
}
public SafeJsonDeserializer(Class<T> targetType) {
super(targetType);
}
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
try {
if (data == null || data.length == 0) {
throw new SerializationException("Can't deserialize data [ null ] from topic [" + topic + " ]");
}
return super.deserialize(topic, headers, data);
} catch (SerializationException e) {
log.error("Deserialize error [ {} ]", e.getMessage());
return null;
}
}
@Override
public T deserialize(String topic, byte[] data) {
// 省略
}
}
あとはこのクラスを先述の ConsumerFactory
の初期化時に登録しておくだけです。
以上でKafka Consumerの実装はだいたい完了しました。
Firebase Realtime Database
次はFirebase Realtime Databaseの実装に入っていきます。
Firebase WebコンソールでRealtime Databaseプロジェクトを作成
Firebase WebコンソールでFirebaseプロジェクトを作成し、データベースとしてFirebase Realtime Databaseを選択しておきます。
また、コンソール上でサービスアカウントの秘密鍵を生成し、 application.yml
にそのパスを設定しておきます。
spring:
# ...省略
firebase:
credential:
path: "/path/to/firebase-private-key.json"
Firebaseの初期化
秘密鍵の設定が済んだら、あとはその鍵ファイルを使ってFirebaseを初期化していきます。
まずはFirebaseの設定のバインディングクラスを実装し、
@ConfigurationProperties(prefix = "spring.firebase")
public class FirebaseProperties {
@Getter
private final Credential credential = new Credential();
public static class Credential {
@Setter
@Getter
private String path;
}
}
あとは鍵ファイルを読み込んで、
@Configuration
@EnableConfigurationProperties(FirebaseProperties.class)
public class FirebaseCredential {
private final FirebaseProperties properties;
private GoogleCredentials credentials;
public FirebaseCredential(FirebaseProperties properties) {
this.properties = properties;
}
@PostConstruct
public synchronized void initialize() {
if (credentials != null) {
return;
}
try (InputStream serviceAccount = new FileInputStream(properties.getCredential().getPath())) {
credentials = GoogleCredentials.fromStream(serviceAccount);
} catch (IOException e) {
throw new GoogleCredentialsInitializeException(e.getMessage(), e.getCause());
}
}
@Bean
public GoogleCredentials getCredentials() {
return credentials;
}
}
Firebaseを初期化するだけです。
@Component
public class FirebaseDataSource {
private final FirebaseProperties properties;
private final GoogleCredentials credentials;
private FirebaseApp app;
@Autowired
public FirebaseDataSource(
FirebaseProperties properties,
GoogleCredentials credentials
) {
this.properties = properties;
this.credentials = credentials;
}
@PostConstruct
public synchronized void init() {
if (app != null) {
return;
}
FirebaseOptions options = new FirebaseOptions.Builder()
.setCredentials(credentials)
.setDatabaseUrl("https://xxxx.firebaseio.com")
.build();
this.app = FirebaseApp.initializeApp(options);
}
@Bean
@Scope("prototype")
public FirebaseDatabase getDatabaseClient() {
return FirebaseDatabase.getInstance(this.app);
}
}
ここでのポイントは、 @PostConstruct
アノテーションを使って、DIコンテナへのクラス登録時に鍵情報やFirebaseを初期化している箇所です。
これでSpringアプリケーションのライフサイクルの中で一度だけ初期化が実行されるようにしました。
(明示的にメソッドを呼び出すこともできますが、2回目以降の初期化処理はスキップされます)
Firebaseの初期化はこれだけで完了です。
とても簡単ですね!!
データのストア
あとはConsumerで購読したKafkaメッセージから売り上げ金額を集計し、最終的にはFirebaseに保存していくだけです。
売り上げ金額の集計ロジックは後述します。
ここではKafkaから受け取ったメッセージ InvoiceMessage
クラスから売り上げ集計ロジックを通るまでに Invoice
クラスに集計に必要なデータを暗黙的に詰め替えています。
クラスから金額を取得する部分ではどちらも差がないので詳細は省きます。
なお、下記の実装では以下のようなドキュメントをFirebaseに保存することになります!!
// Firebaseドキュメント
{
"sales": {
"total_sales": {
"amount": 1234567890, // 売り上げ金額
"orders": 1000 // 注文数
}
}
}
それではRepositoryクラスの実装を見ていきます。
Firebaseのデータベース参照 DatabaseReference
を取得し、その参照に対して開始したトランザクションの中で売り上げデータの更新を行っています。
イベント当日には多数の注文が見込まれるため、それぞれの更新が同時に実行されることで売り上げデータが破損しないようにトランザクションでケアする必要がありました。
トランザクションの中で現在の値を TotalSalesFirebaseEntity
オブジェクト経由で取得し、そこに Invoice
から取得した売り上げ金額を加算し、再登録を行っています。
public class FirebaseSalesRepository {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final FirebaseDataSource dataSource;
@Autowired
public FirebaseSalesRepository(FirebaseDataSource dataSource) {
this.dataSource = dataSource;
}
public void store(Invoice invoice) {
DatabaseReference ref = dataSource.getDatabaseClient().getReference("sales");
ref.child("total_sales").runTransaction(
new Transaction.Handler() {
@Override
public Transaction.Result doTransaction(MutableData currentData) {
TotalSalesFirebaseEntity entity = currentData.getValue(TotalSalesFirebaseEntity.class);
// 初めての登録処理
if (entity == null) {
currentData.setValue(
new TotalSalesFirebaseEntity(invoice.getProductsPrice().getValue(), 1)
);
return Transaction.success(currentData);
}
// 2回目以降の登録処理
currentData.setValue(
new TotalSalesFirebaseEntity(
entity.getAmount() + invoice.getProductsPrice().getValue(),
entity.getNumOfOrders() + 1
)
);
return Transaction.success(currentData);
}
@Override
public void onComplete(DatabaseError error, boolean committed, DataSnapshot currentData) {
if (error != null || committed == false) {
log.error(
"Total sales transaction failed abnormally: error = {}, committed = {}, invoice = {}",
error, committed, invoice
);
return;
}
log.info("Transaction completed [{}]", invoice);
}
}
);
}
}
TotalSalesFirebaseEntity
クラスは以下のような実装となっています。
public class TotalSalesFirebaseEntity {
/**
* Public fields needed by Firebase
*/
@Setter(onMethod = @__({ @Exclude }))
@Getter(onMethod = @__({ @Exclude }))
@PropertyName("amount")
public long amount;
@Setter(onMethod = @__({ @Exclude }))
@Getter(onMethod = @__({ @Exclude }))
@PropertyName("orders")
public long numOfOrders;
public TotalSalesFirebaseEntity() {
// Needed by Firebase
}
public TotalSalesFirebaseEntity(@Nonnull long amount, @Nonnull long numOfOrders) {
this.amount = amount;
this.numOfOrders = numOfOrders;
}
}
引数なしのコンストラクタはFirebaseの規約上必須です。
また、各インスタンスのフィールドは private
ではなく、 public
にしておく必要があります。
各フィールドの onMethod = @__({ @Exclude })
は、Lombokによって自動生成される setter/getter
に com.google.firebase.database.Exclude
アノテーションを付与する記述です。
FirebaseではどうやらBeanクラスの public
なフィールド名や setter/getter
名からFirebaseドキュメントのパラメーターを自動でマッピングされるようです。
通常のLombokの挙動だと
public long getNumOfOrders() {
return numOfOrders;
}
という getter
が自動生成されることで、Firebaseのオブジェクトマッピング処理によって numOfOrders
というプロパティが自動で定義されてしまいます。
Exclude
アノテーションを付与しない場合、以下のようなFirebaseドキュメントがオプジェクトマッピング処理によって自動生成されてしまいます。
{
"sales": {
"total_sales": {
"amount": 1234567890,
"orders": 1000,
"numOfOrders": 1000 // 意図せず自動生成されるパラメーター
}
}
}
この挙動を抑止するのが @Exclude
アノテーションの役割です。
Lombokのメソッド自動生成処理の中で明示的に @Exclude
を付与することで意図しないパラメーターが生成されないようにしています。
これでFirebaseにデータを登録する処理は完成しました。
売り上げ集計ロジック
残るは Kafkaメッセージの購読部分とFirebaseへのデータ登録部分を繋ぐだけですが、ここは至ってシンプルです。
Kafkaメッセージから生成された Invoice
オブジェクトがBeauty Dayのセール集計対象かどうかを判定し、セール対象であればFirebaseの売り上げデータに加算していくだけのものになります。
@Service
public final class BeautyDaySalesCalculateService {
private final FirebaseSalesRepository salesRepository;
@Autowired
public BeautyDaySalesCalculateService(FirebaseSalesRepository salesRepository) {
this.salesRepository = salesRepository;
}
public void run(Invoice invoice) {
if (invoice.isBeautyDaySales() == false) {
return;
}
salesRepository.store(invoice);
}
}
invoice.isBeautyDaySales()
では、注文の日時(当日以外の一部商品の事前予約販売期間)や売れた商品などいくつかの情報からBeauty Dayセール対象であるかどうかを判定しています。
詳細は本記事の趣旨ではないため割愛いたします。
Serviceクラスが完成すれば、あとはKafka ListenerからServiceクラスの呼び出しを行うだけです。
@Autowired
private BeautyDaySalesCalculateService service;
@KafkaListener(...)
public void subscribe(
InvoiceMessage invoiceMessage,
...
) {
log.info(...);
service.run(Invoice.of(invoiceMessage));
}
ここまで実装してしまえば、あとはSpring Bootアプリケーションを起動するだけです。
Consumerが起動することで、Kafkaメッセージの購読が始まり、売り上げデータの集計処理が開始されます。
まとめ
長々と書いてきましたがいかがだったでしょうか?
1つ1つ見ていくと特別に難しい実装はなかったかと思います。
実際のうりあげクンは、Firebaseのスキーマに総売り上げだけでなくDailyの売り上げを集計していたり、Kafkaメッセージのもっと詳細なパラメーターを取り扱っていたりと、本記事の説明よりももう少し複雑な実装になっています。けれど、使い切りのツールということもあって手抜きはしましたが、Spring BootやFirebaseを使った開発は初めてだったにも関わらず実質3-4日程度で実装は終わっています。
もともと時間を掛けられないプロジェクトだったにも関わらず、短期間でリアルタイムな売り上げ表示を実現できたのはSpring Bootの生産性の高さとFirebaseのお手軽さのおかげです。
実は開発当初は新しい世代のFirebase Cloud Firestoreを採用していましたが、テスト用のKafka Producerを作成しFirestoreのパフォーマンスを計測したところ、トランザクションが有効な状態で約70件/secの書き込みでデータ更新の取りこぼしが発生してしまいました。
(良くドキュメントを見たらトランザクションが有効な場合、同じデータには1件/secの書き込み性能しか保証されていませんでした…)
急遽Firebase Realtime Databaseで作り直しパフォーマンスを計測したところ、瞬間的には約5000件/secでも取りこぼしなく売り上げデータの集計ができていることを確認しました。
これだけの性能が出ればイベント当日でも問題なしと判断し、1プロセスのConsumerだけで本番に臨んでいます。
結果、当日は何の問題もなく売り上げ金額をリアルタイムにアイスタイルの社員に届けることができました!!
※ Firebase Realtime Databaseは1000write/sec以上の運用はシャーディングによるスケールアウトが推奨されています。
社内の反応
イベントの売り上げ状況を確認するための補助程度の気持ちでいましたが、実際イベントを迎えると社内での注目度は高く、金額の桁が上がる瞬間などはみんなでサイネージを見ながら盛り上がるなど、かなりの反響がありました。
けっこう適当に作ってしまったサービスですが、イベント中の社員の一体感の形成に大きく貢献できたのはうれしい誤算でした。
こうして自分のサービスが実際に使われ、喜んでもらえるという経験はサービス開発の醍醐味ですね!!
来年に向けて
来年もBeauty Dayは開催される予定です。
こちらのプレスリリースのコメントにもあるように、 今回は日本だけが対象でしたが次回以降は グローバル展開
も視野に入れています。
そうなると、
- 通貨の違い
- 時差
などを考慮しないといけなくなり今回のように単純にはいけそうにありません。
そして、なによりも大きく変わるのは社内の期待ではないかと思います。
今回はひっそりとほとんどの人に知られることなく開発していたのでプレッシャーは何も感じていませんでした。
けれど、これだけ社内の注目を浴びてしまったからには次回は気楽にはいけないなと…
とりあえず、次回はさらに進化したうりあげクンを作れるようにこれからも技術力に磨きを掛けていこうと思います!
最後までお付き合い頂きありがとうございました!!