|
希望您能帮到我。我有一个DF,如下所示:
& K! f8 G0 A; E2 k- yval df = sc.parallelize(Seq(
- k- y4 }" T4 g (1, "a", "2014-12-01", "2015-01-01", 100),
& S* h7 Y. h2 u$ V9 u- J' f5 p (2, "a", "2014-12-01", "2015-01-02", 150),
* }5 X0 `& {$ g/ G% N (3, "a", "2014-12-01", "2015-01-03", 120), L% `3 N+ V3 c. A) h: w4 Y0 l9 \$ g% h
(4, "b", "2015-12-15", "2015-01-01", 100)
- L0 q! f A- F' S* h( F)).toDF("id", "prodId", "dateIns", "dateTrans", "value")6 h/ u- I& o8 o/ J: I
.withColumn("dateIns", to_date($"dateIns")
0 K) R: i# h. M; r& D.withColumn("dateTrans", to_date($"dateTrans"))+ g. O% C, D: [4 L% J! z
我很乐意做一个groupBy prodId并汇总“值”,以将其汇总为由“ dateIns”和“! P! x# M/ Q0 n+ v% h" B; H
dateTrans”列之间的差异所定义的日期范围。特别是,我希望有一种方法来定义一个条件总和,该总和将上述各列之间的预定义最大差之内的所有值相加。即从dateIns开始的10、20、30天之间发生的所有值(’dateTrans’-‘dateIns’/ F# e/ @& Q9 K9 T X( i- t
spark中是否有任何预定义的聚合函数可以进行条件求和?您是否建议开发aggr。UDF(如果有的话,有什么建议)?我正在使用pySpqrk,但也非常高兴获得Scala解决方案。非常感谢!
! c, K* {% }" K. [ B5 S
8 A0 l) O( W J( }: d3 f解决方案:
) n3 v+ w9 X# D# c% S4 g3 U/ ] # Q# y8 ~ ^) D, V) }5 y' o
7 V+ J$ M& g& V* I0 t- N) y
( A3 @# S1 w- u+ m' u C, E! g+ J4 D! Z 让您更有趣一点,以便窗口中有一些事件:0 p3 |# N) e2 K8 E8 P. f F% i) u
val df = sc.parallelize(Seq(
f i, {6 P0 \* e! l4 f, R (1, "a", "2014-12-30", "2015-01-01", 100), 9 x% X- q! \: }6 k* m( v
(2, "a", "2014-12-21", "2015-01-02", 150),( V7 T8 G y0 k$ c1 P5 q
(3, "a", "2014-12-10", "2015-01-03", 120),
* K- c, R* S* U. B$ c (4, "b", "2014-12-05", "2015-01-01", 100)
( M: r7 m7 U0 t/ r% s)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
/ H9 i2 ?+ {* x0 W: e% R1 y6 D.withColumn("dateIns", to_date($"dateIns"))
7 ]# `/ X! I% ?5 r5 W6 D' q8 q.withColumn("dateTrans", to_date($"dateTrans"))
5 {' W5 ^6 Z* G z; G& _1 O您所需要的或多或少是这样的:
& F* k# R$ P2 t4 fimport org.apache.spark.sql.functions.{col, datediff, lit, sum}
% |8 j6 U3 Y" `' T1 c1 } e// Find difference in tens of days * }9 K, \' ~) c3 ]) V3 W7 F
val diff = (datediff(col("dateTrans"), col("dateIns")) / 10)+ [# ^9 b3 S8 C
.cast("integer") * 10
; N3 f0 J. v9 g" gval dfWithDiff = df.withColumn("diff", diff)$ [4 W8 _7 v2 m+ k& m+ @
val aggregated = dfWithDiff
. }2 t& @( G, G6 K .where((col("diff") = 0))7 c2 ?1 C* h8 l% o
.groupBy(col("prodId"), col("diff")) {4 T( K; d2 Z) H2 a2 ^. T
.agg(sum(col("value")))" e4 L; r/ ^) Y3 ?
结果
' o7 ~, J# K1 Q3 v4 laggregated.show( U* @ r# T$ w) ~6 h- \; K
// +------+----+----------+, i. `; y4 i8 F6 G* N# i; S1 z
// |prodId|diff|sum(value)|
/ ~& k* p" R5 r1 y# o9 k- w3 W; K// +------+----+----------+
- h+ ~' L" W7 T// | a| 20| 120|0 K8 P7 T4 v2 g. I
// | b| 20| 100|! F$ r; U% O7 A% b3 u6 n
// | a| 0| 100|
* t0 L+ p/ k( j2 }5 G3 `// | a| 10| 150|- p; W% R: @: C& f( g5 f/ s
// +------+----+----------+
; |9 a7 _+ l7 K; h1 m7 h0 ?3 S0 N其中diff是范围(0-> [0,10),10-> [10,20),…)的下限。如果您删除val并调整了导入,这也将在PySpark中起作用。
* E0 @" m* c0 J7 f编辑 (每列汇总):
' F! O: N" |. G9 F1 ~val exprs = Seq(0, 10, 20).map(x => sum(
' o) m' F( k; N7 l4 ~6 V" M& T when(col("diff") === lit(x), col("value"))) b/ L4 Y# m; V
.otherwise(lit(0)))
9 ?- c# b8 a0 v .alias(x.toString))
r6 I) e+ S8 x, k( f, `dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show
& a# S/ I1 Q# A) X! y3 J5 y// +------+---+---+---+6 R1 u6 j; C0 t+ o% `$ [+ Z' j
// |prodId| 0| 10| 20| J* a" K$ C. g/ N
// +------+---+---+---+
% Q$ `% ]7 o( e4 S: }& r// | a|100|150|120|
0 O. T8 g2 x& N b& Y// | b| 0| 0|100|
: u& s- n$ r8 {! }% q// +------+---+---+---+& E# F% ? F( Q2 E9 O4 b
与Python等效:
: _# v! T2 p/ t( b; W; Xfrom pyspark.sql.functions import *6 z3 z0 H: y; Y0 J- C+ J6 \
def make_col(x):! x3 p8 x, C/ K9 F2 ~3 f
cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0))
2 @' [9 q. W. N return sum(cnd).alias(str(x))) F2 [# a% ]( `
exprs = [make_col(x) for x in range(0, 30, 10)]
6 j+ ]1 n' a( s) R/ y; {- t7 U2 mdfWithDiff.groupBy(col("prodId")).agg(*exprs).show()
& \: C% ]1 K# F; s& x## +------+---+---+---+
7 O5 E; c2 P3 K, x" A## |prodId| 0| 10| 20|
7 C4 _" q/ Q v5 l% r& d## +------+---+---+---+
* x$ }$ P o+ E4 R) G( u+ T/ q## | a|100|150|120|* s" Z" g5 L0 D8 M: j
## | b| 0| 0|100|
! W( B% u- M4 ^0 o## +------+---+---+---+ |
|