Ticket #3138: dns_check.py

File dns_check.py, 6.3 KB (added by Michael Radziej <mir@…>, 9 years ago)

demo code for a DNS resolving validator

Line 
1# -*- coding: UTF-8 -*-
2
3"""
4Demo code for a DNS resolving validator. You need to rearrange this for your needs.
5
6you'll need dnspython >= 1.4.0  from  http://www.dnspython.org/
7
8requires two settings: DNS_LIFETIME (time for a single dns query)
9and DNS_TIMEOUT (timeout for completely resolving one domain name).
10
11to make use of the asynchronous lookup feature, you need to call hasResolvableDomain.prepare()
12for each form field with this validator before validation starts.
13
14
15I don't recommend to use this as a validator
16since DNS can have transient errors.
17
18- mir@noris.de
19"""
20
21class hasResolvableDomain(object):
22    """Testet, ob ein Feld eine DNS-auflösbare Domain enthält.
23    Das funktioniert für Domains, Email-Adressen, auch für einen ganzen Text,
24    bei dem in jeder Zeile ein Eintrag steht.
25
26    >>> from kundebunt.popkern.async_dns import AsyncDNSResolver
27    >>> resolver=AsyncDNSResolver()
28    >>> nl=chr(10)
29    >>> validator = hasResolvableDomain(resolver)
30    >>> validator("noris.de")
31    >>> validator("blazlsprazl.noris.de")
32    Traceback (most recent call last):
33        ...
34    ValidationError: ['blazlsprazl.noris.de: Die angegebene Domain ist im DNS nicht vorhanden.']
35    >>> validator("braslfasl@noris.de")
36    >>> validator(nl.join(["noris.de","braslfasl@noris.de"]))
37    >>> validator("")
38    >>> validator("aosidjasoasdioj")
39    Traceback (most recent call last):
40        ...
41    ValidationError: ['aosidjasoasdioj: Die angegebene Domain ist im DNS nicht vorhanden.']
42    >>> validator("..")
43    Traceback (most recent call last):
44        ...
45    ValidationError: ['..: Die angegebene Domain ist im DNS nicht vorhanden.']
46    >>> validator = hasResolvableDomain(resolver, ["mx"],"fehlermeldung")
47    >>> validator("noris.de")
48    >>> validator(nl.join(["blablara@noris.de","service.noris.de"]))
49    Traceback (most recent call last):
50        ...
51    ValidationError: ['service.noris.de: fehlermeldung']
52
53    >>> validator = hasResolvableDomain(resolver, subdomain_field_name='subdomain')
54    >>> validator('service.noris.net', {'subdomain': 'raxlbratzl'})
55    Traceback (most recent call last):
56        ...
57    ValidationError: ['raxlbratzl.service.noris.net: Die angegebene Domain ist im DNS nicht vorhanden.']
58    >>> validator('raxlbratzl.noris.net', {'subdomain': '.'})
59    Traceback (most recent call last):
60        ...
61    ValidationError: ['raxlbratzl.noris.net: Die angegebene Domain ist im DNS nicht vorhanden.']
62
63    >>> validator('noris.net', {'subdomain': 'service'})
64    >>> validator('noris.net', {'subdomain': '.service'})
65    """
66
67    def __init__(self, resolver, rrtypes = None, error_message=gettext_lazy("Die angegebene Domain ist im DNS nicht vorhanden."), subdomain_field_name=None, allow_wildcard=False):
68        self.resolver = resolver
69        if rrtypes == None:
70            rrtypes = ["A", "MX"]
71        self.rrtypes = [dns.rdatatype.from_text(s) for s in rrtypes]
72        self.error_message = error_message
73        self.subdomain_field_name = subdomain_field_name
74        self.allow_wildcard = allow_wildcard
75
76    def _domains_to_check(self, field_data, all_data):
77        """generiert alle zu überprüfenden Domains."""
78        if not self.allow_wildcard or field_data!="*":
79            for line in field_data.split("\n"):
80                line = line.strip()
81                if line:
82                    at_pos = line.find('@')
83                    if at_pos != -1:
84                        # es ist eine email-adresse
85                        domain = line[at_pos+1:]
86                    else:
87                        domain = line
88                    if self.subdomain_field_name:
89                        subdomain = all_data.get(self.subdomain_field_name,'')
90                        if subdomain and subdomain[0] != '.':
91                            domain = "%s.%s" % (subdomain, domain)
92                    if len(domain)>=2 and domain[0] == '.' and domain[1] != '.':
93                        domain = domain[1:]
94                    elif len(domain)>=3 and domain[:2] == '*.' and domain[2] != '.':
95                        domain = domain[2:]
96                    yield domain
97
98
99    def __call__(self, field_data, all_data=None):
100        for domain in self._domains_to_check(field_data, all_data):
101            resolvable = is_resolvable(domain + ".", self.rrtypes, self.resolver)
102            if not resolvable:
103                raise validators.ValidationError, "%s: %s" % (domain,self.error_message)
104
105    def prepare(self, field_data, all_data=None):
106        """submits the dns query to the resolver so that the reply can be ready in __call__"""
107        for domain in self._domains_to_check(field_data, all_data):
108            for rrtype in self.rrtypes:
109                self.resolver.submit(domain + ".", rrtype)
110
111import dns.resolver, threading, time
112from django.conf import settings
113
114class AsyncDNSResolver(object):
115    def __init__(self):
116        self.threads = {}
117        self.answers = {}  # answers is modified within threads; apply locking!
118        self.lock = threading.Lock()
119
120    def submit(self, domain_string, rdatatype):
121        query = (domain_string, rdatatype)
122        if not query in self.threads and not query in self.answers:
123            thread = threading.Thread(target=_run_thread, args=(domain_string, rdatatype, self.answers, self.lock))
124            thread.setDaemon(True)
125            thread.start()
126            self.final_submit_time = time.time()
127            self.threads[query] = thread
128
129
130    def resolve(self, domain_string, rdatatype):
131        self.submit(domain_string, rdatatype)
132        query = (domain_string, rdatatype)
133        if self.threads[query].isAlive():
134            self.threads[query].join(self._compute_timeout())
135        self.lock.acquire()
136        try:
137            answer = self.answers.get(query)
138        finally:
139            self.lock.release()
140        return answer
141
142    def _compute_timeout(self):
143        return max(settings.DNS_LIFETIME - (time.time() - self.final_submit_time) , 0)
144
145
146def _run_thread(domain_string, rdatatype, answers, lock):
147    resolver = dns.resolver.Resolver()
148    resolver.timeout = settings.DNS_TIMEOUT
149    resolver.lifetime = settings.DNS_LIFETIME
150    try:
151        answer = resolver.query(domain_string, rdatatype)
152    except (dns.exception.Timeout, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.name.EmptyLabel):
153        answer = None
154    lock.acquire()
155    try:
156        answers[(domain_string, rdatatype)] = answer
157    finally:
158        lock.release()
Back to Top