PySparkのDataFrameでは同名のカラムが許容される
概要
再現コード
作成時にカラム名が重複
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df99 = spark.createDataFrame( [ [1, 'a', 'b'], [2, 'aa', 'bb'], ], ['id', 'c1', 'c1'], ) df99.show()
+---+---+---+ | id| c1| c1| +---+---+---+ | 1| a| b| | 2| aa| bb| +---+---+---+
※このパターンはあまりない気がする
joinした結果カラム目が重複
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df1 = spark.createDataFrame( [ [1, 'a', 'b'], [2, 'aa', 'bb'], ], ['id', 'c1', 'c2'], ) df2 = spark.createDataFrame( [ [1, 'x', 'y'], [2, 'xx', 'yy'], ], ['id', 'c1', 'c2'], ) merged_df = df1.join(df2, 'id') # 重複カラムの存在は可能 merged_df.show() # 重複していないカラムのselectは可能 merged_df.select('id').show() # 重複しているカラムのselectは例外発生 merged_df.select('id', 'c1')
+---+---+---+---+---+ | id| c1| c2| c1| c2| +---+---+---+---+---+ | 1| a| b| x| y| | 2| aa| bb| xx| yy| +---+---+---+---+---+ +---+ | id| +---+ | 1| | 2| +---+ --------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) <ipython-input-20-69f4fa3aa508> in <module> 24 25 # 重複しているカラムのselectは例外発生 ---> 26 merged_df.select('id', 'c1') /spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1419 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1420 """ -> 1421 jdf = self._jdf.select(self._jcols(*cols)) 1422 return DataFrame(jdf, self.sql_ctx) 1423 /usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: /spark/python/pyspark/sql/utils.py in deco(*a, **kw) 135 # Hide where the exception came from that shows a non-Pythonic 136 # JVM exception message. --> 137 raise_from(converted) 138 else: 139 raise /spark/python/pyspark/sql/utils.py in raise_from(e) AnalysisException: Reference 'c1' is ambiguous, could be: c1, c1.;
対応方法
toDFでカラムを指定して新しくDataFrameを作る
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df1 = spark.createDataFrame( [ [1, 'a', 'b'], [2, 'aa', 'bb'], ], ['id', 'c1', 'c2'], ) df2 = spark.createDataFrame( [ [1, 'x', 'y'], [2, 'xx', 'yy'], ], ['id', 'c1', 'c2'], ) merged_df = df1.join(df2, 'id') # カラムを指定して新しくDataFrameを作る df88 = merged_df.toDF('id', 'df1_c1', 'df1_c2', 'df2_c1', 'df2_c2') df88.show()
+---+------+------+------+------+ | id|df1_c1|df1_c2|df2_c1|df2_c2| +---+------+------+------+------+ | 1| a| b| x| y| | 2| aa| bb| xx| yy| +---+------+------+------+------+
https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF
aliasをつけてselectする
from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.getOrCreate() df1 = spark.createDataFrame( [ [1, 'a', 'b'], [2, 'aa', 'bb'], ], ['id', 'c1', 'c2'], ) df2 = spark.createDataFrame( [ [1, 'x', 'y'], [2, 'xx', 'yy'], ], ['id', 'c1', 'c2'], ) df1 = df1.alias('df1') df2 = df2.alias('df2') merged_df = df1.join(df2, 'id') # showの見た目は変わらない merged_df.show() # alias付きで指定すると取得が可能 merged_df.select('id', 'df1.c1', 'df1.c2', 'df2.c1', 'df2.c2').show()
+---+---+---+---+---+ | id| c1| c2| c1| c2| +---+---+---+---+---+ | 1| a| b| x| y| | 2| aa| bb| xx| yy| +---+---+---+---+---+ +---+---+---+---+---+ | id| c1| c2| c1| c2| +---+---+---+---+---+ | 1| a| b| x| y| | 2| aa| bb| xx| yy| +---+---+---+---+---+
https://spark.apache.org/docs/3.0.0/api/python/pyspark.sql.html#pyspark.sql.Column.alias