PEP8 PREFIX, AND SOME TYPING ANNOTATION, FUNCTION COMMENT

This commit is contained in:
FatalFurY
2022-02-23 22:11:45 +08:00
parent 950a4dce13
commit 377a9f308b
4 changed files with 144 additions and 77 deletions

View File

@@ -1,6 +1,6 @@
from os import replace from os import replace
import requests import requests
#import hashlib # import hashlib
from pathlib import Path from pathlib import Path
import secrets import secrets
import os.path import os.path
@@ -11,6 +11,7 @@ import time
from lxml import etree from lxml import etree
import re import re
import config import config
import typing
from urllib.parse import urljoin from urllib.parse import urljoin
import mechanicalsoup import mechanicalsoup
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
@@ -27,8 +28,11 @@ def getXpathSingle(htmlcode, xpath):
G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36' G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36'
# 网页请求核心
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None):
"""
网页请求核心函数
"""
verify = config.getInstance().cacert_file() verify = config.getInstance().cacert_file()
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
errors = "" errors = ""
@@ -39,7 +43,8 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None,
try: try:
if configProxy.enable: if configProxy.enable:
proxies = configProxy.proxies() proxies = configProxy.proxies()
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies, verify=verify, result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies,
verify=verify,
cookies=cookies) cookies=cookies)
else: else:
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies) result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies)
@@ -89,7 +94,8 @@ def post_html(url: str, query: dict, headers: dict = None) -> requests.Response:
print("[-]" + errors) print("[-]" + errors)
G_DEFAULT_TIMEOUT = 10 # seconds G_DEFAULT_TIMEOUT = 10 # seconds
class TimeoutHTTPAdapter(HTTPAdapter): class TimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@@ -98,6 +104,7 @@ class TimeoutHTTPAdapter(HTTPAdapter):
self.timeout = kwargs["timeout"] self.timeout = kwargs["timeout"]
del kwargs["timeout"] del kwargs["timeout"]
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def send(self, request, **kwargs): def send(self, request, **kwargs):
timeout = kwargs.get("timeout") timeout = kwargs.get("timeout")
if timeout is None: if timeout is None:
@@ -106,12 +113,14 @@ class TimeoutHTTPAdapter(HTTPAdapter):
# with keep-alive feature # with keep-alive feature
def get_html_session(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None):
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
session = requests.Session() session = requests.Session()
if isinstance(cookies, dict) and len(cookies): if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies) requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable: if configProxy.enable:
@@ -122,7 +131,7 @@ def get_html_session(url:str = None, cookies: dict = None, ua: str = None, retur
try: try:
if isinstance(url, str) and len(url): if isinstance(url, str) and len(url):
result = session.get(str(url)) result = session.get(str(url))
else: # 空url参数直接返回可重用session对象无需设置return_type else: # 空url参数直接返回可重用session对象无需设置return_type
return session return session
if not result.ok: if not result.ok:
return None return None
@@ -142,12 +151,14 @@ def get_html_session(url:str = None, cookies: dict = None, ua: str = None, retur
return None return None
def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, use_scraper: bool = False): def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None, use_scraper: bool = False):
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
s = create_scraper(browser={'custom': ua or G_USER_AGENT,}) if use_scraper else requests.Session() s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session()
if isinstance(cookies, dict) and len(cookies): if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies) requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable: if configProxy.enable:
@@ -178,12 +189,14 @@ def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, re
return None return None
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None,
return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
s = requests.Session() s = requests.Session()
if isinstance(cookies, dict) and len(cookies): if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies) requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable: if configProxy.enable:
@@ -216,12 +229,14 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies:
return None return None
def get_html_by_scraper(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None):
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
session = create_scraper(browser={'custom': ua or G_USER_AGENT,}) session = create_scraper(browser={'custom': ua or G_USER_AGENT, })
if isinstance(cookies, dict) and len(cookies): if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies) requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable: if configProxy.enable:
@@ -230,7 +245,7 @@ def get_html_by_scraper(url:str = None, cookies: dict = None, ua: str = None, re
try: try:
if isinstance(url, str) and len(url): if isinstance(url, str) and len(url):
result = session.get(str(url)) result = session.get(str(url))
else: # 空url参数直接返回可重用scraper对象无需设置return_type else: # 空url参数直接返回可重用scraper对象无需设置return_type
return session return session
if not result.ok: if not result.ok:
return None return None
@@ -285,7 +300,12 @@ def translate(
app_id: str = "", app_id: str = "",
key: str = "", key: str = "",
delay: int = 0, delay: int = 0,
): ) -> str:
"""
translate japanese kana to simplified chinese
翻译日语假名到简体中文
:raises ValueError: Non-existent translation engine
"""
trans_result = "" trans_result = ""
# 中文句子如果包含&等符号会被谷歌翻译截断损失内容,而且中文翻译到中文也没有意义,故而忽略,只翻译带有日语假名的 # 中文句子如果包含&等符号会被谷歌翻译截断损失内容,而且中文翻译到中文也没有意义,故而忽略,只翻译带有日语假名的
if not is_japanese(src): if not is_japanese(src):
@@ -295,7 +315,7 @@ def translate(
if not re.match('^translate\.google\.(com|com\.\w{2}|\w{2})$', gsite): if not re.match('^translate\.google\.(com|com\.\w{2}|\w{2})$', gsite):
gsite = 'translate.google.cn' gsite = 'translate.google.cn'
url = ( url = (
f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={target_language}&q={src}" f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={target_language}&q={src}"
) )
result = get_html(url=url, return_type="object") result = get_html(url=url, return_type="object")
if not result.ok: if not result.ok:
@@ -324,26 +344,27 @@ f"https://{gsite}/translate_a/single?client=gtx&dt=t&dj=1&ie=UTF-8&sl=auto&tl={t
return trans_result return trans_result
# 从浏览器中导出网站登录验证信息的cookies能够以会员方式打开游客无法访问到的页面 def load_cookies(cookie_json_filename: str):
# 示例: FC2-755670 url https://javdb9.com/v/vO8Mn """
# json 文件格式 加载cookie,用于以会员方式访问非游客内容
# 文件名: 站点名.json示例 javdb9.json
# 内容(文件编码:UTF-8) :filename: cookie文件名。获取cookie方式从网站登录后通过浏览器插件(CookieBro或EdittThisCookie)或者直接在地址栏网站链接信息处都可以复制或者导出cookie内容以JSON方式保存
'''
{ # 示例: FC2-755670 url https://javdb9.com/v/vO8Mn
"over18":"1", # json 文件格式
"redirect_to":"%2Fv%2FvO8Mn", # 文件名: 站点名.json示例 javdb9.json
"remember_me_token":"cbJdeaFpbHMiOnsibWVzc2FnZSI6IklrNVJjbTAzZFVSRVlVaEtPWEpUVFhOVU0yNXhJZz09IiwiZXhwIjoiMjAyMS0wNS0xNVQxMzoyODoxNy4wMDBaIiwicHVyIjoiY29va2llLnJlbWVtYmVyX21lX3Rva2VuIn19--a7131611e844cf75f9db4cd411b635889bff3fe3", # 内容(文件编码:UTF-8)
"_jdb_session":"asddefqfwfwwrfdsdaAmqKj1%2FvOrDQP4b7h%2BvGp7brvIShi2Y%2FHBUr%2BklApk06TfhBOK3g5gRImZzoi49GINH%2FK49o3W%2FX64ugBiUAcudN9b27Mg6Ohu%2Bx9Z7A4bbqmqCt7XR%2Bao8PRuOjMcdDG5czoYHJCPIPZQFU28Gd7Awc2jc5FM5CoIgSRyaYDy9ulTO7DlavxoNL%2F6OFEL%2FyaA6XUYTB2Gs1kpPiUDqwi854mo5%2FrNxMhTeBK%2BjXciazMtN5KlE5JIOfiWAjNrnx7SV3Hj%2FqPNxRxXFQyEwHr5TZa0Vk1%2FjbwWQ0wcIFfh%2FMLwwqKydAh%2FLndc%2Bmdv3e%2FJ%2BiL2--xhqYnMyVRlxJajdN--u7nl0M7Oe7tZtPd4kIaEbg%3D%3D", {
"locale":"zh", "over18":"1",
"__cfduid":"dee27116d98c432a5cabc1fe0e7c2f3c91620479752", "redirect_to":"%2Fv%2FvO8Mn",
"theme":"auto" "remember_me_token":"***********",
} "_jdb_session":"************",
''' "locale":"zh",
# 从网站登录后,通过浏览器插件(CookieBro或EdittThisCookie)或者直接在地址栏网站链接信息处都可以复制或者导出cookie内容 "__cfduid":"*********",
# 并填写到以上json文件的相应字段中 "theme":"auto"
def load_cookies(filename): }
filename = os.path.basename(filename) """
filename = os.path.basename(cookie_json_filename)
if not len(filename): if not len(filename):
return None, None return None, None
path_search_order = ( path_search_order = (
@@ -364,8 +385,11 @@ def load_cookies(filename):
except: except:
return None, None return None, None
# 文件修改时间距此时的天数
def file_modification_days(filename) -> int: def file_modification_days(filename: str) -> int:
"""
文件修改时间距此时的天数
"""
mfile = Path(filename) mfile = Path(filename)
if not mfile.is_file(): if not mfile.is_file():
return 9999 return 9999
@@ -376,48 +400,61 @@ def file_modification_days(filename) -> int:
return 9999 return 9999
return days return days
def file_not_exist_or_empty(filepath) -> bool: def file_not_exist_or_empty(filepath) -> bool:
return not os.path.isfile(filepath) or os.path.getsize(filepath) == 0 return not os.path.isfile(filepath) or os.path.getsize(filepath) == 0
# 日语简单检测
def is_japanese(s) -> bool: def is_japanese(raw: str) -> bool:
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', s, re.UNICODE)) """
日语简单检测
"""
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE))
# Usage: python ./ADC_function.py https://cn.bing.com/ # Usage: python ./ADC_function.py https://cn.bing.com/
if __name__ == "__main__": if __name__ == "__main__":
import sys, timeit import sys, timeit
from http.client import HTTPConnection from http.client import HTTPConnection
def benchmark(t, url): def benchmark(t, url):
print(f"HTTP GET Benchmark times:{t} url:{url}") print(f"HTTP GET Benchmark times:{t} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')", tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()", "from __main__ import get_html_session;session1=get_html_session()",
number=t) number=t)
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable') print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
tm = timeit.timeit(f"_ = scraper1.get('{url}')", tm = timeit.timeit(f"_ = scraper1.get('{url}')",
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()", "from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
number=t) number=t)
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable') print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
tm = timeit.timeit(f"_ = browser1.open('{url}')", tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()", "from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=t) number=t)
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable') print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
tm = timeit.timeit(f"_ = get_html('{url}')", tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html", "from __main__ import get_html",
number=t) number=t)
print(f' *{tm:>10.5f}s get_html()') print(f' *{tm:>10.5f}s get_html()')
t = 100 t = 100
#url = "https://www.189.cn/"
# url = "https://www.189.cn/"
url = "http://www.chinaunicom.com" url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1 HTTPConnection.debuglevel = 1
s = get_html_session() s = get_html_session()
_ = s.get(url) _ = s.get(url)
HTTPConnection.debuglevel = 0 HTTPConnection.debuglevel = 0
if len(sys.argv)>1: if len(sys.argv) > 1:
url = sys.argv[1] url = sys.argv[1]
benchmark(t, url) benchmark(t, url)
def download_file_with_filename(url, filename, path):
def download_file_with_filename(url: str, filename: str, path: str) -> None:
"""
download file save to give path with given name from given url
"""
conf = config.getInstance() conf = config.getInstance()
configProxy = conf.proxy() configProxy = conf.proxy()
@@ -475,38 +512,55 @@ def download_file_with_filename(url, filename, path):
raise ValueError('[-]Connect Failed! Please check your Proxy or Network!') raise ValueError('[-]Connect Failed! Please check your Proxy or Network!')
return return
def download_one_file(args):
def download_one_file(args) -> str:
"""
download file save to given path from given url
wrapped for map function
"""
def _inner(url: str, save_path: Path): def _inner(url: str, save_path: Path):
filebytes = get_html(url, return_type='content') filebytes = get_html(url, return_type='content')
if isinstance(filebytes, bytes) and len(filebytes): if isinstance(filebytes, bytes) and len(filebytes):
if len(filebytes) == save_path.open('wb').write(filebytes): if len(filebytes) == save_path.open('wb').write(filebytes):
return str(save_path) return str(save_path)
return _inner(*args) return _inner(*args)
'''用法示例: 2线程同时下载两个不同文件并保存到不同路径路径目录可未创建但需要具备对目标目录和文件的写权限
parallel_download_files([ def parallel_download_files(dn_list: typing.Iterable[typing.Sequence], parallel: int = 0):
"""
download files in parallel 多线程下载文件
用法示例: 2线程同时下载两个不同文件并保存到不同路径路径目录可未创建但需要具备对目标目录和文件的写权限
parallel_download_files([
('https://site1/img/p1.jpg', 'C:/temp/img/p1.jpg'), ('https://site1/img/p1.jpg', 'C:/temp/img/p1.jpg'),
('https://site2/cover/n1.xml', 'C:/tmp/cover/n1.xml') ('https://site2/cover/n1.xml', 'C:/tmp/cover/n1.xml')
]) ])
'''
# dn_list 可以是 tuple或者list: ((url1, save_fullpath1),(url2, save_fullpath2),) :dn_list: 可以是 tuple或者list: ((url1, save_fullpath1),(url2, save_fullpath2),) fullpath可以是str或Path
# parallel: 并行下载的线程池线程数为0则由函数自己决定 :parallel: 并行下载的线程池线程数为0则由函数自己决定
def parallel_download_files(dn_list, parallel: int = 0): """
mp_args = [] mp_args = []
for url, fullpath in dn_list: for url, fullpath in dn_list:
if url and isinstance(url, str) and url.startswith('http') and fullpath and isinstance(fullpath, (str, Path)) and len(str(fullpath)): if url and isinstance(url, str) and url.startswith('http') \
and fullpath and isinstance(fullpath, (str, Path)) and len(str(fullpath)):
fullpath = Path(fullpath) fullpath = Path(fullpath)
fullpath.parent.mkdir(parents=True, exist_ok=True) fullpath.parent.mkdir(parents=True, exist_ok=True)
mp_args.append((url, fullpath)) mp_args.append((url, fullpath))
if not len(mp_args): if not len(mp_args):
return [] return []
if not isinstance(parallel, int) or parallel not in range(1,200): if not isinstance(parallel, int) or parallel not in range(1, 200):
parallel = min(5, len(mp_args)) parallel = min(5, len(mp_args))
with ThreadPoolExecutor(parallel) as pool: with ThreadPoolExecutor(parallel) as pool:
results = list(pool.map(download_one_file, mp_args)) results = list(pool.map(download_one_file, mp_args))
return results return results
def delete_all_elements_in_list(string,lists):
def delete_all_elements_in_list(string: str, lists: typing.Iterable[str]):
"""
delete same string in given list
"""
new_lists = [] new_lists = []
for i in lists: for i in lists:
if i != string: if i != string:

View File

@@ -280,7 +280,7 @@ def sigdebug_handler(*args):
# 新增失败文件列表跳过处理,及.nfo修改天数跳过处理提示跳过视频总数调试模式(-g)下详细被跳过文件,跳过小广告 # 新增失败文件列表跳过处理,及.nfo修改天数跳过处理提示跳过视频总数调试模式(-g)下详细被跳过文件,跳过小广告
def movie_lists(source_folder, regexstr: str) -> list[str]: def movie_lists(source_folder, regexstr: str) -> typing.List[str]:
conf = config.getInstance() conf = config.getInstance()
main_mode = conf.main_mode() main_mode = conf.main_mode()
debug = conf.debug() debug = conf.debug()
@@ -526,7 +526,10 @@ def main():
create_failed_folder(conf.failed_folder()) create_failed_folder(conf.failed_folder())
# Download Mapping Table, parallel version # Download Mapping Table, parallel version
def fmd(f): def fmd(f) -> typing.Tuple[str, Path]:
"""
"""
return ('https://raw.githubusercontent.com/yoshiko2/Movie_Data_Capture/master/MappingTable/' + f, return ('https://raw.githubusercontent.com/yoshiko2/Movie_Data_Capture/master/MappingTable/' + f,
Path.home() / '.local' / 'share' / 'mdc' / f) Path.home() / '.local' / 'share' / 'mdc' / f)

View File

@@ -38,9 +38,10 @@ def get_data_state(data: dict) -> bool: # 元数据获取失败检测
return True return True
def get_data_from_json(file_number, oCC): # 从JSON返回元数据
def get_data_from_json(file_number, oCC):
""" """
iterate through all services and fetch the data iterate through all services and fetch the data 从JSON返回元数据
""" """
actor_mapping_data = etree.parse(str(Path.home() / '.local' / 'share' / 'mdc' / 'mapping_actor.xml')) actor_mapping_data = etree.parse(str(Path.home() / '.local' / 'share' / 'mdc' / 'mapping_actor.xml'))
@@ -331,6 +332,7 @@ def get_data_from_json(file_number, oCC): # 从JSON返回元数据
json_data['naming_rule'] = naming_rule json_data['naming_rule'] = naming_rule
return json_data return json_data
def special_characters_replacement(text) -> str: def special_characters_replacement(text) -> str:
if not isinstance(text, str): if not isinstance(text, str):
return text return text

26
core.py
View File

@@ -371,15 +371,19 @@ def print_files(path, leak_word, c_word, naming_rule, part, cn_sub, json_data, f
moveFailedFolder(filepath) moveFailedFolder(filepath)
return return
# 此函数从gui版copy过来用用
# 参数说明 def add_mark(poster_path, thumb_path, cn_sub, leak, uncensored, hack) -> None:
# poster_path """
# thumb_path add watermark on poster or thumb for describe extra properties 给海报和缩略图加属性水印
# cn_sub 中文字幕 参数值为 1 0
# leak 流出 参数值为 1 0 此函数从gui版copy过来用用
# uncensored 无码 参数值为 1 0
# ========================================================================加水印 :poster_path 海报位置
def add_mark(poster_path, thumb_path, cn_sub, leak, uncensored, hack): :thumb_path 缩略图位置
:cn_sub: 中文字幕 可选值1,"1" 或其他值
:uncensored 无码 可选值1,"1" 或其他值
:hack 破解 可选值1,"1" 或其他值
"""
mark_type = '' mark_type = ''
if cn_sub: if cn_sub:
mark_type += ',字幕' mark_type += ',字幕'
@@ -396,6 +400,7 @@ def add_mark(poster_path, thumb_path, cn_sub, leak, uncensored, hack):
add_mark_thread(poster_path, cn_sub, leak, uncensored, hack) add_mark_thread(poster_path, cn_sub, leak, uncensored, hack)
print('[+]Poster Add Mark: ' + mark_type.strip(',')) print('[+]Poster Add Mark: ' + mark_type.strip(','))
def add_mark_thread(pic_path, cn_sub, leak, uncensored, hack): def add_mark_thread(pic_path, cn_sub, leak, uncensored, hack):
size = 9 size = 9
img_pic = Image.open(pic_path) img_pic = Image.open(pic_path)
@@ -414,6 +419,7 @@ def add_mark_thread(pic_path, cn_sub, leak, uncensored, hack):
add_to_pic(pic_path, img_pic, size, count, 4) add_to_pic(pic_path, img_pic, size, count, 4)
img_pic.close() img_pic.close()
def add_to_pic(pic_path, img_pic, size, count, mode): def add_to_pic(pic_path, img_pic, size, count, mode):
mark_pic_path = '' mark_pic_path = ''
pngpath = '' pngpath = ''
@@ -455,6 +461,7 @@ def add_to_pic(pic_path, img_pic, size, count, mode):
img_pic.save(pic_path, quality=95) img_pic.save(pic_path, quality=95)
# ========================结束================================= # ========================结束=================================
def paste_file_to_folder(filepath, path, number, leak_word, c_word, hack_word): # 文件路径,番号,后缀,要移动至的位置 def paste_file_to_folder(filepath, path, number, leak_word, c_word, hack_word): # 文件路径,番号,后缀,要移动至的位置
filepath_obj = pathlib.Path(filepath) filepath_obj = pathlib.Path(filepath)
houzhui = filepath_obj.suffix houzhui = filepath_obj.suffix
@@ -546,6 +553,7 @@ def paste_file_to_folder_mode2(filepath, path, multi_part, number, part, leak_wo
print(f'[-]OS Error errno {oserr.errno}') print(f'[-]OS Error errno {oserr.errno}')
return return
def get_part(filepath): def get_part(filepath):
try: try:
if re.search('-CD\d+', filepath): if re.search('-CD\d+', filepath):