回答

收藏

SQL查询针对Apache Ignite缓存返回空结果

技术问答 技术问答 209 人阅读 | 0 人回复 | 2023-09-12

我正在尝试从Spark RDD向Ignite缓存执行插入操作。我在用2.2版本的Ignite和2.1版本的Spark。
1 I! W0 e$ M/ e9 c1 x" G我想做的第一步是单独做scala脚本中创建缓存,如下所示:# O2 ^6 Z# l) Q* b3 ]
object Create_Ignite_Cache {case class Custom_Class(                 @(QuerySqlField @field)(index = true)   a: String,                  @(QuerySqlField @field)(index = true)  b: String,                  @(QuerySqlField @field)(index = true)  c: String,                  @(QuerySqlField @field)(index = true)  d: String,                  @(QuerySqlField @field)(index = true)  e: String,                  @(QuerySqlField @field)(index = true)  f: String,                                                                        @(QuerySqlField @field)(index = true)  g: String,                  @(QuerySqlField @field)(index = true)  h: String              def main(args: Array[String]): Unit = { val spi = new TcpDiscoverySpi val ipFinder = new TcpDiscoveryMulticastIpFinder val adresses = new util.ArrayList[String] adresses.add("127.0.0.1:48500..48520") ipFinder.setAddresses(adresses) spi.setIpFinder(ipFinder) val cfg = new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true) val cache_conf = new CacheConfiguration[String,Custom_Class]().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]).setName("Spark_Ignite") val ignite = Ignition.getOrStart(cfg) ignite.getOrCreateCache(cache_conf) System.out.println("[INFO] CACHE CREATED") ignite.close()}}接下来,我操作了一个Spark应用程序,将igniteRDD将内容插入缓存:
7 m7 }6 j; z* l. D3 uobject Spark_Streaming_Processing { case class Custom_Class                 @(QuerySqlField @field)(index = true) a: String,                     @(QuerySqlField @field)(index = true) b: String,                     @(QuerySqlField @field)(index = true) c: String,                     @(QuerySqlField @field)(index = true) d: String,                     @(QuerySqlField @field)(index = true) e: String,                                                                                    @(QuerySqlField @field)(index = true) f: String,                     @(QuerySqlField @field)(index = true) g: String,                     @(QuerySqlField @field)(index = true) h: String                  START IGNITE CONTEXT  val addresses=new util.ArrayList[String]()  addresses.add("127.0.0.1:48500..48520")  val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>    new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))      ).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()       .setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))    ,true)  println(igniteContext.ignite().cacheNames())  val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")  val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data,which as you can see has the correct datatypes as parameters  ignite_cache_rdd.savePairs(processed_PairRDD)}  }可见这些类是完全一样的。; S. H# p" a1 r4 Q
应用程序成功运行后,我可以ignitevisor该缓存包含63个记录,如控制台上的快照所见。
" y# E+ G5 G: N0 l& z但是,如果我试图执行高速缓存sql查询如下:& ]; R/ f8 g* x* e( z( @
                        ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)我得到了一个空表。# _/ b# v" r. n$ q& A( K  t/ P# o
假如我穿过外面sql同样的事情也会发生在服务器查询中。
% H" N; _' h/ Z2 z2 W2 t$ O9 P奇怪的是,如果我不创建缓存先验,而是运行Spark应用程序,然后IgniteContext会创建缓存(如果不存在),然后5 {  @8 T; M" ~2 j5 `* s& K
我可以在查询中看到记录!
/ [2 `; a1 x7 G: n/ d% f1 q这可能是什么问题?+ i5 A' K  a! B, j/ H+ s# M
据我所知,键和值的数据类型完全相同,所以我应该能够在查询时看到它们。
& K5 E# U) t4 h- W& |& c0 i谢谢你的时间。
% @; X# t& l( y8 e7 S; N2 i2 p                                                               
, Y+ _5 j1 [5 L1 S1 |    解决方案:
分享到:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则