diff --git a/ADC_function.py b/ADC_function.py index de36c73..77f00f7 100644 --- a/ADC_function.py +++ b/ADC_function.py @@ -1,6 +1,6 @@ from os import replace import requests -#import hashlib +# import hashlib from pathlib import Path import secrets import os.path @@ -11,6 +11,7 @@ import time from lxml import etree import re import config +import typing from urllib.parse import urljoin import mechanicalsoup from requests.adapters import HTTPAdapter @@ -27,8 +28,11 @@ def getXpathSingle(htmlcode, xpath): G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36' -# 网页请求核心 + def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): + """ + 网页请求核心函数 + """ verify = config.getInstance().cacert_file() configProxy = config.getInstance().proxy() errors = "" @@ -39,7 +43,8 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, try: if configProxy.enable: proxies = configProxy.proxies() - result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies, verify=verify, + result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies, + verify=verify, cookies=cookies) else: result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies) @@ -89,7 +94,8 @@ def post_html(url: str, query: dict, headers: dict = None) -> requests.Response: print("[-]" + errors) -G_DEFAULT_TIMEOUT = 10 # seconds +G_DEFAULT_TIMEOUT = 10 # seconds + class TimeoutHTTPAdapter(HTTPAdapter): def __init__(self, *args, **kwargs): @@ -98,6 +104,7 @@ class TimeoutHTTPAdapter(HTTPAdapter): self.timeout = kwargs["timeout"] del kwargs["timeout"] super().__init__(*args, **kwargs) + def send(self, request, **kwargs): timeout = kwargs.get("timeout") if timeout is None: @@ -106,12 +113,14 @@ class TimeoutHTTPAdapter(HTTPAdapter): # with keep-alive feature -def get_html_session(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): +def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, + encoding: str = None): configProxy = config.getInstance().proxy() session = requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(session.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) + retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504]) session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: @@ -122,7 +131,7 @@ def get_html_session(url:str = None, cookies: dict = None, ua: str = None, retur try: if isinstance(url, str) and len(url): result = session.get(str(url)) - else: # 空url参数直接返回可重用session对象,无需设置return_type + else: # 空url参数直接返回可重用session对象,无需设置return_type return session if not result.ok: return None @@ -142,12 +151,14 @@ def get_html_session(url:str = None, cookies: dict = None, ua: str = None, retur return None -def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, use_scraper: bool = False): +def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, + encoding: str = None, use_scraper: bool = False): configProxy = config.getInstance().proxy() - s = create_scraper(browser={'custom': ua or G_USER_AGENT,}) if use_scraper else requests.Session() + s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(s.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) + retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504]) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: @@ -178,12 +189,14 @@ def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, re return None -def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): +def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, + return_type: str = None, encoding: str = None): configProxy = config.getInstance().proxy() s = requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(s.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) + retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504]) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: @@ -216,12 +229,14 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: return None -def get_html_by_scraper(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): +def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, + encoding: str = None): configProxy = config.getInstance().proxy() - session = create_scraper(browser={'custom': ua or G_USER_AGENT,}) + session = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(session.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) + retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504]) session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) if configProxy.enable: @@ -230,7 +245,7 @@ def get_html_by_scraper(url:str = None, cookies: dict = None, ua: str = None, re try: if isinstance(url, str) and len(url): result = session.get(str(url)) - else: # 空url参数直接返回可重用scraper对象,无需设置return_type + else: # 空url参数直接返回可重用scraper对象,无需设置return_type return session if not result.ok: return None @@ -285,7 +300,12 @@ def translate( app_id: str = "", key: str = "", delay: int = 0, -): +) -> str: + """ + translate japanese kana to simplified chinese + 翻译日语假名到简体中文 + :raises ValueError: Non-existent translation engine + """ trans_result = "" # 中文句子如果包含&等符号会被谷歌翻译截断损失内容,而且中文翻译到中文也没有意义,故而忽略,只翻译带有日语假名的 if not is_japanese(src): @@ -295,7 +315,7 @@ def translate( if not re.match('^translate\.google\.(com|com\.\w{2}|\w{2})$', gsite): gsite = 'translate.google.cn' url = ( -f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={target_language}&q={src}" + f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={target_language}&q={src}" ) result = get_html(url=url, return_type="object") if not result.ok: @@ -324,26 +344,27 @@ f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={t return trans_result -# 从浏览器中导出网站登录验证信息的cookies,能够以会员方式打开游客无法访问到的页面 -# 示例: FC2-755670 url https://javdb9.com/v/vO8Mn -# json 文件格式 -# 文件名: 站点名.json,示例 javdb9.json -# 内容(文件编码:UTF-8): -''' -{ - "over18":"1", - "redirect_to":"%2Fv%2FvO8Mn", - "remember_me_token":"cbJdeaFpbHMiOnsibWVzc2FnZSI6IklrNVJjbTAzZFVSRVlVaEtPWEpUVFhOVU0yNXhJZz09IiwiZXhwIjoiMjAyMS0wNS0xNVQxMzoyODoxNy4wMDBaIiwicHVyIjoiY29va2llLnJlbWVtYmVyX21lX3Rva2VuIn19--a7131611e844cf75f9db4cd411b635889bff3fe3", - "_jdb_session":"asddefqfwfwwrfdsdaAmqKj1%2FvOrDQP4b7h%2BvGp7brvIShi2Y%2FHBUr%2BklApk06TfhBOK3g5gRImZzoi49GINH%2FK49o3W%2FX64ugBiUAcudN9b27Mg6Ohu%2Bx9Z7A4bbqmqCt7XR%2Bao8PRuOjMcdDG5czoYHJCPIPZQFU28Gd7Awc2jc5FM5CoIgSRyaYDy9ulTO7DlavxoNL%2F6OFEL%2FyaA6XUYTB2Gs1kpPiUDqwi854mo5%2FrNxMhTeBK%2BjXciazMtN5KlE5JIOfiWAjNrnx7SV3Hj%2FqPNxRxXFQyEwHr5TZa0Vk1%2FjbwWQ0wcIFfh%2FMLwwqKydAh%2FLndc%2Bmdv3e%2FJ%2BiL2--xhqYnMyVRlxJajdN--u7nl0M7Oe7tZtPd4kIaEbg%3D%3D", - "locale":"zh", - "__cfduid":"dee27116d98c432a5cabc1fe0e7c2f3c91620479752", - "theme":"auto" -} -''' -# 从网站登录后,通过浏览器插件(CookieBro或EdittThisCookie)或者直接在地址栏网站链接信息处都可以复制或者导出cookie内容, -# 并填写到以上json文件的相应字段中 -def load_cookies(filename): - filename = os.path.basename(filename) +def load_cookies(cookie_json_filename: str): + """ + 加载cookie,用于以会员方式访问非游客内容 + + :filename: cookie文件名。获取cookie方式:从网站登录后,通过浏览器插件(CookieBro或EdittThisCookie)或者直接在地址栏网站链接信息处都可以复制或者导出cookie内容,以JSON方式保存 + + # 示例: FC2-755670 url https://javdb9.com/v/vO8Mn + # json 文件格式 + # 文件名: 站点名.json,示例 javdb9.json + # 内容(文件编码:UTF-8): + { + "over18":"1", + "redirect_to":"%2Fv%2FvO8Mn", + "remember_me_token":"***********", + "_jdb_session":"************", + "locale":"zh", + "__cfduid":"*********", + "theme":"auto" + } + """ + filename = os.path.basename(cookie_json_filename) if not len(filename): return None, None path_search_order = ( @@ -364,8 +385,11 @@ def load_cookies(filename): except: return None, None -# 文件修改时间距此时的天数 -def file_modification_days(filename) -> int: + +def file_modification_days(filename: str) -> int: + """ + 文件修改时间距此时的天数 + """ mfile = Path(filename) if not mfile.is_file(): return 9999 @@ -376,48 +400,61 @@ def file_modification_days(filename) -> int: return 9999 return days + def file_not_exist_or_empty(filepath) -> bool: return not os.path.isfile(filepath) or os.path.getsize(filepath) == 0 -# 日语简单检测 -def is_japanese(s) -> bool: - return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', s, re.UNICODE)) + +def is_japanese(raw: str) -> bool: + """ + 日语简单检测 + """ + return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE)) # Usage: python ./ADC_function.py https://cn.bing.com/ if __name__ == "__main__": import sys, timeit from http.client import HTTPConnection + + def benchmark(t, url): print(f"HTTP GET Benchmark times:{t} url:{url}") tm = timeit.timeit(f"_ = session1.get('{url}')", - "from __main__ import get_html_session;session1=get_html_session()", - number=t) + "from __main__ import get_html_session;session1=get_html_session()", + number=t) print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable') tm = timeit.timeit(f"_ = scraper1.get('{url}')", - "from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()", - number=t) + "from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()", + number=t) print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable') tm = timeit.timeit(f"_ = browser1.open('{url}')", - "from __main__ import get_html_by_browser;browser1=get_html_by_browser()", - number=t) + "from __main__ import get_html_by_browser;browser1=get_html_by_browser()", + number=t) print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable') tm = timeit.timeit(f"_ = get_html('{url}')", - "from __main__ import get_html", - number=t) + "from __main__ import get_html", + number=t) print(f' *{tm:>10.5f}s get_html()') + + t = 100 - #url = "https://www.189.cn/" + + # url = "https://www.189.cn/" url = "http://www.chinaunicom.com" HTTPConnection.debuglevel = 1 s = get_html_session() _ = s.get(url) HTTPConnection.debuglevel = 0 - if len(sys.argv)>1: + if len(sys.argv) > 1: url = sys.argv[1] benchmark(t, url) -def download_file_with_filename(url, filename, path): + +def download_file_with_filename(url: str, filename: str, path: str) -> None: + """ + download file save to give path with given name from given url + """ conf = config.getInstance() configProxy = conf.proxy() @@ -475,38 +512,55 @@ def download_file_with_filename(url, filename, path): raise ValueError('[-]Connect Failed! Please check your Proxy or Network!') return -def download_one_file(args): + +def download_one_file(args) -> str: + """ + download file save to given path from given url + wrapped for map function + """ + def _inner(url: str, save_path: Path): filebytes = get_html(url, return_type='content') if isinstance(filebytes, bytes) and len(filebytes): if len(filebytes) == save_path.open('wb').write(filebytes): return str(save_path) + return _inner(*args) -'''用法示例: 2线程同时下载两个不同文件,并保存到不同路径,路径目录可未创建,但需要具备对目标目录和文件的写权限 -parallel_download_files([ + +def parallel_download_files(dn_list: typing.Iterable[typing.Sequence], parallel: int = 0): + """ + download files in parallel 多线程下载文件 + + 用法示例: 2线程同时下载两个不同文件,并保存到不同路径,路径目录可未创建,但需要具备对目标目录和文件的写权限 + parallel_download_files([ ('https://site1/img/p1.jpg', 'C:/temp/img/p1.jpg'), ('https://site2/cover/n1.xml', 'C:/tmp/cover/n1.xml') ]) -''' -# dn_list 可以是 tuple或者list: ((url1, save_fullpath1),(url2, save_fullpath2),) -# parallel: 并行下载的线程池线程数,为0则由函数自己决定 -def parallel_download_files(dn_list, parallel: int = 0): + + :dn_list: 可以是 tuple或者list: ((url1, save_fullpath1),(url2, save_fullpath2),) fullpath可以是str或Path + :parallel: 并行下载的线程池线程数,为0则由函数自己决定 + """ mp_args = [] for url, fullpath in dn_list: - if url and isinstance(url, str) and url.startswith('http') and fullpath and isinstance(fullpath, (str, Path)) and len(str(fullpath)): + if url and isinstance(url, str) and url.startswith('http') \ + and fullpath and isinstance(fullpath, (str, Path)) and len(str(fullpath)): fullpath = Path(fullpath) fullpath.parent.mkdir(parents=True, exist_ok=True) mp_args.append((url, fullpath)) if not len(mp_args): return [] - if not isinstance(parallel, int) or parallel not in range(1,200): + if not isinstance(parallel, int) or parallel not in range(1, 200): parallel = min(5, len(mp_args)) with ThreadPoolExecutor(parallel) as pool: results = list(pool.map(download_one_file, mp_args)) return results -def delete_all_elements_in_list(string,lists): + +def delete_all_elements_in_list(string: str, lists: typing.Iterable[str]): + """ + delete same string in given list + """ new_lists = [] for i in lists: if i != string: diff --git a/Movie_Data_Capture.py b/Movie_Data_Capture.py index 237eec9..141a241 100644 --- a/Movie_Data_Capture.py +++ b/Movie_Data_Capture.py @@ -280,7 +280,7 @@ def sigdebug_handler(*args): # 新增失败文件列表跳过处理,及.nfo修改天数跳过处理,提示跳过视频总数,调试模式(-g)下详细被跳过文件,跳过小广告 -def movie_lists(source_folder, regexstr: str) -> list[str]: +def movie_lists(source_folder, regexstr: str) -> typing.List[str]: conf = config.getInstance() main_mode = conf.main_mode() debug = conf.debug() @@ -526,7 +526,10 @@ def main(): create_failed_folder(conf.failed_folder()) # Download Mapping Table, parallel version - def fmd(f): + def fmd(f) -> typing.Tuple[str, Path]: + """ + + """ return ('https://raw.githubusercontent.com/yoshiko2/Movie_Data_Capture/master/MappingTable/' + f, Path.home() / '.local' / 'share' / 'mdc' / f) diff --git a/WebCrawler/__init__.py b/WebCrawler/__init__.py index 3ce9fe8..8d291db 100644 --- a/WebCrawler/__init__.py +++ b/WebCrawler/__init__.py @@ -38,9 +38,10 @@ def get_data_state(data: dict) -> bool: # 元数据获取失败检测 return True -def get_data_from_json(file_number, oCC): # 从JSON返回元数据 + +def get_data_from_json(file_number, oCC): """ - iterate through all services and fetch the data + iterate through all services and fetch the data 从JSON返回元数据 """ actor_mapping_data = etree.parse(str(Path.home() / '.local' / 'share' / 'mdc' / 'mapping_actor.xml')) @@ -332,6 +333,7 @@ def get_data_from_json(file_number, oCC): # 从JSON返回元数据 json_data['naming_rule'] = naming_rule return json_data + def special_characters_replacement(text) -> str: if not isinstance(text, str): return text diff --git a/core.py b/core.py index 7290b21..15eadb4 100644 --- a/core.py +++ b/core.py @@ -371,15 +371,19 @@ def print_files(path, leak_word, c_word, naming_rule, part, cn_sub, json_data, f moveFailedFolder(filepath) return -# 此函数从gui版copy过来用用 -# 参数说明 -# poster_path -# thumb_path -# cn_sub 中文字幕 参数值为 1 0 -# leak 流出 参数值为 1 0 -# uncensored 无码 参数值为 1 0 -# ========================================================================加水印 -def add_mark(poster_path, thumb_path, cn_sub, leak, uncensored, hack): + +def add_mark(poster_path, thumb_path, cn_sub, leak, uncensored, hack) -> None: + """ + add watermark on poster or thumb for describe extra properties 给海报和缩略图加属性水印 + + 此函数从gui版copy过来用用 + + :poster_path 海报位置 + :thumb_path 缩略图位置 + :cn_sub: 中文字幕 可选值:1,"1" 或其他值 + :uncensored 无码 可选值:1,"1" 或其他值 + :hack 破解 可选值:1,"1" 或其他值 + """ mark_type = '' if cn_sub: mark_type += ',字幕' @@ -396,6 +400,7 @@ def add_mark(poster_path, thumb_path, cn_sub, leak, uncensored, hack): add_mark_thread(poster_path, cn_sub, leak, uncensored, hack) print('[+]Poster Add Mark: ' + mark_type.strip(',')) + def add_mark_thread(pic_path, cn_sub, leak, uncensored, hack): size = 9 img_pic = Image.open(pic_path) @@ -414,6 +419,7 @@ def add_mark_thread(pic_path, cn_sub, leak, uncensored, hack): add_to_pic(pic_path, img_pic, size, count, 4) img_pic.close() + def add_to_pic(pic_path, img_pic, size, count, mode): mark_pic_path = '' pngpath = '' @@ -455,6 +461,7 @@ def add_to_pic(pic_path, img_pic, size, count, mode): img_pic.save(pic_path, quality=95) # ========================结束================================= + def paste_file_to_folder(filepath, path, number, leak_word, c_word, hack_word): # 文件路径,番号,后缀,要移动至的位置 filepath_obj = pathlib.Path(filepath) houzhui = filepath_obj.suffix @@ -546,6 +553,7 @@ def paste_file_to_folder_mode2(filepath, path, multi_part, number, part, leak_wo print(f'[-]OS Error errno {oserr.errno}') return + def get_part(filepath): try: if re.search('-CD\d+', filepath):