回答

收藏

分布式事务如何在线程环境中与同一个数据库建立多个连接?

技术问答 技术问答 229 人阅读 | 0 人回复 | 2023-09-14

我试图确定分布式事务中多个数据库连接的行为。
7 a( n8 `+ U5 C我有一个运行很长时间的过程,它产生了一系列线程,然后每个线程负责管理其DB连接等。所有这些都在事务作用域内运行,并且每个线程都通过一个DependentTransaction对象加入到事务中。2 t/ j. |+ O0 P5 }5 x
当我并行执行此过程时,我遇到了几个问题,即似乎存在某种阻止查询在事务上同时执行的块。- g6 p: j3 H' h1 w1 b, x  v% |
我想知道的是事务协调器如何处理从多个连接到同一数据库的查询,甚至是否建议跨线程传递连接对象?
. C7 @+ Z: Z5 r* U# Q, J' k我读过MS
  i' x( Y4 V+ L- r& qSQL在每个事务中只允许一个连接,但是我显然能够在同一事务中创建和初始化一个以上连接到同一DB的连接。打开连接时,如果没有获取“另一个会话正在使用的事务上下文”异常,我将无法并行执行线程。结果是连接必须等待执行,而不是同时运行,最终代码运行完成,但是由于存在锁定问题,因此对应用程序进行线程化没有任何好处。
2 g2 I' R4 r; w- l/ v该代码看起来像这样。
. o* b/ \; @" s- k5 g6 f4 W. s    Sub StartThreads()
3 Z( y2 f) O9 M* Y/ c9 v3 c( q+ [        Using Scope As New TransactionScope
2 y5 a7 o; X. v$ \0 f            Dim TL(100) As Tasks.Task
: g# q# A! v9 {( O            Dim dTx As DependentTransaction( Y+ z. K6 u& w% k! y+ Y5 T; e
            For i As Int32 = 0 To 1007 R/ P. X0 C3 _
                Dim A(1) As Object
; H5 P! i" A) I9 m0 q                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
. {/ B/ Z/ V* `0 W+ ~! [1 E                'A(0) = some_other_data/ c$ t) B  ?8 y3 S, I
                A(1) = dTx 'the Dependent Transaction
0 r/ M2 p4 t! U1 e0 {9 _  p                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
5 P5 g. F" G+ c( G! P' `            Next. D- R; \3 |3 j2 B
            Tasks.Task.WaitAll(TL) 'Wait for threads to finish: }! F0 [4 J$ B4 |3 P/ F3 R" `7 a
            Scope.Complete()
% K% n  O9 B! n4 `' e        End Using- K2 Y4 U2 w) y* a2 J# @0 \
    End Sub4 T# b8 t# S- j2 c4 C4 N$ z
    Dim TransLock As New Object6 W4 b# W' V; k! C8 M
    Sub ProcessData(ByVal A As Object)
& ^$ Z- \3 H5 k) `        Dim DTX As DependentTransaction = A(1)$ l( h2 V3 t4 q
        Dim Trans As Transactions.TransactionScope+ O! U) u: E7 j" r+ r
        Dim I As Int32; }+ `; O: F8 k- }/ Q
        Do While True
# h0 c+ h( h6 z# B0 U* j6 x8 c- I# n            Try
, J$ L! |5 t9 K. t5 w, d0 E( n                SyncLock (TransLock)
  Z0 U  I3 n/ I5 R/ E                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
% Q; G5 m: Q% _; }8 N( b; F                End SyncLock- x1 D* V- X& W5 c0 A
                Exit Do- I( t  M+ A, t
            Catch ex As TransactionAbortedException* ^, i* B" y; y+ F& a
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
3 O/ @& z* e/ z                ElseIf ex.Message = "The transaction has aborted." Then
, W8 f$ ?0 q1 q: ^                    Throw New Exception(ex.ToString)
- y1 k: A# _1 @/ I                    Exit Sub! y8 ^3 O7 t" u# G2 W
                End If. R, Q% y5 l; H9 ^7 m/ `" z# J
                I += 1# d: s. S/ v8 @. l
                If I > 5 Then
. e) w0 ^3 a% F. n" Y) {; j                    Throw New Exception(ex.ToString)
+ R1 O- ]9 F* R7 T( v8 K8 T                End If
9 Y6 Q1 D! |4 \( R$ o            Catch ex As Exception; G7 `; }1 L7 h% O- m
            End Try
- p) [+ u8 h, N) W  k2 @4 }            Thread.Sleep(10)7 C! Y7 p* {2 u! Y
        Loop
7 Y' ]. k7 f6 b0 ^        Using Trans
5 M4 y' B: f0 ~2 ?, V8 D            Using DALS As New DAC.DALScope: b2 q1 U. Z- q
                Do While True; T9 @* O5 A( r5 w! N: {6 a$ L% o
                    Try0 U* l. G: D$ l1 [
                        SyncLock (TransLock): K% w$ W3 |9 h" r: h1 j
                            'This opens two connection to the same DB for later use.
& i% L7 f* }: p2 |% q) q2 x3 f                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection): D( y! t8 o: |
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
& a) f  c. ?0 o: x, i                        End SyncLock
6 f7 r, H8 r! o                        Exit Do
  \' T5 G! o, e, C' e# s                    Catch ex As Exception
% o) Q+ X. {' x# k% l1 F5 b                        'This is usually where I find the bottleneck
& p2 c! Y# }9 [- L" u6 G. z2 Q1 l9 A                        '"Transaction context in use by another session" is the exception that I get- T7 x: g) c2 p0 C
                        Thread.Sleep(100)  U8 s& F- i8 |8 b! ?4 F. x
                    End Try
5 G9 q4 e3 I' a5 S                Loop0 b- g2 [# p  `1 k2 e' m
                '*****************4 n/ p8 L6 d, z, c
                'Do some work here! T- H0 n5 m7 [8 I
                '*****************6 o4 x. W! X2 J
                Trans.Complete()6 Z. T7 N' d3 x+ f6 M# {% C/ D
            End Using
) p- _, k. P2 l6 H/ r. ~% _" ?        End Using
* d2 @/ t3 Y; l; i6 Z* s( f/ Y        DTX.Complete()
% Z7 o% @+ W- }3 N, N9 _    End Sub9 G4 v, @3 S" R9 {( @2 m
编辑( V; e5 Y  X- u+ f! Z  i% @
我的测试最终表明,这是无法完成的。即使存在多个连接或使用相同的连接,事务中的所有请求或问题也会被顺序处理。: j3 K% t* y% a  o6 V# T
也许他们将来会改变这种行为。' _- j. p* T9 Y+ ]* ]/ p
               
- t1 y0 }4 O  Q/ i7 [解决方案:
9 D4 C1 ?6 \5 x6 u+ ?                ! \, u. B! i, b  }. h

/ v0 n0 t0 S: a! F9 X: ~  Q3 _0 ^- m; M. p( t1 |( x, [
                首先,您必须将在这里和那里读到的有关SQL Server事务的内容分为两种不同的情况:本地和分布式。5 @' s: z6 E% y1 `/ x1 i
本地SQL事务* M0 |8 H, s( X7 n5 T! Y7 P$ n& b
SQL Server只允许在每个本地事务上执行一个请求。 1 {' ^6 P) L8 ^" C
默认情况下,只有一个会话可以注册本地事务。使用sp_getbindtoken和sp_bindsession可以将多个会话注册到本地事务中。会话仍然限于在任何时间仅执行一个请求。
- h( [7 ~* }/ s. _1 g使用多个活动结果集(MARS),一个会话可以执行多个请求。所有请求都必须注册在同一本地事务中。5 g. L2 i, ?: O) }0 _5 o( x
0 h: A' B* K$ w& }! g* K& J% Z7 E
分布式事务
$ N" U: `; w1 h多个会话可以将其本地事务注册到单个分布式事务中。 3 _) M6 \7 D- \6 f
每个会话仍在本地交易中注册,但要遵守上述本地交易的所有限制
2 V5 [: q0 u' u4 I6 j' G分布式事务中注册的本地事务受分布式事务协调的两阶段提交的约束
& F* r' a" q# `: E2 m注册到分布式事务中的实例上的所有本地事务仍然是 独立的 本地事务,这主要意味着它们具有冲突的锁命名空间。. R! ~+ I: t6 I; C' J  Q1 E2 V7 E
2 g! x  k4 p  s, R: _
因此,当客户端创建.Net( Z  f" Y, {) Q+ z$ I$ ^
TransactionScope并在此事务范围下,它在同一服务器上执行多个请求时,这些请求都是注册在分布式事务中的所有本地事务。一个简单的例子:
$ K1 n" L# L% G5 k3 m; l$ jclass Program; r0 D& [" L7 v
    {
. h1 L  ~  W0 _5 \  _# u9 n        static string sqlBatch = @"
- h8 O/ ]- z0 h+ @% a, ]set nocount on;
' y8 l7 \8 [/ T9 B: ]; u# e- G5 qdeclare @i int;
- @! S8 z, ]6 B5 b% Fset @i = 0;
5 [0 s- X5 t, p% ewhile @i 创建一个虚拟测试表:
+ @  m3 d8 o1 [2 L1 O' u9 Wcreate table test (id int not null identity(1,1) primary key, a varchar(100));, [6 o7 @- K/ `& L
并运行示例中的代码。您将看到两个请求并行执行,每个请求浪费表中的100k行,然后在事务范围完成时都提交。因此,您看到的问题与SQL1 ]# |% g$ Z6 @' p
Server或TransactionScope都不相关,它们可以轻松处理您描述的情况。而且,该代码非常简单明了,并且不需要创建依赖事务,进行克隆或促进事务。, e' j' r6 m! ^/ P. t& ?$ Y
更新
; K) E7 t' Y3 p6 Q% ?. d/ Y使用显式线程和相关事务:' t/ D+ {% T; v8 `+ W1 l6 o- {
private class ThreadState9 i# {3 U, n0 M7 l% E& C4 M+ s
    {3 R6 d1 M4 f( |; V, ?$ f/ n1 F- s7 `
        public DependentTransaction Transaction {get; set;}
: S" Y, |; Q2 p. y% p6 j9 n        public EventWaitHandle Done {get; set;}
: S3 r$ T7 q6 [9 T+ E; Y        public SqlConnection Connection { get; set; }" i# N( k3 m( A6 N2 J
    }
; p# `" Y. u8 U9 x1 A    static void Main(string[] args); D# G0 ?6 u* V& f5 x+ j/ M
    {+ C: h; k# [, V# _
        try
( Q% D) W# q3 ?4 B/ y( u' [! u        {4 g5 @4 {3 M9 [! ]) W2 P
            TransactionOptions to = new TransactionOptions();
, K  R; U9 X  k" T; W            to.IsolationLevel = IsolationLevel.ReadCommitted;
; s) X. a9 Q9 Z3 F            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
, ~6 d$ X8 L: a; S" u3 q            {
# p+ Q' ?! y+ C* u9 i2 C" P                ThreadState stateA = new ThreadState . }9 ~1 h+ \; s; @
                {& ^+ n! p# d2 Q0 J
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),& B  I- q5 M# n! \, o
                    Done = new AutoResetEvent(false),
' T5 E( `- C& h; D5 o3 ^                    Connection = new SqlConnection(Settings.Default.connString),
" i0 k) x9 C/ }9 n                };
* r- O# S+ s. m! p9 t1 H. V. @6 ]                stateA.Connection.Open();; u: w8 J2 y& H  j2 H1 f! y8 ?+ }1 Q
                ThreadState stateB = new ThreadState6 p5 ]$ L! ]4 w0 O' s
                {/ z6 \) O& }( C' i
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),* s/ x5 }! O& i
                    Done = new AutoResetEvent(false),3 V; b- C" @+ H; @
                    Connection = new SqlConnection(Settings.Default.connString),
% k' }9 g0 q! t6 O5 @) y                };
8 z4 F' @' u- Y2 e$ m2 E                stateB.Connection.Open();
4 m( L6 y6 _3 i$ M& f$ y                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);% K4 p7 X! ~1 M) C" c" C; m: O/ W
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);2 q" h5 B5 D% `" o
                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });+ {8 h( i% m0 {8 I3 d
                scp.Complete();
2 c* Y* t4 C( Q. ^7 h* o                //TODO: dispose the open connections
7 v: E% E/ U5 ?! \% j0 t1 ^; X            }
# _& H0 P& R: {5 J        }4 C- I2 j, T6 N7 n
        catch (Exception e)& d5 ^4 `2 F" f4 q3 C# p: b
        {
& \4 Y: I: ]0 [$ u# P3 a: Y            Console.Error.Write(e);- z8 U, ?* ^, q
        }
8 `" G( |  v: N4 f    }
$ K; F' k; h' u- H! c3 C    private static void Worker(object args)
" Q/ d4 `* Z' ]# w    {) |6 E% N$ C) \4 m1 I* E$ {
        Debug.Assert(args is ThreadState);) R/ E1 }! o, S6 M5 G, y
        ThreadState state = (ThreadState) args;
5 u' D, y: u$ O; o' ]0 C1 u9 l        try5 z4 |: G0 c  |0 U
        {- l4 @/ b3 k) D8 F, T8 u4 |
            using (TransactionScope scp = new TransactionScope(state.Transaction))! I. I: [; J/ e
            {
" Y. ?7 j# U  w$ \: o1 q                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
$ _9 m$ Z! R# ~                cmd.ExecuteNonQuery();
% b- ^+ C1 X9 b3 j                scp.Complete();
2 g6 ^0 H& I# ~! g+ T            }
! O' m% `; q; o$ N7 t+ M            state.Transaction.Complete();& |7 N  Y! h$ m5 P- J" U- R
        }+ p5 ?- `6 Y$ J, ?" ]
        catch (Exception e)
. |+ {/ [. g: H. E4 T% m2 C        {
& B- r- E9 [& z" \- U; u; S            Console.Error.WriteLine(e);; x4 c7 C5 `. M) i! t9 I
            state.Transaction.Rollback();; o. ^8 g+ \- m& p& T
        }
% f/ S7 Z" b1 \  D, C        finally
4 L% w; w. Q( g5 W        {
/ D8 F6 h. u0 E+ V0 B            state.Done.Set();
& w1 e8 |3 o; g/ ^9 }! s# `        }
5 m( I1 p# X% K, G    }
分享到:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则