はじめに
こんにちは!
アイスタイルAdvent Calender2020の13日目の記事を担当するsugatoです。
去年のアドカレは、matplotlibでクチコミデータの推移を紙芝居形式で変化させて遊んでみました。
matplotlibを利用して、20年間のクチコミデータを動的に可視化する
簡単に自己紹介すると、現在は新設されたデータベース統括センターという部署で、全社横断的にデータ分析のための集計や、BIツールとしてTableauの推進を進めていくということをやっています。
アイスタイルが運営しているアットコスメは、1999年に開始されて21年目を迎えているサービスです。
当時はSNSが栄えていなかったので、化粧品を利用したユーザーのダイレクトな声が分かる、という意味で貴重な役割を担っていたと思います。
現在は状況が変わり、気軽に商品の感想をSNSで発信する、という機会も増えていると思います。
そこで「SNSのデータでちょっと遊んでみたいな」と思ったので、アドカレのネタにしてみました。
今回やったことは何かの他ツールでも可視化できると思います。
本当は社内のサービスデータとも組み合わせたかったのですが、行けずじまいでした。
いわゆるやってみた系の記事に近いので、気軽にご覧下さい!
Twitter APIに関して
今回、個人アカウントでTwitter APIを利用しましたが、申請方法はいろんな記事があると思うので割愛します。
また、なぜTwitterにしたのか?というと僕が使い慣れていた、というだけです。
化粧品の利用ユーザーを考えたらインスタの方が良かったなと後悔してます。。笑
個人だと無料で使える分、取ってこれるデータの種類や、APIの利用上限が結構違います。
単語の検索等をする場合に利用できる、Search Tweets
アカウント単位でツイート内容を取得できる、Get Tweet timelines
例えば、「Standard search API」を利用した場合、1回に取得できるツイート数は100件です。
また直近1週間しか遡れず、データ取得時にリツイートやいいねの上限などを設定することもできません。
最初はリツイートやいいねの数が多い、コスメ関係のツイートを取得
そこから拡散される傾向などが見れたらいいな、ぐらいに考えてましたが頓挫しました。笑
アカウント単位で取得すると過去分も取得できるため、今回は@cosme公式(アットコスメ)のツイートを取得しています。
TwitterのデータをBigQueryに定期投入
今回、データをBigQueryに送るにあたり、こちらの記事を参考にさせてもらいました。
PythonでTwitterスクレイピング&データフレーム化
BQに投入したパーティションテーブルの内容
id | text | user | fav | retweet | create_date | execute_time |
---|---|---|---|---|---|---|
tweetのid | tweet内容 | user名 | いいね数 | リツイート数 | ツイート時刻 | バッチ実行時刻 |
こう見ると時間のデータに対して、dateでカラム名を作ってるのが適当過ぎましたね..
BQ上に作成したテーブルの中身
APIからデータを取得すると、実行時点でのリツイートやいいね、が取得されます。
いいね数の遷移を評価するために、バッチの実行タイミングのデータを「execute_time」というカラムで作成しました。
また日時でデータを取得する上で、テーブル自体をパーティション化したいと思ったので、その設定の追加したり、APIの設定はymlで別ファイル化してます。
setting.yml
設定情報ファイル
# API申請によって取得した各種キー
Consumer_Key: hogehoge
Consumer_Secret: hogehoge
Access_Token: hogehoge
Accesss_Token_Secert: hogehoge
# Twitterアカウント名 メンションする際に使用する「@XXXXX」の方
# ymlで管理することで、複数アカウントからもデータが取得可能
twitter_account:
# https://twitter.com/atcosmenet
- '@atcosmenet'
- '@hogehoge'
- '@hogehoge'
import_yaml.py
ymlファイルの読み込み用
import yaml
import sys
import codecs
args = sys.argv
file_path = r'setting.yml'
with codecs.open(file_path, 'r', 'utf-8') as file:
obj = yaml.safe_load(file)
Consumer_Key = obj["Consumer_Key"]
Consumer_Secret = obj["Consumer_Secret"]
Access_Token = obj["Access_Token"]
Accesss_Token_Secert = obj["Accesss_Token_Secert"]
twitter_account = obj["twitter_account"]
twitter_data_to_bq.py
BQにデータを投入するためのファイル
コードの大部分を参考記事から引用させてもらったので、追記部分は別途コメントアウトを追加しています
# -*- coding: utf-8 -*-
import pandas as pd
import pandas_gbq
from requests_oauthlib import OAuth1Session
import json
import datetime, time, sys
from datetime import datetime, date, timedelta
from abc import ABCMeta, abstractmethod
import import_yaml
# ymlファイルに記載したAPIキーを読み込む形に修正
# OAuth認証部分
CK = import_yaml.Consumer_Key # Consumer Key
CS = import_yaml.Consumer_Secret # Consumer Secret
AT = import_yaml.Access_Token # Access Token
AS = import_yaml.Accesss_Token_Secert # Accesss Token Secert
# パーティションテーブル作成用の変数を作成しておく処理を追加
dt_now = datetime.now()
patition_date = dt_now.strftime('%Y%m%d')
execute_time = dt_now.strftime('%Y-%m-%d %H:%M:%S')
PythonでTwitterスクレイピング&データフレーム化 から参考にさせてもらった部分
class TweetsGetter(object):
__metaclass__ = ABCMeta
def __init__(self):
self.session = OAuth1Session(CK, CS, AT, AS)
@abstractmethod
def specifyUrlAndParams(self, keyword):
'''
呼出し先 URL、パラメータを返す
'''
@abstractmethod
def pickupTweet(self, res_text, includeRetweet):
'''
res_text からツイートを取り出し、配列にセットして返却
'''
@abstractmethod
def getLimitContext(self, res_text):
'''
回数制限の情報を取得 (起動時)
'''
def collect(self, total = -1, onlyText = False, includeRetweet = False):
'''
ツイート取得を開始する
'''
#----------------
# 回数制限を確認
#----------------
self.checkLimit()
#----------------
# URL、パラメータ
#----------------
url, params = self.specifyUrlAndParams()
params['include_rts'] = str(includeRetweet).lower()
# include_rts は statuses/user_timeline のパラメータ。search/tweets には無効
#----------------
# ツイート取得
#----------------
cnt = 0
unavailableCnt = 0
while True:
res = self.session.get(url, params = params)
if res.status_code == 503:
# 503 : Service Unavailable
if unavailableCnt > 10:
raise Exception('Twitter API error %d' % res.status_code)
unavailableCnt += 1
print ('Service Unavailable 503')
self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
continue
unavailableCnt = 0
if res.status_code != 200:
raise Exception('Twitter API error %d' % res.status_code)
tweets = self.pickupTweet(json.loads(res.text))
if len(tweets) == 0:
# len(tweets) != params['count'] としたいが
# count は最大値らしいので判定に使えない。
# ⇒ "== 0" にする
# https://dev.twitter.com/discussions/7513
break
for tweet in tweets:
if (('retweeted_status' in tweet) and (includeRetweet is False)):
pass
else:
if onlyText is True:
yield tweet['text']
else:
yield tweet
cnt += 1
if cnt % 100 == 0:
print ('%d件 ' % cnt)
if total > 0 and cnt >= total:
return
params['max_id'] = tweet['id'] - 1
# ヘッダ確認 (回数制限)
# X-Rate-Limit-Remaining が入ってないことが稀にあるのでチェック
if ('X-Rate-Limit-Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers):
if (int(res.headers['X-Rate-Limit-Remaining']) == 0):
self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset']))
self.checkLimit()
else:
print ('not found - X-Rate-Limit-Remaining or X-Rate-Limit-Reset')
self.checkLimit()
def checkLimit(self):
'''
回数制限を問合せ、アクセス可能になるまで wait する
'''
unavailableCnt = 0
while True:
url = "https://api.twitter.com/1.1/application/rate_limit_status.json"
res = self.session.get(url)
if res.status_code == 503:
# 503 : Service Unavailable
if unavailableCnt > 10:
raise Exception('Twitter API error %d' % res.status_code)
unavailableCnt += 1
print ('Service Unavailable 503')
self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
continue
unavailableCnt = 0
if res.status_code != 200:
raise Exception('Twitter API error %d' % res.status_code)
remaining, reset = self.getLimitContext(json.loads(res.text))
if (remaining == 0):
self.waitUntilReset(reset)
else:
break
def waitUntilReset(self, reset):
'''
reset 時刻まで sleep
'''
seconds = reset - time.mktime(datetime.datetime.now().timetuple())
seconds = max(seconds, 0)
print ('\n =====================')
print (' == waiting %d sec ==' % seconds)
print (' =====================')
sys.stdout.flush()
time.sleep(seconds + 10) # 念のため + 10 秒
@staticmethod
def bySearch(keyword):
return TweetsGetterBySearch(keyword)
@staticmethod
def byUser(screen_name):
return TweetsGetterByUser(screen_name)
class TweetsGetterBySearch(TweetsGetter):
'''
キーワードでツイートを検索
'''
def __init__(self, keyword):
super(TweetsGetterBySearch, self).__init__()
self.keyword = keyword
def specifyUrlAndParams(self):
'''
呼出し先 URL、パラメータを返す
'''
url = 'https://api.twitter.com/1.1/search/tweets.json'
params = {'q':self.keyword, 'count':100}
return url, params
def pickupTweet(self, res_text):
'''
res_text からツイートを取り出し、配列にセットして返却
'''
results = []
for tweet in res_text['statuses']:
results.append(tweet)
return results
def getLimitContext(self, res_text):
'''
回数制限の情報を取得 (起動時)
'''
remaining = res_text['resources']['search']['/search/tweets']['remaining']
reset = res_text['resources']['search']['/search/tweets']['reset']
return int(remaining), int(reset)
class TweetsGetterByUser(TweetsGetter):
'''
ユーザーを指定してツイートを取得
'''
for i in range(0,3):
def __init__(self, screen_name):
super(TweetsGetterByUser, self).__init__()
self.screen_name = import_yaml.twitter_account[i]
self.exclude_replies = True
def specifyUrlAndParams(self):
'''
呼出し先 URL、パラメータを返す
'''
url = 'https://api.twitter.com/1.1/statuses/user_timeline.json'
params = {'screen_name':self.screen_name, 'exclude_replies':self.exclude_replies, 'count':100}
return url, params
def pickupTweet(self, res_text):
'''
res_text からツイートを取り出し、配列にセットして返却
'''
results = []
for tweet in res_text:
results.append(tweet)
return results
def getLimitContext(self, res_text):
'''
回数制限の情報を取得 (起動時)
'''
remaining = res_text['resources']['statuses']['/statuses/user_timeline']['remaining']
reset = res_text['resources']['statuses']['/statuses/user_timeline']['reset']
return int(remaining), int(reset)
if __name__ == '__main__':
# キーワードで取得
# getter = TweetsGetter.bySearch(u'Python')
# ユーザーを指定して取得 (screen_name)
twitter_df = pd.DataFrame(columns=['id', 'text', 'user', 'fav', 'retweet', 'create_date', 'execute_time'])
複数アカウントをymlファイルから読み込む処理及びカラムの追加
for i in range(0,3):
getter = TweetsGetter.byUser(import_yaml.twitter_account[i])
list_text = []
list_id = []
list_user_screenname = []
list_favorite_count = []
list_retweet_count = []
list_created_date = []
list_execute_time = []
for tweet in getter.collect(total = 100):
list_text.append(tweet['text'])
list_id.append(tweet['id'])
list_user_screenname.append(tweet['user']['screen_name'])
# いいねのカラムを追加
list_favorite_count.append(tweet['favorite_count'])
# リツイートのカラムを追加
list_retweet_count.append(tweet['retweet_count'])
# ツイート時間を日本時間に変換
list_created_date.append('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.strptime(tweet['created_at'], "%a %b %d %H:%M:%S %z %Y") + timedelta(hours=9)))
# バッチの実行時刻を追加
list_execute_time.append(execute_time)
df = pd.DataFrame(columns=['id', 'text', 'user', 'fav', 'retweet', 'create_date', 'execute_time'])
df_new = df.assign(id=list_id, text=list_text,
user=list_user_screenname, fav=list_favorite_count,
retweet=list_retweet_count, create_date=list_created_date,
execute_time=list_execute_time
)
twitter_df = twitter_df.append(df_new, ignore_index=False)
# 指定したデータセット配下に、バッチの実行時刻の日付をパーティションテーブルとしてinsertしていく
pandas_gbq.to_gbq(twitter_df.drop_duplicates(), 'データセット名.テーブル名_{}'.format(patition_date), project_id='プロジェクト名', if_exists='append')
Twitterのデータはいつまでお気に入りされるのか?
今回、バッチで1時間単位でAPIを実行されるようにしました。
深夜はそんなに見られないかなと思い、9時~24時まで1時間おきにデータを取得したので、その結果を可視化してみました。
ある程度いいねされてるtweetに限定して、数値を見てみます。
これでも何となく傾向は分かりますが、、
一部の圧倒的にお気に入りされてるtweetに引きずられてしまって読み取りにくいですね。
そこで10時間後のいいね数をMAXとして、累積構成比を出してみます。
すると、大体4時間後には約8割いいねが完了してることが分かりました。
時々100%を越えているのは、その後取り消しされた方がいたためだと思います。
また、初回から100%に達してるtweetは、24時以降にお気に入りされているのかもしれないです。
まとめ
今回、社外のデータの活用をしてみたいなと思ってアドカレのネタに選びましたが、いろいろ制約があってやりたかったことまで十分に時間を割くことができなかったです。
APIの実行上限があるのでその際の取得条件に考慮したり、もし今後SNSのデータを利用する上での考慮ポイントぐらいにはなったかもしれません。
データ分析時の罠として、ガベージイン・ガベージアウトという言葉がありますが、それを改めて感じる機会になりました。
そもそもいいね数とかリツイートの遷移を見ようとしたのは、impressionの遷移がAPIで取れなかったためなので。。
同時に、普段各サービスのデータを作成してくださってるエンジニアの方々に感謝の気持ちを改めて抱きました。
少々長くなりましたが、最後までご覧いただきありがとうございました!
最初に少し書きましたが、BIツールとしてTableauを導入して分析用データマートの作成などをチームでは進めてます。
データエンジニアとしてキャリアを描いていく上で、良いタイミングだと思っているので、もし興味がある方がいたら、一緒にやっていきましょう!
アイスタイル採用ページはこちら