PySparkで日付毎にデータを蓄積する際のdf.write.modeについて
概要
PySparkでpartitionByで日付毎に分けてデータを保存している場合、どのように追記していけば良いのか。
先にまとめ
appendの方がメリットは多いが、チェック忘れると重複登録されるデメリットが怖い。
とはいえ、overwriteも他のデータ消えるデメリットも怖いので、一長一短か。
説明用コード
path = 's3://..../hoge/' df = spark.createDataFrame( [ ['a',1,'hoge','20190101'], ['b',2,'huga','20190201'], ['c',3,'piyo','20190301'] ], ['id','integer','string','date'] ) ( df .write .mode('overwrite') .partitionBy('date') .parquet(path) )
前提
- partitionByを指定してwriteを行うと、指定したPATHの下に「partitionByで指定したカラム名=値」というディレクトリが設定されて保存される。
- 上記のコードでいうと「's3://..../hoge/date=20190101/'」の下にparquetが作成される。(dateカラムは保存されない)
- modeをappendでwriteを行うと、追記で保存される。
- 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
- 追記なので重複は考慮されない
- 上記のコード実行後にdateが20190101 のみ のdataframeを保存すると、date=20190101のデータが重複して保存される
- modeをoverwriteでwriteを行うと、上書きで保存される。
- 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
- dateが20190101などのデータは削除される
- 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
詳細と具体例
日付毎にデータが蓄積されており、1月から3月までデータが蓄積されているとする。 その場合に、4月のみのDataframeがある場合に、どのようにwriteするとうまく保存できるか。
っというか、modeはappendとoverwriteどちらが良いか。
appendの場合
メリット
- 特に何も気にせずwriteすれば良し
- 複数日分(複数のpartitionにまたがる場合)であっても問題はない
デメリット
- 同コードを再実行した場合には2重に登録される。
- appendってあまり使わない気がする。
デメリットの解決策
- 実行前に存在確認を行う
コード
from pyspark.sql import functions as f path = 's3://..../hoge/' if not spark.read.parquet(path).filter(f.col('date') == '20190401').rdd.isEmpty(): raise Exception('登録済みです : {}'.format('20190401')) df = spark.createDataFrame( [ ['d',4,'hogehoge','20190401'] ], ['id','integer','string','date'] ) ( df .write .mode('append') .partitionBy('date') .parquet(path) )
overwriteの場合
メリット
- 開発時に楽
- 実行後に消したりする必要がない。
- 挙動がわかりやすい?(慣れているだけか?)
デメリット
- 読み込み時と書き込み時にPATHを意識(変更)する必要がある。
- 複数日分(複数のpartitionにまたがる場合)では、filterしてそれぞれ保存する必要がある。
- そんな処理はしたくないので、この場合はappend一択。
コード
from pyspark.sql import functions as f import os path = 's3://..../hoge/' # partitionByを含めたPATHを作成 path = os.path.join(path, 'date=20190401') df = spark.createDataFrame( [ ['d',4,'hogehoge','20190401'] ], ['id','integer','string','date'] ) # PATHで直接指定した場合でも、DataFrameにdateカラムが存在して問題ない ※ # (上位フォルダを指定して読みこんだ場合にはparitionに設定されている値が優先される) ( df .write .mode('overwrite') # .partitionBy('date') # partitionByは指定しない .parquet(path) )
※partitionされている場合に、paritionカラムの値と保存値については別記事にします。