山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

databricksでcreate tableを行った際にParquet does not support timestamp. See HIVE-6384が発生する

エラー詳細

stacktraceは長いので割愛

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException:
org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.UnsupportedOperationException:
Parquet does not support timestamp. See HIVE-6384;

発生事象

databricksで下記のcreate table文を実行した際に上記のエラーが発生する。

%sql
create external table if not exists
  yamap55.hoge(
    HOGE_ID string comment 'HOGE_ID',
    HOGE_TIMESTAMP timestamp comment 'HOGE_TIMESTAMP'
  )
  comment
    'hoge'
  partitioned by (
    TARGET_DATE string comment 'TARGET_DATE'
  )
  stored as
    parquet
  location
    's3a://aaaaa/bbbbbb/data/hoge'
  tblproperties
    (
      'parquet.compress'='SNAPPY'
    )
;

再現する最小構成↓

%sql
create table if not exists
  yamap55.hoge(
    HOGE_TIMESTAMP timestamp comment 'HOGE_TIMESTAMP'
  )
  stored as
    parquet
;

回避方法

詳細は長くなるので先に回避方法を記載。

  • 通常のcreate table文を使用する事で回避が可能です。

具体的には以下

%sql
create table if not exists
  yamap55.hoge(
    HOGE_ID string comment 'HOGE_ID',
    HOGE_TIMESTAMP timestamp comment 'HOGE_TIMESTAMP',
    TARGET_DATE string comment 'TARGET_DATE'
  )
  using
    parquet
  comment
    'hoge'
  partitioned by (
    TARGET_DATE
  )
  location
    's3a://aaaaa/bbbbbb/data/hoge'
  tblproperties
    (
      'parquet.compress'='SNAPPY'
    )
;

注意事項

  • hive形式との差分が多いので構文に注意(下記は一例)
    • 「external」の記載は不要(詳細は下記「通常版のcreate tableを実行する事による懸念とか」参照)
    • column定義にpartitionで使用する列も記載
    • 「stored as」 → 「using」
    • 「using」の記載順
  • 外部テーブル指定(external)がないが、「location」を指定する事で暗黙的に外部テーブルとして扱われる。
    • テーブルをdropしてもS3にデータは残る

原因とか詳細の調査

細かい事はよくわからなかった。誰か教えてください。

以下は推測とか。

通常版のcreate tableを実行する事による懸念とか

  • 「external」が通常版の構文にはないが、外部テーブルを使用したい
    • locationを指定する事で暗黙的に外部テーブルとして扱われるため問題ない。(tableのdropなどで試した)

参考URL

直接は関係なかったけどメモ的なURL

PySparkで日付毎にデータを蓄積する際のdf.write.modeについて

概要

PySparkでpartitionByで日付毎に分けてデータを保存している場合、どのように追記していけば良いのか。

先にまとめ

appendの方がメリットは多いが、チェック忘れると重複登録されるデメリットが怖い。
とはいえ、overwriteも他のデータ消えるデメリットも怖いので、一長一短か。

説明用コード

path = 's3://..../hoge/'
df = spark.createDataFrame(
  [
    ['a',1,'hoge','20190101'], ['b',2,'huga','20190201'], ['c',3,'piyo','20190301']
  ],
  ['id','integer','string','date']
)
(
  df
  .write
  .mode('overwrite')
  .partitionBy('date')
  .parquet(path)
)

前提

  • partitionByを指定してwriteを行うと、指定したPATHの下に「partitionByで指定したカラム名=値」というディレクトリが設定されて保存される。
    • 上記のコードでいうと「's3://..../hoge/date=20190101/'」の下にparquetが作成される。(dateカラムは保存されない)
  • modeをappendでwriteを行うと、追記で保存される。
    • 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
    • 追記なので重複は考慮されない
      • 上記のコード実行後にdateが20190101 のみ のdataframeを保存すると、date=20190101のデータが重複して保存される
  • modeをoverwriteでwriteを行うと、上書きで保存される。
    • 上記のコード実行後にdateが20190401 のみ のdataframeを保存すると、date=20190401フォルダ下に保存される。
      • dateが20190101などのデータは削除される

詳細と具体例

日付毎にデータが蓄積されており、1月から3月までデータが蓄積されているとする。 その場合に、4月のみのDataframeがある場合に、どのようにwriteするとうまく保存できるか。

っというか、modeはappendとoverwriteどちらが良いか。

appendの場合

メリット

  • 特に何も気にせずwriteすれば良し
  • 複数日分(複数のpartitionにまたがる場合)であっても問題はない

デメリット

  • 同コードを再実行した場合には2重に登録される。
  • appendってあまり使わない気がする。

デメリットの解決策

  • 実行前に存在確認を行う

コード

from pyspark.sql import functions as f
path = 's3://..../hoge/'

if not spark.read.parquet(path).filter(f.col('date') == '20190401').rdd.isEmpty():
    raise Exception('登録済みです : {}'.format('20190401'))

df = spark.createDataFrame(
  [
    ['d',4,'hogehoge','20190401']
  ],
  ['id','integer','string','date']
)
(
  df
  .write
  .mode('append')
  .partitionBy('date')
  .parquet(path)
)

overwriteの場合

メリット

  • 開発時に楽
    • 実行後に消したりする必要がない。
  • 挙動がわかりやすい?(慣れているだけか?)

デメリット

  • 読み込み時と書き込み時にPATHを意識(変更)する必要がある。
  • 複数日分(複数のpartitionにまたがる場合)では、filterしてそれぞれ保存する必要がある。
    • そんな処理はしたくないので、この場合はappend一択。

コード

from pyspark.sql import functions as f
import os
path = 's3://..../hoge/'

# partitionByを含めたPATHを作成
path = os.path.join(path, 'date=20190401')

df = spark.createDataFrame(
  [
    ['d',4,'hogehoge','20190401']
  ],
  ['id','integer','string','date']
)

# PATHで直接指定した場合でも、DataFrameにdateカラムが存在して問題ない ※
# (上位フォルダを指定して読みこんだ場合にはparitionに設定されている値が優先される)
(
  df
  .write
  .mode('overwrite')
#  .partitionBy('date') # partitionByは指定しない
  .parquet(path)
)

※partitionされている場合に、paritionカラムの値と保存値については別記事にします。

PySparkで特定のカラムが全体の最大値であるレコードを取得する

概要

実現はできてはいたものの、もっと良いやり方ないかな?と聞いたら教えてもらったのでメモ。

うまく説明できないのでデータを記載します。

処理前

+----+------+
|name|  date|
+----+------+
|   a|201906|
|   a|201907|
|   b|201906|
|   b|201907|
|   c|201907|
+----+------+

処理後

+----+------+
|name|  date|
+----+------+
|   a|201907|
|   b|201907|
|   c|201907|
+----+------+

教えてもらった方法

from pyspark.sql import functions as f
from pyspark.sql.window import Window as w

df = spark.createDataFrame(
  [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']],
  ['name', 'date']
)
df.show()

result_df = (
  df
    .withColumn('max_date', f.max('date').over(w.partitionBy()))
    .filter(f.col('date') == f.col('max_date'))
    .drop('max_date')
)
result_df.show()

試行錯誤の内容もメモ

当初書いたコードや、途中のコードもメモ

from pyspark.sql import functions as f
from pyspark.sql.window import Window as w

df = spark.createDataFrame(
  [['a', '201906'], ['a', '201907'], ['b', '201906'], ['b', '201907'], ['c', '201907']],
  ['name', 'date']
)
df.show()

# 1行ではあるが、一旦actionが走るので遅い気がする。
# また、filterの中でdfを使っているので、dfが定義されている必要がある。(読み込みからメソッドチェーンで繋げない)
result_df_2 = df.filter(f.col('date') == df.agg(f.max('date')).first()[0])
result_df_2.show()

# 同じwindow関数だったら最大を取るという意図からしてmax使った方がわかりやすい
result_df_3 = (
  df
    .withColumn('rank', f.rank().over(w.partitionBy().orderBy(f.col('date').desc())))
    .filter(f.col('rank') == 1)
    .drop('rank')
)

Databricksでは日本語は使用しない方が良い

概要

Databricksでファイル名、フォルダ名、引数では日本語を使用できない場合があるので、使用しない方が良い。

詳細

今時当たり前だが、Databricksでは普通に日本語を使用可能。 ファイル名やフォルダ名でもnotebbook単体で普通に使う分には問題ない。

だが、下記に記載するように幾つかの場合で日本語が問題になる事がある。

※ここに記載した事項以外でもあるのかもしれない。
公式フォーラムでも見つからなかった。

別のノートブックから起動する場合

ファイル名、フォルダ名に日本語が入っていると、他のノートブックから起動できない。

コード例

# 「/Users/xxxxx@example.com/日本語/b」を起動する
dbutils.notebook.run('b', 600)

出力

com.databricks.WorkflowException: com.databricks.NotebookExecutionException: Unknown state: Notebook not found: /Users/xxxxx@example.com/work/????/b

引数に日本語の値を渡す場合

引数の「値」に日本語を渡すと正しく渡されない

コード例

  • ノートブックA
dbutils.notebook.run('b', 600,{ 'args' : '日本語'})
  • ノートブックB
dbutils.widgets.removeAll()
dbutils.widgets.text('args', 'default', '引数')
print(dbutils.widgets.get('args'))

出力

???

jobs画面でパラメータを見ると「{"args":"???"}」となっているので、この時点で化けている。

対策

  • 日本語ファイル名、フォルダ名を使用しない
  • 引数の場合は英数字の形に変換し、復元して使用する
import urllib
args = urllib.parse.quote('日本語')
dbutils.notebook.run('b', 600,{ 'args' : args})
import re
import urllib
dbutils.widgets.removeAll()
dbutils.widgets.text('args', 'default', '引数')
args = dbutils.widgets.get('args')
# 判定は雑です。
r = re.compile('(%[a-zA-Z0-9]{2})+')
if r.match(args):
  args = urllib.parse.unquote(args)
print(args)

PySparkでjsonカラムを縦持ちに変換する

正確には文字列でjsonが入っている時にパースして縦持ちにする方法。
また使いそうだが、すぐ忘れそうなのでメモ

データ

元データ

+---+--------------------------------------------------------------+
|id |json                                                          |
+---+--------------------------------------------------------------+
|100|[{"label": "男", "value": "1"},{"label": "女", "value": "2"}] |
|200|[{"label":"YES", "value":"10"},{"label":"NO", "value":"20"}]  |
+---+--------------------------------------------------------------+

処理後

+---+-----+-----+
| id|label|value|
+---+-----+-----+
|100|   男|    1|
|100|   女|    2|
|200|  YES|   10|
|200|   NO|   20|
+---+-----+-----+

コード

from pyspark.sql import functions as f
from pyspark.sql import types as t
df = spark.createDataFrame([
  ['100','[{"label": "男", "value": "1"}, {"label": "女", "value": "2"}]'],
  ['200','[{"label":"YES", "value":"10"},{"label":"NO", "value":"20"}]']
], ['id','json'])
df.show(100, False)
json_schema = t.ArrayType(t.StructType([t.StructField("label", t.StringType()),t.StructField("value", t.StringType())]))
print(json_schema)
df = (df
      .withColumn('json2', f.explode(f.from_json(f.col('json'), json_schema)))
      .withColumn('label', f.col('json2.label'))
      .withColumn('value', f.col('json2.value'))
      .drop('json', 'json2')
     )
df.show()

メモとか

  • f.from_jsonjsonをパース
  • schemaは↓でもとれるようだが、今回の場合は正しく作れなかったため自分で作成
    • json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
    • ↑のコードで作成 : StructType(List(StructField(label,StringType,true),StructField(value,StringType,true)))
    • 自力で作成 : ArrayType(StructType(List(StructField(label,StringType,true),StructField(value,StringType,true))),true)
  • f.explodeはリスト型の場合に行を作ってくれる
    • カンマやスペース区切りのデータの場合でもsplitしてから使用する事で、同様に使えそう。
  • 通常の?jsonだとf.json_tupleとか便利そう

参考

PySparkでread時に型が変わる

概要

PySparkで保存前はstringで、読み込むとintegerにカラムの型が変わっている現象に遭遇した。

原因としてはpartitionByで指定したカラムの型は自動的に推測されるため。

パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプ、日付、タイムスタンプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。

再現コード

from pyspark.sql.types import *
schema = StructType([
  StructField('ID', StringType(), False),
  StructField('STR', StringType(), False),
])
df = spark.createDataFrame([['1','20190717']], schema)
df.printSchema()
df.write.mode('overwrite').partitionBy('STR').parquet('s3a://xxxxxx/data/test')
df = spark.read.parquet('s3a://xxxxxx/data/test')
df.printSchema()

出力

root
 |-- ID: string (nullable = false)
 |-- STR: string (nullable = false)

root
 |-- ID: string (nullable = true)
 |-- STR: integer (nullable = true)

対策

どれがいいのかよくわからないが、システム作ってる現状では自動推測されてもあまり嬉しい事ないので、config設定で対応。

documentにあるようにconfigで設定

spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
df = spark.read.parquet('s3a://xxxxxx/data/tes')

stackoverflowをみると毎回設定する必要あると書いてあるが、特にそんな事はなかった。

読み込み後に変換

from pyspark.sql import functions as f
df = df.withColumn("STR", f.col("STR").cast(StringType()))

schema再定義

df = spark.createDataFrame(spark.read.parquet('s3a://xxxxxx/data/test').rdd, schema)

参考

GreasemonkeyやTampermonkeyで外部CSSを読み込む

表題のまま。 久しぶりにuser script書いたら外部のCSSの読み込み方法がわからなかったのでメモ。 (昔、user scriptをよく書いてた時も外部のCSS読み込んでいたと思うんだけど、こんな方法使ってた記憶がない。。。どうやってたんだろう?)

まとめ

ヘッダでGM_addStyleとGM_getResourceTextを許可して、@resourceでcssを変数に格納。その後、 GM_addStyle(GM_getResourceText("変数")) で設定する。

具体例

// @grant        GM_addStyle
// @grant        GM_getResourceText
// @resource     CSS1 https://cdn.jsdelivr.net/npm/flatpickr/dist/flatpickr.min.css

GM_addStyle(GM_getResourceText('CSS1'));

詳細