時系列に考慮したシーケンシャル・カテゴリ特徴量へのログデータの省メモリな変換

はじめに

こんにちは、今回は時系列情報を考慮する必要のあるログデータに対して、メモリ消費を抑えつつ前処理を行う方法について書いていきます。

やりたいこと

このようなユーザーごとの行動ログの入ったデータセットがあったとして、

    userid  itemid  categoryid  timestamp
0        0       3           1 2019-01-04
1        0       4           1 2019-01-08
2        0       4           1 2019-01-19
3        0       5           1 2019-01-02
4        0       7           2 2019-01-17
5        0       8           2 2019-01-07
6        1       0           0 2019-01-06
7        1       1           0 2019-01-14
8        1       2           0 2019-01-20
9        1       6           2 2019-01-01
10       1       7           2 2019-01-12
11       1       8           2 2019-01-18
12       2       3           1 2019-01-16
13       2       4           1 2019-01-15
14       2       5           1 2019-01-10
15       2       5           1 2019-01-13
16       2       6           2 2019-01-03
17       2       7           2 2019-01-05
18       2       8           2 2019-01-11
19       2       8           2 2019-01-21
20       2       9           3 2019-01-09

以下のようなユーザーごとに時間を基準にして並び替えた可変長の系列データや、

各ユーザーが接触したitemid(時系列順)
[[5, 3, 8, 4, 7, 4], 
 [6, 0, 7, 1, 8, 2], 
 [6, 7, 9, 5, 8, 5, 4, 3, 8]]
各ユーザーが接触したcategoryid(時系列順)
[[1, 1, 2, 1, 2, 1], 
 [2, 0, 2, 0, 2, 0], 
 [2, 2, 3, 1, 2, 1, 1, 1, 2]]

以下のような時系列を考慮したカテゴリー変数を作成したいです。 ここではitemid、categoryidに対して最新のレコードを使用するとします。

各ユーザーが接触した最新のitemid
[4, 2, 8]
各ユーザーが接触した最新のcategoryid
[1, 0, 2]

※ ここでは例としてECでどのカテゴリーのどの商品を見たかっていうログを想定しています。各カラムは読み替えてください。

系列データに関しては、このようなリストを作成できれば、例えば Keras (functional API) だとそれぞれ以下のようにpaddingすることで、シーケンシャルデータとして入力することができます。

import tensorflow as tf
inputs = []
inputs.append(tf.keras.preprocessing.sequence.pad_sequences(
    df['itemid'].values.tolist(), padding='post', truncating='post', maxlen=10))
inputs.append(tf.keras.preprocessing.sequence.pad_sequences(
    df['categoryid'].values.tolist(), padding='post', truncating='post', maxlen=10))

Pandasでの書き方

系列データに関しては、pandasでは以下のように書くと、

# ユーザーid, 時系列順に並び替え
df = df.sort_values(by=['userid','timestamp'])
# ユーザー単位でリストとしてgroupby
df = df.groupby('userid').agg(list).reset_index(drop=False)

print('各ユーザーが接触したitemid(時系列順)')
pprint.pprint(df['itemid'].values.tolist())
print('各ユーザーが接触したcategoryid(時系列順)')
pprint.pprint(df['categoryid'].values.tolist()

上記の結果が得られます。

各ユーザーが接触したitemid(時系列順)
[[5, 3, 8, 4, 7, 4], 
 [6, 0, 7, 1, 8, 2], 
 [6, 7, 9, 5, 8, 5, 4, 3, 8]]
各ユーザーが接触したcategoryid(時系列順)
[[1, 1, 2, 1, 2, 1], 
 [2, 0, 2, 0, 2, 0], 
 [2, 2, 3, 1, 2, 1, 1, 1, 2]]

また、カテゴリデータに関しては、

# ユーザー単位で最新のものを取るようにgroupby
df_cate = df.loc[df.groupby('userid')['timestamp'].idxmax()]

print(df_cate)
print('各ユーザーが接触した最新のitemid')
pprint.pprint(df_cate['itemid'].values.tolist())
print('各ユーザーが接触した最新のcategoryid')
pprint.pprint(df_cate['categoryid'].values.tolist())

のように書くことで、上記の結果を得ることができます。

各ユーザーが接触した最新のitemid
[4, 2, 8]
各ユーザーが接触した最新のcategoryid
[1, 0, 2]

Pandasで起こり得る問題

上記データセットが大きくなってくると、Pandasだとメモリエラーを起こし一括での変換はできなくなります。 加えて、データセット自体がメモリに乗り切らない場合も同じく処理できません。 一方で、データセットを分割して処理する場合は各データセットに含まれないレコードの時系列情報を保持する必要があります。

改めてやりたいこと

上記のような背景から、

  • 前処理におけるメモリ消費を抑える
  • データセットの分割した読み込みに対応できる

ような時系列順に系列データを作成できる方法が必要となり対応しました。

以下では具体的な方法に関して書いていきます。

方法

ここでは、時系列情報を保持したリストを作成し、それを元に

  • 時系列を考慮した系列特徴量
  • 時系列を考慮したカテゴリ特徴量

の作成方法について書きます。

その後、分割したデータセットではどのような処理を行うのか説明します。

ソート対象リストの作成

まず、時系列順の操作を行う元となるリストを作成します。 時系列順かつユーザー単位で系列データとして持ちたい値を item その値の時系列情報を timestamp とすると、

[[[item,timestamp],[item,timestamp]...[item,timestamp]],
 [[item,timestamp],[item,timestamp]...[item,timestamp]],
 ...
 [[item,timestamp],[item,timestamp]...[item,timestamp]]]

という3次元のリストを作成します。 ここで、1次元目はユーザーidをインデックスとします。

処理としては以下のようになります。

def create_list(df, user_index_col, sort_col, target_col, user_num):
    """
    :param user_index_col: ユーザーIDのカラム
    :param sort_col: sortに使う値の入っているカラム
    :param target_col: sortしたいカラム
    :param user_num: ユーザー数(エンコーダ等から取得してください)
    """
    inputs = [[] for _ in range(user_num)]
    for _, user_index, sort_value, target_value in df[[user_index_col, sort_col, target_col]].itertuples():
        inputs[user_index].append([target_value, sort_value])

    return inputs

最初に出てきたデータセットに対してこの処理を行うと、

itemid_inputs = create_list(df, user_index_col='userid', sort_col='timestamp', target_col='itemid', user_num=3)
categoryid_inputs = create_list(df, user_index_col='userid', sort_col='timestamp', target_col='categoryid', user_num=3)

print('itemid')
pprint.pprint(itemid_inputs)

print('categoryid')
pprint.pprint(categoryid_inputs)

以下のようなリストが作成されます。

itemid
[[[3, Timestamp('2019-01-04 00:00:00')],
  [4, Timestamp('2019-01-08 00:00:00')],
  [4, Timestamp('2019-01-19 00:00:00')],
  [5, Timestamp('2019-01-02 00:00:00')],
  [7, Timestamp('2019-01-17 00:00:00')],
  [8, Timestamp('2019-01-07 00:00:00')]],
 [[0, Timestamp('2019-01-06 00:00:00')],
  [1, Timestamp('2019-01-14 00:00:00')],
  [2, Timestamp('2019-01-20 00:00:00')],
  [6, Timestamp('2019-01-01 00:00:00')],
  [7, Timestamp('2019-01-12 00:00:00')],
  [8, Timestamp('2019-01-18 00:00:00')]],
 [[3, Timestamp('2019-01-16 00:00:00')],
  [4, Timestamp('2019-01-15 00:00:00')],
  [5, Timestamp('2019-01-10 00:00:00')],
  [5, Timestamp('2019-01-13 00:00:00')],
  [6, Timestamp('2019-01-03 00:00:00')],
  [7, Timestamp('2019-01-05 00:00:00')],
  [8, Timestamp('2019-01-11 00:00:00')],
  [8, Timestamp('2019-01-21 00:00:00')],
  [9, Timestamp('2019-01-09 00:00:00')]]]
categoryid
[[[1, Timestamp('2019-01-04 00:00:00')],
  [1, Timestamp('2019-01-08 00:00:00')],
  [1, Timestamp('2019-01-19 00:00:00')],
  [1, Timestamp('2019-01-02 00:00:00')],
  [2, Timestamp('2019-01-17 00:00:00')],
  [2, Timestamp('2019-01-07 00:00:00')]],
 [[0, Timestamp('2019-01-06 00:00:00')],
  [0, Timestamp('2019-01-14 00:00:00')],
  [0, Timestamp('2019-01-20 00:00:00')],
  [2, Timestamp('2019-01-01 00:00:00')],
  [2, Timestamp('2019-01-12 00:00:00')],
  [2, Timestamp('2019-01-18 00:00:00')]],
 [[1, Timestamp('2019-01-16 00:00:00')],
  [1, Timestamp('2019-01-15 00:00:00')],
  [1, Timestamp('2019-01-10 00:00:00')],
  [1, Timestamp('2019-01-13 00:00:00')],
  [2, Timestamp('2019-01-03 00:00:00')],
  [2, Timestamp('2019-01-05 00:00:00')],
  [2, Timestamp('2019-01-11 00:00:00')],
  [2, Timestamp('2019-01-21 00:00:00')],
  [3, Timestamp('2019-01-09 00:00:00')]]]

時系列順にソート

次に、作成したリストを時系列順にソートする処理を加えます。

def sort_list(inputs, is_descending):
    """
    :param is_descending: 降順かどうか
    """
    return [sorted(i_input, key=lambda i: i[1], reverse=is_descending) for i_input in inputs]

この処理を行うと、

itemid_inputs = sort_list(itemid_inputs, is_descending=False)
categoryid_inputs = sort_list(categoryid_inputs, is_descending=False)

print('itemid')
pprint.pprint(itemid_inputs)

print('categoryid')
pprint.pprint(categoryid_inputs)

以下のように時系列順に並び替えられたリストが作成されます。

itemid
[[[5, Timestamp('2019-01-02 00:00:00')],
  [3, Timestamp('2019-01-04 00:00:00')],
  [8, Timestamp('2019-01-07 00:00:00')],
  [4, Timestamp('2019-01-08 00:00:00')],
  [7, Timestamp('2019-01-17 00:00:00')],
  [4, Timestamp('2019-01-19 00:00:00')]],
 [[6, Timestamp('2019-01-01 00:00:00')],
  [0, Timestamp('2019-01-06 00:00:00')],
  [7, Timestamp('2019-01-12 00:00:00')],
  [1, Timestamp('2019-01-14 00:00:00')],
  [8, Timestamp('2019-01-18 00:00:00')],
  [2, Timestamp('2019-01-20 00:00:00')]],
 [[6, Timestamp('2019-01-03 00:00:00')],
  [7, Timestamp('2019-01-05 00:00:00')],
  [9, Timestamp('2019-01-09 00:00:00')],
  [5, Timestamp('2019-01-10 00:00:00')],
  [8, Timestamp('2019-01-11 00:00:00')],
  [5, Timestamp('2019-01-13 00:00:00')],
  [4, Timestamp('2019-01-15 00:00:00')],
  [3, Timestamp('2019-01-16 00:00:00')],
  [8, Timestamp('2019-01-21 00:00:00')]]]
categoryid
[[[1, Timestamp('2019-01-02 00:00:00')],
  [1, Timestamp('2019-01-04 00:00:00')],
  [2, Timestamp('2019-01-07 00:00:00')],
  [1, Timestamp('2019-01-08 00:00:00')],
  [2, Timestamp('2019-01-17 00:00:00')],
  [1, Timestamp('2019-01-19 00:00:00')]],
 [[2, Timestamp('2019-01-01 00:00:00')],
  [0, Timestamp('2019-01-06 00:00:00')],
  [2, Timestamp('2019-01-12 00:00:00')],
  [0, Timestamp('2019-01-14 00:00:00')],
  [2, Timestamp('2019-01-18 00:00:00')],
  [0, Timestamp('2019-01-20 00:00:00')]],
 [[2, Timestamp('2019-01-03 00:00:00')],
  [2, Timestamp('2019-01-05 00:00:00')],
  [3, Timestamp('2019-01-09 00:00:00')],
  [1, Timestamp('2019-01-10 00:00:00')],
  [2, Timestamp('2019-01-11 00:00:00')],
  [1, Timestamp('2019-01-13 00:00:00')],
  [1, Timestamp('2019-01-15 00:00:00')],
  [1, Timestamp('2019-01-16 00:00:00')],
  [2, Timestamp('2019-01-21 00:00:00')]]]

時系列を考慮した系列データの作成

まず、上記で作成したリストから可変長の系列特徴量(シーケンシャルな特徴量)を作成するための処理は以下になります。

def create_sequential(inputs):
    # リストのうちtimestampのリストを削除
    return [[i[0] for i in i_input] for i_input in inputs]

これを実行すると、

print('各ユーザーが接触したitemid(時系列順)')
pprint.pprint(create_sequential(itemid_inputs))

print('各ユーザーが接触したcategoryid(時系列順)')
pprint.pprint(create_sequential(categoryid_inputs))

求めていた結果を得ることができます。

各ユーザーが接触したitemid(時系列順)
[[5, 3, 8, 4, 7, 4], 
 [6, 0, 7, 1, 8, 2], 
 [6, 7, 9, 5, 8, 5, 4, 3, 8]]

各ユーザーが接触したcategoryid(時系列順)
[[1, 1, 2, 1, 2, 1], 
 [2, 0, 2, 0, 2, 0], 
 [2, 2, 3, 1, 2, 1, 1, 1, 2]]

時系列を考慮したカテゴリデータの作成

次に、上記で作成したリストから各ユーザーの最新のレコードをカテゴリ変数として取得するための処理は以下になります。

def create_category(inputs, n=-1):
    """
    :param n: 時系列順のリストのうち、何番目のものを残すか
    """
    # リストのうちtimestampのリストを削除
    # 時系列順の系列データのうち、n番目のもののみを残す
    return [[i[0] for i in i_input][n] for i_input in inputs]

これを実行すると、

print('各ユーザーが接触した最新のitemid')
pprint.pprint(create_category(itemid_inputs, -1))

print('各ユーザーが接触した最新のcategoryid')
pprint.pprint(create_category(categoryid_inputs, -1))

以下のように求めていた結果を得ることができます。

各ユーザーが接触した最新のitemid
[4, 2, 8]

各ユーザーが接触した最新のcategoryid
[1, 0, 2]

処理まとめ

ここで、上記説明のために分けていた関数を統合すると以下のようになります。

def create_features(
        df, user_index_col, sort_col, target_col, user_num, is_descending, is_sequence, n=-1):
    """
    :param user_index_col: ユーザーIDのカラム
    :param sort_col: sortに使う値の入っているカラム
    :param target_col: sortしたいカラム
    :param user_num: ユーザー数(エンコーダ等から取得してください)
    :param is_descending: 降順かどうか
    :param is_sequence: シーケンシャルかどうか
    :param n: 時系列順のリストのうち、何番目のものを残すか(カテゴリーのみ)
    """
    # リストの作成
    inputs = [[] for _ in range(user_num)]
    for _, user_index, sort_value, target_value in df[[user_index_col, sort_col, target_col]].itertuples():
        inputs[user_index].append([target_value, sort_value])

    # リストのソート
    inputs = [sorted(i_input, key=lambda i: i[1], reverse=is_descending) for i_input in inputs]

    if is_sequence:
        return [[i[0] for i in i_input] for i_input in inputs]
    else:
        return [[i[0] for i in i_input][n] for i_input in inputs]

データを分割して読み込む場合の方法

ここが一番書きたかったところで、上記のように時系列情報を保持するようにリストを作成すると、全データをメモリに乗せることができない場合等、データセットを分割して読み込み分割単位ごとに前処理を行うという場合にも対応できます。

例として、以下のように最初のDataFrameが三つに分割されてdictionaryに格納されたものがあるとして、 (実際にないケースな気がしますが、例示として、、、)

{'df1':    userid  itemid  categoryid  timestamp
0       0       3           1 2019-01-04
1       0       4           1 2019-01-08
2       0       4           1 2019-01-19
3       0       5           1 2019-01-02
4       0       7           2 2019-01-17
5       0       8           2 2019-01-07
6       1       0           0 2019-01-06,
 'df2':     userid  itemid  categoryid  timestamp
7        1       1           0 2019-01-14
8        1       2           0 2019-01-20
9        1       6           2 2019-01-01
10       1       7           2 2019-01-12
11       1       8           2 2019-01-18
12       2       3           1 2019-01-16
13       2       4           1 2019-01-15,
 'df3':     userid  itemid  categoryid  timestamp
14       2       5           1 2019-01-10
15       2       5           1 2019-01-13
16       2       6           2 2019-01-03
17       2       7           2 2019-01-05
18       2       8           2 2019-01-11
19       2       8           2 2019-01-21
20       2       9           3 2019-01-09}

リストに時系列情報を保持しているため、例えば以下のように関数を変更することで処理が可能です。

def create_features_by_datasets(
        df_dict, user_index_col, sort_col, target_col, user_num, is_descending, is_sequence, n=-1):
    inputs = [[] for _ in range(user_num)]

    # データセットの分割単位ごとに対して処理
    for df in df_dict.values():
        for _, user_index, sort_value, target_value in df[[user_index_col, sort_col, target_col]].itertuples():
            inputs[user_index].append([target_value, sort_value])

    inputs = [sorted(i_input, key=lambda i: i[1], reverse=is_descending) for i_input in inputs]

    if is_sequence:
        return [[i[0] for i in i_input] for i_input in inputs]
    else:
        return [[i[0] for i in i_input][n] for i_input in inputs]

以下の処理を行うと、

print('各ユーザーが接触したitemid(時系列順)')
pprint.pprint(create_features_by_datasets(df_dict, user_index_col='userid', sort_col='timestamp', target_col='itemid', user_num=3, is_descending=False, is_sequence=True))
print('各ユーザーが接触した最新のitemid')
pprint.pprint(create_features_by_datasets(df_dict, user_index_col='userid', sort_col='timestamp', target_col='itemid', user_num=3, is_descending=False, is_sequence=False))

結果としては上記と同じものが得られます。

各ユーザーが接触したitemid(時系列順)
[[5, 3, 8, 4, 7, 4], 
 [6, 0, 7, 1, 8, 2],
 [6, 7, 9, 5, 8, 5, 4, 3, 8]]

各ユーザーが接触した最新のitemid
 [4, 2, 8]

時系列情報以外でのソート

また、今回はソートする基準を時系列情報に絞りましたが、他のカラムで、もしくは降順でソートすることも可能です。 上記の処理に対して渡す変数を変更することで、昇順降順、カラムを指定したソートが可能です。

例えば、以下のようなデータセットにおいて、

    userid  itemid  categoryid     score
0        0       3           1  0.730968
1        0       3           1  0.889117
2        0       3           1  0.714828
3        0       4           1  0.430737
4        0       5           1  0.734746
5        0       7           2  0.412346
6        1       0           0  0.660430
7        1       3           1  0.095672
8        1       4           1  0.985072
9        1       5           1  0.629274
10       1       6           2  0.617733
11       1       7           2  0.636219
12       1       8           2  0.246769
13       1       8           2  0.020140
14       2       0           0  0.812525
15       2       1           0  0.671100
16       2       2           0  0.174011
17       2       2           0  0.164321
18       2       3           1  0.783329
19       2       4           1  0.068837
20       2       5           1  0.265281

スコアというカラムがあったとして、それで高い順に系列データやカテゴリデータを作成したい場合は以下のような処理になります。

print('スコア順(itemid)')
pprint.pprint(create_features(df, user_index_col='userid', sort_col='score', target_col='itemid', user_num=3, is_descending=True, is_sequence=True))
print('スコア最大(itemid)')
pprint.pprint(create_features(df, user_index_col='userid', sort_col='score', target_col='itemid', user_num=3, is_descending=True, is_sequence=False, n=0))

結果は以下のようになります。

スコア順(itemid)
[[3, 5, 3, 3, 4, 7], 
 [4, 0, 7, 5, 6, 8, 3, 8], 
 [0, 3, 1, 5, 2, 2, 4]]

スコア最大(itemid)
[3, 4, 0]

終わりに

今回は、時系列情報を考慮しつつ、省メモリにログデータを系列特徴量やカテゴリ特徴量に変換する方法について書きました。 より良い方法あれば、コメントなどで教えていただきたいです。ありがとうございました。