Django

Code

Ticket #1237: pool.py

File pool.py, 5.6 kB (added by junzhang.jn@gmail.com, 3 years ago)
Line 
1 # -*- coding: gb2312 -*-
2 #
3 # author: junzhang.jn@gmail.com
4
5 import threading,thread
6 import time
7 import sys,traceback
8
9 class DBPoolWithThread:
10     # @param factory      PooledObjectFactory object create factory
11     # @param freetime     integer             when object's idle time > freetime , check_thread will destory it
12     # @param threadsafety integer             like dbapi2.0��
13     #                                               0 = Threads may not share the module.
14     #                                               1 = Threads may share the module, but not connections.
15     #                                               2 = Threads may share the module and connections.
16     #                                               3 = Threads may share the module, connections and cursors.
17     def __init__( self , factory , freetime = 30 , threadsafety = 1 ):
18         self.factory = factory
19         self.threadsafety = threadsafety
20         if threadsafety == 0:
21             raise RuntimeError( 'threadsafety is %d , you cannot use connectionpool for this db driver' % threadsafety )
22         self.idlepool = {}   # { threadid: [] , }
23         self.lock = threading.Lock()
24         self.freetime = freetime
25         self.idleTime = {} # { obj: ( threadid , time ) , }
26         self.deleted = False
27         self.exitEvent = threading.Event()
28         self.thread = threading.Thread( target=DBPoolWithThread.check_thread , args = ( self , ) )
29         self.thread.start()
30    
31     def __container_get_obj( self ):
32         try:
33             self.lock.acquire()
34             threadid = thread.get_ident()
35             obj = None
36             try:
37                 # 1.1 get object in current thread
38                 li = None
39                 try:
40                     li = self.idlepool[ threadid ]
41                     # 1.2 get first object
42                     obj = li.pop(0)
43                     if len( li ) == 0:
44                         del self.idlepool[ threadid ]
45                 except KeyError , e:
46                     # not find object list in current thread
47                     if self.threadsafety == 1:
48                         return None
49                 except IndexError , e :
50                     # this exception should not occured.
51                     del self.idlepool[ threadid ]
52                
53                 # 1.3 not found object in current thread
54                 if obj == None and self.threadsafety > 1:
55                     # 2 get object from other thread, threadsafety MUST > 1
56                     for key , cons in self.idlepool.items():
57                         obj = cons.pop(0)
58                         if len( cons ) == 0:
59                             del self.idlepool[ key ]
60                         break
61             except :
62                 pass
63             if obj and self.idleTime.has_key( obj ):
64                 del self.idleTime[ obj ]    # clear idle timer for return object
65             return obj
66         finally:
67             self.lock.release()
68    
69     def __container_put_obj( self , obj ):
70         try:
71             self.lock.acquire()
72             threadid = thread.get_ident()
73             try:
74                 li = self.idlepool[ threadid ]
75             except KeyError , e:
76                 li = []
77                 self.idlepool[ threadid ] =  li
78             li.append( obj )
79             self.idleTime[ obj ] = ( threadid , time.time() )
80         finally:
81             self.lock.release()
82    
83     def borrow_object( self ):
84         i = 0
85         while i < 5:
86             obj = self.__container_get_obj()
87            
88             if not obj :
89                 obj = self.factory.create_object()
90                 break
91            
92             if self.factory.validate_object( obj ):
93                 # object is valid
94                 break
95             else:
96                 # not valid
97                 self.factory.destroy_object( obj )# destroy invalid object
98                 obj = None
99             i += 1
100         return obj
101    
102     def return_object( self , obj ):
103         if obj:
104             self.__container_put_obj( obj )
105    
106     def __del__( self ):
107         try:
108             self.lock.acquire()
109             self.exitEvent.set()
110             for key , cons in self.idlepool.items():
111                 for obj in cons:
112                     self.factory.destroy_object( obj )
113         finally:
114             self.lock.release()
115    
116     def check_thread( this ):
117         while True:
118             this.exitEvent.wait( this.freetime )    # wait interval time
119             if this.exitEvent.isSet():
120                 break
121             if this.freetime:
122                 this.lock.acquire()
123                 try:
124                     threadid = thread.get_ident()
125                     for k,v in this.idleTime.items():
126                         if time.time() - v[1] > this.freetime: # timeout will destroy
127                             li = this.idlepool[ v[0] ]
128                             li.remove( k )
129                             this.factory.destroy_object( k )
130                             if len( li ) == 0:
131                                 del this.idlepool[ v[0] ] # delete list
132                             del this.idleTime[ k ]
133                 finally:
134                     this.lock.release()
135     check_thread = staticmethod( check_thread )
136    
137 # object create factory interface
138 class PooledObjectFactory:
139     def __init__( self ):
140         pass
141        
142     def create_object( self ):
143         raise NotImplementedError()
144    
145     def destroy_object( self , obj ):
146         raise NotImplementedError()
147    
148     def validate_object( self , obj ):
149         raise NotImplementedError()