山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

GitHubのマージ済のブランチをGitHub Actionsで定期的に削除する

追記(2019/10/28)

GitHubリポジトリ設定にマージ完了後にブランチを削除する設定がありました。(2019/07/31に追加されたらしいです。)よって、GitHub Actionsなんて使用せずにリポジトリの設定を変更しましょう!

help.github.com


はじめに

プルリク後のブランチ削除忘れってありますよね。気づいたらマージ済みのブランチが山のようにあったり。

で、今までは「GitHubのマージ済のブランチをCircleCIで定期的に削除する」を参考にCircleCIを使用していたのですが、GitHub Actionsでできるのでは?と思ったので調査したメモです。

面倒な設定やGitHub連携なども不要なので、プルリクを使うリポジトリには必ず入れる位でも良いかも?

一言で

branch-cleanup-action」を見てください

手順とか

  • GitHub Actionsを有効にする
  • リポジトリのメニューに「Actions」が表示されていることを確認
  • Actions → Set up a workflow yourselftをクリック
    • 既にGitHub Actionsで何か動いている場合には「Actions → New workflow → Set up a workflow yourselft」
  • ファイル名を「pull_request-branch_cleanup.yml」に変更(main.ymlでも問題はないが。。。)
  • コードを以下のように設定
on:
  pull_request:
    types: [closed]
name: Branch Cleanup
jobs:
  build:
    name: branch-cleanup-action
    runs-on: ubuntu-latest
    steps:
    - name: branch-cleanup
      uses: jessfraz/branch-cleanup-action@master
      env:
        GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
  • Start commit → コミットメッセージ入力 → Commit new file

※この設定自体をプルリクにしたい場合には、「 .github/workflows/pull_request-branch_cleanup.yml 」に上記の設定書いてプルリクすればよろし。

参考URL

setuptoolsがsetup.cfgを読んでくれない

現象

Pythonでsetuptoolsを使用してwheelファイルを作成しようとしたが、自動的に読んでくれるはずの「setup.cfg」を読んでくれない。

※setup.pyで「setup(name='hoge')」などと指定すると正しく動作する
※正確には、license_fileなどで存在しないファイルを指定するとエラーになるので読んでくれているが、wheelファイル作成時に使用してくれない。

結論

setuptoolsのバージョンが低い

※setuptools 30.3.0から読んでくれるようになったらしい

確認

pip list
pip list -o # 更新があるライブラリのみ

アップデート

pip install --upgrade setuptools

何故発生したか

  • setuptoolsはPythonに付随してくるので、最新版のPythonを使用すればこの事象は発生しない。
    • 意図的にバージョン下げれば発生する?(動かない気もするが。)
  • 今回はvenvでPython3.5.2を使用していたため、3.5.2に付随したsetuptoolsのバージョンが仮想環境に入り、本事象が発生した。
    • 正確にはpipに依存するsetuptoolsか?

今後の対応

最初に環境を作る際に以下のようにアップグレードを行う!

mkdir hoge
cd hoge
pyenv local 3.5.2
python -m venv .venv
.venv\Scripts\activate.bat
python -m pip install --upgrade pip setuptools

Windowsでpyenv(pyenv-win)

pyenvを使おうと思って公式見たらpyenv-win使えと書いてあった。

If you're on Windows, consider using @kirankotari's pyenv-win fork. (pyenv does not work on windows outside the Windows Subsystem for Linux)

使い方も含めてメモ

pyenvとは

  • Pythonのバージョンを切り替えるツール
    • それ以外の事をしないというのが混乱しないためのポイント。
  • ライブラリ管理的な事もできるようだが、そちらはPython公式がサポートしているvenvを使用する
    • この記事の最後に書いた。

前提

  • python(pip)がインストール済み

インストール

詳細はpyenv-winのGitHubをみてください。

  • install
pip install pyenv-win --target %USERPROFILE%/.pyenv
  • PATHに追加
    • 既存のpythonよりも前に追加する必要がある
      • 管理者権限を持っておらず、管理者権限のPATHにpythonがあるPATHが設定してあるとpyenv云々の前に使用するpythonが決まってしまう。
      • ↑この場合、打つ手はなさそう。
    • %USERPROFILE%\.pyenv\pyenv-win\bin
    • %USERPROFILE%\.pyenv\pyenv-win\shims
      • この時点だと「shims」は空
    • %USERPROFILE%/.pyenv 」は「 C:\Users\yamap_55\.pyenv 」となる。
  • 別のコマンドプロンプト起動
    • セッションを切り替えないと環境変数が有効とならないため
  • PATHが通っていることを確認
    • pyenv --version
  • 設定
    • pyenv rehash
    • バージョンが切り替わらないなど何かあったらrehash

使い方

  • インストール可能なバージョン確認
pyenv install --list
pyenv install 3.5.2
  • pythonのバージョン切り替え
pyenv local 3.5.2

どうやって切り替えている?

  • pythonが呼ばれる
  • PATHを辿る
  • shims下にpython.batを見つける
    • shimsの前にpythonがあるとpyenvは起動せず、バージョンが切り替わらない
    • shims内が空だとpyenvは起動せず、バージョンが切り替わらない
  • python.batからpyenvが呼ばれる
  • 「.python-version」から指定されたpythonのバージョンを調べ、pythonが実行される
    • ここはコード見ていないので想定。

メモとか

  • pyenvがある状態での仮想環境作成
mkdir project
cd project
pyenv install 3.5.2
pyenv local 3.5.2
python -m venv .venv
.venv\Scripts\activate

JDK 1.8.0_221を管理者権限がないWindowsにインストールする

今時Java8?管理者権限がないとかどういうこと?とか色々言いたい事はありますが、やらなきゃいけない事もあるのです。

同内容の記事がWeb上で見受けられますが、落とし穴いっぱいなので改めて記事にしています。

手順

  • jdkを取得
  • 7zipインストール
    • 管理者権限が必要www(矛盾)
    • ↓の注意点に記載しています。
  • jdk-8u221-windows-x64.exe」を解凍
    • ↓「x」は「-」は付けない、「-o」の後はスペースなし
    • C:\work\20190911\7zip\7z.exe x C:\work\20190911\a\jdk-8u221-windows-x64.exe -oC:\work\20190911\a\o
  • 111ファイルからtool.zip(JDKの実体)を取得
    • バージョンによってはexe展開後にtool.zipが存在することもあるようです。
    • 111は cab ファイルなので展開が可能。
    • extrac32 C:\work\20190911\a\o\.rsrc\1033\JAVA_CAB10\111
  • tool.zipを展開
    • zipの展開方法は省略
  • jarが.packファイルとなっているらしいので、jarに変換
    • cd C:\work\20190911\tools
    • for /r %x in (*.pack) do .\bin\unpack200 -r "%x" "%~dx%~px%~nx.jar"
  • 確認
    • C:\work\20190911\tools\bin\java -version
  • toolsフォルダを任意のPATHに移動
    • move tools C:\tools\java\jdk-8u2212

※PATHやJAVA_HOMEの設定は省略(既にJavaがインストールされており、システム環境変数のPATHに設定されていると、ユーザ環境変数では上書きする事はできないので注意)

注意点

  • 7zipがインストールできない
  • jdkのexeを展開後に111が展開できない
    • 111はzipではなく cab です。
      • extrac32を利用して展開が可能です。
    • よく見るとディレクトリ名に書いてありますが、参考サイトをみるとzipとなっている所もあります。。。
  • jdkのexeを解凍後にtool.zipがない
    • tool.zipがあるかどうかはjdkのバージョンによって異なるようです。

参考

databricksでcreate tableを行った際にParquet does not support timestamp. See HIVE-6384が発生する

エラー詳細

stacktraceは長いので割愛

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException:
org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.UnsupportedOperationException:
Parquet does not support timestamp. See HIVE-6384;

発生事象

databricksで下記のcreate table文を実行した際に上記のエラーが発生する。

%sql
create external table if not exists
  yamap55.hoge(
    HOGE_ID string comment 'HOGE_ID',
    HOGE_TIMESTAMP timestamp comment 'HOGE_TIMESTAMP'
  )
  comment
    'hoge'
  partitioned by (
    TARGET_DATE string comment 'TARGET_DATE'
  )
  stored as
    parquet
  location
    's3a://aaaaa/bbbbbb/data/hoge'
  tblproperties
    (
      'parquet.compress'='SNAPPY'
    )
;

再現する最小構成↓

%sql
create table if not exists
  yamap55.hoge(
    HOGE_TIMESTAMP timestamp comment 'HOGE_TIMESTAMP'
  )
  stored as
    parquet
;

回避方法

詳細は長くなるので先に回避方法を記載。

  • 通常のcreate table文を使用する事で回避が可能です。

具体的には以下

%sql
create table if not exists
  yamap55.hoge(
    HOGE_ID string comment 'HOGE_ID',
    HOGE_TIMESTAMP timestamp comment 'HOGE_TIMESTAMP',
    TARGET_DATE string comment 'TARGET_DATE'
  )
  using
    parquet
  comment
    'hoge'
  partitioned by (
    TARGET_DATE
  )
  location
    's3a://aaaaa/bbbbbb/data/hoge'
  tblproperties
    (
      'parquet.compress'='SNAPPY'
    )
;

注意事項

  • hive形式との差分が多いので構文に注意(下記は一例)
    • 「external」の記載は不要(詳細は下記「通常版のcreate tableを実行する事による懸念とか」参照)
    • column定義にpartitionで使用する列も記載
    • 「stored as」 → 「using」
    • 「using」の記載順
  • 外部テーブル指定(external)がないが、「location」を指定する事で暗黙的に外部テーブルとして扱われる。
    • テーブルをdropしてもS3にデータは残る

原因とか詳細の調査

細かい事はよくわからなかった。誰か教えてください。

以下は推測とか。

通常版のcreate tableを実行する事による懸念とか

  • 「external」が通常版の構文にはないが、外部テーブルを使用したい
    • locationを指定する事で暗黙的に外部テーブルとして扱われるため問題ない。(tableのdropなどで試した)

参考URL

直接は関係なかったけどメモ的なURL

PySparkで日付毎にデータを蓄積する際のdf.write.modeについて

概要

PySparkでpartitionByで日付毎に分けてデータを保存している場合、どのように追記していけば良いのか。

先にまとめ

appendの方がメリットは多いが、チェック忘れると重複登録されるデメリットが怖い。
とはいえ、overwriteも他のデータ消えるデメリットも怖いので、一長一短か。

説明用コード

path = 's3://..../hoge/'
df = spark.createDataFrame(
  [
    ['a',1,'hoge','20190101'], ['b',2,'huga','20190201'], ['c',3,'piyo','20190301']
  ],
  ['id','integer','string','date']
)
(
  df
  .write
  .mode('overwrite')
  .partitionBy('date')
  .parquet(path)
)

前提

  • partitionByを指定してwriteを行うと、指定したPATHの下に「partitionByで指定したカラム名=値」というディレクトリが設定されて保存される。
    • 上記のコードでいうと「's3://..../hoge/date=20190101/'」の下にparquetが作成される。(dateカラムは保存されない)
  • modeをappendでwriteを行うと、追記で保存される。
    • 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
    • 追記なので重複は考慮されない
      • 上記のコード実行後にdateが20190101 のみ のdataframeを保存すると、date=20190101のデータが重複して保存される
  • modeをoverwriteでwriteを行うと、上書きで保存される。
    • 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
      • dateが20190101などのデータは削除される

詳細と具体例

日付毎にデータが蓄積されており、1月から3月までデータが蓄積されているとする。 その場合に、4月のみのDataframeがある場合に、どのようにwriteするとうまく保存できるか。

っというか、modeはappendとoverwriteどちらが良いか。

appendの場合

メリット

  • 特に何も気にせずwriteすれば良し
  • 複数日分(複数のpartitionにまたがる場合)であっても問題はない

デメリット

  • 同コードを再実行した場合には2重に登録される。
  • appendってあまり使わない気がする。

デメリットの解決策

  • 実行前に存在確認を行う

コード

from pyspark.sql import functions as f
path = 's3://..../hoge/'

if not spark.read.parquet(path).filter(f.col('date') == '20190401').rdd.isEmpty():
    raise Exception('登録済みです : {}'.format('20190401'))

df = spark.createDataFrame(
  [
    ['d',4,'hogehoge','20190401']
  ],
  ['id','integer','string','date']
)
(
  df
  .write
  .mode('append')
  .partitionBy('date')
  .parquet(path)
)

overwriteの場合

メリット

  • 開発時に楽
    • 実行後に消したりする必要がない。
  • 挙動がわかりやすい?(慣れているだけか?)

デメリット

  • 読み込み時と書き込み時にPATHを意識(変更)する必要がある。
  • 複数日分(複数のpartitionにまたがる場合)では、filterしてそれぞれ保存する必要がある。
    • そんな処理はしたくないので、この場合はappend一択。

コード

from pyspark.sql import functions as f
import os
path = 's3://..../hoge/'

# partitionByを含めたPATHを作成
path = os.path.join(path, 'date=20190401')

df = spark.createDataFrame(
  [
    ['d',4,'hogehoge','20190401']
  ],
  ['id','integer','string','date']
)

# PATHで直接指定した場合でも、DataFrameにdateカラムが存在して問題ない ※
# (上位フォルダを指定して読みこんだ場合にはparitionに設定されている値が優先される)
(
  df
  .write
  .mode('overwrite')
#  .partitionBy('date') # partitionByは指定しない
  .parquet(path)
)

※partitionされている場合に、paritionカラムの値と保存値については別記事にします。

PySparkで特定のカラムが全体の最大値であるレコードを取得する

概要

実現はできてはいたものの、もっと良いやり方ないかな?と聞いたら教えてもらったのでメモ。

うまく説明できないのでデータを記載します。

処理前

+----+------+
|name|  date|
+----+------+
|   a|201906|
|   a|201907|
|   b|201906|
|   b|201907|
|   c|201907|
+----+------+

処理後

+----+------+
|name|  date|
+----+------+
|   a|201907|
|   b|201907|
|   c|201907|
+----+------+

教えてもらった方法

from pyspark.sql import functions as f
from pyspark.sql.window import Window as w

df = spark.createDataFrame(
  [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']],
  ['name', 'date']
)
df.show()

result_df = (
  df
    .withColumn('max_date', f.max('date').over(w.partitionBy()))
    .filter(f.col('date') == f.col('max_date'))
    .drop('max_date')
)
result_df.show()

試行錯誤の内容もメモ

当初書いたコードや、途中のコードもメモ

from pyspark.sql import functions as f
from pyspark.sql.window import Window as w

df = spark.createDataFrame(
  [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']],
  ['name', 'date']
)
df.show()

# 1行ではあるが、一旦actionが走るので遅い気がする。
# また、filterの中でdfを使っているので、dfが定義されている必要がある。(読み込みからメソッドチェーンで繋げない)
result_df_2 = df.filter(f.col('date') == df.agg(f.max('date')).first()[0])
result_df_2.show()

# 同じwindow関数だったら最大を取るという意図からしてmax使った方がわかりやすい
result_df_3 = (
  df
    .withColumn('rank', f.rank().over(w.partitionBy().orderBy(f.col('date').desc())))
    .filter(f.col('rank') == 1)
    .drop('rank')
)