山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

PySparkで日付毎にデータを蓄積する際のdf.write.modeについて

概要

PySparkでpartitionByで日付毎に分けてデータを保存している場合、どのように追記していけば良いのか。

先にまとめ

appendの方がメリットは多いが、チェック忘れると重複登録されるデメリットが怖い。
とはいえ、overwriteも他のデータ消えるデメリットも怖いので、一長一短か。

説明用コード

path = 's3://..../hoge/'
df = spark.createDataFrame(
  [
    ['a',1,'hoge','20190101'], ['b',2,'huga','20190201'], ['c',3,'piyo','20190301']
  ],
  ['id','integer','string','date']
)
(
  df
  .write
  .mode('overwrite')
  .partitionBy('date')
  .parquet(path)
)

前提

  • partitionByを指定してwriteを行うと、指定したPATHの下に「partitionByで指定したカラム名=値」というディレクトリが設定されて保存される。
    • 上記のコードでいうと「's3://..../hoge/date=20190101/'」の下にparquetが作成される。(dateカラムは保存されない)
  • modeをappendでwriteを行うと、追記で保存される。
    • 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
    • 追記なので重複は考慮されない
      • 上記のコード実行後にdateが20190101 のみ のdataframeを保存すると、date=20190101のデータが重複して保存される
  • modeをoverwriteでwriteを行うと、上書きで保存される。
    • 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
      • dateが20190101などのデータは削除される

詳細と具体例

日付毎にデータが蓄積されており、1月から3月までデータが蓄積されているとする。 その場合に、4月のみのDataframeがある場合に、どのようにwriteするとうまく保存できるか。

っというか、modeはappendとoverwriteどちらが良いか。

appendの場合

メリット

  • 特に何も気にせずwriteすれば良し
  • 複数日分(複数のpartitionにまたがる場合)であっても問題はない

デメリット

  • 同コードを再実行した場合には2重に登録される。
  • appendってあまり使わない気がする。

デメリットの解決策

  • 実行前に存在確認を行う

コード

from pyspark.sql import functions as f
path = 's3://..../hoge/'

if not spark.read.parquet(path).filter(f.col('date') == '20190401').rdd.isEmpty():
    raise Exception('登録済みです : {}'.format('20190401'))

df = spark.createDataFrame(
  [
    ['d',4,'hogehoge','20190401']
  ],
  ['id','integer','string','date']
)
(
  df
  .write
  .mode('append')
  .partitionBy('date')
  .parquet(path)
)

overwriteの場合

メリット

  • 開発時に楽
    • 実行後に消したりする必要がない。
  • 挙動がわかりやすい?(慣れているだけか?)

デメリット

  • 読み込み時と書き込み時にPATHを意識(変更)する必要がある。
  • 複数日分(複数のpartitionにまたがる場合)では、filterしてそれぞれ保存する必要がある。
    • そんな処理はしたくないので、この場合はappend一択。

コード

from pyspark.sql import functions as f
import os
path = 's3://..../hoge/'

# partitionByを含めたPATHを作成
path = os.path.join(path, 'date=20190401')

df = spark.createDataFrame(
  [
    ['d',4,'hogehoge','20190401']
  ],
  ['id','integer','string','date']
)

# PATHで直接指定した場合でも、DataFrameにdateカラムが存在して問題ない ※
# (上位フォルダを指定して読みこんだ場合にはparitionに設定されている値が優先される)
(
  df
  .write
  .mode('overwrite')
#  .partitionBy('date') # partitionByは指定しない
  .parquet(path)
)

※partitionされている場合に、paritionカラムの値と保存値については別記事にします。