はじめに
初めまして、ZealsでMLエンジニアをしている三宅とジョーです!
今回のブログではpandas高速化ライブラリを検証し、その有用性を探っていきます!
目的
- 本ブログでは、Modin・Vaex・Daskの3つの高速化ライブラリを検証する。
- 検証は、処理速度・メモリ使用量の2つの観点からライブラリとしての有用性を図る。
pandasの非効率性
- pandasはデフォルトで1つのプロセスにつき1コア(CPU Core)を使用している。小規模なデータセットでは問題無いが、億単位の大規模なデータフレームにおける計算プロセスにおいては一つのCPU Coreでの処理は荷が重くスピードが落ちる。
- 通常のPCには最低2コアは搭載されていることが知られている。この場合、1つの処理を行う際に1コアでのみ使用すると、その機械が持つ処理能力の50%しか使っていないことになる。これは、コア数が上がれば上がるほど「非効率的」である。
高速化ライブラリ
1. Modin
- Modin公式サイト
- pandasの非効率性を解決すると話題になっているのが
Modin
である。pandasを高速化するライブラリとして知られており、元々Ray projectの配下にあったようだが現在は独立して開発が続いている。 - 下記のようにimportするだけで利用することができ、非常にシンプルである。
import modin.pandas as mdpd
- 従来の処理と、Modinのコア数を増加させる処理は下図のような違いがある。Modinの処理方法(右)はデータフレームを複数に分割することによってそれぞれのCPU coreに計算をさせ、最後に集計させる。下図では、従来のpandasデータフレーム(左)は1つのCPU coreしか用いず10個のタスクを1つのノードで行おうとしている。それに比べてModin(右)は一つのノードが5個のタスクを受け持つことで処理速度を2倍にしているのだ。この処理方法を並列処理と呼ぶ。
- Modinは完璧に処理速度を上げるとは限らず、統計学的計算を行う場合はpandassの方が処理速度が速いことで知られている。Modinの得意分野はデータの読み込みや特定の数値の探索などである。
2. Dask
- Dask公式サイト
Dask
はPythonにおいて並列処理・分散処理などを簡単に扱ってくれるライブラリである。- 大規模なデータを複数のブロックを各ブロックに分け、処理を分散する。これにより、分散されたブロックは1度に全てのデータを読み込む必要はなく、メモリ消費のピーク値をかなり抑えることができる。NumPy、pandas、Scikit-Learnを中心に、他にもTensorFlow・XGBoostなどといったPythonでよく使われるライブラリとかなり近いインターフェイスを提供していることが特徴的である。
- Modinと同様、importするだけで処理速度を高速化する。
import dask.dataframe as dd
Dask
の用途は主に2つである。- Pythonでよく使われるライブラリの使用感で大規模データの解析を効率化させたい時
- カスタムタスクスケジューリング(プログラムを小・中規模のタスクに分割させ、独立なタスクを同時に並列実行すること)を行いたい時
※上記は、(10,10)の行列を4分割し、「各値を二乗し、その平均を取る」というタスクグラフである。Daskは、独立しているタスクを同時に並列実行する仕組みとなっている。(参考:データ分析のための並列処理ライブラリDask - Qiita)
3. Vaex
- Vaex公式サイト
- 高いメモリの効率性を有していることから、pandasの非効率性を解決する鍵となり、Out-of-core(実メモリに収まらないような大規模なデータ)を遅延処理により計算でき、より大規模データの可視化をすることが可能である。2014年から最初のBlogの著者であるMaarten Breddelsを中心に開発されていた。
- Vaex公式ホームページには、以下のように書かれている。
It can calculate statistics such as mean, sum, count, standard deviation etc, on an N-dimensional grid up to a billion (109) objects/rows per second. Visualization is done using histograms, density plots and 3d volume rendering, allowing interactive exploration of big data. Vaex uses memory mapping, a zero memory copy policy, and lazy computations for best performance (no memory wasted).
- こちらもimportするだけで利用することができるが、一度HDFファイル形式で出力し、読み直す必要がある。pandasで作成されたdfはVaexで読みこむことができないため、以下のように変更する必要がある。
import vaex as vx df = vx.read_csv("df.csv", copy_index=False, convert=True)
(参考:vaexを試してみた - Qiita)
- pandas・Modin・Daskと違い、
Vaex
においてはHDFファイルを使用し検証を行っていく事とする。
検証
検証を行うマシンのスペック
- 機種:GCE n2-standard-4
- プロセッサー:vCPU x 4
- メモリー:16 GB
検証方法
- Pythonのプロファイラーを使用
時間:%timeit メモリ使用量:%memit
- 各ライブラリのインポート
import modin.pandas as mdpd import dask.dataframe as dd from dask.distributed import Client import vaex as vx
- Numpyのランダム関数を利用して 10, 000,000 rows × 4 columnsのデータフレームを作成
df = pd.DataFrame({'x': np.random.normal(size=10**7), 'y': np.random.normal(size=10**7),}) df['randNumCol'] = np.random.randint(1, 20, df.shape[0]) df["id"] = df.index df = df[["id","x","y","randNumCol"]] df.to_csv("df.csv", index=False)
プログラムの実行
- CSVファイルの読み込み時間・メモリー使用量を検証
# pandas %timeit df_pandas = pd.read_csv("df.csv") %memit df_pandas = pd.read_csv("df.csv") # 4.15 s ± 6.78 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 948.36 MiB, increment: 713.32 MiB # Modin %timeit df_modin = mdpd.read_csv("df.csv") %memit df_modin = mdpd.read_csv("df.csv") # 2.23 s ± 16.5 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 826.16 MiB, increment: 119.2 MiB # Dask %timeit df_dd = dd.read_csv("df.csv").compute() %memit df_dd = dd.read_csv("df.csv").compute() # 3.05 s ± 150 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1031.33 MiB, increment: 612.65 MiB # Vaex %timeit df_vaex = vx.read_csv("df.csv", copy_index=False, convert=True) %memit df_vaex = vx.read_csv("df.csv", copy_index=False, convert=True) # 541 ms ± 762 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 709.36 MiB, increment: 0.71 MiB
- applyメソッドの読み込み時間・メモリー使用量を検証
# Apply Function def custom_func(x, y): return math.cos(x ** 2 + y)
# pandas %timeit apply_pd = df.apply(lambda df: custom_func(df.x, df.y), axis=1) %memit apply_pd = df.apply(lambda df: custom_func(df.x, df.y), axis=1) # 4min 43s ± 211 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 2060.23 MiB, increment: 1197.95 MiB # Modin %timeit apply_md = df_modin.apply(lambda df_modin: custom_func(df_modin.x, df_modin.y), axis=1) %memit apply_md = df_modin.apply(lambda df_modin: custom_func(df_modin.x, df_modin.y), axis=1) # 2min 35s ± 217 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 980.82 MiB, increment: 18.34 MiB # Dask %% timeit apply_dd = df_dd.apply(lambda df_dd: custom_func(df_dd.x, df_dd.y), axis=1, meta=('float64')) apply_dd = apply_dd.compute() %% memit apply_dd = df_dd.apply(lambda df_dd: custom_func(df_dd.x, df_dd.y), axis=1, meta=('float64')) apply_dd = apply_dd.compute() # 2min 30s ± 264 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1076.31 MiB, increment: 247.43 MiB # Vaex %%timeit apply_vx = df_vaex.apply(custom_func, arguments=[df_vaex.x, df_vaex.y] ,vectorize=False) %%memit apply_vx = df_vaex.apply(custom_func, arguments=[df_vaex.x, df_vaex.y] ,vectorize=False) # 20.6 µs ± 11.4 µs per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1465.69 MiB, increment: 0.01 MiB
- groupbyメソッドの読み込み時間・メモリー使用量を検証
# pandas %timeit groupby_pd = df.groupby(['randNumCol']).sum() %memit groupby_pd = df.groupby(['randNumCol']).sum() # 261 ms ± 1.77 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 920.39 MiB, increment: 353.48 MiB # Modin %timeit groupby_modin = df_modin.groupby('randNumCol').sum() %memit groupby_modin = df_modin.groupby('randNumCol').sum() # 608 ms ± 9.44 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 859.40 MiB, increment: 38.12 MiB # Dask %timeit groupby_dd = df_dd.groupby('randNumCol').sum().compute() %memit groupby_dd = df_dd.groupby('randNumCol').sum().compute() # 1.4 s ± 3.83 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1417.91 MiB, increment: 0.05 MiB # Vaex %timeit groupby_vaex = df_vaex.groupby(['randNumCol'],agg='sum') %memit groupby_vaex = df_vaex.groupby(['randNumCol'],agg='sum') # 56.5 ms ± 1.74 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 850.87 MiB, increment: 310.39 MiB
- Mergeメソッドの読み込み時間・メモリー使用量を検証
# pandas %timeit merge_pd = df.merge(df_2, on = 'id', how = 'inner') %memit merge_pd = df.merge(df_2, on = 'id', how = 'inner') # 3.95 s ± 5.55 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1419.39 MiB, increment: 3.95 MiB # Modin %timeit merge_modin = df_modin.merge(df_modin_2, on='id', how='inner') %memit merge_modin = df_modin.merge(df_modin_2, on='id', how='inner') # Error # Error # Dask %timeit merge_dd =dd.merge(df_dd,df_dd_2,left_index=True,right_index=True,how='inner').compute() %memit merge_dd = dd.merge(df_dd,df_dd_2,left_index=True,right_index=True,how='inner').compute() # Error # Error # Vaex %timeit merge_vaex = df_vaex.join(df_vaex_2, left_on='id', right_on='id', how='inner') %memit merge_vaex = df_vaex.join(df_vaex_2, left_on='id', right_on='id', how='inner') # Error # Error
※Vaexは、merge
がなく join
を代用した
- 簡単な計算の読み込み時間・メモリー使用量を検証
# pandas %timeit df_pandas['z'] = df_pandas['y'] / (df_pandas['x'] * df_pandas['x']) %memit df_pandas['z'] = df_pandas['y'] / (df_pandas['x'] * df_pandas['x']) # 54.1 ms ± 306 µs per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1235.12 MiB, increment: 142.59 MiB # Modin %timeit df_modin['z'] = df_modin['y'] / (df_modin['x'] * df_modin['x']) %memit df_modin['z'] = df_modin['y'] / (df_modin['x'] * df_modin['x']) # 1.7 s ± 8.59 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 2020.81 MiB, increment: 741.03 MiB # Dask %%timeit df_dd['z'] = df_dd['y'] / (df_dd['x'] * df_dd['x']) df_dd.compute() %%memit df_dd['z'] = df_dd['y'] / (df_dd['x'] * df_dd['x']) df_dd.compute() # 2.2 s ± 15.4 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1972.04 MiB, increment: 49.03 MiB # Vaex %timeit df_vaex['z'] = df_vaex['y'] / (df_vaex['x'] * df_vaex['x']) %memit df_vaex['z'] = df_vaex['y'] / (df_vaex['x'] * df_vaex['x']) # 50 µs ± 17.9 µs per loop (mean ± std. dev. of 3 runs, 3 loops each)) # peak memory: 1039.48 MiB, increment: 0.07 MiB
- 特定の値を探す際の読み込み時間・メモリー使用量を検証
target_list = [val * 10 for val in range(0, 100)]
# pandas %timeit df[df["id"].isin(target_list)] %memit df[df["id"].isin(target_list)] # 862 ms ± 8.13 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1231.79 MiB, increment: 76.31 MiB # Modin %timeit df_modin[df_modin["id"].isin(target_list)] %memit df_modin[df_modin["id"].isin(target_list)] # 328 ms ± 8.44 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1231.53 MiB, increment: 76.3 MiB # Dask %timeit df_dd[df_dd["id"].isin(target_list)].compute() %memit df_dd[df_dd["id"].isin(target_list)].compute() # 2.31 s ± 61.6 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1136.77 MiB, increment: 0.04 MiB # Vaex %timeit df_vaex[df_vaex["id"].isin(target_list)] %memit df_vaex[df_vaex["id"].isin(target_list)] # 4.23 ms ± 405 µs per loop (mean ± std. dev. of 3 runs, 3 loops each) # peak memory: 1106.74 MiB, increment: 0.0 MiB
ライブラリ毎の処理時間比較まとめ
ライブラリ毎のmemory使用量(MiB)比較まとめ
考察
今回の検証結果を用いて、それぞれのライブラリの特徴を比較しながら考察していく。
処理時間について
Daskは、マルチコア処理に対応しており、Out-of-CoreおよびLazy execution(遅延評価)もサポートしている。
これにより、重い処理(apply)において、シングルコア処理のpandasよりDaskの方が効率的に処理させることができる。
ただし、Daskの並列処理は分割されたPartitionを複数のWorkerに渡して処理させるため、
軽い処理(groupby)や小規模のデータに対しては、並列処理の恩恵より並列処理の管理コストのオーバーヘッドの方が大きい。したがって、これらの検証結果は、pandasよりDaskの方が処理時間がかかる結果になったと考えれる。
遅延評価により、通常の処理の記述だけでは実際の処理が走らないため、データをメモリに乗せて処理させるためには、compute()
を実行する必要がある。今回の検証では、ユーザがデータ分析を行う際の観点から評価するため、それぞれの検証項目に実際のデータを確認できる段階までを計測時間の対象とする。つまり、Daskの各処理は、compute()
の実行処理までの結果とする。
Modinは、マルチコア処理に対応しており、かつBackendはDaskあるいはRayを採用している。今回のBackendはDaskを用いて検証を行った。
検証の結果、ModinはDaskと同等、あるいはそれ以上のパフォーマンスを確認することができる。
ModinはBackendとしてDaskを採用しているものの、DataFrameの分割の仕方によって処理時間の差がでると考えられる。
DaskのDataFramesインターフェースは左図のようにデータを分割するに対して、Modinは右図のようにデータを分割する。
例えば、groupby処理はrandNumColカラムの値の合計を処理するため、Daskは複数に分割されたPartitionにまたがって処理をしなければならないのに対し、Modinは列方向、行方向を自由に分割することができる。この場合は複数のPartitionにまたがって処理する必要がないため、このような検証結果になると考えれれる。
Vaexは、マルチコア処理、メモリマッピング、Lazy execution(遅延評価)に対応しており、またノートパソコンでも大規模なデータを快適に扱えるよう、構造化のバイナリファイル形式(HDF、Arrow、Parquet等)を採用している。
検証の結果、pandasをはじめ他のライブラリより圧倒的なパフォーマンスを示した。ただし、ここで出した数字はVaexの特徴を踏襲しており、ここで詳しく解説していく。
Vaexは、Daskと同様に遅延評価の手法を採用しているが、Daskの遅延評価はデータを処理させるかさせないかの2択に対し、Vaexは、部分的に処理を行わせることによって、実用的で効率良く大規模なデータ処理を実現することができる。
DaskとVaexの遅延評価の違いを具体的な例をあげて解説していく。
Daskの遅延評価(read_csvの場合)
Daskは、compute()
を実行しない限り、メモリ上に実データを載せることなく、このようにそれぞれのカラムのデータタイプのみが表示される。
Vaexの遅延評価(read_csvの場合)
Vaexは、このように先頭の5行と後尾の5行のみを処理して、それ以外の部分は必要なときに処理が行われる。
したがって、データのサイズが1万行でも、1億行でも、read_csv()
では、10行のみの処理コストで済ませる事ができる。Vaexは遅延評価を採用しているものの、このように部分的に処理し、見せ方を工夫することによって、ユーザは遅延評価のことを気にせずに本来のデータ処理に集中することができる。
したがって、pandasや他のライブラリにとっては、ややアンフェアな評価方法となってしまうが、先述した通り ユーザがデータ分析を行う際の観点から評価するため、Vaexのこの遅延評価の特徴を生かしたままの評価数字で比較を行った。
とはいえ、もしVaexを他のライブラリと同様にすべての処理をさせて平等に評価したらどうのような結果になるのかを追加検証として行ってみた。
具体的には、特定の処理をさせたあとの結果をリスト型に変換させるまでの処理時間を計測した。
- find_value
#pandas %timeit -n 3 -r 3 (df[df["id"].isin(target_list)])["x"].tolist() # 839 ms ± 9.85 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) #Modin %timeit -n 3 -r 3 (df_modin[df_modin["id"].isin(target_list)])["x"].tolist() # 381 ms ± 2 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) #Dask %timeit -n 3 -r 3 (df_dd[df_dd["id"].isin(target_list)].compute())["x"].tolist() # 2.25 s ± 10.6 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) #Vaex %timeit -n 3 -r 3 (df_vaex[df_vaex["id"].isin(target_list)])["x"].tolist() # 28.1 ms ± 1.57 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
他のライブラリは、リスト型への変換の前後はほぼ変わらないのに対して、Vaexは約7倍の処理時間がかかった結果となった(4.23 ms から 28.1 ms に増加)。
これは、Vaexの遅延評価の有無によって約7倍のパフォーマンスの差があることが分かった。また、遅延評価抜きでも、全体を通してまだ圧倒的なパフォーマンスを実現できるのは、メモリマッピングとHDFファイルを採用しているからだと考えれる。
また、HDFファイルの読み込みを他のライブラリにも追加検証として行ってみた。
- read_hdf
#pandas %timeit -n 3 -r 3 pd.read_hdf("df.hdf5", key="df") # 552 ms ± 7.57 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) #Modin %timeit -n 3 -r 3 mdpd.read_hdf("df.hdf5", key="df") # 2.48 s ± 101 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) #Dask %timeit -n 3 -r 3 dd.read_hdf("df.hdf5", key="df", chunksize=10**7).compute() # 2.36 s ± 64 ms per loop (mean ± std. dev. of 3 runs, 3 loops each) #Vaex %timeit -n 3 -r 3 vx.open("df.csv.hdf5") # 4.61 ms ± 3.71 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
ModinおよびDaskは、CSVファイル読み込み時の処理時間はほぼ変わらないが、pandasは約7.5倍(4.15s から 552ms に)短縮することができた 。
また、Vaexは約100倍(541ms から 4.61ms に)短縮することができた。
これは、Vaexの read_csv()
は初回の実行(HDFファイルが存在しない場合)で内部的にCSVファイルをHDFファイルに変換させる処理も行われるため、541msという数字の結果はその処理時間も含まれる。
Vaexの read_csv()
2回目以降(HDFファイルが存在する場合)の実行は、open()
と同様な処理時間となることを確認することができた。
メモリ使用量について
処理時間と同様、Vaexはpandasや他のライブラリよりメモリ使用量を低く抑えることができる。理由は上記の処理時間についてにも述べたように、遅延評価によるものである。
ただし、groupby処理については、pandasと同じぐらいのメモリ使用量の結果となっている。これは、groupby処理が実際にgroupingおよびaggregateの処理を行う必要があるため、このような結果になったと考えれる。
結論
今回は、{pandas, Modin, Dask, Vaex}
を{read_csv, apply, groupby, merge, basic calculation, find_value}
の6つのメソッドで比較検証した。 開発中のライブラリは特定のメソッド検証を行うことができなかったが、以下のように比較検証を結論づけていく。
各処理において一番処理速度速かった・メモリ使用量が少なかったものを表にした。
- 例
- {read_csv, 処理速度}→Vaexの処理速度が一番速い
- {groupby, メモリ使用量}→Daskのメモリ使用量が一番低い
- Modinは比較的まだ幼いライブラリだ。未だ開発が続き、発達段階であるライブラリのため、全ての処理が加速化されるわけでは無いことが今回の検証でわかった。しかし、read_csvやapplyメソッドにおいて高速化に成功しており、これからの開発が非常に期待できるライブラリである。
- Daskは遅延評価によりcompute()を実行する前まではメモリ使用量を抑えられるが、実行後は非常にメモリを食ってしまう。また、処理速度はread_csv・applyメソッドにおいては高速化に成功したが他の項目はpandasよりも遅い。しかし、Daskのコードはpandasと類比しており、初心者でも簡単に扱える事がメリットとして挙げられる。
- Vaexは独自のHDFフォーマットのデータフレームを使用することにより処理速度を速くしており、メモリの使用量に関しても他のライブラリと比べても安定して低い数値が計測された。pandasの高速化に最も成功したライブラリである。
- 今回紹介した高速化できるライブラリはpandasより高いパフォーマンスを出せたと分かったが、現在完全にpandasの処理を置き換えるライブラリはまだ存在しておらず、かつタスクやデータの規模によってはpandasを使った方が効率的に行えるものあるため、それぞれの特徴を理解した上で使っていくことがベストだと考えられる。
- Zealsでは、最近扱うデータの規模が増加し、大規模なデータを効率的に処理するためにもこのような高速化ライブラリを積極的に選定して使っていきたい、プロダクション環境にも安定的に使えるライブラリやノウハウをまた別の機会で記事にしていきたいと考えている。
参考記事
blog.ikedaosushi.com blog.ikedaosushi.com qiita.com qiita.com modin.readthedocs.io vaex.readthedocs.io dask.org