Merge pull request #876 from naughtyGitCat/master

formatting code under PEP8 and some basic guidelines
This commit is contained in:
Yoshiko2
2022-09-27 22:56:37 +08:00
committed by GitHub
6 changed files with 193 additions and 153 deletions

View File

@@ -1,55 +1,57 @@
from os import replace
import requests
# import hashlib
from pathlib import Path
import secrets
# build-in lib
import os.path
import os
import re
import uuid
import json
import time
from lxml import etree
import re
import config
import typing
from urllib.parse import urljoin
import mechanicalsoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from cloudscraper import create_scraper
from concurrent.futures import ThreadPoolExecutor
from unicodedata import category
from concurrent.futures import ThreadPoolExecutor
# third party lib
import requests
from requests.adapters import HTTPAdapter
import mechanicalsoup
from pathlib import Path
from urllib3.util.retry import Retry
from lxml import etree
from cloudscraper import create_scraper
# project wide
import config
def getXpathSingle(htmlcode, xpath):
html = etree.fromstring(htmlcode, etree.HTMLParser())
def get_xpath_single(html_code: str, xpath):
html = etree.fromstring(html_code, etree.HTMLParser())
result1 = str(html.xpath(xpath)).strip(" ['']")
return result1
G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.133 Safari/537.36'
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers = None):
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers=None):
"""
网页请求核心函数
"""
verify = config.getInstance().cacert_file()
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
errors = ""
headers = {"User-Agent": ua or G_USER_AGENT} # noqa
if json_headers != None:
if json_headers is not None:
headers.update(json_headers)
for i in range(configProxy.retry):
for i in range(config_proxy.retry):
try:
if configProxy.enable:
proxies = configProxy.proxies()
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies,
if config_proxy.enable:
proxies = config_proxy.proxies()
result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, proxies=proxies,
verify=verify,
cookies=cookies)
else:
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies)
result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, cookies=cookies)
if return_type == "object":
return result
@@ -59,7 +61,7 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None,
result.encoding = encoding or result.apparent_encoding
return result.text
except Exception as e:
print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry))
print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry))
errors = str(e)
if "getaddrinfo failed" in errors:
print("[-]Connect Failed! Please Check your proxy config")
@@ -71,8 +73,9 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None,
print('[-]Connect Failed! Please check your Proxy or Network!')
raise Exception('Connect Failed')
def post_html(url: str, query: dict, headers: dict = None) -> requests.Response:
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
errors = ""
headers_ua = {"User-Agent": G_USER_AGENT}
if headers is None:
@@ -80,16 +83,16 @@ def post_html(url: str, query: dict, headers: dict = None) -> requests.Response:
else:
headers.update(headers_ua)
for i in range(configProxy.retry):
for i in range(config_proxy.retry):
try:
if configProxy.enable:
proxies = configProxy.proxies()
result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=configProxy.timeout)
if config_proxy.enable:
proxies = config_proxy.proxies()
result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=config_proxy.timeout)
else:
result = requests.post(url, data=query, headers=headers, timeout=configProxy.timeout)
result = requests.post(url, data=query, headers=headers, timeout=config_proxy.timeout)
return result
except Exception as e:
print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry))
print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry))
errors = str(e)
print("[-]Connect Failed! Please check your Proxy or Network!")
print("[-]" + errors)
@@ -116,17 +119,17 @@ class TimeoutHTTPAdapter(HTTPAdapter):
# with keep-alive feature
def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
session = requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
session.verify = config.getInstance().cacert_file()
session.proxies = configProxy.proxies()
session.proxies = config_proxy.proxies()
headers = {"User-Agent": ua or G_USER_AGENT}
session.headers = headers
try:
@@ -156,17 +159,17 @@ def get_html_session(url: str = None, cookies: dict = None, ua: str = None, retu
def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None, use_scraper: bool = False):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
s.verify = config.getInstance().cacert_file()
s.proxies = configProxy.proxies()
s.proxies = config_proxy.proxies()
try:
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
if isinstance(url, str) and len(url):
@@ -194,17 +197,17 @@ def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, r
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None,
return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
s = requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
s.verify = config.getInstance().cacert_file()
s.proxies = configProxy.proxies()
s.proxies = config_proxy.proxies()
try:
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
result = browser.open(url)
@@ -234,17 +237,17 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies:
def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
session = create_scraper(browser={'custom': ua or G_USER_AGENT, })
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
session.verify = config.getInstance().cacert_file()
session.proxies = configProxy.proxies()
session.proxies = config_proxy.proxies()
try:
if isinstance(url, str) and len(url):
result = session.get(str(url))
@@ -347,7 +350,7 @@ def translate(
return trans_result
def load_cookies(cookie_json_filename: str):
def load_cookies(cookie_json_filename: str) -> typing.Tuple[typing.Optional[dict], typing.Optional[str]]:
"""
加载cookie,用于以会员方式访问非游客内容
@@ -415,55 +418,16 @@ def is_japanese(raw: str) -> bool:
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE))
# Usage: python ./ADC_function.py https://cn.bing.com/
if __name__ == "__main__":
import sys, timeit
from http.client import HTTPConnection
def benchmark(t, url):
print(f"HTTP GET Benchmark times:{t} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()",
number=t)
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
tm = timeit.timeit(f"_ = scraper1.get('{url}')",
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
number=t)
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=t)
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html",
number=t)
print(f' *{tm:>10.5f}s get_html()')
t = 100
# url = "https://www.189.cn/"
url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1
s = get_html_session()
_ = s.get(url)
HTTPConnection.debuglevel = 0
if len(sys.argv) > 1:
url = sys.argv[1]
benchmark(t, url)
def download_file_with_filename(url: str, filename: str, path: str) -> None:
"""
download file save to give path with given name from given url
"""
conf = config.getInstance()
configProxy = conf.proxy()
config_proxy = conf.proxy()
for i in range(configProxy.retry):
for i in range(config_proxy.retry):
try:
if configProxy.enable:
if config_proxy.enable:
if not os.path.exists(path):
try:
os.makedirs(path)
@@ -491,18 +455,18 @@ def download_file_with_filename(url: str, filename: str, path: str) -> None:
with open(os.path.join(path, filename), "wb") as code:
code.write(r)
return
except requests.exceptions.RequestException:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
except requests.exceptions.ConnectionError:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
except requests.exceptions.ProxyError:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except requests.exceptions.ConnectTimeout:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except requests.exceptions.ConnectionError:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except requests.exceptions.RequestException:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except IOError:
raise ValueError(f"[-]Create Directory '{path}' failed!")
return
@@ -518,7 +482,7 @@ def download_one_file(args) -> str:
"""
(url, save_path, json_headers) = args
if json_headers != None:
if json_headers is not None:
filebytes = get_html(url, return_type='content', json_headers=json_headers['headers'])
else:
filebytes = get_html(url, return_type='content')
@@ -574,10 +538,57 @@ def delete_all_elements_in_str(string_delete: str, string: str):
"""
for i in string:
if i == string_delete:
string = string.replace(i,"")
string = string.replace(i, "")
return string
# print format空格填充对齐内容包含中文时的空格计算
def cnspace(v: str, n: int) -> int:
def cn_space(v: str, n: int) -> int:
return n - [category(c) for c in v].count('Lo')
"""
Usage: python ./ADC_function.py https://cn.bing.com/
Purpose: benchmark get_html_session
benchmark get_html_by_scraper
benchmark get_html_by_browser
benchmark get_html
TODO: may be this should move to unittest directory
"""
if __name__ == "__main__":
import sys, timeit
from http.client import HTTPConnection
def benchmark(times: int, url):
print(f"HTTP GET Benchmark times:{times} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()",
number=times)
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
tm = timeit.timeit(f"_ = scraper1.get('{url}')",
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
number=times)
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=times)
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html",
number=times)
print(f' *{tm:>10.5f}s get_html()')
# target_url = "https://www.189.cn/"
target_url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1
html_session = get_html_session()
_ = html_session.get(target_url)
HTTPConnection.debuglevel = 0
# times
t = 100
if len(sys.argv) > 1:
target_url = sys.argv[1]
benchmark(t, target_url)

View File

@@ -563,8 +563,10 @@ class IniProxy():
self.proxytype = proxytype
def proxies(self):
''' 获得代理参数默认http代理
'''
"""
获得代理参数默认http代理
get proxy params, use http proxy for default
"""
if self.address:
if self.proxytype in self.SUPPORT_PROXY_TYPE:
proxies = {"http": self.proxytype + "://" + self.address,

View File

@@ -692,7 +692,7 @@ def debug_print(data: json):
if i == 'extrafanart':
print('[+] -', "%-19s" % i, ':', len(v), 'links')
continue
print(f'[+] - {i:<{cnspace(i,19)}} : {v}')
print(f'[+] - {i:<{cn_space(i, 19)}} : {v}')
print("[+] ------- DEBUG INFO -------")
except:

View File

@@ -1,12 +1,22 @@
# build-in lib
import json
import secrets
import config
from lxml import etree
from pathlib import Path
from ADC_function import delete_all_elements_in_list, delete_all_elements_in_str, file_modification_days, load_cookies, translate
# third party lib
from lxml import etree
# project wide definitions
import config
from ADC_function import (translate,
load_cookies,
file_modification_days,
delete_all_elements_in_str,
delete_all_elements_in_list
)
from scrapinglib.api import search
def get_data_from_json(file_number, oCC, specified_source, specified_url):
"""
iterate through all services and fetch the data 从JSON返回元数据
@@ -21,11 +31,11 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url):
# TODO 准备参数
# - 清理 ADC_function, webcrawler
proxies = None
configProxy = conf.proxy()
if configProxy.enable:
proxies = configProxy.proxies()
proxies: dict = None
config_proxy = conf.proxy()
if config_proxy.enable:
proxies = config_proxy.proxies()
javdb_sites = conf.javdb_sites().split(',')
for i in javdb_sites:
javdb_sites[javdb_sites.index(i)] = "javdb" + i
@@ -43,14 +53,16 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url):
has_json = True
break
elif cdays != 9999:
print(f'[!]Cookies file {cookies_filepath} was updated {cdays} days ago, it will not be used for HTTP requests.')
print(
f'[!]Cookies file {cookies_filepath} was updated {cdays} days ago, it will not be used for HTTP requests.')
if not has_json:
# get real random site from javdb_sites, because random is not really random when the seed value is known
javdb_site = secrets.choice(javdb_sites)
javdb_cookies = None
cacert =None
ca_cert = None
if conf.cacert_file():
cacert = conf.cacert_file()
ca_cert = conf.cacert_file()
json_data = search(file_number, sources, proxies=proxies, verify=cacert,
dbsite=javdb_site, dbcookies=javdb_cookies,
@@ -181,18 +193,21 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url):
if oCC:
cc_vars = conf.cc_convert_vars().split(",")
ccm = conf.cc_convert_mode()
def convert_list(mapping_data,language,vars):
def convert_list(mapping_data, language, vars):
total = []
for i in vars:
if len(mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=f",{i},")) != 0:
i = mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=f",{i},")[0]
total.append(i)
return total
def convert(mapping_data,language,vars):
def convert(mapping_data, language, vars):
if len(mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=vars)) != 0:
return mapping_data.xpath('a[contains(@keyword, $name)]/@' + language, name=vars)[0]
else:
raise IndexError('keyword not found')
for cc in cc_vars:
if json_data[cc] == "" or len(json_data[cc]) == 0:
continue
@@ -239,7 +254,7 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url):
except:
pass
naming_rule=""
naming_rule = ""
for i in conf.naming_rule().split("+"):
if i not in json_data:
naming_rule += i.strip("'").strip('"')
@@ -254,17 +269,17 @@ def get_data_from_json(file_number, oCC, specified_source, specified_url):
def special_characters_replacement(text) -> str:
if not isinstance(text, str):
return text
return (text.replace('\\', ''). # U+2216 SET MINUS @ Basic Multilingual Plane
replace('/', ''). # U+2215 DIVISION SLASH @ Basic Multilingual Plane
replace(':', ''). # U+A789 MODIFIER LETTER COLON @ Latin Extended-D
replace('*', ''). # U+2217 ASTERISK OPERATOR @ Basic Multilingual Plane
replace('?', ''). # U+FF1F FULLWIDTH QUESTION MARK @ Basic Multilingual Plane
replace('"', ''). # U+FF02 FULLWIDTH QUOTATION MARK @ Basic Multilingual Plane
replace('<', ''). # U+1438 CANADIAN SYLLABICS PA @ Basic Multilingual Plane
replace('>', ''). # U+1433 CANADIAN SYLLABICS PO @ Basic Multilingual Plane
replace('|', 'ǀ'). # U+01C0 LATIN LETTER DENTAL CLICK @ Basic Multilingual Plane
replace('&lsquo;', ''). # U+02018 LEFT SINGLE QUOTATION MARK
replace('&rsquo;', ''). # U+02019 RIGHT SINGLE QUOTATION MARK
replace('&hellip;','').
replace('&amp;', '')
return (text.replace('\\', ''). # U+2216 SET MINUS @ Basic Multilingual Plane
replace('/', ''). # U+2215 DIVISION SLASH @ Basic Multilingual Plane
replace(':', ''). # U+A789 MODIFIER LETTER COLON @ Latin Extended-D
replace('*', ''). # U+2217 ASTERISK OPERATOR @ Basic Multilingual Plane
replace('?', ''). # U+FF1F FULLWIDTH QUESTION MARK @ Basic Multilingual Plane
replace('"', ''). # U+FF02 FULLWIDTH QUOTATION MARK @ Basic Multilingual Plane
replace('<', ''). # U+1438 CANADIAN SYLLABICS PA @ Basic Multilingual Plane
replace('>', ''). # U+1433 CANADIAN SYLLABICS PO @ Basic Multilingual Plane
replace('|', 'ǀ'). # U+01C0 LATIN LETTER DENTAL CLICK @ Basic Multilingual Plane
replace('&lsquo;', ''). # U+02018 LEFT SINGLE QUOTATION MARK
replace('&rsquo;', ''). # U+02019 RIGHT SINGLE QUOTATION MARK
replace('&hellip;', '').
replace('&amp;', '')
)

View File

@@ -46,11 +46,11 @@ def getSupportedSources(tag='adult'):
return ','.join(sc.general_full_sources)
class Scraping():
class Scraping:
"""
"""
adult_full_sources = ['javlibrary', 'javdb', 'javbus', 'airav', 'fanza', 'xcity', 'jav321',
'mgstage', 'fc2', 'avsox', 'dlsite', 'carib', 'madou', 'mv91',
adult_full_sources = ['javlibrary', 'javdb', 'javbus', 'airav', 'fanza', 'xcity', 'jav321',
'mgstage', 'fc2', 'avsox', 'dlsite', 'carib', 'madou', 'mv91',
'getchu', 'gcolle'
]
adult_func_mapping = {
@@ -72,7 +72,7 @@ class Scraping():
'javlibrary': Javlibrary().scrape,
}
general_full_sources = ['tmdb','imdb']
general_full_sources = ['tmdb', 'imdb']
general_func_mapping = {
'tmdb': Tmdb().scrape,
'imdb': Imdb().scrape,
@@ -199,7 +199,8 @@ class Scraping():
sources = self.adult_full_sources
else:
sources = c_sources.split(',')
def insert(sources,source):
def insert(sources, source):
if source in sources:
sources.insert(0, sources.pop(sources.index(source)))
return sources

27
xlog.py
View File

@@ -1,4 +1,3 @@
import os
import sys
import time
@@ -18,7 +17,8 @@ INFO = 20
DEBUG = 10
NOTSET = 0
class Logger():
class Logger:
def __init__(self, name, buffer_size=0, file_name=None, roll_num=1):
self.err_color = '\033[0m'
self.warn_color = '\033[0m'
@@ -28,7 +28,7 @@ class Logger():
self.name = str(name)
self.file_max_size = 1024 * 1024
self.buffer_lock = threading.Lock()
self.buffer = {} # id => line
self.buffer = {} # id => line
self.buffer_size = buffer_size
self.last_no = 0
self.min_level = NOTSET
@@ -107,7 +107,7 @@ class Logger():
if not os.path.isfile(old_name):
continue
#self.info("roll_log %s -> %s", old_name, new_name)
# self.info("roll_log %s -> %s", old_name, new_name)
shutil.move(old_name, new_name)
shutil.move(self.log_filename, self.log_filename + ".1")
@@ -157,7 +157,8 @@ class Logger():
if buffer_len > self.buffer_size:
del self.buffer[self.last_no - self.buffer_size]
except Exception as e:
string = '%s - [%s]LOG_EXCEPT: %s, Except:%s<br> %s' % (time.ctime()[4:-5], level, fmt % args, e, traceback.format_exc())
string = '%s - [%s]LOG_EXCEPT: %s, Except:%s<br> %s' % (
time.ctime()[4:-5], level, fmt % args, e, traceback.format_exc())
self.last_no += 1
self.buffer[self.last_no] = string
buffer_len = len(self.buffer)
@@ -202,7 +203,7 @@ class Logger():
def tofile(self, fmt, *args, **kwargs):
self.log_to_file('@', self.warn_color, fmt, *args, **kwargs)
#=================================================================
# =================================================================
def set_buffer_size(self, set_size):
self.buffer_lock.acquire()
self.buffer_size = set_size
@@ -255,8 +256,10 @@ class Logger():
print(("Except stack:%s" % traceback.format_exc()))
return ""
loggerDict = {}
def getLogger(name=None, buffer_size=0, file_name=None, roll_num=1):
global loggerDict, default_log
if name is None:
@@ -279,29 +282,38 @@ def getLogger(name=None, buffer_size=0, file_name=None, roll_num=1):
default_log = logger_instance
return logger_instance
default_log = getLogger()
def debg(fmt, *args, **kwargs):
default_log.debug(fmt, *args, **kwargs)
def info(fmt, *args, **kwargs):
default_log.info(fmt, *args, **kwargs)
def warn(fmt, *args, **kwargs):
default_log.warning(fmt, *args, **kwargs)
def erro(fmt, *args, **kwargs):
default_log.error(fmt, *args, **kwargs)
def excp(fmt, *args, **kwargs):
default_log.exception(fmt, *args, **kwargs)
def crit(fmt, *args, **kwargs):
default_log.critical(fmt, *args, **kwargs)
def tofile(fmt, *args, **kwargs):
default_log.tofile(fmt, *args, **kwargs)
if __name__ == '__main__':
log_file = os.path.join(os.path.dirname(sys.argv[0]), "test.log")
getLogger().set_file(log_file)
@@ -313,7 +325,6 @@ if __name__ == '__main__':
tofile("write to file only")
try:
1/0
1 / 0
except Exception as e:
excp("An error has occurred")