|
我正在尝试从Spark RDD向Ignite缓存执行插入操作。我在用2.2版本的Ignite和2.1版本的Spark。
1 I! W0 e$ M/ e9 c1 x" G我想做的第一步是单独做scala脚本中创建缓存,如下所示:# O2 ^6 Z# l) Q* b3 ]
object Create_Ignite_Cache {case class Custom_Class( @(QuerySqlField @field)(index = true) a: String, @(QuerySqlField @field)(index = true) b: String, @(QuerySqlField @field)(index = true) c: String, @(QuerySqlField @field)(index = true) d: String, @(QuerySqlField @field)(index = true) e: String, @(QuerySqlField @field)(index = true) f: String, @(QuerySqlField @field)(index = true) g: String, @(QuerySqlField @field)(index = true) h: String def main(args: Array[String]): Unit = { val spi = new TcpDiscoverySpi val ipFinder = new TcpDiscoveryMulticastIpFinder val adresses = new util.ArrayList[String] adresses.add("127.0.0.1:48500..48520") ipFinder.setAddresses(adresses) spi.setIpFinder(ipFinder) val cfg = new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true) val cache_conf = new CacheConfiguration[String,Custom_Class]().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]).setName("Spark_Ignite") val ignite = Ignition.getOrStart(cfg) ignite.getOrCreateCache(cache_conf) System.out.println("[INFO] CACHE CREATED") ignite.close()}}接下来,我操作了一个Spark应用程序,将igniteRDD将内容插入缓存:
7 m7 }6 j; z* l. D3 uobject Spark_Streaming_Processing { case class Custom_Class @(QuerySqlField @field)(index = true) a: String, @(QuerySqlField @field)(index = true) b: String, @(QuerySqlField @field)(index = true) c: String, @(QuerySqlField @field)(index = true) d: String, @(QuerySqlField @field)(index = true) e: String, @(QuerySqlField @field)(index = true) f: String, @(QuerySqlField @field)(index = true) g: String, @(QuerySqlField @field)(index = true) h: String START IGNITE CONTEXT val addresses=new util.ArrayList[String]() addresses.add("127.0.0.1:48500..48520") val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=> new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses)) ).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]() .setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class])) ,true) println(igniteContext.ignite().cacheNames()) val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite") val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data,which as you can see has the correct datatypes as parameters ignite_cache_rdd.savePairs(processed_PairRDD)} }可见这些类是完全一样的。; S. H# p" a1 r4 Q
应用程序成功运行后,我可以ignitevisor该缓存包含63个记录,如控制台上的快照所见。
" y# E+ G5 G: N0 l& z但是,如果我试图执行高速缓存sql查询如下:& ]; R/ f8 g* x* e( z( @
ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)我得到了一个空表。# _/ b# v" r. n$ q& A( K t/ P# o
假如我穿过外面sql同样的事情也会发生在服务器查询中。
% H" N; _' h/ Z2 z2 W2 t$ O9 P奇怪的是,如果我不创建缓存先验,而是运行Spark应用程序,然后IgniteContext会创建缓存(如果不存在),然后5 { @8 T; M" ~2 j5 `* s& K
我可以在查询中看到记录!
/ [2 `; a1 x7 G: n/ d% f1 q这可能是什么问题?+ i5 A' K a! B, j/ H+ s# M
据我所知,键和值的数据类型完全相同,所以我应该能够在查询时看到它们。
& K5 E# U) t4 h- W& |& c0 i谢谢你的时间。
% @; X# t& l( y8 e7 S; N2 i2 p
, Y+ _5 j1 [5 L1 S1 | 解决方案: |
|