1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
| import re import os import cv2 import time import base64 import hashlib import requests import openpyxl as xl import threading from openpyxl.styles import Alignment
proxies = { 'http': 'http://username:passwd@x.x.x.x:port', 'https': 'http://username:passwd@x.x.x.x:port', }
def read_targets_from_file(file_path='target.txt'): with open(file_path, 'r', encoding='utf-8') as file: targets = [line.strip() for line in file.readlines()] return targets
def request_with_retry(method, url, proxies, timeout=10, retries=3, **kwargs): while retries > 0: try: if method == 'get': response = requests.get(url, timeout=timeout, proxies=proxies, **kwargs) elif method == 'post': response = requests.post(url, timeout=timeout, proxies=proxies, **kwargs) else: raise ValueError('Invalid method') return response except requests.exceptions.RequestException as e: print(f"请求超时,正在重试: {e}") retries -= 1 if retries == 0: raise
def process_target(target): print(f"正在查询:{target}") cookie = get_cookies() while True: try: global base_header base_header = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32', 'Origin': 'https://beian.miit.gov.cn', 'Referer': 'https://beian.miit.gov.cn/', 'Cookie': f'__jsluid_s={cookie}' } if cookie != -1: token = get_token() if token != -1: check_data = get_check_pic(token) if check_data != -1: sign = get_sign(check_data, token) p_uuid = check_data['key'] if sign != -1: info_data = query_base(target) domain_list = get_beian_info(info_data, p_uuid, token, sign) data_saver(domain_list, target) break else: raise ValueError("获取Sign遇到错误,请重试!") else: raise ValueError("计算图片缺口位置错误,请重试!") else: raise ValueError("获取Token失败,如频繁失败请关闭程序后等待几分钟再试!") else: cookie = get_cookies() raise ValueError("获取Cookie失败,请重试!") except Exception as e: print(f'{e}\n') time.sleep(10)
def main(): targets = read_targets_from_file() threads = []
for target in targets: t = threading.Thread(target=process_target, args=(target,)) threads.append(t) t.start()
for t in threads: t.join()
def query_base(target): info = target.replace(" ", "").replace("https://www.", "").replace("http://www.", "").replace("http://", "") info = re.sub("[^\\u4e00-\\u9fa5-A-Za-z0-9,-.()()]", "", info) info_data = {'pageNum': '1', 'pageSize': '40', 'unitName': info} return info_data
def get_cookies(): cookie_headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32'} err_num = 0 while err_num < 3: try: cookie = requests.utils.dict_from_cookiejar(request_with_retry('get', 'https://beian.miit.gov.cn/', headers=cookie_headers, proxies=proxies).cookies)['__jsluid_s'] return cookie except: err_num += 1 time.sleep(3) return -1
def get_token(): timeStamp = round(time.time() * 1000) authSecret = 'testtest' + str(timeStamp) authKey = hashlib.md5(authSecret.encode(encoding='UTF-8')).hexdigest() auth_data = {'authKey': authKey, 'timeStamp': timeStamp} url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth' try: t_response = request_with_retry('post', url, data=auth_data, headers=base_header, proxies=proxies).json() token = t_response['params']['bussiness'] except: return -1 return token
def get_check_pic(token): url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImage' base_header['Accept'] = 'application/json, text/plain, */*' base_header.update({'Content-Length': '0', 'token': token}) try: p_request = request_with_retry('post', url, data='', headers=base_header, proxies=proxies).json() p_uuid = p_request['params']['uuid'] big_image = p_request['params']['bigImage'] small_image = p_request['params']['smallImage'] except: return -1 with open('bigImage.jpg', 'wb') as f: f.write(base64.b64decode(big_image)) with open('smallImage.jpg', 'wb') as f: f.write(base64.b64decode(small_image)) background_image = cv2.imread('bigImage.jpg', cv2.COLOR_GRAY2RGB) fill_image = cv2.imread('smallImage.jpg', cv2.COLOR_GRAY2RGB) position_match = cv2.matchTemplate(background_image, fill_image, cv2.TM_CCOEFF_NORMED) max_loc = cv2.minMaxLoc(position_match)[3][0] mouse_length = max_loc + 1 os.remove('bigImage.jpg') os.remove('smallImage.jpg') check_data = {'key': p_uuid, 'value': mouse_length} return check_data
def get_sign(check_data, token): check_url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage' base_header.update({'Content-Length': '60', 'token': token, 'Content-Type': 'application/json'}) try: pic_sign = request_with_retry('post', check_url, json=check_data, headers=base_header, proxies=proxies).json() sign = pic_sign['params'] except: return -1 return sign
def get_beian_info(info_data, p_uuid, token, sign): domain_list = [] info_url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition' base_header.update({'Content-Length': '78', 'uuid': p_uuid, 'token': token, 'sign': sign}) try: beian_info = request_with_retry('post', url=info_url, json=info_data, headers=base_header, proxies=proxies).json() if not beian_info["success"]: print(f'请求错误: CODE {beian_info["code"]} MSG {beian_info["msg"]}') return domain_list domain_total = beian_info['params']['total'] page_total = beian_info['params']['lastPage'] end_row = beian_info['params']['endRow'] info = info_data['unitName'] print(f"\n查询对象:{info} 共有 {domain_total} 个已备案域名\n") for i in range(0, page_total): print(f"正在查询第{i+1}页……\n") for k in range(0, end_row + 1): info_base = beian_info['params']['list'][k] domain_name = info_base['domain'] domain_type = info_base['natureName'] domain_licence = info_base['mainLicence'] website_licence = info_base['serviceLicence'] domain_status = info_base['limitAccess'] domain_approve_date = info_base['updateRecordTime'] domain_owner = info_base['unitName'] try: domain_content_approved = info_base['contentTypeName'] if domain_content_approved == "": domain_content_approved = "无" except KeyError: domain_content_approved = "无" row_data = domain_owner, domain_name, domain_licence, website_licence, domain_type, domain_content_approved, domain_status, domain_approve_date domain_list.append(row_data) info_data = {'pageNum': i + 2, 'pageSize': '40', 'unitName': info} if beian_info['params']['isLastPage'] is True: break else: beian_info = request_with_retry('post', info_url, json=info_data, headers=base_header, proxies=proxies).json() end_row = beian_info['params']['endRow'] time.sleep(3) except Exception as e: print(f"意外错误: {e}") return domain_list return domain_list def data_saver(domain_list, target): total_row = len(domain_list) if total_row == 1: total_row = 0 elif total_row == 0: return print("所查域名无备案\n") print(f"查询结果如下:\n\n{domain_list}\n") if os.name == "nt": import winreg subkey = r'Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders' key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, subkey, 0) desktop_raw = str(winreg.QueryValueEx(key, "Desktop")[0]) if desktop_raw == "%USERPROFILE%\Desktop": subkey = r'Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders' key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, subkey, 0) desktop_raw = str(winreg.QueryValueEx(key, "Desktop")[0]) desktop_path = desktop_raw.replace('\\', '/') + "/" file_path = f"{desktop_path}备案信息.xlsx" else: file_path = './备案信息.xlsx' if os.path.exists(file_path): wb = xl.load_workbook(file_path) else: wb = xl.Workbook() ws = wb.create_sheet(target) title_list = ['域名主办方', '域名', '备案许可证号', '网站备案号', '域名类型', '网站前置审批项', '是否限制接入', '审核通过日期'] for i in range(0, 8): ws.cell(1, i + 1).value = title_list[i] col_width = {'A': 45, 'B': 40, 'C': 22, 'D': 24, 'E': 9, 'F': 15, 'G': 13, 'H': 21} for k, v in col_width.items(): ws.column_dimensions[k].width = v ws.freeze_panes = 'A2' start = 0 after_title = 2 for j in range(start, total_row + 1): for k in range(0, 8): try: ws.cell(j + after_title, k + 1).value = domain_list[j - start][k] except: continue for row in range(ws.max_row): for col in range(ws.max_column): ws.cell(row + 1, col + 1).alignment = Alignment(horizontal='center', vertical='center') try: wb.save(file_path) except PermissionError: print("** 备案信息登记表格已打开,无法写入文件。如需写入,请关闭文件后重新执行! **\n") return -1 print(f"查询结果保存在:{file_path}\n") return 'OK'
if __name__ == '__main__': main()
|