Ticket #3138: dns_check.2.py

File dns_check.2.py, 6.7 KB (added by Michael Radziej <mir@…>, 17 years ago)

... adding stuff I forgot before ...

Line 
1# -*- coding: UTF-8 -*-
2
3"""
4Demo code for a DNS resolving validator. You need to rearrange this for your needs.
5
6you'll need dnspython >= 1.4.0 from http://www.dnspython.org/
7
8requires two settings: DNS_LIFETIME (time for a single dns query)
9and DNS_TIMEOUT (timeout for completely resolving one domain name).
10
11to make use of the asynchronous lookup feature, you need to call hasResolvableDomain.prepare()
12for each form field with this validator before validation starts.
13
14
15I don't recommend to use this as a validator
16since DNS can have transient errors.
17
18- mir@noris.de
19"""
20
21import re
22import dns.resolver, dns.name, dns.exception # from dnspython
23
24
25
26def is_resolvable(domain_string, rdatatypes, resolver):
27 """Testet, ob der String in line auflösbar ist, und gibt entsprechend ein Boolean zurück."""
28 for rdatatype in rdatatypes:
29 answer = resolver.resolve(domain_string, rdatatype)
30 if answer != None:
31 return True
32 return False
33
34class hasResolvableDomain(object):
35 """Testet, ob ein Feld eine DNS-auflösbare Domain enthält.
36 Das funktioniert für Domains, Email-Adressen, auch für einen ganzen Text,
37 bei dem in jeder Zeile ein Eintrag steht.
38
39 >>> from kundebunt.popkern.async_dns import AsyncDNSResolver
40 >>> resolver=AsyncDNSResolver()
41 >>> nl=chr(10)
42 >>> validator = hasResolvableDomain(resolver)
43 >>> validator("noris.de")
44 >>> validator("blazlsprazl.noris.de")
45 Traceback (most recent call last):
46 ...
47 ValidationError: ['blazlsprazl.noris.de: Die angegebene Domain ist im DNS nicht vorhanden.']
48 >>> validator("braslfasl@noris.de")
49 >>> validator(nl.join(["noris.de","braslfasl@noris.de"]))
50 >>> validator("")
51 >>> validator("aosidjasoasdioj")
52 Traceback (most recent call last):
53 ...
54 ValidationError: ['aosidjasoasdioj: Die angegebene Domain ist im DNS nicht vorhanden.']
55 >>> validator("..")
56 Traceback (most recent call last):
57 ...
58 ValidationError: ['..: Die angegebene Domain ist im DNS nicht vorhanden.']
59 >>> validator = hasResolvableDomain(resolver, ["mx"],"fehlermeldung")
60 >>> validator("noris.de")
61 >>> validator(nl.join(["blablara@noris.de","service.noris.de"]))
62 Traceback (most recent call last):
63 ...
64 ValidationError: ['service.noris.de: fehlermeldung']
65
66 >>> validator = hasResolvableDomain(resolver, subdomain_field_name='subdomain')
67 >>> validator('service.noris.net', {'subdomain': 'raxlbratzl'})
68 Traceback (most recent call last):
69 ...
70 ValidationError: ['raxlbratzl.service.noris.net: Die angegebene Domain ist im DNS nicht vorhanden.']
71 >>> validator('raxlbratzl.noris.net', {'subdomain': '.'})
72 Traceback (most recent call last):
73 ...
74 ValidationError: ['raxlbratzl.noris.net: Die angegebene Domain ist im DNS nicht vorhanden.']
75
76 >>> validator('noris.net', {'subdomain': 'service'})
77 >>> validator('noris.net', {'subdomain': '.service'})
78 """
79
80 def __init__(self, resolver, rrtypes = None, error_message=gettext_lazy("Die angegebene Domain ist im DNS nicht vorhanden."), subdomain_field_name=None, allow_wildcard=False):
81 self.resolver = resolver
82 if rrtypes == None:
83 rrtypes = ["A", "MX"]
84 self.rrtypes = [dns.rdatatype.from_text(s) for s in rrtypes]
85 self.error_message = error_message
86 self.subdomain_field_name = subdomain_field_name
87 self.allow_wildcard = allow_wildcard
88
89 def _domains_to_check(self, field_data, all_data):
90 """generiert alle zu überprüfenden Domains."""
91 if not self.allow_wildcard or field_data!="*":
92 for line in field_data.split("\n"):
93 line = line.strip()
94 if line:
95 at_pos = line.find('@')
96 if at_pos != -1:
97 # es ist eine email-adresse
98 domain = line[at_pos+1:]
99 else:
100 domain = line
101 if self.subdomain_field_name:
102 subdomain = all_data.get(self.subdomain_field_name,'')
103 if subdomain and subdomain[0] != '.':
104 domain = "%s.%s" % (subdomain, domain)
105 if len(domain)>=2 and domain[0] == '.' and domain[1] != '.':
106 domain = domain[1:]
107 elif len(domain)>=3 and domain[:2] == '*.' and domain[2] != '.':
108 domain = domain[2:]
109 yield domain
110
111
112 def __call__(self, field_data, all_data=None):
113 for domain in self._domains_to_check(field_data, all_data):
114 resolvable = is_resolvable(domain + ".", self.rrtypes, self.resolver)
115 if not resolvable:
116 raise validators.ValidationError, "%s: %s" % (domain,self.error_message)
117
118 def prepare(self, field_data, all_data=None):
119 """submits the dns query to the resolver so that the reply can be ready in __call__"""
120 for domain in self._domains_to_check(field_data, all_data):
121 for rrtype in self.rrtypes:
122 self.resolver.submit(domain + ".", rrtype)
123
124import dns.resolver, threading, time
125from django.conf import settings
126
127
128class AsyncDNSResolver(object):
129 def __init__(self):
130 self.threads = {}
131 self.answers = {} # answers is modified within threads; apply locking!
132 self.lock = threading.Lock()
133
134 def submit(self, domain_string, rdatatype):
135 query = (domain_string, rdatatype)
136 if not query in self.threads and not query in self.answers:
137 thread = threading.Thread(target=_run_thread, args=(domain_string, rdatatype, self.answers, self.lock))
138 thread.setDaemon(True)
139 thread.start()
140 self.final_submit_time = time.time()
141 self.threads[query] = thread
142
143
144 def resolve(self, domain_string, rdatatype):
145 self.submit(domain_string, rdatatype)
146 query = (domain_string, rdatatype)
147 if self.threads[query].isAlive():
148 self.threads[query].join(self._compute_timeout())
149 self.lock.acquire()
150 try:
151 answer = self.answers.get(query)
152 finally:
153 self.lock.release()
154 return answer
155
156 def _compute_timeout(self):
157 return max(settings.DNS_LIFETIME - (time.time() - self.final_submit_time) , 0)
158
159
160def _run_thread(domain_string, rdatatype, answers, lock):
161 resolver = dns.resolver.Resolver()
162 resolver.timeout = settings.DNS_TIMEOUT
163 resolver.lifetime = settings.DNS_LIFETIME
164 try:
165 answer = resolver.query(domain_string, rdatatype)
166 except (dns.exception.Timeout, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.name.EmptyLabel):
167 answer = None
168 lock.acquire()
169 try:
170 answers[(domain_string, rdatatype)] = answer
171 finally:
172 lock.release()
Back to Top