山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

PySparkのDataFrameでは同名のカラムが許容される

概要

  • PySparkのDataFrameでは同名のカラムが許容される
  • select などカラム名を指定する処理時に例外が発生する
  • カラム名を再定義、別名を付ける事で回避が可能

再現コード

作成時にカラム名が重複

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df99 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c1'],
)
df99.show()
+---+---+---+
| id| c1| c1|
+---+---+---+
|  1|  a|  b|
|  2| aa| bb|
+---+---+---+

※このパターンはあまりない気がする

joinした結果カラム目が重複

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
merged_df = df1.join(df2, 'id')

# 重複カラムの存在は可能
merged_df.show()

# 重複していないカラムのselectは可能
merged_df.select('id').show()

# 重複しているカラムのselectは例外発生
merged_df.select('id', 'c1')
+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

+---+
| id|
+---+
|  1|
|  2|
+---+

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-20-69f4fa3aa508> in <module>
     24 
     25 # 重複しているカラムのselectは例外発生
---> 26 merged_df.select('id', 'c1')

/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
   1419         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1420         """
-> 1421         jdf = self._jdf.select(self._jcols(*cols))
   1422         return DataFrame(jdf, self.sql_ctx)
   1423 

/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Reference 'c1' is ambiguous, could be: c1, c1.;

対応方法

toDFでカラムを指定して新しくDataFrameを作る

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
merged_df = df1.join(df2, 'id')

# カラムを指定して新しくDataFrameを作る
df88 = merged_df.toDF('id', 'df1_c1', 'df1_c2', 'df2_c1', 'df2_c2')
df88.show()
+---+------+------+------+------+
| id|df1_c1|df1_c2|df2_c1|df2_c2|
+---+------+------+------+------+
|  1|     a|     b|     x|     y|
|  2|    aa|    bb|    xx|    yy|
+---+------+------+------+------+

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF

aliasをつけてselectする

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
df1 = df1.alias('df1')
df2 = df2.alias('df2')
merged_df = df1.join(df2, 'id')

# showの見た目は変わらない
merged_df.show()

# alias付きで指定すると取得が可能
merged_df.select('id', 'df1.c1', 'df1.c2', 'df2.c1', 'df2.c2').show()
+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.Column.alias

fstringの中でdictionary、setの内包表記を使用する

結論

l = ['a', 'b', 'c']
s1 = f'{ {s:s for s in l} }'  # 中括弧の後にスペースが必要
assert s1 == "{'a': 'a', 'b': 'b', 'c': 'c'}"

s2 = f'{ {s for s in l} }'  # 中括弧の後にスペースが必要
assert s2 == "{'b', 'c', 'a'}"

※setの方は順番は保証されないので↑のassertは失敗する場合がある

詳細とか

fstring内でdictionaryやsetの内包表記を普通に記載すると内包表記の記載がそのまま文字列となる

l = ['a', 'b', 'c']
s3 = f'{{s:s for s in l}}'
assert s3 == "{s:s for s in l}"

s4 = f'{{s for s in l}}'
assert s4 == "{s for s in l}"

これは中括弧は中括弧でエスケープできるという仕様のため。pep-0489で定義されている。

www.python.org

で、どうすれば良いかというとスペースを入れることでエスケープを回避する。こちらについでも同じドキュメント内に記載がある。

www.python.org

Pythonのユニットテストでimportされている変数を上書きする

結論

  • 直接モジュールの変数を上書きすれば良い
  • ただし、importされた時点でそのモジュールの変数として扱われる事に注意

※文字で見ても良くわからないと思うので下記のコードを参照

ケース1(テスト対象に直接変数がimportされている場合)

テスト対象のコード群

# a.py
from define import HOGE
class TargetClass:
    def get_hoge(self):
        return f"aaa_{HOGE}_bbb" # ここのHOGEを入れ替えたい
# define.py
HOGE = 'hoge'

ユニットテスト

import a
from a import TargetClass

def test_hoge():
    a.HOGE = 'piyo' # モジュールをインポートしてモジュールの変数を直接入れ替える
    target_class = TargetClass()
    assert target_class.get_hoge() == 'aaa_piyo_bbb'

ケース2(テスト対象で使用されている別モジュールで直接変数がimportされている場合)

テスト対象のコード群

# a.py
from b import get_hoge
class TargetClass:
    def get_hoge2(self):
        return f"aaa_{get_hoge()}_bbb"
# b.py
from define import HOGE
def get_hoge():
    return HOGE # ここのHOGEを入れ替えたい
# define.py
HOGE = 'hoge'

ユニットテスト

import b
from a import TargetClass

def test_hoge2():
    b.HOGE = 'piyo' # モジュールをインポートしてモジュールの変数を直接入れ替える
    target_class = TargetClass()
    assert target_class.get_hoge2() == 'aaa_piyo_bbb'

※結局やっているのはケース1と同じで直接bをインポートして値を入れ替える

経緯とか

  • ↑に書いた通りのことをやりたかったが調べ方もわからずハマった
  • unittest.patch だと、Mockになってしまうので、違うそうじゃない感。

蛇足

  • ケース2のようにできるのであれば、定義の方を変更できそうに見えるが通常はうまくいかない
  • 下記のユニットテスト2のように、入れ替えたいモジュール(今回であれば b )が呼び出される前であればうまくいくが、一度呼び出すとモジュール内に直接定義されるようで、想定通りにならない

テスト対象コード(ケース2と同じ)

# a.py
from b import get_hoge
class TargetClass:
    def get_hoge2(self):
        return f"aaa_{get_hoge()}_bbb"
# b.py
from define import HOGE
def get_hoge():
    return HOGE # ここのHOGEを入れ替えたい
# define.py
HOGE = 'hoge'

ユニットテスト1(うまくいかないケース)

from a import TargetClass
import define

def test_hoge():
    define.HOGE = 'piyo' # 元々定義されている値を入れ替える
    target_class = TargetClass()
    assert target_class.get_hoge2() == 'aaa_piyo_bbb' # AssertionError

テストケースの from a import TargetClass ここで a.py が読まれて、from b import get_hoge が読まれて、b.pyfrom define import HOGE まで読まれる。 これにより、b内にHOGEが定義されてしまう。 これにより、テストケース内で define.HOGE を置き換えても、b.HOGE は既に定義されているため値が変更されない。

ユニットテスト2(うまくいくがイマイチなケース)

import define
define.HOGE = 'piyo' # テストモジュールの先頭でimportした直後に値を変更する
from a import TargetClass

def test_hoge():
    target_class = TargetClass()
    assert target_class.get_hoge2() == 'aaa_piyo_bbb'

うまく動作するには動作するが、linterに指摘されますし、実際に動かすときには1ファイルのテストだけではないと思われるので、先に実行される他のテストでbがimportされていたら結局同じ事が発生すると思われる。

GitHub ActionsでLOCALEがja_JP.UTF-8のPostgreSQLを使用する方法

結論

  • サービスコンテナでLOCALEに ja_JP.UTF-8 を設定することはできない
  • 自力でLOCALEを追加するイメージを作って、docker run で起動する

Dockerfile

FROM postgres:11.5
RUN localedef -i ja_JP -c -f UTF-8 -A /usr/share/locale/locale.alias ja_JP.UTF-8

GitHub Actionsのワークフロー設定

  • 正しい呼び名がわからず
name: postgresql

on: [push]

jobs:
  postgresql:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: PostgreSQL container build & run
        run: |
          docker build -t postgresql_i .github/workflows
          docker run -d --name postgresql_t -p 5432:5432 -e POSTGRES_PASSWORD=postgres -e LC_ALL=ja_JP.UTF-8 postgresql_i
      - name: wait for db
        run: until docker exec postgresql_t pg_isready -U postgres; do sleep 1; done
      - name: show locale
        run: docker exec postgresql_t psql -U postgres -c "SHOW LC_COLLATE;"

ファイル構成

└ .github
    └ workflows
        ├ Dockerfile
        └postgresql.yml
  • Dockerfileはどこに置くのが良いのかわからず
    • .github/actions みたいなフォルダ作ってそこに置くのが良い?

出力

Run docker exec postgresql_t psql -U postgres -c "SHOW LC_COLLATE;"
 lc_collate  
-------------
 ja_JP.UTF-8
(1 行)

詳細

  • PostgreSQLはデフォルトのLOCALE( en_US.utf8 )だと日本語のソートが想定外の動きをする場合がある
  • Linuxでは設定したいLOCALEである ja_JP.UTF-8 はインストールする必要がある
  • GitHub ActionsでPostgreSQLを使用する場合、サービスコンテナを使用するのが定石な様子
  • サービスコンテナ内で起動前にコマンド実行ができないようなので、LOCALEのインストールができない
  • 自前でimageを作る必要がある
    • ↑の例ではDockerfileをリポジトリ内に置いてbuildしているが、DockerHubなり、GitHub Container Registryなりに置いてそれを使っても良い

参考

PostgreSQLのデフォルトLOCALEでは日本語文字列のソート結果が想定と異なる

概要

  • PostgreSQLをデフォルトのまま使用すると日本語文字列のソート結果が想定と異なる
  • LOCALEを正しく設定することで修正可能
  • 文字列の並び順は LC_COLLATE で制御されるこの設定がデフォルトだと en_US.utf8 となっていることが原因

動作確認環境

対応方法

FROM postgres:11.5
RUN localedef -i ja_JP -c -f UTF-8 -A /usr/share/locale/locale.alias ja_JP.UTF-8
ENV TZ=Asia/Tokyo
ENV LANG=ja_JP.UTF-8
ENV LANGUAGE=ja_JP:ja
ENV LC_ALL=ja_JP.UTF-8

詳細及び再現手順

再現

起動

docker run -d --name postgres_1 postgres:11.5
docker exec -it postgres_1 psql -U postgres

SQL

create table hoge ( id int, value varchar(10));
insert into hoge values
(1, ''),
(2, 'あ(ほげ)'),
(3, ''),
(4, 'い(ふが)')
;
select * from hoge order by value;

結果

postgres=# select * from hoge order by value;
 id |   value    
----+------------
  1 | あ
  3 | い
  2 | あ(ほげ)
  4 | い(ふが)
(4 rows)

LOCALEの確認

postgres=# SHOW LC_COLLATE;
 lc_collate 
------------
 en_US.utf8
(1 row)

対応

Dockerfile

FROM postgres:11.5
RUN localedef -i ja_JP -c -f UTF-8 -A /usr/share/locale/locale.alias ja_JP.UTF-8
ENV TZ=Asia/Tokyo
ENV LANG=ja_JP.UTF-8
ENV LANGUAGE=ja_JP:ja
ENV LC_ALL=ja_JP.UTF-8

起動

docker build . -t yamap55/postgres:11.5_jajp
docker run -d --name postgres_2 yamap55/postgres:11.5_jajp
docker exec -it postgres_2 psql -U postgres

SQL

create table hoge ( id int, value varchar(10));
insert into hoge values
(1, ''),
(2, 'あ(ほげ)'),
(3, ''),
(4, 'い(ふが)')
;
select * from hoge order by value;

結果

postgres=# select * from hoge order by value;
 id |   value    
----+------------
  1 | あ
  2 | あ(ほげ)
  3 | い
  4 | い(ふが)
(4 行)

LOCALEの確認

postgres=# SHOW LC_COLLATE;
 lc_collate  
-------------
 ja_JP.UTF-8
(1 行)

参考

背景とか蛇足とか

MySQLでも試した

起動、接続

docker run --name mysql_1 -e MYSQL_ROOT_PASSWORD=mysql -d mysql
docker exec -it mysql_1 mysql -u root -p -h 127.0.0.1 -D mysql -pmysql

結果

mysql> create table hoge ( id int, value varchar(10));
Query OK, 0 rows affected (0.04 sec)

mysql> insert into hoge values
    -> (1, ''),
    -> (2, ''),
    -> (3, ''),
    -> (4, '')
    -> ;
Query OK, 4 rows affected (0.01 sec)
Records: 4  Duplicates: 0  Warnings: 0

mysql> select * from hoge;
+------+-------+
| id   | value |
+------+-------+
|    1 |       |
|    2 |       |
|    3 |       |
|    4 |       |
+------+-------+
4 rows in set (0.00 sec)

mysql> 

AtCoder用Python環境を作った

概要

最近AtCoderを始めたのでPython用の環境を作りました。 割といい感じにできた気がしましたのでTemplate Repositoryにしています。これから始める方、AtCoder上で書いている方などは是非試してみてください。

一方、まだ私自身が使い込んでいないので、既に環境ある方やAtCoderをある程度やり込んでいる方には不満があるかもしれません。 何かありましたら IssueTwitter などでご連絡頂けたら助かります。

github.com

導入ツール

  • devcontainer
    • VSCode, Git, DockerさえあればOK
  • black
    • 自動フォーマット
  • online-judge-tools
    • VSCode上でテストを実行
  • atcoder-cli
  • atcoder-python-snippets
    • AtCoder用のスニペット
    • あまり洗練されていないので、独自の関数をテンプレートファイルに記載しています
    • 独自のスニペットを追加したい場合には本記事末尾のQAを参照

環境作成(ローカルのVSCodeの場合)

※この文章でわからなければ README を参照してください。

環境作成(GitHub Codespacesの場合)

f:id:yamap_55:20210116171613p:plain
Open with Codespaces
f:id:yamap_55:20210116171759p:plain
New codespaces

※特にCodespaces用の設定はしていないので不具合があったら教えてください

回答までの流れ

VSCode内のターミナルでコマンドを実行します

  1. ログイン
    • acc login
    • oj login https://atcoder.jp/
  2. contestID を取得
    • https://atcoder.jp/contests/abs の場合、 abs
  3. ディレクトリ作成(問題を選択)
    • acc new ${contestID}
    • 例: acc new abs
  4. 回答する問題のディレクトリに移動
    • cd {contestID}/{問題}
  5. 回答を作成
    • main.py に回答を記載
  6. 回答をテスト
    • ojt
    • oj t -c "python main.py" と同じ
  7. 回答を提出
    • acc submit
    • acc s と同じ

使用例

acc new abs
cd abs/practicea/
vi main.py
ojt
acc s

QA

テンプレートファイルを変更したい

templates/py/main.py を変更してください

独自のスニペットを追加したい

  • .vscode/python.code-snippets を作成し、以下のように記載する
{
  "input number": {
    "prefix": ["input-n"],
    "body": ["import sys\nn = int(sys.stdin.buffer.readline())"],
    "description": "input number."
  }
}

Django REST Framework 条件に応じてAPI処理を切り替える方法

全然サンプル見つからなかったので誰かに役に立つかも?と、妥協点はあるもののとりあえず実装としては良さそうなのでメモ

やりたかったこと

  • 条件によって特定のAPI処理全体を切り替えたい
    • 複数発生した場合を考えると if hogeflag: みたいな事はやりたくない
  • この条件というのがログインユーザ毎、特定のデータの場合という事ではなく、環境変数や設定ファイルなど動作する環境毎に切り替えたい。
  • 切り替わったら、その環境では選択されなかった処理は一切使用される事はない

具体的な例とか

  • アプリケーションがパッケージ販売されていて、特定の環境では処理を変更したい
  • 拡張機能などがあり、特定の処理を任意で上書きたい

対応方法

概要

DjangoのMiddlewareを使用する。

Middlewareとは、実際の処理の前後に行わせることができる処理です。hookとか言ったりします。
settings.pyMIDDLEWARE = [] と配列として記載する事で設定しますので、実際のアプリケーションで使われている事が確認できるかと思います。認証処理などもここで行っています。

詳細

# extensions.test_middleware
from django.http import JsonResponse
from rest_framework.request import Request

from extensions.hoge import HogeView

class SimpleMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        response = self.get_response(request)
        return response

    def process_view(self, request, view_func, view_args, view_kwargs):
        # if view_func.__name__ == 'HogeView': # Classで比較する場合
        if request.path == '/api/v1/hogehoge/': # URLPATHで比較する場合
            # DjangoのRequestとDRFのRequestが異なるため変換
            # django.core.handlers.wsgi.WSGIRequest -> rest_framework.request.Request
            req = Request(request)
            res = HogeView.get(HogeView, req)

            # process_viewではdjango.http.Responseを返す必要があり、
            # rest_framework.response.Responseをそのまま返す事ができないため、値を取得して変換している
            return JsonResponse(res.data, status=res.status_code)
        else:
            # Noneを返す事で既存の処理が行われる
            return None
# extensions.hoge.HogeView
from rest_framework.response import Response
from rest_framework.views import APIView
class HogeView(APIView):
    def get(self, request):
        return Response({'aaa': 'bbb'})
# settings.py

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'extensions.test_middleware.SimpleMiddleware'
]

コードの実物に近い形

直接クラス名とか書いていたら、各処理にifと書くのと何にも変わらない。 ので、設定系は全部DBに突っ込んで、DBの内容と合致したら設定に従ってリフレクションで処理を呼ぶようにした。

class SimpleMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        return self.get_response(request)

    def process_view(self, request, view_func, view_args, view_kwargs):
        extension = Extension.objects.filter(class_name=view_func.__name__).first()
        if extension:
            _cls = getattr(importlib.import_module(extension.module_name), extension.class_name)
            method = getattr(_cls, request.method.lower())
            return JsonResponse(method(_cls, Request(request)).data, status=res.status_code)

没とした案

urls.pyで対応

実装して動作するところまでは確認

詳細

urlpatterns は先勝ち(ドキュメントは調べてない!)なので、既存処理の設定をする前に独自処理のパターンをどこからか読み込んで設定すれば既存処理を上書きする事ができる。

没とした理由

ユニットテストが大変。

実際にはDBに設定が入っている事を想定していたため、urls.py のurlpatternsの設定する所でDBアクセスする必要がある。

  1. エラー処理の対応が必要
    • url.pyはDjangoの基本機能らしく、 python manage.py migration などでも処理が行われるので、migration前にテーブルがあるわけないのでエラー(ProgramingError)となる。
    • exceptして対応する事も可能は可能。(1度は実装した)
  2. ユニットテストでエラーとなる
    • django.test.TestCaserest_framework.test.APITestCase などのフレームワーク提供のTestCaseを使用する場合に@DB等のDBを使用する旨のデコレータをつけろとエラーになる。
    • これは各テストケース内ではなく、↑のTestCaseを利用する事でDjangoの初期化処理が走るため、url.pyも同様に処理され発生する。
    • 回避したとしたとしても、Djangoの初期処理はテスト全体で一回なので、通常処理の場合と今回の対応を行った場合の2パターンを処理する事ができない。
    • あえてやろうとすると以下の感じになる?(机上の空論)
      • urls.py内でテストかどうかを確認してテストだったらDBにアクセスしない
      • 今回の対応を行う場合のユニットテストsetUpClassDjangoを再起動する

参考