PySparkで特定のカラムが全体の最大値であるレコードを取得する
概要
実現はできてはいたものの、もっと良いやり方ないかな?と聞いたら教えてもらったのでメモ。
うまく説明できないのでデータを記載します。
処理前
+----+------+ |name| date| +----+------+ | a|201906| | a|201907| | b|201906| | b|201907| | c|201907| +----+------+
処理後
+----+------+ |name| date| +----+------+ | a|201907| | b|201907| | c|201907| +----+------+
教えてもらった方法
from pyspark.sql import functions as f from pyspark.sql.window import Window as w df = spark.createDataFrame( [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']], ['name', 'date'] ) df.show() result_df = ( df .withColumn('max_date', f.max('date').over(w.partitionBy())) .filter(f.col('date') == f.col('max_date')) .drop('max_date') ) result_df.show()
試行錯誤の内容もメモ
当初書いたコードや、途中のコードもメモ
from pyspark.sql import functions as f from pyspark.sql.window import Window as w df = spark.createDataFrame( [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']], ['name', 'date'] ) df.show() # 1行ではあるが、一旦actionが走るので遅い気がする。 # また、filterの中でdfを使っているので、dfが定義されている必要がある。(読み込みからメソッドチェーンで繋げない) result_df_2 = df.filter(f.col('date') == df.agg(f.max('date')).first()[0]) result_df_2.show() # 同じwindow関数だったら最大を取るという意図からしてmax使った方がわかりやすい result_df_3 = ( df .withColumn('rank', f.rank().over(w.partitionBy().orderBy(f.col('date').desc()))) .filter(f.col('rank') == 1) .drop('rank') )
Databricksでは日本語は使用しない方が良い
概要
Databricksでファイル名、フォルダ名、引数では日本語を使用できない場合があるので、使用しない方が良い。
詳細
今時当たり前だが、Databricksでは普通に日本語を使用可能。 ファイル名やフォルダ名でもnotebbook単体で普通に使う分には問題ない。
だが、下記に記載するように幾つかの場合で日本語が問題になる事がある。
※ここに記載した事項以外でもあるのかもしれない。
※公式フォーラムでも見つからなかった。
別のノートブックから起動する場合
ファイル名、フォルダ名に日本語が入っていると、他のノートブックから起動できない。
コード例
# 「/Users/xxxxx@example.com/日本語/b」を起動する dbutils.notebook.run('b', 600)
出力
com.databricks.WorkflowException: com.databricks.NotebookExecutionException: Unknown state: Notebook not found: /Users/xxxxx@example.com/work/????/b
引数に日本語の値を渡す場合
引数の「値」に日本語を渡すと正しく渡されない
コード例
- ノートブックA
dbutils.notebook.run('b', 600,{ 'args' : '日本語'})
- ノートブックB
dbutils.widgets.removeAll() dbutils.widgets.text('args', 'default', '引数') print(dbutils.widgets.get('args'))
出力
???
jobs画面でパラメータを見ると「{"args":"???"}」となっているので、この時点で化けている。
対策
- 日本語ファイル名、フォルダ名を使用しない
- 引数の場合は英数字の形に変換し、復元して使用する
import urllib args = urllib.parse.quote('日本語') dbutils.notebook.run('b', 600,{ 'args' : args})
import re import urllib dbutils.widgets.removeAll() dbutils.widgets.text('args', 'default', '引数') args = dbutils.widgets.get('args') # 判定は雑です。 r = re.compile('(%[a-zA-Z0-9]{2})+') if r.match(args): args = urllib.parse.unquote(args) print(args)
PySparkでjsonカラムを縦持ちに変換する
正確には文字列でjsonが入っている時にパースして縦持ちにする方法。
また使いそうだが、すぐ忘れそうなのでメモ
データ
元データ
+---+--------------------------------------------------------------+ |id |json | +---+--------------------------------------------------------------+ |100|[{"label": "男", "value": "1"},{"label": "女", "value": "2"}] | |200|[{"label":"YES", "value":"10"},{"label":"NO", "value":"20"}] | +---+--------------------------------------------------------------+
処理後
+---+-----+-----+ | id|label|value| +---+-----+-----+ |100| 男| 1| |100| 女| 2| |200| YES| 10| |200| NO| 20| +---+-----+-----+
コード
from pyspark.sql import functions as f from pyspark.sql import types as t df = spark.createDataFrame([ ['100','[{"label": "男", "value": "1"}, {"label": "女", "value": "2"}]'], ['200','[{"label":"YES", "value":"10"},{"label":"NO", "value":"20"}]'] ], ['id','json']) df.show(100, False) json_schema = t.ArrayType(t.StructType([t.StructField("label", t.StringType()),t.StructField("value", t.StringType())])) print(json_schema) df = (df .withColumn('json2', f.explode(f.from_json(f.col('json'), json_schema))) .withColumn('label', f.col('json2.label')) .withColumn('value', f.col('json2.value')) .drop('json', 'json2') ) df.show()
メモとか
- f.from_jsonでjsonをパース
- schemaは↓でもとれるようだが、今回の場合は正しく作れなかったため自分で作成
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
- ↑のコードで作成 :
StructType(List(StructField(label,StringType,true),StructField(value,StringType,true)))
- 自力で作成 :
ArrayType(StructType(List(StructField(label,StringType,true),StructField(value,StringType,true))),true)
- f.explodeはリスト型の場合に行を作ってくれる
- カンマやスペース区切りのデータの場合でもsplitしてから使用する事で、同様に使えそう。
- 通常の?jsonだとf.json_tupleとか便利そう
参考
PySparkでread時に型が変わる
概要
PySparkで保存前はstringで、読み込むとintegerにカラムの型が変わっている現象に遭遇した。
原因としてはpartitionByで指定したカラムの型は自動的に推測されるため。
パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプ、日付、タイムスタンプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。
再現コード
from pyspark.sql.types import * schema = StructType([ StructField('ID', StringType(), False), StructField('STR', StringType(), False), ]) df = spark.createDataFrame([['1','20190717']], schema) df.printSchema() df.write.mode('overwrite').partitionBy('STR').parquet('s3a://xxxxxx/data/test') df = spark.read.parquet('s3a://xxxxxx/data/test') df.printSchema()
出力
root |-- ID: string (nullable = false) |-- STR: string (nullable = false) root |-- ID: string (nullable = true) |-- STR: integer (nullable = true)
対策
どれがいいのかよくわからないが、システム作ってる現状では自動推測されてもあまり嬉しい事ないので、config設定で対応。
documentにあるようにconfigで設定
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false") df = spark.read.parquet('s3a://xxxxxx/data/tes')
※stackoverflowをみると毎回設定する必要あると書いてあるが、特にそんな事はなかった。
読み込み後に変換
from pyspark.sql import functions as f df = df.withColumn("STR", f.col("STR").cast(StringType()))
schema再定義
df = spark.createDataFrame(spark.read.parquet('s3a://xxxxxx/data/test').rdd, schema)
参考
- Parquet ファイル(日本語ドキュメント)
- http://mogile.web.fc2.com/spark/sql-data-sources-parquet.html
- ↑このサイトURLを見ると個人管理っぽいけど、誰が管理しているのだろう??
- Parquet Files(公式ドキュメント)
- Avoid losing data type for the partitioned data when writing from Spark
GreasemonkeyやTampermonkeyで外部CSSを読み込む
表題のまま。 久しぶりにuser script書いたら外部のCSSの読み込み方法がわからなかったのでメモ。 (昔、user scriptをよく書いてた時も外部のCSS読み込んでいたと思うんだけど、こんな方法使ってた記憶がない。。。どうやってたんだろう?)
まとめ
ヘッダでGM_addStyleとGM_getResourceTextを許可して、@resourceでcssを変数に格納。その後、 GM_addStyle(GM_getResourceText("変数"))
で設定する。
具体例
// @grant GM_addStyle // @grant GM_getResourceText // @resource CSS1 https://cdn.jsdelivr.net/npm/flatpickr/dist/flatpickr.min.css GM_addStyle(GM_getResourceText('CSS1'));
詳細
Pythonでjsonを読み込み、出力する際にdateやdatetime型を使用する
出力の時の話はよく記載がありましたが、読み込みの際に変換する方法はあまりなかったのでメモ。
概要
jsonでは日付型というのは定義されていません。(そもそもどう表現する?) そのため、pythonでjsonを読み込みの際に日付が含まれていても文字列になりますし、出力の際にdate型が含まれているとエラーになります。
対応方法としては読み込むためのloadsではobject_hook、出力のためのdumpsはdefaultというオプションがあるので、こちらを設定することで対応が可能です。
対応前のコード
import json data = '{"date" : "2019/02/08"}' d = json.loads(data) type(d['date']) # str
import json from datetime import datetime data = {'date' : datetime.now()} json.dumps(data) # TypeError: Object of type 'datetime' is not JSON serializable
対応後
詳しくは下記のコードを見てください。 出力時は型を調べて文字列に変換するだけなので何の問題もないですが、読み込み時は何らかの条件で日付を判別しdate型に変換しています。 (jsonには明示的に日付を示す構文はないためしょうがない。) ここではキーの末尾が「date」の場合に、date型に変換するようにしています。
import re import json from datetime import datetime, date DATE_FORMAT = '%Y/%m/%d' DATE_KEY = re.compile(r'date$') def _json_parser(dct): for k, v in dct.items(): if re.search(DATE_KEY, k): dct[k] = datetime.strptime(v, DATE_FORMAT).date() return dct json_str = '{"hoge_date" : "2019/01/02", "l" : [{"huga_date" : "2018/12/31"}]}' dct = json.loads(json_str, object_hook=_json_parser) print(dct) print(type(dct['hoge_date'])) # <class 'datetime.date'> print(type(dct['l'][0]['huga_date'])) # <class 'datetime.date'> def json_serial(obj): if isinstance(obj, date): return obj.strftime(DATE_FORMAT) return obj json.dumps(dct, default=json_serial)
参考
S3cmdのProxy設定について
s3cmdを使用して動いていたバッチ処理で、proxy設定を追加しようとしたら色々大変だったお話。
S3cmdとは
公式ページより。
Windows用にはS3Expressというのがあるらしい。
S3Express : Command Line S3 Client and S3 Backup for Windows
まとめ
- 環境変数(http_proxy)ではなく、設定ファイルに記載する
- proxy_host, proxy_port
- 設定ファイルのデフォルトは「~/.s3cfg」
- コマンド実行時に「-c」で設定ファイルを渡すことも可能
- ↑のproxy設定はPython2.7以上が必要
- Python2のバージョン上げたくない場合はPython3を使用することも可能
- S3cmdのV2以降はPython3を使用することも可能。
- python3を使用する場合にはs3cmdを直接実行するのではなく、「python3 s3cmd」的な感じで実行する必要がある。
経緯とか(読む意味なし)
- 会社でproxyを設定しろよという連絡がきた
- (半)自動化されていたとあるシステムが動かない
- S3cmdの処理で落ちている
Problem: error: [Errno 111] Connection refused
- Proxy設定してないのでは?
- export http_proxy=xxxxx
- export https_proxy=yyyy
- 変わらずエラー
- 調べた → 設定ファイルに書く必要がある
- 設定ファイルに書いた
- python2.7以上で動かせと怒られる
- 2.6.6か何かで動いていた
- python2のバージョンは上げたくない。というかpython3にしたい
- s3cmdは2のみ → s3cmdバージョン2で対応しているらしい → s3cmdのバージョンアップ
- python3をデフォルトにしてs3cmd起動
- python2.7以上で動かせと怒られる
- (゚Д゚)ハァ?
- シバンにpython2が指定してある
- python3に引数でs3cmd渡して起動
- OK