Ticket #3138: dns_check.py

File dns_check.py, 6.3 KB (added by Michael Radziej <mir@…>, 18 years ago)

demo code for a DNS resolving validator

Line 
1# -*- coding: UTF-8 -*-
2
3"""
4Demo code for a DNS resolving validator. You need to rearrange this for your needs.
5
6you'll need dnspython >= 1.4.0 from http://www.dnspython.org/
7
8requires two settings: DNS_LIFETIME (time for a single dns query)
9and DNS_TIMEOUT (timeout for completely resolving one domain name).
10
11to make use of the asynchronous lookup feature, you need to call hasResolvableDomain.prepare()
12for each form field with this validator before validation starts.
13
14
15I don't recommend to use this as a validator
16since DNS can have transient errors.
17
18- mir@noris.de
19"""
20
21class hasResolvableDomain(object):
22 """Testet, ob ein Feld eine DNS-auflösbare Domain enthält.
23 Das funktioniert für Domains, Email-Adressen, auch für einen ganzen Text,
24 bei dem in jeder Zeile ein Eintrag steht.
25
26 >>> from kundebunt.popkern.async_dns import AsyncDNSResolver
27 >>> resolver=AsyncDNSResolver()
28 >>> nl=chr(10)
29 >>> validator = hasResolvableDomain(resolver)
30 >>> validator("noris.de")
31 >>> validator("blazlsprazl.noris.de")
32 Traceback (most recent call last):
33 ...
34 ValidationError: ['blazlsprazl.noris.de: Die angegebene Domain ist im DNS nicht vorhanden.']
35 >>> validator("braslfasl@noris.de")
36 >>> validator(nl.join(["noris.de","braslfasl@noris.de"]))
37 >>> validator("")
38 >>> validator("aosidjasoasdioj")
39 Traceback (most recent call last):
40 ...
41 ValidationError: ['aosidjasoasdioj: Die angegebene Domain ist im DNS nicht vorhanden.']
42 >>> validator("..")
43 Traceback (most recent call last):
44 ...
45 ValidationError: ['..: Die angegebene Domain ist im DNS nicht vorhanden.']
46 >>> validator = hasResolvableDomain(resolver, ["mx"],"fehlermeldung")
47 >>> validator("noris.de")
48 >>> validator(nl.join(["blablara@noris.de","service.noris.de"]))
49 Traceback (most recent call last):
50 ...
51 ValidationError: ['service.noris.de: fehlermeldung']
52
53 >>> validator = hasResolvableDomain(resolver, subdomain_field_name='subdomain')
54 >>> validator('service.noris.net', {'subdomain': 'raxlbratzl'})
55 Traceback (most recent call last):
56 ...
57 ValidationError: ['raxlbratzl.service.noris.net: Die angegebene Domain ist im DNS nicht vorhanden.']
58 >>> validator('raxlbratzl.noris.net', {'subdomain': '.'})
59 Traceback (most recent call last):
60 ...
61 ValidationError: ['raxlbratzl.noris.net: Die angegebene Domain ist im DNS nicht vorhanden.']
62
63 >>> validator('noris.net', {'subdomain': 'service'})
64 >>> validator('noris.net', {'subdomain': '.service'})
65 """
66
67 def __init__(self, resolver, rrtypes = None, error_message=gettext_lazy("Die angegebene Domain ist im DNS nicht vorhanden."), subdomain_field_name=None, allow_wildcard=False):
68 self.resolver = resolver
69 if rrtypes == None:
70 rrtypes = ["A", "MX"]
71 self.rrtypes = [dns.rdatatype.from_text(s) for s in rrtypes]
72 self.error_message = error_message
73 self.subdomain_field_name = subdomain_field_name
74 self.allow_wildcard = allow_wildcard
75
76 def _domains_to_check(self, field_data, all_data):
77 """generiert alle zu überprüfenden Domains."""
78 if not self.allow_wildcard or field_data!="*":
79 for line in field_data.split("\n"):
80 line = line.strip()
81 if line:
82 at_pos = line.find('@')
83 if at_pos != -1:
84 # es ist eine email-adresse
85 domain = line[at_pos+1:]
86 else:
87 domain = line
88 if self.subdomain_field_name:
89 subdomain = all_data.get(self.subdomain_field_name,'')
90 if subdomain and subdomain[0] != '.':
91 domain = "%s.%s" % (subdomain, domain)
92 if len(domain)>=2 and domain[0] == '.' and domain[1] != '.':
93 domain = domain[1:]
94 elif len(domain)>=3 and domain[:2] == '*.' and domain[2] != '.':
95 domain = domain[2:]
96 yield domain
97
98
99 def __call__(self, field_data, all_data=None):
100 for domain in self._domains_to_check(field_data, all_data):
101 resolvable = is_resolvable(domain + ".", self.rrtypes, self.resolver)
102 if not resolvable:
103 raise validators.ValidationError, "%s: %s" % (domain,self.error_message)
104
105 def prepare(self, field_data, all_data=None):
106 """submits the dns query to the resolver so that the reply can be ready in __call__"""
107 for domain in self._domains_to_check(field_data, all_data):
108 for rrtype in self.rrtypes:
109 self.resolver.submit(domain + ".", rrtype)
110
111import dns.resolver, threading, time
112from django.conf import settings
113
114class AsyncDNSResolver(object):
115 def __init__(self):
116 self.threads = {}
117 self.answers = {} # answers is modified within threads; apply locking!
118 self.lock = threading.Lock()
119
120 def submit(self, domain_string, rdatatype):
121 query = (domain_string, rdatatype)
122 if not query in self.threads and not query in self.answers:
123 thread = threading.Thread(target=_run_thread, args=(domain_string, rdatatype, self.answers, self.lock))
124 thread.setDaemon(True)
125 thread.start()
126 self.final_submit_time = time.time()
127 self.threads[query] = thread
128
129
130 def resolve(self, domain_string, rdatatype):
131 self.submit(domain_string, rdatatype)
132 query = (domain_string, rdatatype)
133 if self.threads[query].isAlive():
134 self.threads[query].join(self._compute_timeout())
135 self.lock.acquire()
136 try:
137 answer = self.answers.get(query)
138 finally:
139 self.lock.release()
140 return answer
141
142 def _compute_timeout(self):
143 return max(settings.DNS_LIFETIME - (time.time() - self.final_submit_time) , 0)
144
145
146def _run_thread(domain_string, rdatatype, answers, lock):
147 resolver = dns.resolver.Resolver()
148 resolver.timeout = settings.DNS_TIMEOUT
149 resolver.lifetime = settings.DNS_LIFETIME
150 try:
151 answer = resolver.query(domain_string, rdatatype)
152 except (dns.exception.Timeout, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.name.EmptyLabel):
153 answer = None
154 lock.acquire()
155 try:
156 answers[(domain_string, rdatatype)] = answer
157 finally:
158 lock.release()
Back to Top