PEP8 PREFIX, AND SOME TYPING ANNOTATION

This commit is contained in:
FatalFurY
2022-02-18 00:01:21 +08:00
parent 123a2a0c73
commit c1568cd64a
2 changed files with 156 additions and 118 deletions

View File

@@ -13,10 +13,8 @@ import multiprocessing
from datetime import datetime, timedelta
from pathlib import Path
from opencc import OpenCC
import config
from ADC_function import file_modification_days, get_html, parallel_download_files
from number_parser import get_number
@@ -47,7 +45,8 @@ def argparse_function(ver: str) -> typing.Tuple[str, str, str, str, bool]:
parser = argparse.ArgumentParser(epilog=f"Load Config file '{conf.ini_path}'.")
parser.add_argument("file", default='', nargs='?', help="Single Movie file path.")
parser.add_argument("-p", "--path", default='', nargs='?', help="Analysis folder path.")
parser.add_argument("-m","--main-mode",default='',nargs='?',help="Main mode. 1:Scraping 2:Organizing 3:Scraping in analysis folder")
parser.add_argument("-m", "--main-mode", default='', nargs='?',
help="Main mode. 1:Scraping 2:Organizing 3:Scraping in analysis folder")
parser.add_argument("-n", "--number", default='', nargs='?', help="Custom file number of single movie file.")
# parser.add_argument("-C", "--config", default='config.ini', nargs='?', help="The config file Path.")
default_logdir = str(Path.home() / '.mlogs')
@@ -55,9 +54,12 @@ def argparse_function(ver: str) -> typing.Tuple[str, str, str, str, bool]:
help=f"""Duplicate stdout and stderr to logfiles in logging folder, default on.
default folder for current user: '{default_logdir}'. Change default folder to an empty file,
or use --log-dir= to turn log off.""")
parser.add_argument("-q","--regex-query",dest='regexstr',default='',nargs='?',help="python re module regex filepath filtering.")
parser.add_argument("-d","--nfo-skip-days",dest='days',default='',nargs='?', help="Override nfo_skip_days value in config.")
parser.add_argument("-c","--stop-counter",dest='cnt',default='',nargs='?', help="Override stop_counter value in config.")
parser.add_argument("-q", "--regex-query", dest='regexstr', default='', nargs='?',
help="python re module regex filepath filtering.")
parser.add_argument("-d", "--nfo-skip-days", dest='days', default='', nargs='?',
help="Override nfo_skip_days value in config.")
parser.add_argument("-c", "--stop-counter", dest='cnt', default='', nargs='?',
help="Override stop_counter value in config.")
parser.add_argument("-i", "--ignore-failed-list", action="store_true", help="Ignore failed list '{}'".format(
os.path.join(os.path.abspath(conf.failed_folder()), 'failed_list.txt')))
parser.add_argument("-a", "--auto-exit", action="store_true",
@@ -70,12 +72,16 @@ is performed. It may help you correct wrong numbers before real job.""")
parser.add_argument("-v", "--version", action="version", version=ver)
args = parser.parse_args()
def get_natural_number_or_none(value):
return int(value) if isinstance(value, str) and value.isnumeric() and int(value) >= 0 else None
def get_str_or_none(value):
return value if isinstance(value, str) and len(value) else None
def get_bool_or_none(value):
return True if isinstance(value, bool) and value else None
config.G_conf_override["common:main_mode"] = get_natural_number_or_none(args.main_mode)
config.G_conf_override["common:source_folder"] = get_str_or_none(args.path)
config.G_conf_override["common:auto_exit"] = get_bool_or_none(args.auto_exit)
@@ -86,43 +92,53 @@ is performed. It may help you correct wrong numbers before real job.""")
return args.file, args.number, args.logdir, args.regexstr, args.zero_op
class OutLogger(object):
def __init__(self, logfile) -> None:
self.term = sys.stdout
self.log = open(logfile, "w", encoding='utf-8', buffering=1)
self.filepath = logfile
def __del__(self):
self.close()
def __enter__(self):
pass
def __exit__(self, *args):
self.close()
def write(self, msg):
self.term.write(msg)
self.log.write(msg)
def flush(self):
self.term.flush()
self.log.flush()
os.fsync(self.log.fileno())
def close(self):
if self.term != None:
if self.term is not None:
sys.stdout = self.term
self.term = None
if self.log != None:
if self.log is not None:
self.log.close()
self.log = None
class ErrLogger(OutLogger):
def __init__(self, logfile) -> None:
self.term = sys.stderr
self.log = open(logfile, "w", encoding='utf-8', buffering=1)
self.filepath = logfile
def close(self):
if self.term != None:
if self.term is not None:
sys.stderr = self.term
self.term = None
if self.log != None:
if self.log is not None:
self.log.close()
self.log = None
@@ -257,13 +273,14 @@ def signal_handler(*args):
print('[!]Ctrl+C detected, Exit.')
sys.exit(9)
def sigdebug_handler(*args):
config.G_conf_override["debug_mode:switch"] = not config.G_conf_override["debug_mode:switch"]
print('[!]Debug {}'.format('On' if config.getInstance().debug() else 'oFF'))
# 新增失败文件列表跳过处理,及.nfo修改天数跳过处理提示跳过视频总数调试模式(-g)下详细被跳过文件,跳过小广告
def movie_lists(source_folder, regexstr):
def movie_lists(source_folder, regexstr: str) -> list[str]:
conf = config.getInstance()
main_mode = conf.main_mode()
debug = conf.debug()
@@ -314,11 +331,12 @@ def movie_lists(source_folder, regexstr):
continue # file is symlink or hardlink(Linux/NTFS/Darwin)
# 调试用0字节样本允许通过去除小于120MB的广告'苍老师强力推荐.mp4'(102.2MB)'黑道总裁.mp4'(98.4MB)'有趣的妹子激情表演.MP4'(95MB)'有趣的臺灣妹妹直播.mp4'(15.1MB)
movie_size = 0 if is_sym else full_name.stat().st_size # 同上 符号链接不取stat()及st_size直接赋0跳过小视频检测
if movie_size > 0 and movie_size < 125829120: # 1024*1024*120=125829120
if 0 < movie_size < 125829120: # 1024*1024*120=125829120
continue
if cliRE and not cliRE.search(absf) or trailerRE.search(full_name.name):
continue
if main_mode == 3 and nfo_skip_days > 0 and file_modification_days(full_name.with_suffix('.nfo')) <= nfo_skip_days:
if main_mode == 3 and nfo_skip_days > 0 and file_modification_days(
full_name.with_suffix('.nfo')) <= nfo_skip_days:
skip_nfo_days_cnt += 1
if debug:
print(f"[!]Skip movie by it's .nfo which modified within {nfo_skip_days} days: '{absf}'")
@@ -328,7 +346,8 @@ def movie_lists(source_folder, regexstr):
if skip_failed_cnt:
print(f"[!]Skip {skip_failed_cnt} movies in failed list '{failed_list_txt_path}'.")
if skip_nfo_days_cnt:
print(f"[!]Skip {skip_nfo_days_cnt} movies in source folder '{source}' who's .nfo modified within {nfo_skip_days} days.")
print(
f"[!]Skip {skip_nfo_days_cnt} movies in source folder '{source}' who's .nfo modified within {nfo_skip_days} days.")
if nfo_skip_days <= 0 or not soft_link or main_mode == 3:
return total
# 软连接方式,已经成功削刮的也需要从成功目录中检查.nfo更新天数跳过N天内更新过的
@@ -354,13 +373,17 @@ def movie_lists(source_folder, regexstr):
if debug:
print(f"[!]Skip file successfully processed within {nfo_skip_days} days: '{f}'")
if len(rm_list):
print(f"[!]Skip {len(rm_list)} movies in success folder '{success_folder}' who's .nfo modified within {nfo_skip_days} days.")
print(
f"[!]Skip {len(rm_list)} movies in success folder '{success_folder}' who's .nfo modified within {nfo_skip_days} days.")
return total
def create_failed_folder(failed_folder):
if not os.path.exists(failed_folder): # 新建failed文件夹
def create_failed_folder(failed_folder: str):
"""
新建failed文件夹
"""
if not os.path.exists(failed_folder):
try:
os.makedirs(failed_folder)
except:
@@ -373,9 +396,7 @@ def rm_empty_folder(path):
deleted = set()
for current_dir, subdirs, files in os.walk(abspath, topdown=False):
try:
still_has_subdirs = any(
_ for subdir in subdirs if os.path.join(current_dir, subdir) not in deleted
)
still_has_subdirs = any(_ for subdir in subdirs if os.path.join(current_dir, subdir) not in deleted)
if not any(files) and not still_has_subdirs and not os.path.samefile(path, current_dir):
os.rmdir(current_dir)
deleted.add(current_dir)
@@ -390,7 +411,7 @@ def create_data_and_move(file_path: str, zero_op, oCC):
n_number = get_number(debug, os.path.basename(file_path))
file_path = os.path.abspath(file_path)
if debug == True:
if debug is True:
print(f"[!] [{n_number}] As Number making data for '{file_path}'")
if zero_op:
return
@@ -455,11 +476,9 @@ def main():
# Parse command line args
single_file_path, custom_number, logdir, regexstr, zero_op = argparse_function(version)
main_mode = conf.main_mode()
folder_path = ""
if not main_mode in (1, 2, 3):
if main_mode not in (1, 2, 3):
print(f"[-]Main mode must be 1 or 2 or 3! You can run '{os.path.basename(sys.argv[0])} --help' for more help.")
sys.exit(4)
@@ -470,7 +489,8 @@ def main():
signal.signal(signal.SIGWINCH, sigdebug_handler)
dupe_stdout_to_logfile(logdir)
platform_total = str(' - ' + platform.platform() + ' \n[*] - ' + platform.machine() + ' - Python-' + platform.python_version())
platform_total = str(
' - ' + platform.platform() + ' \n[*] - ' + platform.machine() + ' - Python-' + platform.python_version())
print('[*]================= Movie Data Capture =================')
print('[*]' + version.center(54))
@@ -507,6 +527,7 @@ def main():
def fmd(f):
return ('https://raw.githubusercontent.com/yoshiko2/Movie_Data_Capture/master/MappingTable/' + f,
Path.home() / '.local' / 'share' / 'mdc' / f)
map_tab = (fmd('mapping_actor.xml'), fmd('mapping_info.xml'), fmd('c_number.json'))
for k, v in map_tab:
if v.exists():
@@ -528,14 +549,15 @@ def main():
try:
oCC = None if ccm == 0 else OpenCC('t2s.json' if ccm == 1 else 's2t.json')
except:
# some OS no OpennCC cpython, try opencc-python-reimplemented.
# some OS no OpenCC cpython, try opencc-python-reimplemented.
# pip uninstall opencc && pip install opencc-python-reimplemented
oCC = None if ccm == 0 else OpenCC('t2s' if ccm == 1 else 's2t')
if not single_file_path == '': # Single File
print('[+]==================== Single File =====================')
if custom_number == '':
create_data_and_move_with_custom_number(single_file_path, get_number(conf.debug(), os.path.basename(single_file_path)), oCC)
create_data_and_move_with_custom_number(single_file_path,
get_number(conf.debug(), os.path.basename(single_file_path)), oCC)
else:
create_data_and_move_with_custom_number(single_file_path, custom_number, oCC)
else:
@@ -558,7 +580,8 @@ def main():
for movie_path in movie_list: # 遍历电影列表 交给core处理
count = count + 1
percentage = str(count / int(count_all) * 100)[:4] + '%'
print('[!] {:>30}{:>21}'.format('- ' + percentage + ' [' + str(count) + '/' + count_all + '] -', time.strftime("%H:%M:%S")))
print('[!] {:>30}{:>21}'.format('- ' + percentage + ' [' + str(count) + '/' + count_all + '] -',
time.strftime("%H:%M:%S")))
create_data_and_move(movie_path, zero_op, oCC)
if count >= stop_count:
print("[!]Stop counter triggered!")

View File

@@ -2,36 +2,37 @@ import os
import re
import sys
import config
import typing
G_spat = re.compile(
"^22-sht\.me|-fhd|_fhd|^fhd_|^fhd-|-hd|_hd|^hd_|^hd-|-sd|_sd|-1080p|_1080p|-720p|_720p|^hhd800\.com@|-uncensored|_uncensored|-leak|_leak",
re.IGNORECASE)
def get_number(debug,file_path: str) -> str:
# """
# >>> from number_parser import get_number
# >>> get_number("/Users/Guest/AV_Data_Capture/snis-829.mp4")
# 'snis-829'
# >>> get_number("/Users/Guest/AV_Data_Capture/snis-829-C.mp4")
# 'snis-829'
# >>> get_number("C:¥Users¥Guest¥snis-829.mp4")
# 'snis-829'
# >>> get_number("C:¥Users¥Guest¥snis-829-C.mp4")
# 'snis-829'
# >>> get_number("./snis-829.mp4")
# 'snis-829'
# >>> get_number("./snis-829-C.mp4")
# 'snis-829'
# >>> get_number(".¥snis-829.mp4")
# 'snis-829'
# >>> get_number(".¥snis-829-C.mp4")
# 'snis-829'
# >>> get_number("snis-829.mp4")
# 'snis-829'
# >>> get_number("snis-829-C.mp4")
# 'snis-829'
# """
def get_number(debug: bool, file_path: str) -> str:
"""
从文件路径中提取番号 from number_parser import get_number
>>> get_number(False, "/Users/Guest/AV_Data_Capture/snis-829.mp4")
'snis-829'
>>> get_number(False, "/Users/Guest/AV_Data_Capture/snis-829-C.mp4")
'snis-829'
>>> get_number(False, "C:¥Users¥Guest¥snis-829.mp4")
'snis-829'
>>> get_number(False, "C:¥Users¥Guest¥snis-829-C.mp4")
'snis-829'
>>> get_number(False, "./snis-829.mp4")
'snis-829'
>>> get_number(False, "./snis-829-C.mp4")
'snis-829'
>>> get_number(False, ".¥snis-829.mp4")
'snis-829'
>>> get_number(False, ".¥snis-829-C.mp4")
'snis-829'
>>> get_number(False, "snis-829.mp4")
'snis-829'
>>> get_number(False, "snis-829-C.mp4")
'snis-829'
"""
filepath = os.path.basename(file_path)
# debug True 和 False 两块代码块合并原因是此模块及函数只涉及字符串计算没有IO操作debug on时输出导致异常信息即可
try:
@@ -78,7 +79,8 @@ G_TAKE_NUM_RULES = {
'heyzo': lambda x: 'HEYZO-' + re.findall(r'heyzo[^\d]*(\d{4})', x, re.I)[0]
}
def get_number_by_dict(filename: str) -> str:
def get_number_by_dict(filename: str) -> typing.Optional[str]:
try:
for k, v in G_TAKE_NUM_RULES.items():
if re.search(k, filename, re.I):
@@ -87,10 +89,13 @@ def get_number_by_dict(filename: str) -> str:
pass
return None
class Cache_uncensored_conf:
prefix = None
def is_empty(self):
return bool(self.prefix is None)
def set(self, v: list):
if not v or not len(v) or not len(v[0]):
raise ValueError('input prefix list empty or None')
@@ -99,13 +104,16 @@ class Cache_uncensored_conf:
for i in v[1:]:
s += f"|{i}.+"
self.prefix = re.compile(s, re.I)
def check(self, number):
if self.prefix is None:
raise ValueError('No init re compile')
return self.prefix.match(number)
G_cache_uncensored_conf = Cache_uncensored_conf()
# ========================================================================是否为无码
def is_uncensored(number):
if re.match(
@@ -118,6 +126,7 @@ r'[\d-]{4,}|\d{6}_\d{2,3}|(cz|gedo|k|n|red-|se)\d{2,4}|heyzo.+|xxx-av-.+|heydoug
G_cache_uncensored_conf.set(config.getInstance().get_uncensored().split(','))
return G_cache_uncensored_conf.check(number)
if __name__ == "__main__":
# import doctest
# doctest.testmod(raise_on_error=True)
@@ -144,9 +153,13 @@ if __name__ == "__main__":
"pacopacomama-093021_539-FHD.mkv", # 新支持片商格式 093021_539 命名规则来自javdb数据源
"sbw99.cc@heyzo_hd_2636_full.mp4"
)
def evprint(evstr):
code = compile(evstr, "<string>", "eval")
print("{1:>20} # '{0}'".format(evstr[18:-2], eval(code)))
for t in test_use_cases:
evprint(f'get_number(True, "{t}")')
@@ -169,6 +182,7 @@ if __name__ == "__main__":
# 示例:
# python3 ./number_parser.py ALL
import subprocess
ES_search_path = "ALL disks"
if sys.argv[1] == "ALL":
if sys.platform == "win32":
@@ -180,7 +194,8 @@ if __name__ == "__main__":
out_list = out_text.splitlines()
elif sys.platform in ("linux", "darwin"):
ES_prog_path = 'locate' if sys.platform == 'linux' else 'glocate'
ES_cmdline = r"{} -b -i --regex '\.mp4$|\.avi$|\.rmvb$|\.wmv$|\.mov$|\.mkv$|\.webm$|\.iso$|\.mpg$|\.m4v$'".format(ES_prog_path)
ES_cmdline = r"{} -b -i --regex '\.mp4$|\.avi$|\.rmvb$|\.wmv$|\.mov$|\.mkv$|\.webm$|\.iso$|\.mpg$|\.m4v$'".format(
ES_prog_path)
out_bytes = subprocess.check_output(ES_cmdline.split(' '))
out_text = out_bytes.decode('utf-8')
out_list = [os.path.basename(line) for line in out_text.splitlines()]