山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

PySparkではDataFrameのjoinでorderは維持されない

概要

  • PySparkのDataFrameではjoinした際にorderは維持されない
    • 正確にはshuffleが行われる
  • orderは出力直前に行うのが鉄則

再現コード

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        ['user_01'],
        ['user_02'],
        ['user_03'],
        ['user_04'],
        ['user_05'],
    ],
    ['id'],
)
df2 = spark.createDataFrame(
    [
        ['user_01', '1'],
        ['user_02', '2'],
        ['user_03', '3'],
        ['user_04', '4'],
        ['user_05', '5'],
    ],
    ['id', 'c1']
)
print('order byされているdf1')
df1 = df1.orderBy('id')
df1.show()

print('order byされているdf2')
df2 = df2.orderBy('id')
df2.show()

print('join後')
df1.join(df2, 'id').show()
order byされているdf1
+-------+
|     id|
+-------+
|user_01|
|user_02|
|user_03|
|user_04|
|user_05|
+-------+

order byされているdf2
+-------+---+
|     id| c1|
+-------+---+
|user_01|  1|
|user_02|  2|
|user_03|  3|
|user_04|  4|
|user_05|  5|
+-------+---+

join後
+-------+---+
|     id| c1|
+-------+---+
|user_03|  3|
|user_02|  2|
|user_01|  1|
|user_04|  4|
|user_05|  5|
+-------+---+

参考

PySparkのDataFrameでは同名のカラムが許容される

概要

  • PySparkのDataFrameでは同名のカラムが許容される
  • select などカラム名を指定する処理時に例外が発生する
  • カラム名を再定義、別名を付ける事で回避が可能

再現コード

作成時にカラム名が重複

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df99 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c1'],
)
df99.show()
+---+---+---+
| id| c1| c1|
+---+---+---+
|  1|  a|  b|
|  2| aa| bb|
+---+---+---+

※このパターンはあまりない気がする

joinした結果カラム目が重複

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
merged_df = df1.join(df2, 'id')

# 重複カラムの存在は可能
merged_df.show()

# 重複していないカラムのselectは可能
merged_df.select('id').show()

# 重複しているカラムのselectは例外発生
merged_df.select('id', 'c1')
+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

+---+
| id|
+---+
|  1|
|  2|
+---+

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-20-69f4fa3aa508> in <module>
     24 
     25 # 重複しているカラムのselectは例外発生
---> 26 merged_df.select('id', 'c1')

/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
   1419         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1420         """
-> 1421         jdf = self._jdf.select(self._jcols(*cols))
   1422         return DataFrame(jdf, self.sql_ctx)
   1423 

/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Reference 'c1' is ambiguous, could be: c1, c1.;

対応方法

toDFでカラムを指定して新しくDataFrameを作る

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
merged_df = df1.join(df2, 'id')

# カラムを指定して新しくDataFrameを作る
df88 = merged_df.toDF('id', 'df1_c1', 'df1_c2', 'df2_c1', 'df2_c2')
df88.show()
+---+------+------+------+------+
| id|df1_c1|df1_c2|df2_c1|df2_c2|
+---+------+------+------+------+
|  1|     a|     b|     x|     y|
|  2|    aa|    bb|    xx|    yy|
+---+------+------+------+------+

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF

aliasをつけてselectする

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
df1 = df1.alias('df1')
df2 = df2.alias('df2')
merged_df = df1.join(df2, 'id')

# showの見た目は変わらない
merged_df.show()

# alias付きで指定すると取得が可能
merged_df.select('id', 'df1.c1', 'df1.c2', 'df2.c1', 'df2.c2').show()
+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.Column.alias

fstringの中でdictionary、setの内包表記を使用する

結論

l = ['a', 'b', 'c']
s1 = f'{ {s:s for s in l} }'  # 中括弧の後にスペースが必要
assert s1 == "{'a': 'a', 'b': 'b', 'c': 'c'}"

s2 = f'{ {s for s in l} }'  # 中括弧の後にスペースが必要
assert s2 == "{'b', 'c', 'a'}"

※setの方は順番は保証されないので↑のassertは失敗する場合がある

詳細とか

fstring内でdictionaryやsetの内包表記を普通に記載すると内包表記の記載がそのまま文字列となる

l = ['a', 'b', 'c']
s3 = f'{{s:s for s in l}}'
assert s3 == "{s:s for s in l}"

s4 = f'{{s for s in l}}'
assert s4 == "{s for s in l}"

これは中括弧は中括弧でエスケープできるという仕様のため。pep-0489で定義されている。

www.python.org

で、どうすれば良いかというとスペースを入れることでエスケープを回避する。こちらについでも同じドキュメント内に記載がある。

www.python.org

Pythonのユニットテストでimportされている変数を上書きする

結論

  • 直接モジュールの変数を上書きすれば良い
  • ただし、importされた時点でそのモジュールの変数として扱われる事に注意

※文字で見ても良くわからないと思うので下記のコードを参照

ケース1(テスト対象に直接変数がimportされている場合)

テスト対象のコード群

# a.py
from define import HOGE
class TargetClass:
    def get_hoge(self):
        return f"aaa_{HOGE}_bbb" # ここのHOGEを入れ替えたい
# define.py
HOGE = 'hoge'

ユニットテスト

import a
from a import TargetClass

def test_hoge():
    a.HOGE = 'piyo' # モジュールをインポートしてモジュールの変数を直接入れ替える
    target_class = TargetClass()
    assert target_class.get_hoge() == 'aaa_piyo_bbb'

ケース2(テスト対象で使用されている別モジュールで直接変数がimportされている場合)

テスト対象のコード群

# a.py
from b import get_hoge
class TargetClass:
    def get_hoge2(self):
        return f"aaa_{get_hoge()}_bbb"
# b.py
from define import HOGE
def get_hoge():
    return HOGE # ここのHOGEを入れ替えたい
# define.py
HOGE = 'hoge'

ユニットテスト

import b
from a import TargetClass

def test_hoge2():
    b.HOGE = 'piyo' # モジュールをインポートしてモジュールの変数を直接入れ替える
    target_class = TargetClass()
    assert target_class.get_hoge2() == 'aaa_piyo_bbb'

※結局やっているのはケース1と同じで直接bをインポートして値を入れ替える

経緯とか

  • ↑に書いた通りのことをやりたかったが調べ方もわからずハマった
  • unittest.patch だと、Mockになってしまうので、違うそうじゃない感。

蛇足

  • ケース2のようにできるのであれば、定義の方を変更できそうに見えるが通常はうまくいかない
  • 下記のユニットテスト2のように、入れ替えたいモジュール(今回であれば b )が呼び出される前であればうまくいくが、一度呼び出すとモジュール内に直接定義されるようで、想定通りにならない

テスト対象コード(ケース2と同じ)

# a.py
from b import get_hoge
class TargetClass:
    def get_hoge2(self):
        return f"aaa_{get_hoge()}_bbb"
# b.py
from define import HOGE
def get_hoge():
    return HOGE # ここのHOGEを入れ替えたい
# define.py
HOGE = 'hoge'

ユニットテスト1(うまくいかないケース)

from a import TargetClass
import define

def test_hoge():
    define.HOGE = 'piyo' # 元々定義されている値を入れ替える
    target_class = TargetClass()
    assert target_class.get_hoge2() == 'aaa_piyo_bbb' # AssertionError

テストケースの from a import TargetClass ここで a.py が読まれて、from b import get_hoge が読まれて、b.pyfrom define import HOGE まで読まれる。 これにより、b内にHOGEが定義されてしまう。 これにより、テストケース内で define.HOGE を置き換えても、b.HOGE は既に定義されているため値が変更されない。

ユニットテスト2(うまくいくがイマイチなケース)

import define
define.HOGE = 'piyo' # テストモジュールの先頭でimportした直後に値を変更する
from a import TargetClass

def test_hoge():
    target_class = TargetClass()
    assert target_class.get_hoge2() == 'aaa_piyo_bbb'

うまく動作するには動作するが、linterに指摘されますし、実際に動かすときには1ファイルのテストだけではないと思われるので、先に実行される他のテストでbがimportされていたら結局同じ事が発生すると思われる。

GitHub ActionsでLOCALEがja_JP.UTF-8のPostgreSQLを使用する方法

結論

  • サービスコンテナでLOCALEに ja_JP.UTF-8 を設定することはできない
  • 自力でLOCALEを追加するイメージを作って、docker run で起動する

Dockerfile

FROM postgres:11.5
RUN localedef -i ja_JP -c -f UTF-8 -A /usr/share/locale/locale.alias ja_JP.UTF-8

GitHub Actionsのワークフロー設定

  • 正しい呼び名がわからず
name: postgresql

on: [push]

jobs:
  postgresql:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: PostgreSQL container build & run
        run: |
          docker build -t postgresql_i .github/workflows
          docker run -d --name postgresql_t -p 5432:5432 -e POSTGRES_PASSWORD=postgres -e LC_ALL=ja_JP.UTF-8 postgresql_i
      - name: wait for db
        run: until docker exec postgresql_t pg_isready -U postgres; do sleep 1; done
      - name: show locale
        run: docker exec postgresql_t psql -U postgres -c "SHOW LC_COLLATE;"

ファイル構成

└ .github
    └ workflows
        ├ Dockerfile
        └postgresql.yml
  • Dockerfileはどこに置くのが良いのかわからず
    • .github/actions みたいなフォルダ作ってそこに置くのが良い?

出力

Run docker exec postgresql_t psql -U postgres -c "SHOW LC_COLLATE;"
 lc_collate  
-------------
 ja_JP.UTF-8
(1 行)

詳細

  • PostgreSQLはデフォルトのLOCALE( en_US.utf8 )だと日本語のソートが想定外の動きをする場合がある
  • Linuxでは設定したいLOCALEである ja_JP.UTF-8 はインストールする必要がある
  • GitHub ActionsでPostgreSQLを使用する場合、サービスコンテナを使用するのが定石な様子
  • サービスコンテナ内で起動前にコマンド実行ができないようなので、LOCALEのインストールができない
  • 自前でimageを作る必要がある
    • ↑の例ではDockerfileをリポジトリ内に置いてbuildしているが、DockerHubなり、GitHub Container Registryなりに置いてそれを使っても良い

参考

PostgreSQLのデフォルトLOCALEでは日本語文字列のソート結果が想定と異なる

概要

  • PostgreSQLをデフォルトのまま使用すると日本語文字列のソート結果が想定と異なる
  • LOCALEを正しく設定することで修正可能
  • 文字列の並び順は LC_COLLATE で制御されるこの設定がデフォルトだと en_US.utf8 となっていることが原因

動作確認環境

対応方法

FROM postgres:11.5
RUN localedef -i ja_JP -c -f UTF-8 -A /usr/share/locale/locale.alias ja_JP.UTF-8
ENV TZ=Asia/Tokyo
ENV LANG=ja_JP.UTF-8
ENV LANGUAGE=ja_JP:ja
ENV LC_ALL=ja_JP.UTF-8

詳細及び再現手順

再現

起動

docker run -d --name postgres_1 postgres:11.5
docker exec -it postgres_1 psql -U postgres

SQL

create table hoge ( id int, value varchar(10));
insert into hoge values
(1, ''),
(2, 'あ(ほげ)'),
(3, ''),
(4, 'い(ふが)')
;
select * from hoge order by value;

結果

postgres=# select * from hoge order by value;
 id |   value    
----+------------
  1 | あ
  3 | い
  2 | あ(ほげ)
  4 | い(ふが)
(4 rows)

LOCALEの確認

postgres=# SHOW LC_COLLATE;
 lc_collate 
------------
 en_US.utf8
(1 row)

対応

Dockerfile

FROM postgres:11.5
RUN localedef -i ja_JP -c -f UTF-8 -A /usr/share/locale/locale.alias ja_JP.UTF-8
ENV TZ=Asia/Tokyo
ENV LANG=ja_JP.UTF-8
ENV LANGUAGE=ja_JP:ja
ENV LC_ALL=ja_JP.UTF-8

起動

docker build . -t yamap55/postgres:11.5_jajp
docker run -d --name postgres_2 yamap55/postgres:11.5_jajp
docker exec -it postgres_2 psql -U postgres

SQL

create table hoge ( id int, value varchar(10));
insert into hoge values
(1, ''),
(2, 'あ(ほげ)'),
(3, ''),
(4, 'い(ふが)')
;
select * from hoge order by value;

結果

postgres=# select * from hoge order by value;
 id |   value    
----+------------
  1 | あ
  2 | あ(ほげ)
  3 | い
  4 | い(ふが)
(4 行)

LOCALEの確認

postgres=# SHOW LC_COLLATE;
 lc_collate  
-------------
 ja_JP.UTF-8
(1 行)

参考

背景とか蛇足とか

MySQLでも試した

起動、接続

docker run --name mysql_1 -e MYSQL_ROOT_PASSWORD=mysql -d mysql
docker exec -it mysql_1 mysql -u root -p -h 127.0.0.1 -D mysql -pmysql

結果

mysql> create table hoge ( id int, value varchar(10));
Query OK, 0 rows affected (0.04 sec)

mysql> insert into hoge values
    -> (1, ''),
    -> (2, ''),
    -> (3, ''),
    -> (4, '')
    -> ;
Query OK, 4 rows affected (0.01 sec)
Records: 4  Duplicates: 0  Warnings: 0

mysql> select * from hoge;
+------+-------+
| id   | value |
+------+-------+
|    1 |       |
|    2 |       |
|    3 |       |
|    4 |       |
+------+-------+
4 rows in set (0.00 sec)

mysql> 

AtCoder用Python環境を作った

概要

最近AtCoderを始めたのでPython用の環境を作りました。 割といい感じにできた気がしましたのでTemplate Repositoryにしています。これから始める方、AtCoder上で書いている方などは是非試してみてください。

一方、まだ私自身が使い込んでいないので、既に環境ある方やAtCoderをある程度やり込んでいる方には不満があるかもしれません。 何かありましたら IssueTwitter などでご連絡頂けたら助かります。

github.com

導入ツール

  • devcontainer
    • VSCode, Git, DockerさえあればOK
  • black
    • 自動フォーマット
  • online-judge-tools
    • VSCode上でテストを実行
  • atcoder-cli
  • atcoder-python-snippets
    • AtCoder用のスニペット
    • あまり洗練されていないので、独自の関数をテンプレートファイルに記載しています
    • 独自のスニペットを追加したい場合には本記事末尾のQAを参照

環境作成(ローカルのVSCodeの場合)

※この文章でわからなければ README を参照してください。

環境作成(GitHub Codespacesの場合)

f:id:yamap_55:20210116171613p:plain
Open with Codespaces
f:id:yamap_55:20210116171759p:plain
New codespaces

※特にCodespaces用の設定はしていないので不具合があったら教えてください

回答までの流れ

VSCode内のターミナルでコマンドを実行します

  1. ログイン
    • acc login
    • oj login https://atcoder.jp/
  2. contestID を取得
    • https://atcoder.jp/contests/abs の場合、 abs
  3. ディレクトリ作成(問題を選択)
    • acc new ${contestID}
    • 例: acc new abs
  4. 回答する問題のディレクトリに移動
    • cd {contestID}/{問題}
  5. 回答を作成
    • main.py に回答を記載
  6. 回答をテスト
    • ojt
    • oj t -c "python main.py" と同じ
  7. 回答を提出
    • acc submit
    • acc s と同じ

使用例

acc new abs
cd abs/practicea/
vi main.py
ojt
acc s

QA

テンプレートファイルを変更したい

templates/py/main.py を変更してください

独自のスニペットを追加したい

  • .vscode/python.code-snippets を作成し、以下のように記載する
{
  "input number": {
    "prefix": ["input-n"],
    "body": ["import sys\nn = int(sys.stdin.buffer.readline())"],
    "description": "input number."
  }
}