PEP8 space line pretty

PEP8 var name pretty
add __main__ comment
global and local var isolation
This commit is contained in:
naughtyGitCat
2022-09-16 17:18:17 +08:00
parent 8446489b68
commit daedd3071c
4 changed files with 143 additions and 118 deletions

View File

@@ -1,55 +1,57 @@
from os import replace
import requests
# import hashlib
from pathlib import Path
import secrets
# build-in lib
import os.path
import os
import re
import uuid
import json
import time
from lxml import etree
import re
import config
import typing
from urllib.parse import urljoin
import mechanicalsoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from cloudscraper import create_scraper
from concurrent.futures import ThreadPoolExecutor
from unicodedata import category
from concurrent.futures import ThreadPoolExecutor
# third party lib
import requests
from requests.adapters import HTTPAdapter
import mechanicalsoup
from pathlib import Path
from urllib3.util.retry import Retry
from lxml import etree
from cloudscraper import create_scraper
# project wide
import config
def getXpathSingle(htmlcode, xpath):
html = etree.fromstring(htmlcode, etree.HTMLParser())
def get_xpath_single(html_code: str, xpath):
html = etree.fromstring(html_code, etree.HTMLParser())
result1 = str(html.xpath(xpath)).strip(" ['']")
return result1
G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.133 Safari/537.36'
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers = None):
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None, json_headers=None):
"""
网页请求核心函数
"""
verify = config.getInstance().cacert_file()
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
errors = ""
headers = {"User-Agent": ua or G_USER_AGENT} # noqa
if json_headers != None:
if json_headers is not None:
headers.update(json_headers)
for i in range(configProxy.retry):
for i in range(config_proxy.retry):
try:
if configProxy.enable:
proxies = configProxy.proxies()
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, proxies=proxies,
if config_proxy.enable:
proxies = config_proxy.proxies()
result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, proxies=proxies,
verify=verify,
cookies=cookies)
else:
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies)
result = requests.get(str(url), headers=headers, timeout=config_proxy.timeout, cookies=cookies)
if return_type == "object":
return result
@@ -59,7 +61,7 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None,
result.encoding = encoding or result.apparent_encoding
return result.text
except Exception as e:
print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry))
print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry))
errors = str(e)
if "getaddrinfo failed" in errors:
print("[-]Connect Failed! Please Check your proxy config")
@@ -71,8 +73,9 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None,
print('[-]Connect Failed! Please check your Proxy or Network!')
raise Exception('Connect Failed')
def post_html(url: str, query: dict, headers: dict = None) -> requests.Response:
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
errors = ""
headers_ua = {"User-Agent": G_USER_AGENT}
if headers is None:
@@ -80,16 +83,16 @@ def post_html(url: str, query: dict, headers: dict = None) -> requests.Response:
else:
headers.update(headers_ua)
for i in range(configProxy.retry):
for i in range(config_proxy.retry):
try:
if configProxy.enable:
proxies = configProxy.proxies()
result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=configProxy.timeout)
if config_proxy.enable:
proxies = config_proxy.proxies()
result = requests.post(url, data=query, proxies=proxies, headers=headers, timeout=config_proxy.timeout)
else:
result = requests.post(url, data=query, headers=headers, timeout=configProxy.timeout)
result = requests.post(url, data=query, headers=headers, timeout=config_proxy.timeout)
return result
except Exception as e:
print("[-]Connect retry {}/{}".format(i + 1, configProxy.retry))
print("[-]Connect retry {}/{}".format(i + 1, config_proxy.retry))
errors = str(e)
print("[-]Connect Failed! Please check your Proxy or Network!")
print("[-]" + errors)
@@ -116,17 +119,17 @@ class TimeoutHTTPAdapter(HTTPAdapter):
# with keep-alive feature
def get_html_session(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
session = requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
session.verify = config.getInstance().cacert_file()
session.proxies = configProxy.proxies()
session.proxies = config_proxy.proxies()
headers = {"User-Agent": ua or G_USER_AGENT}
session.headers = headers
try:
@@ -156,17 +159,17 @@ def get_html_session(url: str = None, cookies: dict = None, ua: str = None, retu
def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None, use_scraper: bool = False):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
s = create_scraper(browser={'custom': ua or G_USER_AGENT, }) if use_scraper else requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
s.verify = config.getInstance().cacert_file()
s.proxies = configProxy.proxies()
s.proxies = config_proxy.proxies()
try:
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
if isinstance(url, str) and len(url):
@@ -194,17 +197,17 @@ def get_html_by_browser(url: str = None, cookies: dict = None, ua: str = None, r
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None,
return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
s = requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
s.verify = config.getInstance().cacert_file()
s.proxies = configProxy.proxies()
s.proxies = config_proxy.proxies()
try:
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
result = browser.open(url)
@@ -234,17 +237,17 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies:
def get_html_by_scraper(url: str = None, cookies: dict = None, ua: str = None, return_type: str = None,
encoding: str = None):
configProxy = config.getInstance().proxy()
config_proxy = config.getInstance().proxy()
session = create_scraper(browser={'custom': ua or G_USER_AGENT, })
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1,
retries = Retry(total=config_proxy.retry, connect=config_proxy.retry, backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=config_proxy.timeout))
if config_proxy.enable:
session.verify = config.getInstance().cacert_file()
session.proxies = configProxy.proxies()
session.proxies = config_proxy.proxies()
try:
if isinstance(url, str) and len(url):
result = session.get(str(url))
@@ -415,55 +418,16 @@ def is_japanese(raw: str) -> bool:
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', raw, re.UNICODE))
# Usage: python ./ADC_function.py https://cn.bing.com/
if __name__ == "__main__":
import sys, timeit
from http.client import HTTPConnection
def benchmark(t, url):
print(f"HTTP GET Benchmark times:{t} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()",
number=t)
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
tm = timeit.timeit(f"_ = scraper1.get('{url}')",
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
number=t)
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=t)
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html",
number=t)
print(f' *{tm:>10.5f}s get_html()')
t = 100
# url = "https://www.189.cn/"
url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1
s = get_html_session()
_ = s.get(url)
HTTPConnection.debuglevel = 0
if len(sys.argv) > 1:
url = sys.argv[1]
benchmark(t, url)
def download_file_with_filename(url: str, filename: str, path: str) -> None:
"""
download file save to give path with given name from given url
"""
conf = config.getInstance()
configProxy = conf.proxy()
config_proxy = conf.proxy()
for i in range(configProxy.retry):
for i in range(config_proxy.retry):
try:
if configProxy.enable:
if config_proxy.enable:
if not os.path.exists(path):
try:
os.makedirs(path)
@@ -491,18 +455,18 @@ def download_file_with_filename(url: str, filename: str, path: str) -> None:
with open(os.path.join(path, filename), "wb") as code:
code.write(r)
return
except requests.exceptions.RequestException:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
except requests.exceptions.ConnectionError:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
except requests.exceptions.ProxyError:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except requests.exceptions.ConnectTimeout:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(configProxy.retry))
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except requests.exceptions.ConnectionError:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except requests.exceptions.RequestException:
i += 1
print('[-]Download : Connect retry ' + str(i) + '/' + str(config_proxy.retry))
except IOError:
raise ValueError(f"[-]Create Directory '{path}' failed!")
return
@@ -518,7 +482,7 @@ def download_one_file(args) -> str:
"""
(url, save_path, json_headers) = args
if json_headers != None:
if json_headers is not None:
filebytes = get_html(url, return_type='content', json_headers=json_headers['headers'])
else:
filebytes = get_html(url, return_type='content')
@@ -574,10 +538,57 @@ def delete_all_elements_in_str(string_delete: str, string: str):
"""
for i in string:
if i == string_delete:
string = string.replace(i,"")
string = string.replace(i, "")
return string
# print format空格填充对齐内容包含中文时的空格计算
def cnspace(v: str, n: int) -> int:
def cn_space(v: str, n: int) -> int:
return n - [category(c) for c in v].count('Lo')
"""
Usage: python ./ADC_function.py https://cn.bing.com/
Purpose: benchmark get_html_session
benchmark get_html_by_scraper
benchmark get_html_by_browser
benchmark get_html
TODO: may be this should move to unittest directory
"""
if __name__ == "__main__":
import sys, timeit
from http.client import HTTPConnection
def benchmark(times: int, url):
print(f"HTTP GET Benchmark times:{times} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()",
number=times)
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
tm = timeit.timeit(f"_ = scraper1.get('{url}')",
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
number=times)
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=times)
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html",
number=times)
print(f' *{tm:>10.5f}s get_html()')
# target_url = "https://www.189.cn/"
target_url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1
html_session = get_html_session()
_ = html_session.get(target_url)
HTTPConnection.debuglevel = 0
# times
t = 100
if len(sys.argv) > 1:
target_url = sys.argv[1]
benchmark(t, target_url)