diff --git a/ADC_function.py b/ADC_function.py index 2219219..4fcbec1 100644 --- a/ADC_function.py +++ b/ADC_function.py @@ -1,55 +1,57 @@ -from os import replace -import requests -# import hashlib -from pathlib import Path -import secrets +# build-in lib import os.path import os +import re import uuid import json import time -from lxml import etree -import re -import config import typing -from urllib.parse import urljoin -import mechanicalsoup -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry -from cloudscraper import create_scraper -from concurrent.futures import ThreadPoolExecutor from unicodedata import category +from concurrent.futures import ThreadPoolExecutor + +# third party lib +import requests +from requests.adapters import HTTPAdapter +import mechanicalsoup +from pathlib import Path +from urllib3.util.retry import Retry +from lxml import etree +from cloudscraper import create_scraper + +# project wide +import config -def getXpathSingle(htmlcode, xpath): - html = etree.fromstring(htmlcode, etree.HTMLParser()) +def get_xpath_single(html_code: str, xpath): + html = etree.fromstring(html_code, etree.HTMLParser()) result1 = str(html.xpath(xpath)).strip(" ['']") return result1 G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.133 Safari/537.36' -def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers = None): + +def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers=None): """ 网页请求核心函数 """ verify = config.getInstance().cacert_file() - configProxy = config.getInstance().proxy() + config_proxy = config.getInstance().proxy() errors = "" headers = {"User-Agent": ua or G_USER_AGENT} # noqa - if json_headers != None: + if json_headers is not None: headers.update(json_headers) - for i in range(configProxy.retry): + for i in range(config_proxy.retry): try: - if configProxy.enable: - proxies = configProxy.proxies() - result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies, + if config_proxy.enable: + proxies = config_proxy.proxies() + result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, proxies=proxies, verify=verify, cookies=cookies) else: - result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies) + result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, cookies=cookies) if return_type == "object": return result @@ -59,7 +61,7 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, result.encoding = encoding or result.apparent_encoding return result.text except Exception as e: - print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry)) + print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry)) errors = str(e) if "getaddrinfo failed" in errors: print("[-]Connect Failed! Please Check your proxy config") @@ -71,8 +73,9 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, print('[-]Connect Failed! Please check your Proxy or Network!') raise Exception('Connect Failed') + def post_html(url: str, query: dict, headers: dict = None) -> requests.Response: - configProxy = config.getInstance().proxy() + config_proxy = config.getInstance().proxy() errors = "" headers_ua = {"User-Agent": G_USER_AGENT} if headers is None: @@ -80,16 +83,16 @@ def post_html(url: str, query: dict, headers: dict = None) -> requests.Response: else: headers.update(headers_ua) - for i in range(configProxy.retry): + for i in range(config_proxy.retry): try: - if configProxy.enable: - proxies = configProxy.proxies() - result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=configProxy.timeout) + if config_proxy.enable: + proxies = config_proxy.proxies() + result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=config_proxy.timeout) else: - result = requests.post(url, data=query, headers=headers, timeout=configProxy.timeout) + result = requests.post(url, data=query, headers=headers, timeout=config_proxy.timeout) return result except Exception as e: - print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry)) + print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry)) errors = str(e) print("[-]Connect Failed! Please check your Proxy or Network!") print("[-]" + errors) @@ -116,17 +119,17 @@ class TimeoutHTTPAdapter(HTTPAdapter): # with keep-alive feature def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): - configProxy = config.getInstance().proxy() + config_proxy = config.getInstance().proxy() session = requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(session.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) - session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - if configProxy.enable: + session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + if config_proxy.enable: session.verify = config.getInstance().cacert_file() - session.proxies = configProxy.proxies() + session.proxies = config_proxy.proxies() headers = {"User-Agent": ua or G_USER_AGENT} session.headers = headers try: @@ -156,17 +159,17 @@ def get_html_session(url: str = None, cookies: dict = None, ua: str = None, retu def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, use_scraper: bool = False): - configProxy = config.getInstance().proxy() + config_proxy = config.getInstance().proxy() s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(s.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) - s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - if configProxy.enable: + s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + if config_proxy.enable: s.verify = config.getInstance().cacert_file() - s.proxies = configProxy.proxies() + s.proxies = config_proxy.proxies() try: browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s) if isinstance(url, str) and len(url): @@ -194,17 +197,17 @@ def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, r def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): - configProxy = config.getInstance().proxy() + config_proxy = config.getInstance().proxy() s = requests.Session() if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(s.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) - s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - if configProxy.enable: + s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + if config_proxy.enable: s.verify = config.getInstance().cacert_file() - s.proxies = configProxy.proxies() + s.proxies = config_proxy.proxies() try: browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s) result = browser.open(url) @@ -234,17 +237,17 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None): - configProxy = config.getInstance().proxy() + config_proxy = config.getInstance().proxy() session = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if isinstance(cookies, dict) and len(cookies): requests.utils.add_dict_to_cookiejar(session.cookies, cookies) - retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, + retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) - session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) - if configProxy.enable: + session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout)) + if config_proxy.enable: session.verify = config.getInstance().cacert_file() - session.proxies = configProxy.proxies() + session.proxies = config_proxy.proxies() try: if isinstance(url, str) and len(url): result = session.get(str(url)) @@ -347,7 +350,7 @@ def translate( return trans_result -def load_cookies(cookie_json_filename: str): +def load_cookies(cookie_json_filename: str) -> typing.Tuple[typing.Optional[dict], typing.Optional[str]]: """ 加载cookie,用于以会员方式访问非游客内容 @@ -415,55 +418,16 @@ def is_japanese(raw: str) -> bool: return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE)) -# Usage: python ./ADC_function.py https://cn.bing.com/ -if __name__ == "__main__": - import sys, timeit - from http.client import HTTPConnection - - - def benchmark(t, url): - print(f"HTTP GET Benchmark times:{t} url:{url}") - tm = timeit.timeit(f"_ = session1.get('{url}')", - "from __main__ import get_html_session;session1=get_html_session()", - number=t) - print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable') - tm = timeit.timeit(f"_ = scraper1.get('{url}')", - "from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()", - number=t) - print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable') - tm = timeit.timeit(f"_ = browser1.open('{url}')", - "from __main__ import get_html_by_browser;browser1=get_html_by_browser()", - number=t) - print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable') - tm = timeit.timeit(f"_ = get_html('{url}')", - "from __main__ import get_html", - number=t) - print(f' *{tm:>10.5f}s get_html()') - - - t = 100 - - # url = "https://www.189.cn/" - url = "http://www.chinaunicom.com" - HTTPConnection.debuglevel = 1 - s = get_html_session() - _ = s.get(url) - HTTPConnection.debuglevel = 0 - if len(sys.argv) > 1: - url = sys.argv[1] - benchmark(t, url) - - def download_file_with_filename(url: str, filename: str, path: str) -> None: """ download file save to give path with given name from given url """ conf = config.getInstance() - configProxy = conf.proxy() + config_proxy = conf.proxy() - for i in range(configProxy.retry): + for i in range(config_proxy.retry): try: - if configProxy.enable: + if config_proxy.enable: if not os.path.exists(path): try: os.makedirs(path) @@ -491,18 +455,18 @@ def download_file_with_filename(url: str, filename: str, path: str) -> None: with open(os.path.join(path, filename), "wb") as code: code.write(r) return - except requests.exceptions.RequestException: - i += 1 - print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) - except requests.exceptions.ConnectionError: - i += 1 - print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) except requests.exceptions.ProxyError: i += 1 - print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) + print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry)) except requests.exceptions.ConnectTimeout: i += 1 - print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry)) + print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry)) + except requests.exceptions.ConnectionError: + i += 1 + print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry)) + except requests.exceptions.RequestException: + i += 1 + print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry)) except IOError: raise ValueError(f"[-]Create Directory '{path}' failed!") return @@ -518,7 +482,7 @@ def download_one_file(args) -> str: """ (url, save_path, json_headers) = args - if json_headers != None: + if json_headers is not None: filebytes = get_html(url, return_type='content', json_headers=json_headers['headers']) else: filebytes = get_html(url, return_type='content') @@ -574,10 +538,57 @@ def delete_all_elements_in_str(string_delete: str, string: str): """ for i in string: if i == string_delete: - string = string.replace(i,"") + string = string.replace(i, "") return string # print format空格填充对齐内容包含中文时的空格计算 -def cnspace(v: str, n: int) -> int: +def cn_space(v: str, n: int) -> int: return n - [category(c) for c in v].count('Lo') + + +""" +Usage: python ./ADC_function.py https://cn.bing.com/ +Purpose: benchmark get_html_session + benchmark get_html_by_scraper + benchmark get_html_by_browser + benchmark get_html +TODO: may be this should move to unittest directory +""" +if __name__ == "__main__": + import sys, timeit + from http.client import HTTPConnection + + + def benchmark(times: int, url): + print(f"HTTP GET Benchmark times:{times} url:{url}") + tm = timeit.timeit(f"_ = session1.get('{url}')", + "from __main__ import get_html_session;session1=get_html_session()", + number=times) + print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable') + tm = timeit.timeit(f"_ = scraper1.get('{url}')", + "from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()", + number=times) + print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable') + tm = timeit.timeit(f"_ = browser1.open('{url}')", + "from __main__ import get_html_by_browser;browser1=get_html_by_browser()", + number=times) + print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable') + tm = timeit.timeit(f"_ = get_html('{url}')", + "from __main__ import get_html", + number=times) + print(f' *{tm:>10.5f}s get_html()') + + + # target_url = "https://www.189.cn/" + target_url = "http://www.chinaunicom.com" + HTTPConnection.debuglevel = 1 + html_session = get_html_session() + _ = html_session.get(target_url) + HTTPConnection.debuglevel = 0 + + # times + t = 100 + if len(sys.argv) > 1: + target_url = sys.argv[1] + benchmark(t, target_url) diff --git a/config.py b/config.py index 745b33b..30efb81 100644 --- a/config.py +++ b/config.py @@ -563,8 +563,10 @@ class IniProxy(): self.proxytype = proxytype def proxies(self): - ''' 获得代理参数,默认http代理 - ''' + """ + 获得代理参数,默认http代理 + get proxy params, use http proxy for default + """ if self.address: if self.proxytype in self.SUPPORT_PROXY_TYPE: proxies = {"http": self.proxytype + "://" + self.address, diff --git a/core.py b/core.py index c8364ef..4a4b714 100644 --- a/core.py +++ b/core.py @@ -692,7 +692,7 @@ def debug_print(data: json): if i == 'extrafanart': print('[+] -', "%-19s" % i, ':', len(v), 'links') continue - print(f'[+] - {i:<{cnspace(i,19)}} : {v}') + print(f'[+] - {i:<{cn_space(i, 19)}} : {v}') print("[+] ------- DEBUG INFO -------") except: diff --git a/scraper.py b/scraper.py index 32228c1..9f3ed0c 100644 --- a/scraper.py +++ b/scraper.py @@ -1,12 +1,22 @@ +# build-in lib import json import secrets -import config -from lxml import etree from pathlib import Path -from ADC_function import delete_all_elements_in_list, delete_all_elements_in_str, file_modification_days, load_cookies, translate +# third party lib +from lxml import etree + +# project wide definitions +import config +from ADC_function import (translate, + load_cookies, + file_modification_days, + delete_all_elements_in_str, + delete_all_elements_in_list + ) from scrapinglib.api import search + def get_data_from_json(file_number, oCC, specified_source, specified_url): """ iterate through all services and fetch the data 从JSON返回元数据 @@ -21,11 +31,11 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url): # TODO 准备参数 # - 清理 ADC_function, webcrawler - proxies = None - configProxy = conf.proxy() - if configProxy.enable: - proxies = configProxy.proxies() - + proxies: dict = None + config_proxy = conf.proxy() + if config_proxy.enable: + proxies = config_proxy.proxies() + javdb_sites = conf.javdb_sites().split(',') for i in javdb_sites: javdb_sites[javdb_sites.index(i)] = "javdb" + i @@ -43,14 +53,16 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url): has_json = True break elif cdays != 9999: - print(f'[!]Cookies file {cookies_filepath} was updated {cdays} days ago, it will not be used for HTTP requests.') + print( + f'[!]Cookies file {cookies_filepath} was updated {cdays} days ago, it will not be used for HTTP requests.') if not has_json: + # get real random site from javdb_sites, because random is not really random when the seed value is known javdb_site = secrets.choice(javdb_sites) javdb_cookies = None - cacert =None + ca_cert = None if conf.cacert_file(): - cacert = conf.cacert_file() + ca_cert = conf.cacert_file() json_data = search(file_number, sources, proxies=proxies, verify=cacert, dbsite=javdb_site, dbcookies=javdb_cookies, @@ -181,18 +193,21 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url): if oCC: cc_vars = conf.cc_convert_vars().split(",") ccm = conf.cc_convert_mode() - def convert_list(mapping_data,language,vars): + + def convert_list(mapping_data, language, vars): total = [] for i in vars: if len(mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=f",{i},")) != 0: i = mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=f",{i},")[0] total.append(i) return total - def convert(mapping_data,language,vars): + + def convert(mapping_data, language, vars): if len(mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=vars)) != 0: return mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=vars)[0] else: raise IndexError('keyword not found') + for cc in cc_vars: if json_data[cc] == "" or len(json_data[cc]) == 0: continue @@ -239,7 +254,7 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url): except: pass - naming_rule="" + naming_rule = "" for i in conf.naming_rule().split("+"): if i not in json_data: naming_rule += i.strip("'").strip('"') @@ -254,17 +269,17 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url): def special_characters_replacement(text) -> str: if not isinstance(text, str): return text - return (text.replace('\\', '∖'). # U+2216 SET MINUS @ Basic Multilingual Plane - replace('/', '∕'). # U+2215 DIVISION SLASH @ Basic Multilingual Plane - replace(':', '꞉'). # U+A789 MODIFIER LETTER COLON @ Latin Extended-D - replace('*', '∗'). # U+2217 ASTERISK OPERATOR @ Basic Multilingual Plane - replace('?', '?'). # U+FF1F FULLWIDTH QUESTION MARK @ Basic Multilingual Plane - replace('"', '"'). # U+FF02 FULLWIDTH QUOTATION MARK @ Basic Multilingual Plane - replace('<', 'ᐸ'). # U+1438 CANADIAN SYLLABICS PA @ Basic Multilingual Plane - replace('>', 'ᐳ'). # U+1433 CANADIAN SYLLABICS PO @ Basic Multilingual Plane - replace('|', 'ǀ'). # U+01C0 LATIN LETTER DENTAL CLICK @ Basic Multilingual Plane - replace('‘', '‘'). # U+02018 LEFT SINGLE QUOTATION MARK - replace('’', '’'). # U+02019 RIGHT SINGLE QUOTATION MARK - replace('…','…'). - replace('&', '&') + return (text.replace('\\', '∖'). # U+2216 SET MINUS @ Basic Multilingual Plane + replace('/', '∕'). # U+2215 DIVISION SLASH @ Basic Multilingual Plane + replace(':', '꞉'). # U+A789 MODIFIER LETTER COLON @ Latin Extended-D + replace('*', '∗'). # U+2217 ASTERISK OPERATOR @ Basic Multilingual Plane + replace('?', '?'). # U+FF1F FULLWIDTH QUESTION MARK @ Basic Multilingual Plane + replace('"', '"'). # U+FF02 FULLWIDTH QUOTATION MARK @ Basic Multilingual Plane + replace('<', 'ᐸ'). # U+1438 CANADIAN SYLLABICS PA @ Basic Multilingual Plane + replace('>', 'ᐳ'). # U+1433 CANADIAN SYLLABICS PO @ Basic Multilingual Plane + replace('|', 'ǀ'). # U+01C0 LATIN LETTER DENTAL CLICK @ Basic Multilingual Plane + replace('‘', '‘'). # U+02018 LEFT SINGLE QUOTATION MARK + replace('’', '’'). # U+02019 RIGHT SINGLE QUOTATION MARK + replace('…', '…'). + replace('&', '&') ) diff --git a/scrapinglib/api.py b/scrapinglib/api.py index 3429107..8b175a1 100644 --- a/scrapinglib/api.py +++ b/scrapinglib/api.py @@ -46,11 +46,11 @@ def getSupportedSources(tag='adult'): return ','.join(sc.general_full_sources) -class Scraping(): +class Scraping: """ """ - adult_full_sources = ['javlibrary', 'javdb', 'javbus', 'airav', 'fanza', 'xcity', 'jav321', - 'mgstage', 'fc2', 'avsox', 'dlsite', 'carib', 'madou', 'mv91', + adult_full_sources = ['javlibrary', 'javdb', 'javbus', 'airav', 'fanza', 'xcity', 'jav321', + 'mgstage', 'fc2', 'avsox', 'dlsite', 'carib', 'madou', 'mv91', 'getchu', 'gcolle' ] adult_func_mapping = { @@ -72,7 +72,7 @@ class Scraping(): 'javlibrary': Javlibrary().scrape, } - general_full_sources = ['tmdb','imdb'] + general_full_sources = ['tmdb', 'imdb'] general_func_mapping = { 'tmdb': Tmdb().scrape, 'imdb': Imdb().scrape, @@ -199,7 +199,8 @@ class Scraping(): sources = self.adult_full_sources else: sources = c_sources.split(',') - def insert(sources,source): + + def insert(sources, source): if source in sources: sources.insert(0, sources.pop(sources.index(source))) return sources diff --git a/xlog.py b/xlog.py index 956a77b..a91423a 100755 --- a/xlog.py +++ b/xlog.py @@ -1,4 +1,3 @@ - import os import sys import time @@ -18,7 +17,8 @@ INFO = 20 DEBUG = 10 NOTSET = 0 -class Logger(): + +class Logger: def __init__(self, name, buffer_size=0, file_name=None, roll_num=1): self.err_color = '\033[0m' self.warn_color = '\033[0m' @@ -28,7 +28,7 @@ class Logger(): self.name = str(name) self.file_max_size = 1024 * 1024 self.buffer_lock = threading.Lock() - self.buffer = {} # id => line + self.buffer = {} # id => line self.buffer_size = buffer_size self.last_no = 0 self.min_level = NOTSET @@ -107,7 +107,7 @@ class Logger(): if not os.path.isfile(old_name): continue - #self.info("roll_log %s -> %s", old_name, new_name) + # self.info("roll_log %s -> %s", old_name, new_name) shutil.move(old_name, new_name) shutil.move(self.log_filename, self.log_filename + ".1") @@ -157,7 +157,8 @@ class Logger(): if buffer_len > self.buffer_size: del self.buffer[self.last_no - self.buffer_size] except Exception as e: - string = '%s - [%s]LOG_EXCEPT: %s, Except:%s
%s' % (time.ctime()[4:-5], level, fmt % args, e, traceback.format_exc()) + string = '%s - [%s]LOG_EXCEPT: %s, Except:%s
%s' % ( + time.ctime()[4:-5], level, fmt % args, e, traceback.format_exc()) self.last_no += 1 self.buffer[self.last_no] = string buffer_len = len(self.buffer) @@ -202,7 +203,7 @@ class Logger(): def tofile(self, fmt, *args, **kwargs): self.log_to_file('@', self.warn_color, fmt, *args, **kwargs) - #================================================================= + # ================================================================= def set_buffer_size(self, set_size): self.buffer_lock.acquire() self.buffer_size = set_size @@ -255,8 +256,10 @@ class Logger(): print(("Except stack:%s" % traceback.format_exc())) return "" + loggerDict = {} + def getLogger(name=None, buffer_size=0, file_name=None, roll_num=1): global loggerDict, default_log if name is None: @@ -279,29 +282,38 @@ def getLogger(name=None, buffer_size=0, file_name=None, roll_num=1): default_log = logger_instance return logger_instance + default_log = getLogger() + def debg(fmt, *args, **kwargs): default_log.debug(fmt, *args, **kwargs) + def info(fmt, *args, **kwargs): default_log.info(fmt, *args, **kwargs) + def warn(fmt, *args, **kwargs): default_log.warning(fmt, *args, **kwargs) + def erro(fmt, *args, **kwargs): default_log.error(fmt, *args, **kwargs) + def excp(fmt, *args, **kwargs): default_log.exception(fmt, *args, **kwargs) + def crit(fmt, *args, **kwargs): default_log.critical(fmt, *args, **kwargs) + def tofile(fmt, *args, **kwargs): default_log.tofile(fmt, *args, **kwargs) + if __name__ == '__main__': log_file = os.path.join(os.path.dirname(sys.argv[0]), "test.log") getLogger().set_file(log_file) @@ -313,7 +325,6 @@ if __name__ == '__main__': tofile("write to file only") try: - 1/0 + 1 / 0 except Exception as e: excp("An error has occurred") -