1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| import argparse import os import socket
import dns.message import dns.rdatatype
class DNSCacheSnooping(object): def __init__(self): self.dns_server = '8.8.8.8'
def set_dns_server(self, nameserver='8.8.8.8'): self.dns_server = nameserver
def detect(self, domain): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5)
query = dns.message.make_query(domain, dns.rdatatype.A, dns.rdataclass.IN) query.flags &= ~dns.flags.RD
sock.sendto(query.to_wire(), (self.dns_server, 53))
try: data, _ = sock.recvfrom(1024)
response = dns.message.from_wire(data)
answers = response.answer if len(answers) == 0: print(f'No Cache for [{domain}] in [{self.dns_server}]') else: for ans in answers: print(f'Find Cache: [{ans}] in [{self.dns_server}]')
except socket.timeout: print("DNS query timeout") finally: sock.close()
def main(): parser = argparse.ArgumentParser(description='A simple command-line tool') parser.add_argument('-f', '--domain-file', help='domains in file') parser.add_argument('-d', '--domain', help='single domain') parser.add_argument('-s', '--dns-server', default='8.8.8.8', help='A dns server') args = parser.parse_args()
detector = DNSCacheSnooping() if args.dns_server: detector.set_dns_server(args.dns_server) if args.domain: detector.detect(domain=args.domain) if args.domain_file: if not os.path.exists(args.domain_file): args.domain_file = os.path.join(os.getcwd(), args.domain_file) with open(args.domain_file) as f: for domain in list(f): if domain.strip() != "": detector.detect(domain=domain.strip())
if __name__ == "__main__": main()
|