dataframe添加列 spark dataframe 列操作

本文详细介绍了在spark dataset中使用java更新列值的两种主要方法。首先,通过新列并删除旧列来实现简单的值替换。其次,针对复杂的数据转换需求,重点阐述了如何注册和应用用户自定义函数(udf),包括在dataframe api和spark sql中集成udf的实践,并提供了具体的数据格式高效转换示例,旨在帮助开发者、正确处理spark中的数据更新操作。
在Spark中,数据集(其类型别名DataFrame)是不大量的数据集合。这意味着你不能像操作传统Java集合那样直接并修改其内部元素。当需要“更新”列的值时,实际上是创建一个新的数据集,其下载Java环境下,如何高效且符合Spark范式地更新Dataset中的列值。1. 理解Spark的不可变性
许多初学者尝试通过遍历Dataset中的行并直接修改Row对象来更新数据,例如使用foreach或map操作。然而,这种转换做法是转换错误的,原因如下:不可变性:Row对象本身是不可变的。普遍执行:foreach操作在集群的各个执行器上完成执行,但不会返回一个新的Datase t:非数据。
正确的做法是利用Spark的(Transform) ation)操作,这些操作会返回一个新的数据集,而不会修改原始数据。2. 使用withColumn和drop进行列值替换
运送mn一个新列,然后如果需要创建,使用drop删除旧列。
立即学习“Java免费笔记学习(深“Row”gt;名为yourdataset,并且想要将UPLOADED_ON列替换为新的值(例如,一个常量值)。
import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import static org.apache.spark.sql.functions.lit; //lit函数//假设yourdataset已经加载// Datasetlt;Rowgt;yourdataset = sparkSession.read()....;// 1.一个名为quot;UPLOADED_ON_NEWquot;的新列,其值quot;Any-valuequot;//如果新列名与旧列名相同,底部直接替换Datasetlt;Rowgt;updatedDataset = yourdataset.withColumn(quot;UPLOADED_ON_NEWquot;,lit(quot;Any-valuequot;));// 2.如果需要删除,原始的quot;UPLOADED_ONquot;列updatedDataset = UpdatedDataset.withColumn(quot;UPLOADED_ON_NEWquot;,lit(quot;Any-valuequot;));// 2.如果需要删除,原始的quot;UPLOADED_ONquot;列updatedDataset = UpdatedDataset.withColumn(quot;UPLOADED_ON_NEWquot);的新列,而没有原始的 quot;UPLOADED_ONquot; UpdatedDataset.show() ,withColumn会直接覆盖旧列。例如:yourdataset.withColumn("UPLOADED_ON", lit("New Value")) 会直接将UPLOADED_ON列的所有值更新为"New Value"。lit()函数用于创建字面量(常量)列。3. 使用用户自定义函数 (UDF) 进行复杂转换
当列值的转换逻辑比较复杂,无法导入函数直接实现时,用户自定义函数(UDF Java(或 Scala、Python)逻辑集成到 Spark 的转换操作中。
AppMall应用商店
AI应用商店,提供即时交付、涉及付费的人工智能应用服务 56页
示例场景:日期格式转换
假设UPLOADED_ON列存储的3.1注册UDF
在使用UDF,需要将其注册到SparkSession中。注册时需要事先指定UDF的名称、实现逻辑(通常是Lambda返回表达式)和类型。
import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.api.java.UDF1; // 导入UDF1接口import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import java.text.ParseException; //导入ParseException//假设sparkSession已经初始化// SparkSession SparkSession = SparkSession.builder().appName(quot;UDFExamplequot;).master(quot;local[*]quot;).getOrCreate();// 注册一个 UDF,用于将日期字符串从 quot;yyyy-MM-ddquot; dateIn -gt; { // UDF 的逻辑实现,这里使用 Lambda 表达式 if (dateIn == null || dateIn.isEmpty()) { return null; } try { DateFormat inputFormatter = 新SimpleDateFormat(quot;yyyy-MM-ddquot;); Date date = inputFormatter.parse(dateIn); // 解析输入日期字符串 DateFormat outputFormatter = new SimpleDateFormat(quot;dd-MM-yyquot;); return outputFormatter.format(date); //格式化为目标字符串 } catch (ParseException e) { // Return null: // dateIn; } } 或者 }, DataTypes.StringType // UDF的返回类型);System.out.println(quot;UDF 'formatDateYYYYMMDDtoDDMMYY'注册成功。quot;);登录后复制
关键点
:UDF1lt;String, Stringgt;表示一个接受一个String参数并返回一个String结果的UDF。根据参数数量,Spark提供了UDF1到UDF22等接口。DataTypes.StringType 阅读更多:UDF 3.2应用UDF到数据集
注册UDF后,就可以在withColumn操作中使用UDF函数来调用它。import org.apache.spark.sql.Dataset;import;import org.apache.spark.sql.Row;import static org.apache.spark.sql.functions.col; // 导入col函数import static org.apache.spark.sql.functions.callUDF; //导入callUDF函数//假设yourdataset已经加载,并且UDF已经注册// Datasetlt;Rowgt; yourdataset = sparkSession.read()....;//使用注册的UDF来”;UPLOADED_ON”; 列,放入结果存入quot;UPLOADED_ON_NEWquot; 列Datasetlt;Rowgt;transformedDataset = yourdataset.withColumn( quot;UPLOADED_ON_NEWquot;, callUDF( quot;formatDateYYYYMMDDtoDDMMYYquot;, // UDF 的名称col(quot;UPLOADED_ONquot;) // 确定UDF 的列 ));//如果需要替换原始列,可以删除旧列并重命名新列transformedDataset =transformedDataset.drop(quot;UPLOADED_ONquot;) .withColumnRenamed(quot;UPLOADED_ON_NEWquot;,quot;UPLOADED_ONquot;);transformedDataset.show();登录后复制3.3 UDF在Spark SQL中的应用
注册的UDF不仅可以在DataFrame中API中使用,也可以在Spark SQL查询中直接调用。这使得UDF在混合使用SQL和DataFrame API的场景非常灵活。
import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;//假设sparkSession已经初始化,yourdataset已经加载,并且UDF已经注册// 1. 将Dataset Spark使用 // sqlTransformedDataset = sqlTransformedDataset.drop(quot;UPLOADED_ONquot;) .withColumnRenamed(quot;UPLOADED_ON_NEW";, quot;UPLOADED_ONquot;);sqlTransformedDataset.show();登录后复制4. 注意事项与最实践性能考量:尽管UDF功能强大,但它们通常不如Spark内置函数或表达式优化得好。Spark内置函数(如date_format、to_date等在org.apac he.spark.sql.functions中)可以进行全面层次的优化,因为Spark,应优先使用。类型安全:注册UDF如果UDF的实际返回值类型与注册类型不匹配,可能会导致运行时错误或意外行为。 UDF列化的:ParseException。调试:调试UDF可能比调试普通Spark转换更复杂,因为错误可能会导致Spark数据集中更新列值,核心在于理解其不可可变性并利用Spark的转换操作。对于简单的值替换结合Columndrop是高效的方法。UDF提供了一个强大的扩展机制,允许开发者将任何Java代码集成到Spark的数据处理中。无论是通过DataFrame API的调用UDF还是Spark SQL(UDF) rk 内置函数,只有在内置函数无法满足需求时,再使用UDF:
以上就是Spark Dataset列值更新:Java python java apache app session yy red Python Java scala sql 广泛 String 常量 foreach 字符串 Lambda 接口值类型 map 对象 Spark apache scala Java 之间 Base64 转换压缩文件时出现的问题 理解标准输出缓冲:Python, C, Java, Go理解标准输出缓冲:Python、C、Java 与 Go 的行为差异解析 理解标准输出缓冲:Python、C、Java 和 Go 的异同
