山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

PySparkで特定のカラムが全体の最大値であるレコードを取得する

概要

実現はできてはいたものの、もっと良いやり方ないかな?と聞いたら教えてもらったのでメモ。

うまく説明できないのでデータを記載します。

処理前

+----+------+
|name|  date|
+----+------+
|   a|201906|
|   a|201907|
|   b|201906|
|   b|201907|
|   c|201907|
+----+------+

処理後

+----+------+
|name|  date|
+----+------+
|   a|201907|
|   b|201907|
|   c|201907|
+----+------+

教えてもらった方法

from pyspark.sql import functions as f
from pyspark.sql.window import Window as w

df = spark.createDataFrame(
  [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']],
  ['name', 'date']
)
df.show()

result_df = (
  df
    .withColumn('max_date', f.max('date').over(w.partitionBy()))
    .filter(f.col('date') == f.col('max_date'))
    .drop('max_date')
)
result_df.show()

試行錯誤の内容もメモ

当初書いたコードや、途中のコードもメモ

from pyspark.sql import functions as f
from pyspark.sql.window import Window as w

df = spark.createDataFrame(
  [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']],
  ['name', 'date']
)
df.show()

# 1行ではあるが、一旦actionが走るので遅い気がする。
# また、filterの中でdfを使っているので、dfが定義されている必要がある。(読み込みからメソッドチェーンで繋げない)
result_df_2 = df.filter(f.col('date') == df.agg(f.max('date')).first()[0])
result_df_2.show()

# 同じwindow関数だったら最大を取るという意図からしてmax使った方がわかりやすい
result_df_3 = (
  df
    .withColumn('rank', f.rank().over(w.partitionBy().orderBy(f.col('date').desc())))
    .filter(f.col('rank') == 1)
    .drop('rank')
)

Databricksでは日本語は使用しない方が良い

概要

Databricksでファイル名、フォルダ名、引数では日本語を使用できない場合があるので、使用しない方が良い。

詳細

今時当たり前だが、Databricksでは普通に日本語を使用可能。 ファイル名やフォルダ名でもnotebbook単体で普通に使う分には問題ない。

だが、下記に記載するように幾つかの場合で日本語が問題になる事がある。

※ここに記載した事項以外でもあるのかもしれない。
公式フォーラムでも見つからなかった。

別のノートブックから起動する場合

ファイル名、フォルダ名に日本語が入っていると、他のノートブックから起動できない。

コード例

# 「/Users/xxxxx@example.com/日本語/b」を起動する
dbutils.notebook.run('b', 600)

出力

com.databricks.WorkflowException: com.databricks.NotebookExecutionException: Unknown state: Notebook not found: /Users/xxxxx@example.com/work/????/b

引数に日本語の値を渡す場合

引数の「値」に日本語を渡すと正しく渡されない

コード例

  • ノートブックA
dbutils.notebook.run('b', 600,{ 'args' : '日本語'})
  • ノートブックB
dbutils.widgets.removeAll()
dbutils.widgets.text('args', 'default', '引数')
print(dbutils.widgets.get('args'))

出力

???

jobs画面でパラメータを見ると「{"args":"???"}」となっているので、この時点で化けている。

対策

  • 日本語ファイル名、フォルダ名を使用しない
  • 引数の場合は英数字の形に変換し、復元して使用する
import urllib
args = urllib.parse.quote('日本語')
dbutils.notebook.run('b', 600,{ 'args' : args})
import re
import urllib
dbutils.widgets.removeAll()
dbutils.widgets.text('args', 'default', '引数')
args = dbutils.widgets.get('args')
# 判定は雑です。
r = re.compile('(%[a-zA-Z0-9]{2})+')
if r.match(args):
  args = urllib.parse.unquote(args)
print(args)

PySparkでjsonカラムを縦持ちに変換する

正確には文字列でjsonが入っている時にパースして縦持ちにする方法。
また使いそうだが、すぐ忘れそうなのでメモ

データ

元データ

+---+--------------------------------------------------------------+
|id |json                                                          |
+---+--------------------------------------------------------------+
|100|[{"label": "男", "value": "1"},{"label": "女", "value": "2"}] |
|200|[{"label":"YES", "value":"10"},{"label":"NO", "value":"20"}]  |
+---+--------------------------------------------------------------+

処理後

+---+-----+-----+
| id|label|value|
+---+-----+-----+
|100|   男|    1|
|100|   女|    2|
|200|  YES|   10|
|200|   NO|   20|
+---+-----+-----+

コード

from pyspark.sql import functions as f
from pyspark.sql import types as t
df = spark.createDataFrame([
  ['100','[{"label": "男", "value": "1"}, {"label": "女", "value": "2"}]'],
  ['200','[{"label":"YES", "value":"10"},{"label":"NO", "value":"20"}]']
], ['id','json'])
df.show(100, False)
json_schema = t.ArrayType(t.StructType([t.StructField("label", t.StringType()),t.StructField("value", t.StringType())]))
print(json_schema)
df = (df
      .withColumn('json2', f.explode(f.from_json(f.col('json'), json_schema)))
      .withColumn('label', f.col('json2.label'))
      .withColumn('value', f.col('json2.value'))
      .drop('json', 'json2')
     )
df.show()

メモとか

  • f.from_jsonjsonをパース
  • schemaは↓でもとれるようだが、今回の場合は正しく作れなかったため自分で作成
    • json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
    • ↑のコードで作成 : StructType(List(StructField(label,StringType,true),StructField(value,StringType,true)))
    • 自力で作成 : ArrayType(StructType(List(StructField(label,StringType,true),StructField(value,StringType,true))),true)
  • f.explodeはリスト型の場合に行を作ってくれる
    • カンマやスペース区切りのデータの場合でもsplitしてから使用する事で、同様に使えそう。
  • 通常の?jsonだとf.json_tupleとか便利そう

参考

PySparkでread時に型が変わる

概要

PySparkで保存前はstringで、読み込むとintegerにカラムの型が変わっている現象に遭遇した。

原因としてはpartitionByで指定したカラムの型は自動的に推測されるため。

パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプ、日付、タイムスタンプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。

再現コード

from pyspark.sql.types import *
schema = StructType([
  StructField('ID', StringType(), False),
  StructField('STR', StringType(), False),
])
df = spark.createDataFrame([['1','20190717']], schema)
df.printSchema()
df.write.mode('overwrite').partitionBy('STR').parquet('s3a://xxxxxx/data/test')
df = spark.read.parquet('s3a://xxxxxx/data/test')
df.printSchema()

出力

root
 |-- ID: string (nullable = false)
 |-- STR: string (nullable = false)

root
 |-- ID: string (nullable = true)
 |-- STR: integer (nullable = true)

対策

どれがいいのかよくわからないが、システム作ってる現状では自動推測されてもあまり嬉しい事ないので、config設定で対応。

documentにあるようにconfigで設定

spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
df = spark.read.parquet('s3a://xxxxxx/data/tes')

stackoverflowをみると毎回設定する必要あると書いてあるが、特にそんな事はなかった。

読み込み後に変換

from pyspark.sql import functions as f
df = df.withColumn("STR", f.col("STR").cast(StringType()))

schema再定義

df = spark.createDataFrame(spark.read.parquet('s3a://xxxxxx/data/test').rdd, schema)

参考

GreasemonkeyやTampermonkeyで外部CSSを読み込む

表題のまま。 久しぶりにuser script書いたら外部のCSSの読み込み方法がわからなかったのでメモ。 (昔、user scriptをよく書いてた時も外部のCSS読み込んでいたと思うんだけど、こんな方法使ってた記憶がない。。。どうやってたんだろう?)

まとめ

ヘッダでGM_addStyleとGM_getResourceTextを許可して、@resourceでcssを変数に格納。その後、 GM_addStyle(GM_getResourceText("変数")) で設定する。

具体例

// @grant        GM_addStyle
// @grant        GM_getResourceText
// @resource     CSS1 https://cdn.jsdelivr.net/npm/flatpickr/dist/flatpickr.min.css

GM_addStyle(GM_getResourceText('CSS1'));

詳細

Pythonでjsonを読み込み、出力する際にdateやdatetime型を使用する

出力の時の話はよく記載がありましたが、読み込みの際に変換する方法はあまりなかったのでメモ。

概要

jsonでは日付型というのは定義されていません。(そもそもどう表現する?) そのため、pythonjsonを読み込みの際に日付が含まれていても文字列になりますし、出力の際にdate型が含まれているとエラーになります。

対応方法としては読み込むためのloadsではobject_hook、出力のためのdumpsはdefaultというオプションがあるので、こちらを設定することで対応が可能です。

対応前のコード

import json
data = '{"date" : "2019/02/08"}'
d = json.loads(data)
type(d['date']) # str
import json
from datetime import datetime
data = {'date' : datetime.now()}
json.dumps(data)
# TypeError: Object of type 'datetime' is not JSON serializable

対応後

詳しくは下記のコードを見てください。 出力時は型を調べて文字列に変換するだけなので何の問題もないですが、読み込み時は何らかの条件で日付を判別しdate型に変換しています。 (jsonには明示的に日付を示す構文はないためしょうがない。) ここではキーの末尾が「date」の場合に、date型に変換するようにしています。

import re
import json
from datetime import datetime, date

DATE_FORMAT = '%Y/%m/%d'
DATE_KEY = re.compile(r'date$')

def _json_parser(dct):
    for k, v in dct.items():
        if re.search(DATE_KEY, k):
            dct[k] = datetime.strptime(v, DATE_FORMAT).date()
    return dct

json_str = '{"hoge_date" : "2019/01/02", "l" : [{"huga_date" : "2018/12/31"}]}'
dct = json.loads(json_str, object_hook=_json_parser)
print(dct)
print(type(dct['hoge_date'])) # <class 'datetime.date'>
print(type(dct['l'][0]['huga_date'])) # <class 'datetime.date'>

def json_serial(obj):
    if isinstance(obj, date):
        return obj.strftime(DATE_FORMAT)
    return obj
json.dumps(dct, default=json_serial)

参考

S3cmdのProxy設定について

s3cmdを使用して動いていたバッチ処理で、proxy設定を追加しようとしたら色々大変だったお話。

S3cmdとは

S3cmd : Command Line S3 Client and Backup for Linux and Mac

公式ページより。

Windows用にはS3Expressというのがあるらしい。

S3Express : Command Line S3 Client and S3 Backup for Windows

まとめ

  • 環境変数(http_proxy)ではなく、設定ファイルに記載する
    • proxy_host, proxy_port
    • 設定ファイルのデフォルトは「~/.s3cfg」
    • コマンド実行時に「-c」で設定ファイルを渡すことも可能
  • ↑のproxy設定はPython2.7以上が必要
  • Python2のバージョン上げたくない場合はPython3を使用することも可能
    • S3cmdのV2以降はPython3を使用することも可能。
  • python3を使用する場合にはs3cmdを直接実行するのではなく、「python3 s3cmd」的な感じで実行する必要がある。
    • 「s3cmd」はPythonスクリプトで先頭に「#!/usr/bin/env python2」とpython2の指定があるため。

経緯とか(読む意味なし)

  • 会社でproxyを設定しろよという連絡がきた
  • (半)自動化されていたとあるシステムが動かない
  • S3cmdの処理で落ちている
    • Problem: error: [Errno 111] Connection refused
  • Proxy設定してないのでは?
    • export http_proxy=xxxxx
    • export https_proxy=yyyy
  • 変わらずエラー
    • 調べた → 設定ファイルに書く必要がある
  • 設定ファイルに書いた
  • python2.7以上で動かせと怒られる
    • 2.6.6か何かで動いていた
  • python2のバージョンは上げたくない。というかpython3にしたい
  • s3cmdは2のみ → s3cmdバージョン2で対応しているらしい → s3cmdのバージョンアップ
  • python3をデフォルトにしてs3cmd起動
  • python2.7以上で動かせと怒られる
    • (゚Д゚)ハァ?
  • シバンにpython2が指定してある
  • python3に引数でs3cmd渡して起動
  • OK

参考URL