from os import replace import requests # import hashlib from pathlib import Path import secrets import os.path import os import uuid import json import time from lxml import etree import re import config import typing from urllib.parse import urljoin import mechanicalsoup from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from cloudscraper import create_scraper from concurrent.futures import ThreadPoolExecutor from unicodedata import category def getXpathSingle(htmlcode, xpath): html = etree.fromstring(htmlcode, etree.HTMLParser()) result1 = str(html.xpath(xpath)).strip(" ['']") return result1 G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.133 Safari/537.36' def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers = None): """ 网页请求核心函数 """ verify = config.getInstance().cacert_file() configProxy = config.getInstance().proxy() errors = "" headers = {"User-Agent": ua or G_USER_AGENT} # noqa if json_headers != None: headers.update(json_headers) for i in range(configProxy.retry): try: if configProxy.enable: proxies = configProxy.proxies() result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies, verify=verify, cookies=cookies) else: result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies) if return_type == "object": return result elif return_type == "content": return result.content else: result.encoding = encoding or result.apparent_encoding return result.text except Exception as e: print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry)) errors = str(e) if "getaddrinfo failed" in errors: print("[-]Connect Failed! Please Check your proxy config") debug = config.getInstance().debug() if debug: print("[-]" + errors) else: print("[-]" + errors) print('[-]Connect Failed! Please check your Proxy or Network!') raise Exception('Connect Failed') def post_html(url: str, query: dict, headers: dict = None) -> requests.Response: configProxy = config.getInstance().proxy() errors = "" headers_ua = {"User-Agent": G_USER_AGENT} if headers is None: headers = headers_ua else: headers.update(headers_ua) for i in range(configProxy.retry): try: if configProxy.enable: proxies = configProxy.proxies() result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=configProxy.timeout) else: result = requests.post(url, data=query, headers=headers, timeout=configProxy.timeout) return result except Exception as e: print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry)) errors = str(e) print("[-]Connect Failed! Please check your Proxy or Network!") print("[-]" + errors) G_DEFAULT_TIMEOUT = 10 # seconds class TimeoutHTTPAdapter(HTTPAdapter): def __init__(self, *args, **kwargs): self.timeout = G_DEFAULT_TIMEOUT if "timeout" in kwargs: self.timeout = kwargs["timeout"] del kwargs["timeout"] super().__init__(*args, **kwargs) def send(self, request, **kwargs): timeout = kwargs.get("timeout") if timeout is None: kwargs["timeout"] = self.timeout return super().send(request, **kwargs) # with keep-alive feature def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): configProxy = config.getInstance().proxy() session = requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(session.cookies, cookies) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: session.verify = config.getInstance().cacert_file() session.proxies = configProxy.proxies() headers = {"User-Agent": ua or G_USER_AGENT} session.headers = headers try: if isinstance(url, str) and len(url): result = session.get(str(url)) else: # 空url参数直接返回可重用session对象,无需设置return_type return session if not result.ok: return None if return_type == "object": return result elif return_type == "content": return result.content elif return_type == "session": return result, session else: result.encoding = encoding or "utf-8" return result.text except requests.exceptions.ProxyError: print("[-]get_html_session() Proxy error! Please check your Proxy") except requests.exceptions.RequestException: pass except Exception as e: print(f"[-]get_html_session() failed. {e}") return None def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, use_scraper: bool = False): configProxy = config.getInstance().proxy() s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(s.cookies, cookies) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: s.verify = config.getInstance().cacert_file() s.proxies = configProxy.proxies() try: browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s) if isinstance(url, str) and len(url): result = browser.open(url) else: return browser if not result.ok: return None if return_type == "object": return result elif return_type == "content": return result.content elif return_type == "browser": return result, browser else: result.encoding = encoding or "utf-8" return result.text except requests.exceptions.ProxyError: print("[-]get_html_by_browser() Proxy error! Please check your Proxy") except Exception as e: print(f'[-]get_html_by_browser() Failed! {e}') return None def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): configProxy = config.getInstance().proxy() s = requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(s.cookies, cookies) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: s.verify = config.getInstance().cacert_file() s.proxies = configProxy.proxies() try: browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s) result = browser.open(url) if not result.ok: return None form = browser.select_form() if form_select is None else browser.select_form(form_select) if isinstance(fields, dict): for k, v in fields.items(): browser[k] = v response = browser.submit_selected() if return_type == "object": return response elif return_type == "content": return response.content elif return_type == "browser": return response, browser else: result.encoding = encoding or "utf-8" return response.text except requests.exceptions.ProxyError: print("[-]get_html_by_form() Proxy error! Please check your Proxy") except Exception as e: print(f'[-]get_html_by_form() Failed! {e}') return None def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): configProxy = config.getInstance().proxy() session = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(session.cookies, cookies) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: session.verify = config.getInstance().cacert_file() session.proxies = configProxy.proxies() try: if isinstance(url, str) and len(url): result = session.get(str(url)) else: # 空url参数直接返回可重用scraper对象,无需设置return_type return session if not result.ok: return None if return_type == "object": return result elif return_type == "content": return result.content elif return_type == "scraper": return result, session else: result.encoding = encoding or "utf-8" return result.text except requests.exceptions.ProxyError: print("[-]get_html_by_scraper() Proxy error! Please check your Proxy") except Exception as e: print(f"[-]get_html_by_scraper() failed. {e}") return None # def get_javlib_cookie() -> [dict, str]: # import cloudscraper # switch, proxy, timeout, retry_count, proxytype = config.getInstance().proxy() # proxies = get_proxy(proxy, proxytype) # # raw_cookie = {} # user_agent = "" # # # Get __cfduid/cf_clearance and user-agent # for i in range(retry_count): # try: # if switch == 1 or switch == '1': # raw_cookie, user_agent = cloudscraper.get_cookie_string( # "http://www.javlibrary.com/", # proxies=proxies # ) # else: # raw_cookie, user_agent = cloudscraper.get_cookie_string( # "http://www.javlibrary.com/" # ) # except requests.exceptions.ProxyError: # print("[-] ProxyError, retry {}/{}".format(i + 1, retry_count)) # except cloudscraper.exceptions.CloudflareIUAMError: # print("[-] IUAMError, retry {}/{}".format(i + 1, retry_count)) # # return raw_cookie, user_agent def translate( src: str, target_language: str = "zh_cn", engine: str = "google-free", app_id: str = "", key: str = "", delay: int = 0, ) -> str: """ translate japanese kana to simplified chinese 翻译日语假名到简体中文 :raises ValueError: Non-existent translation engine """ trans_result = "" # 中文句子如果包含&等符号会被谷歌翻译截断损失内容,而且中文翻译到中文也没有意义,故而忽略,只翻译带有日语假名的 if not is_japanese(src): return src if engine == "google-free": gsite = config.getInstance().get_translate_service_site() if not re.match('^translate\.google\.(com|com\.\w{2}|\w{2})$', gsite): gsite = 'translate.google.cn' url = ( f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={target_language}&q={src}" ) result = get_html(url=url, return_type="object") if not result.ok: print('[-]Google-free translate web API calling failed.') return '' translate_list = [i["trans"] for i in result.json()["sentences"]] trans_result = trans_result.join(translate_list) elif engine == "azure": url = "https://api.cognitive.microsofttranslator.com/translate?api-version=3.0&to=" + target_language headers = { 'Ocp-Apim-Subscription-Key': key, 'Ocp-Apim-Subscription-Region': "global", 'Content-type': 'application/json', 'X-ClientTraceId': str(uuid.uuid4()) } body = json.dumps([{'text': src}]) result = post_html(url=url, query=body, headers=headers) translate_list = [i["text"] for i in result.json()[0]["translations"]] trans_result = trans_result.join(translate_list) else: raise ValueError("Non-existent translation engine") time.sleep(delay) return trans_result def load_cookies(cookie_json_filename: str): """ 加载cookie,用于以会员方式访问非游客内容 :filename: cookie文件名。获取cookie方式:从网站登录后,通过浏览器插件(CookieBro或EdittThisCookie)或者直接在地址栏网站链接信息处都可以复制或者导出cookie内容,以JSON方式保存 # 示例: FC2-755670 url https://javdb9.com/v/vO8Mn # json 文件格式 # 文件名: 站点名.json,示例 javdb9.json # 内容(文件编码:UTF-8): { "over18":"1", "redirect_to":"%2Fv%2FvO8Mn", "remember_me_token":"***********", "_jdb_session":"************", "locale":"zh", "__cfduid":"*********", "theme":"auto" } """ filename = os.path.basename(cookie_json_filename) if not len(filename): return None, None path_search_order = ( Path.cwd() / filename, Path.home() / filename, Path.home() / f".mdc/{filename}", Path.home() / f".local/share/mdc/{filename}" ) cookies_filename = None try: for p in path_search_order: if p.is_file(): cookies_filename = str(p.resolve()) break if not cookies_filename: return None, None return json.loads(Path(cookies_filename).read_text(encoding='utf-8')), cookies_filename except: return None, None def file_modification_days(filename: str) -> int: """ 文件修改时间距此时的天数 """ mfile = Path(filename) if not mfile.is_file(): return 9999 mtime = int(mfile.stat().st_mtime) now = int(time.time()) days = int((now - mtime) / (24 * 60 * 60)) if days < 0: return 9999 return days def file_not_exist_or_empty(filepath) -> bool: return not os.path.isfile(filepath) or os.path.getsize(filepath) == 0 def is_japanese(raw: str) -> bool: """ 日语简单检测 """ return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE)) # Usage: python ./ADC_function.py https://cn.bing.com/ if __name__ == "__main__": import sys, timeit from http.client import HTTPConnection def benchmark(t, url): print(f"HTTP GET Benchmark times:{t} url:{url}") tm = timeit.timeit(f"_ = session1.get('{url}')", "from __main__ import get_html_session;session1=get_html_session()", number=t) print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable') tm = timeit.timeit(f"_ = scraper1.get('{url}')", "from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()", number=t) print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable') tm = timeit.timeit(f"_ = browser1.open('{url}')", "from __main__ import get_html_by_browser;browser1=get_html_by_browser()", number=t) print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable') tm = timeit.timeit(f"_ = get_html('{url}')", "from __main__ import get_html", number=t) print(f' *{tm:>10.5f}s get_html()') t = 100 # url = "https://www.189.cn/" url = "http://www.chinaunicom.com" HTTPConnection.debuglevel = 1 s = get_html_session() _ = s.get(url) HTTPConnection.debuglevel = 0 if len(sys.argv) > 1: url = sys.argv[1] benchmark(t, url) def download_file_with_filename(url: str, filename: str, path: str) -> None: """ download file save to give path with given name from given url """ conf = config.getInstance() configProxy = conf.proxy() for i in range(configProxy.retry): try: if configProxy.enable: if not os.path.exists(path): try: os.makedirs(path) except: print(f"[-]Fatal error! Can not make folder '{path}'") os._exit(0) proxies = configProxy.proxies() headers = { 'User-Agent': G_USER_AGENT} r = requests.get(url, headers=headers, timeout=configProxy.timeout, proxies=proxies) if r == '': print('[-]Movie Download Data not found!') return with open(os.path.join(path, filename), "wb") as code: code.write(r.content) return else: if not os.path.exists(path): try: os.makedirs(path) except: print(f"[-]Fatal error! Can not make folder '{path}'") os._exit(0) headers = { 'User-Agent': G_USER_AGENT} r = requests.get(url, timeout=configProxy.timeout, headers=headers) if r == '': print('[-]Movie Download Data not found!') return with open(os.path.join(path, filename), "wb") as code: code.write(r.content) return except requests.exceptions.RequestException: i += 1 print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) except requests.exceptions.ConnectionError: i += 1 print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) except requests.exceptions.ProxyError: i += 1 print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) except requests.exceptions.ConnectTimeout: i += 1 print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) except IOError: raise ValueError(f"[-]Create Directory '{path}' failed!") return print('[-]Connect Failed! Please check your Proxy or Network!') raise ValueError('[-]Connect Failed! Please check your Proxy or Network!') return def download_one_file(args) -> str: """ download file save to given path from given url wrapped for map function """ (url, save_path, json_headers) = args if json_headers != None: filebytes = get_html(url, return_type='content', json_headers=json_headers['headers']) else: filebytes = get_html(url, return_type='content') if isinstance(filebytes, bytes) and len(filebytes): with save_path.open('wb') as fpbyte: if len(filebytes) == fpbyte.write(filebytes): return str(save_path) def parallel_download_files(dn_list: typing.Iterable[typing.Sequence], parallel: int = 0, json_headers=None): """ download files in parallel 多线程下载文件 用法示例: 2线程同时下载两个不同文件,并保存到不同路径,路径目录可未创建,但需要具备对目标目录和文件的写权限 parallel_download_files([ ('https://site1/img/p1.jpg', 'C:/temp/img/p1.jpg'), ('https://site2/cover/n1.xml', 'C:/tmp/cover/n1.xml') ]) :dn_list: 可以是 tuple或者list: ((url1, save_fullpath1),(url2, save_fullpath2),) fullpath可以是str或Path :parallel: 并行下载的线程池线程数,为0则由函数自己决定 """ mp_args = [] for url, fullpath in dn_list: if url and isinstance(url, str) and url.startswith('http') \ and fullpath and isinstance(fullpath, (str, Path)) and len(str(fullpath)): fullpath = Path(fullpath) fullpath.parent.mkdir(parents=True, exist_ok=True) mp_args.append((url, fullpath, json_headers)) if not len(mp_args): return [] if not isinstance(parallel, int) or parallel not in range(1, 200): parallel = min(5, len(mp_args)) with ThreadPoolExecutor(parallel) as pool: results = list(pool.map(download_one_file, mp_args)) return results def delete_all_elements_in_list(string: str, lists: typing.Iterable[str]): """ delete same string in given list """ new_lists = [] for i in lists: if i != string: new_lists.append(i) return new_lists def delete_all_elements_in_str(string_delete: str, string: str): """ delete same string in given list """ for i in string: if i == string_delete: string = string.replace(i,"") return string # print format空格填充对齐内容包含中文时的空格计算 def cnspace(v: str, n: int) -> int: return n - [category(c) for c in v].count('Lo')