Ticket #1237: pool.py

File pool.py, 5.6 KB (added by junzhang.jn@…, 9 years ago)
Line 
1# -*- coding: gb2312 -*-
2#
3# author: junzhang.jn@gmail.com
4
5import threading,thread
6import time
7import sys,traceback
8
9class DBPoolWithThread:
10    # @param factory      PooledObjectFactory object create factory
11    # @param freetime     integer             when object's idle time > freetime , check_thread will destory it
12    # @param threadsafety integer             like dbapi2.0£¬
13    #                                               0 = Threads may not share the module.
14    #                                               1 = Threads may share the module, but not connections.
15    #                                               2 = Threads may share the module and connections.
16    #                                               3 = Threads may share the module, connections and cursors.
17    def __init__( self , factory , freetime = 30 , threadsafety = 1 ):
18        self.factory = factory
19        self.threadsafety = threadsafety
20        if threadsafety == 0:
21            raise RuntimeError( 'threadsafety is %d , you cannot use connectionpool for this db driver' % threadsafety )
22        self.idlepool = {}   # { threadid: [] , }
23        self.lock = threading.Lock()
24        self.freetime = freetime
25        self.idleTime = {} # { obj: ( threadid , time ) , }
26        self.deleted = False
27        self.exitEvent = threading.Event()
28        self.thread = threading.Thread( target=DBPoolWithThread.check_thread , args = ( self , ) )
29        self.thread.start()
30   
31    def __container_get_obj( self ):
32        try:
33            self.lock.acquire()
34            threadid = thread.get_ident()
35            obj = None
36            try:
37                # 1.1 get object in current thread
38                li = None
39                try:
40                    li = self.idlepool[ threadid ]
41                    # 1.2 get first object
42                    obj = li.pop(0)
43                    if len( li ) == 0:
44                        del self.idlepool[ threadid ]
45                except KeyError , e:
46                    # not find object list in current thread
47                    if self.threadsafety == 1:
48                        return None
49                except IndexError , e :
50                    # this exception should not occured.
51                    del self.idlepool[ threadid ]
52               
53                # 1.3 not found object in current thread
54                if obj == None and self.threadsafety > 1:
55                    # 2 get object from other thread, threadsafety MUST > 1
56                    for key , cons in self.idlepool.items():
57                        obj = cons.pop(0)
58                        if len( cons ) == 0:
59                            del self.idlepool[ key ]
60                        break
61            except :
62                pass
63            if obj and self.idleTime.has_key( obj ):
64                del self.idleTime[ obj ]    # clear idle timer for return object
65            return obj
66        finally:
67            self.lock.release()
68   
69    def __container_put_obj( self , obj ):
70        try:
71            self.lock.acquire()
72            threadid = thread.get_ident()
73            try:
74                li = self.idlepool[ threadid ]
75            except KeyError , e:
76                li = []
77                self.idlepool[ threadid ] =  li
78            li.append( obj )
79            self.idleTime[ obj ] = ( threadid , time.time() )
80        finally:
81            self.lock.release()
82   
83    def borrow_object( self ):
84        i = 0
85        while i < 5:
86            obj = self.__container_get_obj()
87           
88            if not obj :
89                obj = self.factory.create_object()
90                break
91           
92            if self.factory.validate_object( obj ):
93                # object is valid
94                break
95            else:
96                # not valid
97                self.factory.destroy_object( obj )# destroy invalid object
98                obj = None
99            i += 1
100        return obj
101   
102    def return_object( self , obj ):
103        if obj:
104            self.__container_put_obj( obj )
105   
106    def __del__( self ):
107        try:
108            self.lock.acquire()
109            self.exitEvent.set()
110            for key , cons in self.idlepool.items():
111                for obj in cons:
112                    self.factory.destroy_object( obj )
113        finally:
114            self.lock.release()
115   
116    def check_thread( this ):
117        while True:
118            this.exitEvent.wait( this.freetime )    # wait interval time
119            if this.exitEvent.isSet():
120                break
121            if this.freetime:
122                this.lock.acquire()
123                try:
124                    threadid = thread.get_ident()
125                    for k,v in this.idleTime.items():
126                        if time.time() - v[1] > this.freetime: # timeout will destroy
127                            li = this.idlepool[ v[0] ]
128                            li.remove( k )
129                            this.factory.destroy_object( k )
130                            if len( li ) == 0:
131                                del this.idlepool[ v[0] ] # delete list
132                            del this.idleTime[ k ]
133                finally:
134                    this.lock.release()
135    check_thread = staticmethod( check_thread )
136   
137# object create factory interface
138class PooledObjectFactory:
139    def __init__( self ):
140        pass
141       
142    def create_object( self ):
143        raise NotImplementedError()
144   
145    def destroy_object( self , obj ):
146        raise NotImplementedError()
147   
148    def validate_object( self , obj ):
149        raise NotImplementedError()
Back to Top