回答

收藏

Spark:优化将DataFrame写入SQL Server

技术问答 技术问答 241 人阅读 | 0 人回复 | 2023-09-14

我正在使用以下代码将43列和约2000行DataFrame写入SQL Server的表中:% V/ s& I8 @3 o3 U
dataFrame  .write  .format("jdbc")  .mode("overwrite")  .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")  .option("url",url)  .option("dbtable",tablename)  .option("user",user)  .option("password",password)  .save()不幸的是,虽然它确实适用于小型DataFrame,但它要么很慢,要么很大DataFrame超时。关于如何优化它的任何提示?. r/ H$ j3 r# D. g. C% F+ t# j
我试着设置 rewriteBatchedStatements=true$ y, @; U4 g3 k; h& q
谢谢。( p+ F) i' V$ A' @! ]% O
                                                                2 t7 p" a1 j  i  {) |$ g9 n) Y
    解决方案:                                                                6 c" a7 G6 g0 }
                                                                我们求助于使用azure-sqldb-spark而不是使用Spark默认内置导出功能。这个库给你一个bulkCopyToSqlDB这是一种方法 真正的    批量插入,去 了很多
: K/ O' Q7 d1 T) h% P更快。它比内置功能太实用,但以我的经验还是值得的。
; N# {; k/ x; b9 K6 z我们或多或少地这样使用它:# B/ w" B3 I& f7 |
import com.microsoft.azure.sqldb.spark.config.Configimport com.microsoft.azure.sqldb.spark.connect._import com.microsoft.azure.sqldb.spark.query._val options = Map(  "url"          -> "***", "databaseName" -> "***", "user"         -> "***", "password"     -> "***", "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver")// first make sure the table exists,with the correct column types// and is properly cleaned up if necessaryval query = dropAndCreateQuery(df,"myTable")val createConfig = Config(options    Map("QueryCustom" -> query))spark.sqlContext.sqlDBQuery(createConfig)val bulkConfig = Config(options    Map(  "dbTable"           -> "myTable", "bulkCopyBatchSize" -> "20000", "bulkCopyTableLock" -> "true", "bulkCopyTimeout"   -> "600"))df.bulkCopyToSqlDB(bulkConfig)如你所见,我们CREATE TABLE您 可以, o+ {! r. r) h9 X0 j1 o6 @4 r
让库创建表,但这样做dataFrame.limit(0).write.sqlDB(config)它仍然非常低效,可能需要你缓存DataFrame,而且不允许你选择SaveMode。
- S! ?8 H! Z2 T' T6 n. x0 m! H也可能有趣:ExclusionRule添加此库sbt我们必须在施工过程中使用它an ,否则assembly任务将失败。. k" A8 [" w" n; t. N
libraryDependencies  = "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(  ExclusionRule(organization = "org.apache.spark"))
分享到:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则