山pの楽しいお勉強生活

勉強の成果を垂れ流していきます

PySparkのDataFrameでは同名のカラムが許容される

概要

  • PySparkのDataFrameでは同名のカラムが許容される
  • select などカラム名を指定する処理時に例外が発生する
  • カラム名を再定義、別名を付ける事で回避が可能

再現コード

作成時にカラム名が重複

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df99 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c1'],
)
df99.show()
+---+---+---+
| id| c1| c1|
+---+---+---+
|  1|  a|  b|
|  2| aa| bb|
+---+---+---+

※このパターンはあまりない気がする

joinした結果カラム目が重複

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
merged_df = df1.join(df2, 'id')

# 重複カラムの存在は可能
merged_df.show()

# 重複していないカラムのselectは可能
merged_df.select('id').show()

# 重複しているカラムのselectは例外発生
merged_df.select('id', 'c1')
+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

+---+
| id|
+---+
|  1|
|  2|
+---+

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-20-69f4fa3aa508> in <module>
     24 
     25 # 重複しているカラムのselectは例外発生
---> 26 merged_df.select('id', 'c1')

/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
   1419         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1420         """
-> 1421         jdf = self._jdf.select(self._jcols(*cols))
   1422         return DataFrame(jdf, self.sql_ctx)
   1423 

/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Reference 'c1' is ambiguous, could be: c1, c1.;

対応方法

toDFでカラムを指定して新しくDataFrameを作る

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
merged_df = df1.join(df2, 'id')

# カラムを指定して新しくDataFrameを作る
df88 = merged_df.toDF('id', 'df1_c1', 'df1_c2', 'df2_c1', 'df2_c2')
df88.show()
+---+------+------+------+------+
| id|df1_c1|df1_c2|df2_c1|df2_c2|
+---+------+------+------+------+
|  1|     a|     b|     x|     y|
|  2|    aa|    bb|    xx|    yy|
+---+------+------+------+------+

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF

aliasをつけてselectする

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(
    [
        [1, 'a', 'b'],
        [2, 'aa', 'bb'],
    ],
    ['id', 'c1', 'c2'],
)
df2 = spark.createDataFrame(
    [
        [1, 'x', 'y'],
        [2, 'xx', 'yy'],
    ],
    ['id', 'c1', 'c2'],
)
df1 = df1.alias('df1')
df2 = df2.alias('df2')
merged_df = df1.join(df2, 'id')

# showの見た目は変わらない
merged_df.show()

# alias付きで指定すると取得が可能
merged_df.select('id', 'df1.c1', 'df1.c2', 'df2.c1', 'df2.c2').show()
+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

+---+---+---+---+---+
| id| c1| c2| c1| c2|
+---+---+---+---+---+
|  1|  a|  b|  x|  y|
|  2| aa| bb| xx| yy|
+---+---+---+---+---+

https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.Column.alias