SQLAlchemy-将子查询的架构和数据复制到另一个数据库
技术问答
310 人阅读
|
0 人回复
|
2023-09-14
|
我正在尝试将子查询中的数据postgres(from_engine)复制到sqlite数据库。我可以使用以下命令复制表来实现这一目的:
5 p+ y0 n' v/ i: Ysmeta = MetaData(bind=from_engine)table = Table(table_name,smeta,autoload=True)table.metadata.create_all(to_engine)但是,我不确定如何为子查询句子实现相同的功能。
9 c2 ]' n! G7 P2 J1 c7 | x-山迪普
! j5 T! f2 A) O编辑:跟进答案。创建表后,我要创建一个子查询stmt,如下所示:' ?0 l# @' [, \5 o( I
table = Table("newtable",dest_metadata,*columns)stmt = dest_session.query(table).subquery();但是,最后一个stmt最终出现了错误cursor.execute(statement,parameters)sqlalchemy.exc.ProgrammingError:(ProgrammingError)关系“5 P, {. r* p# U
newtable第三行不存在:FROM newtable)AS anon_1/ C* M) k1 X6 Q* Q0 ~* ?: U
% R1 n( N6 a% z: P: u9 ^
解决方案: : v& p) T9 k& t2 E
至少在某些情况下:0 h; l$ k" r6 N5 V! {6 K8 s
[ol]使用column_descriptions查询对象获取各列相关结果的信息。
. j* @, u6 w8 f M& B/ N, i利用这些信息,您可以在另一个数据库中构建创建新表的模式。) x2 n, V/ \9 V& {$ y
运行查询源数据库,并将结果插入新表。
6 b0 q8 a( T; e; y0 R0 X k$ i7 ?3 g[/ol]先设置示例:
+ E7 q- i8 i; d. Z# H* B2 O% rfrom sqlalchemy import create_engine,MetaData,from sqlalchemy import Column,Integer,String,Tablefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker# Engine to the database to query the data from# (postgresql)source_engine = create_engine('sqlite:///:memory:',echo=True)SourceSession = sessionmaker(source_engine)# Engine to the database to store the results in# (sqlite)dest_engine = create_engine('sqlite:///:memory:',echo=True)DestSession = sessionmaker(dest_engine)# Create some toy table and fills it with some dataBase = declarative_base()class Pet(Base): __tablename__ = 'pets' id = Column(Integer,primary_key=True) name = Column(String) race = Column(String)Base.metadata.create_all(source_engine)sourceSession = SourceSession()sourceSession.add(Pet(name="Fido",race="cat"))sourceSession.add(Pet(name="Ceasar",race="cat"))sourceSession.add(Pet(name="Rex",race="dog"))sourceSession.commit()现在去有趣的地方:5 c$ t1 X) H$ V& l0 z
# This is the query we want to persist in a new table:query= sourceSession.query(Pet.name,Pet.race).filter_by(race='cat')# Build the schema for the new table# based on the columns that will be returned # by the query:metadata = MetaData(bind=dest_engine)columns = [Column(desc['name'],desc['type']) for desc in query.column_descriptions]column_names = [desc['name'] for desc in query.column_descriptions]table = Table("newtable",metadata,*columns)# Create the new table in the destination databasetable.create(dest_engine)# Finally execute the querydestSession = DestSession()for row in query: destSession.execute(table.insert(row))destSession.commit()最后一个一个循环应该有更有效的方法。但批量插入是另一个主题。 |
|
|
|
|
|