本地搭建DNS服务器(同时支持设置TTL 响应A CNAME NS MX请求格式)(基于python实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import logging
import socket
from dns.resolver import Resolver
from dnslib import DNSRecord, QTYPE, RR, A, CNAME, NS, MX, DNSHeader
# 代码说明
# 提供的代码是用 Python 实现的 DNS 服务器的示例。它使用该socket模块创建一个 UDP 套接字并将其绑定到端口 53,这是 DNS 的默认端口。服务器侦听传入的 DNS 查询,使用dnslib库解析它们,并处理不同类型的 DNS 查询。
# 以下是代码中主要组件的细分:
# 导入必要的模块:
# logging:用于记录服务器活动
# socket: 用于创建套接字并与之交互
# Resolverfrom dns.resolver: 用于解析 DNS 查询
# 各种类 ( DNSRecord, QTYPE, RR, A, CNAME, NS, MX, DNSHeader) 来自dnslib:用于构建 DNS 响应记录
# 设置 DNS 解析器:
# 创建一个实例Resolver。
# 将解析器的名称服务器设置为8.8.8.8和8.8.4.4(Google 的公共 DNS 服务器)。
# 定义字典来存储域映射:
# domain_ip_map:存储域到 IP 地址的映射。
# domain_cname_map:存储域到 CNAME 的映射。
# domain_ns_map:存储域到 NS(名称服务器)的映射。
# domain_mx_map:存储域到 MX(邮件交换)的映射。
# 定义辅助函数:
# get_ip_from_domain(domain)domain_ip_map:通过检查字典并在必要时执行 DNS 查询来检索给定域的 IP 地址。
# get_cname_from_domain(domain)domain_cname_map:通过检查字典并在必要时执行 DNS 查询来检索给定域的 CNAME(规范名称) 。
# get_ns_from_domain(domain)domain_ns_map:通过检查字典并在必要时执行 DNS 查询来检索给定域的 NS(名称服务器) 。
# get_mx_from_domain(domain)domain_mx_map:通过检查字典并在必要时执行 DNS 查询来检索给定域的 MX(邮件交换)记录。
# reply_for_not_found(income_record): 构造一个DNS响应记录,表示没有找到请求的域或查询类型。
# reply_for_A(income_record, ip, ttl=None): 为请求的域和 IP 地址构造一个带有 A(地址)记录的 DNS 响应记录。
# reply_for_CNAME(income_record, cname, ttl=None): 为请求的域和 CNAME 值构造一个带有 CNAME(规范名称)记录的 DNS 响应记录。
# reply_for_NS(income_record, ns, ttl=None): 为所请求的域和 NS 值构造一个带有 NS(名称服务器)记录的 DNS 响应记录。
# reply_for_MX(income_record, mx_records, ttl=None): 为请求的域和 MX 值构造一个带有 MX(邮件交换)记录的 DNS 响应记录。
# 定义dns_handler函数:
# 为每个传入的 DNS 查询调用此函数。
# 它解析查询,确定查询类型(A、CNAME、NS、MX),并调用适当的辅助函数来处理查询。
# 如果在相应的域映射字典中找到匹配项,则会构建 DNS 响应记录并将其发送回客户端。
# 如果未找到匹配项,则发送“未找到”响应。
# 启动 DNS 服务器:
# 创建一个 UDP 套接字并将其绑定到端口 53。
# 输入服务器等待传入消息的无限循环。
# 对于每条传入消息,dns_handler都会调用该函数来处理查询并发送响应。

# =================================================================
# =================================================================
# important!!!!important!!!!important!!!!
# 运行代码说明:
# 1.根据项目要求,代码实现了可以更改ttl(在方法dns_handler可以更改,默认为60)
# 2.这是一个ip地址为127.0.0.1的本地dns服务器,并且实现了四种RR格式的DNS查询服务,同时对于每种RR格式还在本地DNS服务器中保留了一些映射
# (这些映射在以下的比如domain_ip_map字典中可见)
# 访问DNS服务器时,首先会查看本地缓存中(字典)是否有满足查询条件的条目,如果有则返回,没有就向远端的8.8.8.8DNS服务器发送查询请求
# 3.测试代码:
# 第一步:运行代码启动服务器
# 第二步:在计算器的终端(cmd)使用类似于dig @127.0.0.1 example.com MX的指令进行查询
# dig是mac计算器的查询指令,windows系统需要下载,@127.0.0.1是该dns服务器的ip地址,example.com是需要查询的域名,MX是RR格式
# 如上所示,可以根据自己的需要进行更改查询请求
# 请注意,要运行此 DNS 服务器,您需要dnslib安装库 ( pip install dnslib)。此外,请确保您具有绑定到端口 53 的必要权限,因为它是特权端口(通常需要管理权限)。
dns_resolver = Resolver()
dns_resolver.nameservers = ["8.8.8.8", "8.8.4.4"]

# Dictionary to store domain to IP mappings
domain_ip_map = {
"example.com": "192.0.2.1",
"example.net": "203.0.113.2",
# Add more mappings as needed
}

# Dictionary to store domain to CNAME mappings
domain_cname_map = {
"www.example.com": "example.com",
# Add more mappings as needed
}
# dig @127.0.0.1 www.example.com CNAME

# Dictionary to store domain to NS mappings
domain_ns_map = {
"example.com": "ns1.example.com",
# Add more mappings as needed
}

domain_mx_map = {
"example.com": [(10, "mail.example.com")],
# Add more mappings as needed
}


def get_ip_from_domain(domain):
domain = domain.lower().strip()
if domain in domain_ip_map:
return domain_ip_map[domain]
try:
return dns_resolver.query(domain, 'A')[0].to_text()
except:
return None

def get_cname_from_domain(domain):
domain = domain.lower().strip()
if domain in domain_cname_map:
return domain_cname_map[domain]
try:
return dns_resolver.query(domain, 'CNAME')[0].to_text()
except:
return None

def get_ns_from_domain(domain):
domain = domain.lower().strip()
if domain in domain_ns_map:
return domain_ns_map[domain]
try:
return dns_resolver.query(domain, 'NS')[0].to_text()
except:
return None

def get_mx_from_domain(domain):
domain = domain.lower().strip()
if domain in domain_mx_map:
return domain_mx_map[domain]
try:
response = dns_resolver.query(domain, 'MX')
mx_records = [(r.preference, r.exchange.to_text()) for r in response]
return mx_records
except:
return None

def reply_for_not_found(income_record):
header = DNSHeader(id=income_record.header.id, bitmap=income_record.header.bitmap, qr=1)
header.set_rcode(0)
record = DNSRecord(header, q=income_record.q)
return record

def reply_for_A(income_record, ip, ttl=None):
r_data = A(ip)
header = DNSHeader(id=income_record.header.id, bitmap=income_record.header.bitmap, qr=1)
domain = income_record.q.qname
query_type_int = QTYPE.reverse.get('A') or income_record.q.qtype
record = DNSRecord(header, q=income_record.q, a=RR(domain, query_type_int, rdata=r_data, ttl=ttl))
return record

def reply_for_CNAME(income_record, cname, ttl=None):
r_data = CNAME(cname)
header = DNSHeader(id=income_record.header.id, bitmap=income_record.header.bitmap, qr=1)
domain = income_record.q.qname
query_type_int = QTYPE.reverse.get('CNAME') or income_record.q.qtype
record = DNSRecord(header, q=income_record.q, a=RR(domain, query_type_int, rdata=r_data, ttl=ttl))
return record

def reply_for_NS(income_record, ns, ttl=None):
r_data = NS(ns)
header = DNSHeader(id=income_record.header.id, bitmap=income_record.header.bitmap, qr=1)
domain = income_record.q.qname
query_type_int = QTYPE.reverse.get('NS') or income_record.q.qtype
record = DNSRecord(header, q=income_record.q, a=RR(domain, query_type_int, rdata=r_data, ttl=ttl))
return record

def reply_for_MX(income_record, mx_records, ttl=None):
#label 标签指代的是mx_dns_server的名字
r_data = [MX(preference=preference, label=exchange) for preference, exchange in mx_records]
header = DNSHeader(id=income_record.header.id, bitmap=income_record.header.bitmap, qr=1)
domain = income_record.q.qname
query_type_int = QTYPE.reverse.get('MX') or income_record.q.qtype
record = DNSRecord(header, q=income_record.q)
for mx in r_data:
record.add_answer(RR(domain, QTYPE.MX, rdata=mx, ttl=ttl))
return record
# dig @127.0.0.1 example.com MX
def dns_handler(s, message, address):
try:
income_record = DNSRecord.parse(message)
except:
logging.error('from %s, parse error' % address)
return

try:
qtype = QTYPE.get(income_record.q.qtype)
except:
qtype = 'unknown'

domain = str(income_record.q.qname).strip('.')
info = '%s -- %s, from %s' % (qtype, domain, address)

if qtype == 'A':
ip = get_ip_from_domain(domain)
if ip:
response = reply_for_A(income_record, ip=ip, ttl=60)
s.sendto(response.pack(), address)
return logging.info(info)

if qtype == 'CNAME':
cname = get_cname_from_domain(domain)
if cname:
response = reply_for_CNAME(income_record, cname=cname, ttl=60)
s.sendto(response.pack(), address)
return logging.info(info)

if qtype == 'NS':
ns = get_ns_from_domain(domain)
if ns:
response = reply_for_NS(income_record, ns=ns, ttl=60)
s.sendto(response.pack(), address)
return logging.info(info)

if qtype == 'MX':
mx = get_mx_from_domain(domain)
if mx:
response = reply_for_MX(income_record, mx, ttl=60)
s.sendto(response.pack(), address)
return logging.info(info)

response = reply_for_not_found(income_record)
s.sendto(response.pack(), address)
logging.info(info)

if __name__ == '__main__':
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_sock.bind(('', 53))
logging.info('DNS server is started')
while True:
message, address = udp_sock.recvfrom(8192)
dns_handler(udp_sock, message, address)