回答

收藏

SparkSQL:使用两列的条件总和

技术问答 技术问答 269 人阅读 | 0 人回复 | 2023-09-14

希望您能帮到我。我有一个DF,如下所示:
& K! f8 G0 A; E2 k- yval df = sc.parallelize(Seq(
- k- y4 }" T4 g  (1, "a", "2014-12-01", "2015-01-01", 100),
& S* h7 Y. h2 u$ V9 u- J' f5 p  (2, "a", "2014-12-01", "2015-01-02", 150),
* }5 X0 `& {$ g/ G% N  (3, "a", "2014-12-01", "2015-01-03", 120),   L% `3 N+ V3 c. A) h: w4 Y0 l9 \$ g% h
  (4, "b", "2015-12-15", "2015-01-01", 100)
- L0 q! f  A- F' S* h( F)).toDF("id", "prodId", "dateIns", "dateTrans", "value")6 h/ u- I& o8 o/ J: I
.withColumn("dateIns", to_date($"dateIns")
0 K) R: i# h. M; r& D.withColumn("dateTrans", to_date($"dateTrans"))+ g. O% C, D: [4 L% J! z
我很乐意做一个groupBy prodId并汇总“值”,以将其汇总为由“ dateIns”和“! P! x# M/ Q0 n+ v% h" B; H
dateTrans”列之间的差异所定义的日期范围。特别是,我希望有一种方法来定义一个条件总和,该总和将上述各列之间的预定义最大差之内的所有值相加。即从dateIns开始的10、20、30天之间发生的所有值(’dateTrans’-‘dateIns’/ F# e/ @& Q9 K9 T  X( i- t
spark中是否有任何预定义的聚合函数可以进行条件求和?您是否建议开发aggr。UDF(如果有的话,有什么建议)?我正在使用pySpqrk,但也非常高兴获得Scala解决方案。非常感谢!
! c, K* {% }" K. [  B5 S               
8 A0 l) O( W  J( }: d3 f解决方案:
) n3 v+ w9 X# D# c% S4 g3 U/ ]                # Q# y8 ~  ^) D, V) }5 y' o

7 V+ J$ M& g& V* I0 t- N) y
( A3 @# S1 w- u+ m' u  C, E! g+ J4 D! Z                让您更有趣一点,以便窗口中有一些事件:0 p3 |# N) e2 K8 E8 P. f  F% i) u
val df = sc.parallelize(Seq(
  f  i, {6 P0 \* e! l4 f, R  (1, "a", "2014-12-30", "2015-01-01", 100), 9 x% X- q! \: }6 k* m( v
  (2, "a", "2014-12-21", "2015-01-02", 150),( V7 T8 G  y0 k$ c1 P5 q
  (3, "a", "2014-12-10", "2015-01-03", 120),
* K- c, R* S* U. B$ c  (4, "b", "2014-12-05", "2015-01-01", 100)
( M: r7 m7 U0 t/ r% s)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
/ H9 i2 ?+ {* x0 W: e% R1 y6 D.withColumn("dateIns", to_date($"dateIns"))
7 ]# `/ X! I% ?5 r5 W6 D' q8 q.withColumn("dateTrans", to_date($"dateTrans"))
5 {' W5 ^6 Z* G  z; G& _1 O您所需要的或多或少是这样的:
& F* k# R$ P2 t4 fimport org.apache.spark.sql.functions.{col, datediff, lit, sum}
% |8 j6 U3 Y" `' T1 c1 }  e// Find difference in tens of days * }9 K, \' ~) c3 ]) V3 W7 F
val diff = (datediff(col("dateTrans"), col("dateIns")) / 10)+ [# ^9 b3 S8 C
  .cast("integer") * 10
; N3 f0 J. v9 g" gval dfWithDiff = df.withColumn("diff", diff)$ [4 W8 _7 v2 m+ k& m+ @
val aggregated = dfWithDiff
. }2 t& @( G, G6 K  .where((col("diff") = 0))7 c2 ?1 C* h8 l% o
  .groupBy(col("prodId"), col("diff"))  {4 T( K; d2 Z) H2 a2 ^. T
  .agg(sum(col("value")))" e4 L; r/ ^) Y3 ?
结果
' o7 ~, J# K1 Q3 v4 laggregated.show( U* @  r# T$ w) ~6 h- \; K
// +------+----+----------+, i. `; y4 i8 F6 G* N# i; S1 z
// |prodId|diff|sum(value)|
/ ~& k* p" R5 r1 y# o9 k- w3 W; K// +------+----+----------+
- h+ ~' L" W7 T// |     a|  20|       120|0 K8 P7 T4 v2 g. I
// |     b|  20|       100|! F$ r; U% O7 A% b3 u6 n
// |     a|   0|       100|
* t0 L+ p/ k( j2 }5 G3 `// |     a|  10|       150|- p; W% R: @: C& f( g5 f/ s
// +------+----+----------+
; |9 a7 _+ l7 K; h1 m7 h0 ?3 S0 N其中diff是范围(0-> [0,10),10-> [10,20),…)的下限。如果您删除val并调整了导入,这也将在PySpark中起作用。
* E0 @" m* c0 J7 f编辑 (每列汇总):
' F! O: N" |. G9 F1 ~val exprs = Seq(0, 10,  20).map(x => sum(
' o) m' F( k; N7 l4 ~6 V" M& T  when(col("diff") === lit(x), col("value"))) b/ L4 Y# m; V
    .otherwise(lit(0)))
9 ?- c# b8 a0 v    .alias(x.toString))
  r6 I) e+ S8 x, k( f, `dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show
& a# S/ I1 Q# A) X! y3 J5 y// +------+---+---+---+6 R1 u6 j; C0 t+ o% `$ [+ Z' j
// |prodId|  0| 10| 20|  J* a" K$ C. g/ N
// +------+---+---+---+
% Q$ `% ]7 o( e4 S: }& r// |     a|100|150|120|
0 O. T8 g2 x& N  b& Y// |     b|  0|  0|100|
: u& s- n$ r8 {! }% q// +------+---+---+---+& E# F% ?  F( Q2 E9 O4 b
与Python等效:
: _# v! T2 p/ t( b; W; Xfrom pyspark.sql.functions import *6 z3 z0 H: y; Y0 J- C+ J6 \
def make_col(x):! x3 p8 x, C/ K9 F2 ~3 f
   cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0))
2 @' [9 q. W. N   return sum(cnd).alias(str(x))) F2 [# a% ]( `
exprs = [make_col(x) for x in range(0, 30, 10)]
6 j+ ]1 n' a( s) R/ y; {- t7 U2 mdfWithDiff.groupBy(col("prodId")).agg(*exprs).show()
& \: C% ]1 K# F; s& x## +------+---+---+---+
7 O5 E; c2 P3 K, x" A## |prodId|  0| 10| 20|
7 C4 _" q/ Q  v5 l% r& d## +------+---+---+---+
* x$ }$ P  o+ E4 R) G( u+ T/ q## |     a|100|150|120|* s" Z" g5 L0 D8 M: j
## |     b|  0|  0|100|
! W( B% u- M4 ^0 o## +------+---+---+---+
分享到:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则