deal with websites behind Clo*dfl**e

This commit is contained in:
lededev
2021-11-02 03:51:31 +08:00
parent 3786f58bb6
commit e564629f16
4 changed files with 99 additions and 27 deletions

View File

@@ -14,6 +14,7 @@ from urllib.parse import urljoin
import mechanicalsoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from cloudscraper import create_scraper
def getXpathSingle(htmlcode, xpath):
@@ -25,7 +26,7 @@ def getXpathSingle(htmlcode, xpath):
G_USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36'
# 网页请求核心
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None):
def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None):
verify = config.getInstance().cacert_file()
configProxy = config.getInstance().proxy()
errors = ""
@@ -41,13 +42,12 @@ def get_html(url, cookies: dict = None, ua: str = None, return_type: str = None)
else:
result = requests.get(str(url), headers=headers, timeout=configProxy.timeout, cookies=cookies)
result.encoding = "utf-8"
if return_type == "object":
return result
elif return_type == "content":
return result.content
else:
result.encoding = encoding or "utf-8"
return result.text
except requests.exceptions.ProxyError:
print("[-]Proxy error! Please check your Proxy")
@@ -100,7 +100,7 @@ class TimeoutHTTPAdapter(HTTPAdapter):
# with keep-alive feature
def get_html_session(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None):
def get_html_session(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy()
session = requests.Session()
if isinstance(cookies, dict) and len(cookies):
@@ -127,7 +127,7 @@ def get_html_session(url:str = None, cookies: dict = None, ua: str = None, retur
elif return_type == "session":
return result, session
else:
result.encoding = "utf-8"
result.encoding = encoding or "utf-8"
return result.text
except requests.exceptions.ProxyError:
print("[-]get_html_session() Proxy error! Please check your Proxy")
@@ -136,7 +136,7 @@ def get_html_session(url:str = None, cookies: dict = None, ua: str = None, retur
return None
def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None):
def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy()
s = requests.Session()
if isinstance(cookies, dict) and len(cookies):
@@ -155,7 +155,7 @@ def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, re
return browser
if not result.ok:
return None
result.encoding = "utf-8"
if return_type == "object":
return result
elif return_type == "content":
@@ -163,6 +163,7 @@ def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, re
elif return_type == "browser":
return result, browser
else:
result.encoding = encoding or "utf-8"
return result.text
except requests.exceptions.ProxyError:
print("[-]get_html_by_browser() Proxy error! Please check your Proxy")
@@ -170,7 +171,8 @@ def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, re
print(f'[-]get_html_by_browser() Failed! {e}')
return None
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None):
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy()
s = requests.Session()
if isinstance(cookies, dict) and len(cookies):
@@ -191,7 +193,7 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies:
for k, v in fields.items():
browser[k] = v
response = browser.submit_selected()
response.encoding = "utf-8"
if return_type == "object":
return response
elif return_type == "content":
@@ -199,6 +201,7 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies:
elif return_type == "browser":
return response, browser
else:
result.encoding = encoding or "utf-8"
return response.text
except requests.exceptions.ProxyError:
print("[-]get_html_by_form() Proxy error! Please check your Proxy")
@@ -207,6 +210,40 @@ def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies:
return None
def get_html_by_scraper(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None, encoding: str = None):
configProxy = config.getInstance().proxy()
session = create_scraper(browser={'custom': ua or G_USER_AGENT,})
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
session.verify = config.getInstance().cacert_file()
session.proxies = configProxy.proxies()
try:
if isinstance(url, str) and len(url):
result = session.get(str(url))
else: # 空url参数直接返回可重用scraper对象无需设置return_type
return session
if not result.ok:
return None
if return_type == "object":
return result
elif return_type == "content":
return result.content
elif return_type == "scraper":
return result, session
else:
result.encoding = encoding or "utf-8"
return result.text
except requests.exceptions.ProxyError:
print("[-]get_html_session() Proxy error! Please check your Proxy")
except Exception as e:
print(f"[-]get_html_session() failed. {e}")
return None
# def get_javlib_cookie() -> [dict, str]:
# import cloudscraper
# switch, proxy, timeout, retry_count, proxytype = config.getInstance().proxy()
@@ -701,31 +738,35 @@ def is_japanese(s) -> bool:
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', s, re.UNICODE))
# Usage: python ./ADC_function.py https://cn.bing.com/
if __name__ == "__main__":
import sys, timeit
from http.client import HTTPConnection
s = get_html_session()
def benchmark(t, url):
print(f"HTTP GET Benchmark times:{t} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()",
number=t)
print(f'===={tm:2.5f}s get_html_session() Keep-Alive enable====')
print(f' *{tm:>10.5f}s get_html_session() Keep-Alive enable')
tm = timeit.timeit(f"_ = scraper1.get('{url}')",
"from __main__ import get_html_by_scraper;scraper1=get_html_by_scraper()",
number=t)
print(f' *{tm:>10.5f}s get_html_by_scraper() Keep-Alive enable')
tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=t)
print(f'===={tm:2.5f}s get_html_by_browser() Keep-Alive enable====')
print(f' *{tm:>10.5f}s get_html_by_browser() Keep-Alive enable')
tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html",
number=t)
print(f'===={tm:2.5f}s get_html() ====')
print(f' *{tm:>10.5f}s get_html()')
t = 100
#url = "https://www.189.cn/"
url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1
s = get_html_session()
_ = s.get(url)
HTTPConnection.debuglevel = 0
# Usage: python ./ADC_function.py https://cn.bing.com/
if len(sys.argv)>1:
url = sys.argv[1]
benchmark(t, url)