Ticket #1237: pool.py.2

File pool.py.2, 6.0 KB (added by junzhang.jn@…, 18 years ago)
Line 
1# -*- coding: gb2312 -*-
2#
3# author: junzhang.jn@gmail.com
4
5import threading,thread
6import time
7import sys,traceback
8import atexit
9
10class DBPoolWithThread:
11 # @param factory PooledObjectFactory object create factory
12 # @param freetime integer when object's idle time > freetime , check_thread will destory it
13 # @param threadsafety integer like dbapi2.0£¬
14 # 0 = Threads may not share the module.
15 # 1 = Threads may share the module, but not connections.
16 # 2 = Threads may share the module and connections.
17 # 3 = Threads may share the module, connections and cursors.
18 def __init__( self , factory , freetime = 30 , threadsafety = 1 ):
19 self.factory = factory
20 self.threadsafety = threadsafety
21 if threadsafety == 0:
22 raise RuntimeError( 'threadsafety is %d , you cannot use connectionpool for this db driver' % threadsafety )
23 self.idlepool = {} # { threadid: [] , }
24 self.lock = threading.Lock()
25 self.freetime = freetime
26 self.idleTime = {} # { obj: ( threadid , time ) , }
27 self.deleted = False
28 self.exitEvent = threading.Event()
29 if threadsafety > 1:
30 self.thread = threading.Thread( target=DBPoolWithThread.check_thread , args = ( self , ) )
31 self.thread.start()
32 atexit.register( self._exit )
33
34 def __container_get_obj( self ):
35 try:
36 self.lock.acquire()
37 threadid = thread.get_ident()
38 obj = None
39 try:
40 # 1.1 get object in current thread
41 li = None
42 try:
43 li = self.idlepool[ threadid ]
44 # 1.2 get first object
45 obj = li.pop(0)
46 if len( li ) == 0:
47 del self.idlepool[ threadid ]
48 except KeyError , e:
49 # not find object list in current thread
50 if self.threadsafety == 1:
51 return None
52 except IndexError , e :
53 # this exception should not occured.
54 del self.idlepool[ threadid ]
55
56 # 1.3 not found object in current thread
57 if obj == None and self.threadsafety > 1:
58 # 2 get object from other thread, threadsafety MUST > 1
59 for key , cons in self.idlepool.items():
60 obj = cons.pop(0)
61 if len( cons ) == 0:
62 del self.idlepool[ key ]
63 break
64 except :
65 pass
66 if obj and self.idleTime.has_key( obj ):
67 del self.idleTime[ obj ] # clear idle timer for return object
68 return obj
69 finally:
70 print self.idlepool
71 print self.idleTime
72 self.lock.release()
73
74 def __container_put_obj( self , obj ):
75 try:
76 self.lock.acquire()
77 threadid = thread.get_ident()
78 try:
79 li = self.idlepool[ threadid ]
80 except KeyError , e:
81 li = []
82 self.idlepool[ threadid ] = li
83 li.append( obj )
84 self.idleTime[ obj ] = ( threadid , time.time() )
85 finally:
86 print self.idlepool
87 print self.idleTime
88 self.lock.release()
89
90 def borrow_object( self ):
91 print 'in borrow' , thread.get_ident()
92 i = 0
93 while i < 5:
94 obj = self.__container_get_obj()
95
96 if not obj :
97 obj = self.factory.create_object()
98 break
99
100 if self.factory.validate_object( obj ):
101 # object is valid
102 break
103 else:
104 # not valid
105 self.factory.destroy_object( obj )# destroy invalid object
106 obj = None
107 i += 1
108 return obj
109
110 def return_object( self , obj ):
111 print 'in return' , thread.get_ident()
112 if obj:
113 self.__container_put_obj( obj )
114
115 def __del__( self ):
116 try:
117 self.lock.acquire()
118 self.exitEvent.set()
119 for key , cons in self.idlepool.items():
120 for obj in cons:
121 self.factory.destroy_object( obj )
122 finally:
123 self.lock.release()
124
125 def check_thread( this ):
126 while True:
127 this.exitEvent.wait( this.freetime ) # wait interval time
128 if this.exitEvent.isSet():
129 break
130 if this.freetime:
131 this.lock.acquire()
132 try:
133 for k,v in this.idleTime.items():
134 if time.time() - v[1] > this.freetime: # timeout will destroy
135 li = this.idlepool[ v[0] ]
136 li.remove( k )
137 this.factory.destroy_object( k )
138 if len( li ) == 0:
139 del this.idlepool[ v[0] ] # delete list
140 del this.idleTime[ k ]
141 finally:
142 this.lock.release()
143 check_thread = staticmethod( check_thread )
144
145 def _exit( self ):
146 #print 'in exit'
147 self.exitEvent.set()
148
149# object create factory interface
150class PooledObjectFactory:
151 def __init__( self ):
152 pass
153
154 def create_object( self ):
155 raise NotImplementedError()
156
157 def destroy_object( self , obj ):
158 raise NotImplementedError()
159
160 def validate_object( self , obj ):
161 raise NotImplementedError()
Back to Top