山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

PySparkでread時に型が変わる

概要

PySparkで保存前はstringで、読み込むとintegerにカラムの型が変わっている現象に遭遇した。

原因としてはpartitionByで指定したカラムの型は自動的に推測されるため。

パーティションのカラムのデータタイプは自動的に推測されることに注意してください。現在のところ、数学的なデータタイプ、日付、タイムスタンプおよび文字列のタイプがサポートされます。パーティションカラムのデータタイプの自動的な推測をされたくない場合があるかも知れません。

再現コード

from pyspark.sql.types import *
schema = StructType([
  StructField('ID', StringType(), False),
  StructField('STR', StringType(), False),
])
df = spark.createDataFrame([['1','20190717']], schema)
df.printSchema()
df.write.mode('overwrite').partitionBy('STR').parquet('s3a://xxxxxx/data/test')
df = spark.read.parquet('s3a://xxxxxx/data/test')
df.printSchema()

出力

root
 |-- ID: string (nullable = false)
 |-- STR: string (nullable = false)

root
 |-- ID: string (nullable = true)
 |-- STR: integer (nullable = true)

対策

どれがいいのかよくわからないが、システム作ってる現状では自動推測されてもあまり嬉しい事ないので、config設定で対応。

documentにあるようにconfigで設定

spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
df = spark.read.parquet('s3a://xxxxxx/data/tes')

stackoverflowをみると毎回設定する必要あると書いてあるが、特にそんな事はなかった。

読み込み後に変換

from pyspark.sql import functions as f
df = df.withColumn("STR", f.col("STR").cast(StringType()))

schema再定義

df = spark.createDataFrame(spark.read.parquet('s3a://xxxxxx/data/test').rdd, schema)

参考