replace browser by session in some places

This commit is contained in:
lededev
2021-11-01 03:49:35 +08:00
parent 0fe1b2fcac
commit 3b498d32ca
6 changed files with 215 additions and 122 deletions

View File

@@ -98,59 +98,113 @@ class TimeoutHTTPAdapter(HTTPAdapter):
kwargs["timeout"] = self.timeout kwargs["timeout"] = self.timeout
return super().send(request, **kwargs) return super().send(request, **kwargs)
def get_html_by_browser(url, cookies: dict = None, ua: str = None, return_type: str = None):
# with keep-alive feature
def get_html_session(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None):
configProxy = config.getInstance().proxy()
session = requests.Session()
if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(session.cookies, cookies)
retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
session.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable:
session.verify = config.getInstance().cacert_file()
session.proxies = configProxy.proxies()
headers = {"User-Agent": ua or G_USER_AGENT}
session.headers = headers
try:
if isinstance(url, str) and len(url):
result = session.get(str(url))
else: # 空url参数直接返回可重用session对象无需设置return_type
return session
if not result.ok:
return None
if return_type == "object":
return result
elif return_type == "content":
return result.content
elif return_type == "session":
return result, session
else:
result.encoding = "utf-8"
return result.text
except requests.exceptions.ProxyError:
print("[-]get_html_session() Proxy error! Please check your Proxy")
except Exception as e:
print(f"[-]get_html_session() failed. {e}")
return None
def get_html_by_browser(url:str = None, cookies: dict = None, ua: str = None, return_type: str = None):
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
s = requests.Session() s = requests.Session()
if isinstance(cookies, dict) and len(cookies): if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies) requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable: if configProxy.enable:
s.verify = config.getInstance().cacert_file()
s.proxies = configProxy.proxies() s.proxies = configProxy.proxies()
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s) try:
result = browser.open(url) browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
if not result.ok: if isinstance(url, str) and len(url):
return '' result = browser.open(url)
result.encoding = "utf-8" else:
if return_type == "object": return browser
return result if not result.ok:
elif return_type == "content": return None
return result.content result.encoding = "utf-8"
elif return_type == "browser": if return_type == "object":
return result, browser return result
else: elif return_type == "content":
return result.text return result.content
elif return_type == "browser":
return result, browser
else:
return result.text
except requests.exceptions.ProxyError:
print("[-]get_html_by_browser() Proxy error! Please check your Proxy")
except Exception as e:
print(f'[-]get_html_by_browser() Failed! {e}')
return None
def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None): def get_html_by_form(url, form_select: str = None, fields: dict = None, cookies: dict = None, ua: str = None, return_type: str = None):
configProxy = config.getInstance().proxy() configProxy = config.getInstance().proxy()
s = requests.Session() s = requests.Session()
if isinstance(cookies, dict) and len(cookies): if isinstance(cookies, dict) and len(cookies):
requests.utils.add_dict_to_cookiejar(s.cookies, cookies) requests.utils.add_dict_to_cookiejar(s.cookies, cookies)
retries = Retry(connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) retries = Retry(total=configProxy.retry, connect=configProxy.retry, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("https://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout)) s.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=configProxy.timeout))
if configProxy.enable: if configProxy.enable:
s.verify = config.getInstance().cacert_file()
s.proxies = configProxy.proxies() s.proxies = configProxy.proxies()
browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s) try:
result = browser.open(url) browser = mechanicalsoup.StatefulBrowser(user_agent=ua or G_USER_AGENT, session=s)
if not result.ok: result = browser.open(url)
return '' if not result.ok:
form = browser.select_form() if form_select is None else browser.select_form(form_select) return None
if isinstance(fields, dict): form = browser.select_form() if form_select is None else browser.select_form(form_select)
for k, v in fields.items(): if isinstance(fields, dict):
browser[k] = v for k, v in fields.items():
response = browser.submit_selected() browser[k] = v
response.encoding = "utf-8" response = browser.submit_selected()
if return_type == "object": response.encoding = "utf-8"
return response if return_type == "object":
elif return_type == "content": return response
return response.content elif return_type == "content":
elif return_type == "browser": return response.content
return response, browser elif return_type == "browser":
else: return response, browser
return response.text else:
return response.text
except requests.exceptions.ProxyError:
print("[-]get_html_by_form() Proxy error! Please check your Proxy")
except Exception as e:
print(f'[-]get_html_by_form() Failed! {e}')
return None
# def get_javlib_cookie() -> [dict, str]: # def get_javlib_cookie() -> [dict, str]:
@@ -645,3 +699,33 @@ def file_not_exist_or_empty(filepath) -> bool:
# 日语简单检测 # 日语简单检测
def is_japanese(s) -> bool: def is_japanese(s) -> bool:
return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', s, re.UNICODE)) return bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\uFF66-\uFF9F]', s, re.UNICODE))
if __name__ == "__main__":
import sys, timeit
from http.client import HTTPConnection
s = get_html_session()
def benchmark(t, url):
print(f"HTTP GET Benchmark times:{t} url:{url}")
tm = timeit.timeit(f"_ = session1.get('{url}')",
"from __main__ import get_html_session;session1=get_html_session()",
number=t)
print(f'===={tm:2.5f}s get_html_session() Keep-Alive enable====')
tm = timeit.timeit(f"_ = browser1.open('{url}')",
"from __main__ import get_html_by_browser;browser1=get_html_by_browser()",
number=t)
print(f'===={tm:2.5f}s get_html_by_browser() Keep-Alive enable====')
tm = timeit.timeit(f"_ = get_html('{url}')",
"from __main__ import get_html",
number=t)
print(f'===={tm:2.5f}s get_html() ====')
t = 100
#url = "https://www.189.cn/"
url = "http://www.chinaunicom.com"
HTTPConnection.debuglevel = 1
_ = s.get(url)
HTTPConnection.debuglevel = 0
# Usage: python ./ADC_function.py https://cn.bing.com/
if len(sys.argv)>1:
url = sys.argv[1]
benchmark(t, url)

View File

@@ -328,6 +328,6 @@ def special_characters_replacement(text) -> str:
replace('|', 'ǀ'). # U+01C0 LATIN LETTER DENTAL CLICK @ Basic Multilingual Plane replace('|', 'ǀ'). # U+01C0 LATIN LETTER DENTAL CLICK @ Basic Multilingual Plane
replace('‘', ''). # U+02018 LEFT SINGLE QUOTATION MARK replace('‘', ''). # U+02018 LEFT SINGLE QUOTATION MARK
replace('’', ''). # U+02019 RIGHT SINGLE QUOTATION MARK replace('’', ''). # U+02019 RIGHT SINGLE QUOTATION MARK
replace('&', ''). replace('…','').
replace('…','') replace('&', '')
) )

View File

@@ -6,17 +6,16 @@ import re
from ADC_function import * from ADC_function import *
from WebCrawler.storyline import getStoryline from WebCrawler.storyline import getStoryline
G_SITE = 'https://www.caribbeancom.com'
def main(number: str) -> json: def main(number: str) -> json:
try: try:
# 因演员图片功能还未使用为提速暂时注释改为用get_html() url = f'{G_SITE}/moviepages/{number}/index.html'
#r, browser = get_html_by_browser('https://www.caribbeancom.com/moviepages/'+number+'/index.html', result, session = get_html_session(url, return_type='session')
# return_type='browser') htmlcode = result.content.decode('euc-jp')
#if not r.ok: if not result or not htmlcode or '<title>404' in htmlcode or 'class="movie-info section"' not in htmlcode:
# raise ValueError("page not found")
#htmlcode = str(browser.page)
htmlbyte = get_html('https://www.caribbeancom.com/moviepages/'+number+'/index.html', return_type='content')
htmlcode = htmlbyte.decode('euc-jp')
if not htmlcode or '<title>404' in htmlcode or 'class="movie-info section"' not in htmlcode:
raise ValueError("page not found") raise ValueError("page not found")
lx = html.fromstring(htmlcode) lx = html.fromstring(htmlcode)
@@ -32,13 +31,13 @@ def main(number: str) -> json:
'actor': get_actor(lx), 'actor': get_actor(lx),
'release': get_release(lx), 'release': get_release(lx),
'number': number, 'number': number,
'cover': 'https://www.caribbeancom.com/moviepages/' + number + '/images/l_l.jpg', 'cover': f'{G_SITE}/moviepages/{number}/images/l_l.jpg',
'tag': get_tag(lx), 'tag': get_tag(lx),
'extrafanart': get_extrafanart(lx), 'extrafanart': get_extrafanart(lx),
'label': get_series(lx), 'label': get_series(lx),
'imagecut': 1, 'imagecut': 1,
# 'actor_photo': get_actor_photo(browser), # 'actor_photo': get_actor_photo(lx, session),
'website': 'https://www.caribbeancom.com/moviepages/' + number + '/index.html', 'website': f'{G_SITE}/moviepages/{number}/index.html',
'source': 'carib.py', 'source': 'carib.py',
'series': get_series(lx), 'series': get_series(lx),
} }
@@ -101,24 +100,25 @@ def get_series(lx: html.HtmlElement) -> str:
return '' return ''
def get_runtime(lx: html.HtmlElement) -> str: def get_runtime(lx: html.HtmlElement) -> str:
return str(lx.xpath( "//span[@class='spec-content']/span[@itemprop='duration']/text()")[0]).strip() return str(lx.xpath("//span[@class='spec-content']/span[@itemprop='duration']/text()")[0]).strip()
def get_actor_photo(browser): def get_actor_photo(lx, session):
htmla = browser.page.select('#moviepages > div > div:nth-child(1) > div.movie-info.section > ul > li:nth-child(1) > span.spec-content > a') htmla = lx.xpath("//*[@id='moviepages']/div[@class='container']/div[@class='inner-container']/div[@class='movie-info section']/ul/li[@class='movie-spec']/span[@class='spec-content']/a[@itemprop='actor']")
names = lx.xpath("//*[@id='moviepages']/div[@class='container']/div[@class='inner-container']/div[@class='movie-info section']/ul/li[@class='movie-spec']/span[@class='spec-content']/a[@itemprop='actor']/span[@itemprop='name']/text()")
t = {} t = {}
for a in htmla: for i, name in enumerate(names):
if a.text.strip() == '': if name.strip() == '':
continue continue
p = {a.text.strip(): a['href']} p = {name.strip(): htmla[i].attrib['href']}
t.update(p) t.update(p)
o = {} o = {}
for k, v in t.items(): for k, v in t.items():
if '/search_act/' not in v: if '/search_act/' not in v:
continue continue
r = browser.open_relative(v) r = session.get(urljoin(G_SITE, v))
if not r.ok: if not r.ok:
continue continue
html = browser.page.prettify() html = r.text
pos = html.find('.full-bg') pos = html.find('.full-bg')
if pos<0: if pos<0:
continue continue
@@ -126,7 +126,7 @@ def get_actor_photo(browser):
cssBGjpgs = re.findall(r'background: url\((.+\.jpg)', css, re.I) cssBGjpgs = re.findall(r'background: url\((.+\.jpg)', css, re.I)
if not cssBGjpgs or not len(cssBGjpgs[0]): if not cssBGjpgs or not len(cssBGjpgs[0]):
continue continue
p = {k: urljoin(browser.url, cssBGjpgs[0])} p = {k: urljoin(r.url, cssBGjpgs[0])}
o.update(p) o.update(p)
return o return o

View File

@@ -118,8 +118,15 @@ def main_uncensored(number):
def main(number): def main(number):
try: try:
try: try:
url = "https://www." + secrets.choice([
'buscdn.fun', 'busdmm.fun', 'busfan.fun', 'busjav.fun',
'cdnbus.fun',
'dmmbus.fun', 'dmmsee.fun',
'fanbus.us',
'seedmm.fun',
]) + "/"
try: try:
htmlcode = get_html('https://www.fanbus.us/' + number) htmlcode = get_html(url + number)
except: except:
htmlcode = get_html('https://www.javbus.com/' + number) htmlcode = get_html('https://www.javbus.com/' + number)
if "<title>404 Page Not Found" in htmlcode: if "<title>404 Page Not Found" in htmlcode:

View File

@@ -4,7 +4,6 @@ import re
from lxml import etree from lxml import etree
import json import json
from ADC_function import * from ADC_function import *
from mechanicalsoup.stateful_browser import StatefulBrowser
from WebCrawler.storyline import getStoryline from WebCrawler.storyline import getStoryline
# import io # import io
# sys.stdout = io.TextIOWrapper(sys.stdout.buffer, errors = 'replace', line_buffering = True) # sys.stdout = io.TextIOWrapper(sys.stdout.buffer, errors = 'replace', line_buffering = True)
@@ -30,8 +29,8 @@ def getActor(html):
idx = idx + 1 idx = idx + 1
return r return r
def getaphoto(url, browser): def getaphoto(url, session):
html_page = browser.open_relative(url).text if isinstance(browser, StatefulBrowser) else get_html(url) html_page = session.get(url).text if isinstance(session, requests.Session) else get_html(url)
img_prether = re.compile(r'<span class\=\"avatar\" style\=\"background\-image\: url\((.*?)\)') img_prether = re.compile(r'<span class\=\"avatar\" style\=\"background\-image\: url\((.*?)\)')
img_url = img_prether.findall(html_page) img_url = img_prether.findall(html_page)
if img_url: if img_url:
@@ -39,7 +38,7 @@ def getaphoto(url, browser):
else: else:
return '' return ''
def getActorPhoto(html, javdb_site, browser): #//*[@id="star_qdt"]/li/a/img def getActorPhoto(html, javdb_site, session):
actorall = html.xpath('//strong[contains(text(),"演員:")]/../span/a[starts-with(@href,"/actors/")]') actorall = html.xpath('//strong[contains(text(),"演員:")]/../span/a[starts-with(@href,"/actors/")]')
if not actorall: if not actorall:
return {} return {}
@@ -47,7 +46,7 @@ def getActorPhoto(html, javdb_site, browser): #//*[@id="star_qdt"]/li/a/img
actor_photo = {} actor_photo = {}
for i in actorall: for i in actorall:
if i.text in a: if i.text in a:
actor_photo[i.text] = getaphoto(urljoin(f'https://{javdb_site}.com', i.attrib['href']), browser) actor_photo[i.text] = getaphoto(urljoin(f'https://{javdb_site}.com', i.attrib['href']), session)
return actor_photo return actor_photo
def getStudio(a, html): def getStudio(a, html):
@@ -178,15 +177,6 @@ def getDirector(html):
result1 = str(html.xpath('//strong[contains(text(),"導演")]/../span/text()')).strip(" ['']") result1 = str(html.xpath('//strong[contains(text(),"導演")]/../span/text()')).strip(" ['']")
result2 = str(html.xpath('//strong[contains(text(),"導演")]/../span/a/text()')).strip(" ['']") result2 = str(html.xpath('//strong[contains(text(),"導演")]/../span/a/text()')).strip(" ['']")
return str(result1 + result2).strip('+').replace("', '", '').replace('"', '') return str(result1 + result2).strip('+').replace("', '", '').replace('"', '')
def getOutline0(number): #获取剧情介绍 airav.wiki站点404函数暂时更名等无法恢复时删除
try:
htmlcode = get_html('https://cn.airav.wiki/video/' + number)
from WebCrawler.airav import getOutline as airav_getOutline
result = airav_getOutline(htmlcode)
return result
except:
pass
return ''
def getOutline(number, title): #获取剧情介绍 多进程并发查询 def getOutline(number, title): #获取剧情介绍 多进程并发查询
return getStoryline(number,title) return getStoryline(number,title)
def getSeries(html): def getSeries(html):
@@ -224,11 +214,11 @@ def main(number):
javdb_site = secrets.choice(javdb_sites) javdb_site = secrets.choice(javdb_sites)
if debug: if debug:
print(f'[!]javdb:select site {javdb_site}') print(f'[!]javdb:select site {javdb_site}')
browser = None session = None
try: try:
javdb_url = 'https://' + javdb_site + '.com/search?q=' + number + '&f=all' javdb_url = 'https://' + javdb_site + '.com/search?q=' + number + '&f=all'
res, browser = get_html_by_browser(javdb_url, cookies=javdb_cookies, return_type='browser') res, session = get_html_session(javdb_url, cookies=javdb_cookies, return_type='session')
if not res.ok: if not res:
raise raise
query_result = res.text query_result = res.text
except: except:
@@ -251,8 +241,9 @@ def main(number):
raise ValueError("number not found") raise ValueError("number not found")
correct_url = urls[0] correct_url = urls[0]
try: try:
if isinstance(browser, StatefulBrowser): # get faster benefit from http keep-alive if isinstance(session, requests.Session): # get faster benefit from http keep-alive
detail_page = browser.open_relative(correct_url).text javdb_detail_url = urljoin(res.url, correct_url)
detail_page = session.get(javdb_detail_url).text
else: else:
javdb_detail_url = 'https://' + javdb_site + '.com' + correct_url javdb_detail_url = 'https://' + javdb_site + '.com' + correct_url
detail_page = get_html(javdb_detail_url, cookies=javdb_cookies) detail_page = get_html(javdb_detail_url, cookies=javdb_cookies)
@@ -303,8 +294,8 @@ def main(number):
'tag': getTag(lx), 'tag': getTag(lx),
'label': getLabel(lx), 'label': getLabel(lx),
'year': getYear(detail_page), # str(re.search('\d{4}',getRelease(a)).group()), 'year': getYear(detail_page), # str(re.search('\d{4}',getRelease(a)).group()),
# 'actor_photo': getActorPhoto(lx, javdb_site, browser), # 'actor_photo': getActorPhoto(lx, javdb_site, session),
'website': 'https://javdb.com' + correct_url, 'website': urljoin('https://javdb.com', correct_url),
'source': 'javdb.py', 'source': 'javdb.py',
'series': getSeries(lx), 'series': getSeries(lx),

View File

@@ -4,6 +4,7 @@ import re
import json import json
import builtins import builtins
from ADC_function import * from ADC_function import *
from lxml.html import fromstring
from multiprocessing import Pool from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool from multiprocessing.dummy import Pool as ThreadPool
from difflib import SequenceMatcher from difflib import SequenceMatcher
@@ -110,24 +111,30 @@ def _getStoryline_mp(site, number, title, debug):
def getStoryline_airav(number, debug): def getStoryline_airav(number, debug):
try: try:
number_up = number
site = secrets.choice(('airav.cc','airav4.club')) site = secrets.choice(('airav.cc','airav4.club'))
url = f'https://{site}/searchresults.aspx?Search={number}&Type=0' url = f'https://{site}/searchresults.aspx?Search={number}&Type=0'
res, browser = get_html_by_browser(url, return_type='browser') res, session = get_html_session(url, return_type='session')
if not res.ok: if not res:
raise ValueError(f"get_html_by_browser('{url}') failed") raise ValueError(f"get_html_by_session('{url}') failed")
avs = browser.page.select_one('div.resultcontent > ul > li:nth-child(1) > div') lx = fromstring(res.text)
if number_up not in avs.a.h3.text.upper(): urls = lx.xpath('//div[@class="resultcontent"]/ul/li/div/a[@class="ga_click"]/@href')
txts = lx.xpath('//div[@class="resultcontent"]/ul/li/div/a[@class="ga_click"]/h3[@class="one_name ga_name"]/text()')
detail_url = None
for i, txt in enumerate(txts):
if re.search(number, txt, re.I):
detail_url = urljoin(res.url, urls[i])
break
if detail_url is None:
raise ValueError("number not found") raise ValueError("number not found")
detail_url = avs.a['href'] res = session.get(detail_url)
res = browser.open_relative(detail_url)
if not res.ok: if not res.ok:
raise ValueError(f"browser.open_relative('{detail_url}') failed") raise ValueError(f"session.get('{detail_url}') failed")
t = browser.page.select_one('head > title').text lx = fromstring(res.text)
airav_number = str(re.findall(r'^\s*\[(.*?)]', t)[0]).upper() t = str(lx.xpath('/html/head/title/text()')[0]).strip()
if number.upper() != airav_number: airav_number = str(re.findall(r'^\s*\[(.*?)]', t)[0])
if not re.search(number, airav_number, re.I):
raise ValueError(f"page number ->[{airav_number}] not match") raise ValueError(f"page number ->[{airav_number}] not match")
desc = browser.page.select_one('li.introduction > span').text.strip() desc = str(lx.xpath('//span[@id="ContentPlaceHolder1_Label2"]/text()')[0]).strip()
return desc return desc
except Exception as e: except Exception as e:
if debug: if debug:
@@ -140,9 +147,9 @@ def getStoryline_airavwiki(number, debug):
try: try:
kwd = number[:6] if re.match(r'\d{6}[\-_]\d{2,3}', number) else number kwd = number[:6] if re.match(r'\d{6}[\-_]\d{2,3}', number) else number
url = f'https://www.airav.wiki/api/video/list?lang=zh-TW&lng=zh-TW&search={kwd}' url = f'https://www.airav.wiki/api/video/list?lang=zh-TW&lng=zh-TW&search={kwd}'
result, browser = get_html_by_browser(url, return_type='browser') result, session = get_html_session(url, return_type='session')
if not result.ok: if not result:
raise ValueError(f"get_html_by_browser('{url}','{number}') failed") raise ValueError(f"get_html_session('{url}','{number}') failed")
j = json.loads(result.content) j = json.loads(result.content)
if int(j.get('count')) == 0: if int(j.get('count')) == 0:
raise ValueError("number not found") raise ValueError("number not found")
@@ -150,12 +157,12 @@ def getStoryline_airavwiki(number, debug):
for r in j["result"]: for r in j["result"]:
n = r['barcode'] n = r['barcode']
if re.search(number, n, re.I): if re.search(number, n, re.I):
link = f'/api/video/barcode/{n}?lng=zh-TW' link = urljoin(result.url, f'/api/video/barcode/{n}?lng=zh-TW')
break break
if link is None: if link is None:
raise ValueError("number not found") raise ValueError("number not found")
result = browser.open_relative(link) result = session.get(link)
if not result.ok or not re.search(number, browser.url, re.I): if not result.ok or not re.search(number, result.url, re.I):
raise ValueError("detail page not found") raise ValueError("detail page not found")
j = json.loads(result.content) j = json.loads(result.content)
if int(j.get('count')) != 1: if int(j.get('count')) != 1:
@@ -221,7 +228,7 @@ def getStoryline_avno1(number, debug): #获取剧情介绍 从avno1.cc取得
form_select='div.wrapper > div.header > div.search > form', form_select='div.wrapper > div.header > div.search > form',
fields = {'kw' : number}, fields = {'kw' : number},
return_type = 'browser') return_type = 'browser')
if not result.ok: if not result:
raise ValueError(f"get_html_by_form('{url}','{number}') failed") raise ValueError(f"get_html_by_form('{url}','{number}') failed")
s = browser.page.select('div.type_movie > div > ul > li > div') s = browser.page.select('div.type_movie > div > ul > li > div')
for div in s: for div in s:
@@ -261,41 +268,45 @@ def getStoryline_amazon(q_title, number, debug):
if not isinstance(q_title, str) or not len(q_title): if not isinstance(q_title, str) or not len(q_title):
return None return None
try: try:
amazon_cookie, _ = load_cookies('amazon.json') cookie, cookies_filepath = load_cookies('amazon.json')
cookie = amazon_cookie if isinstance(amazon_cookie, dict) else None
url = "https://www.amazon.co.jp/s?k=" + q_title url = "https://www.amazon.co.jp/s?k=" + q_title
res, browser = get_html_by_browser(url, cookies=cookie, return_type='browser') res, session = get_html_session(url, cookies=cookie, return_type='session')
if not res.ok: if not res:
raise ValueError("get_html_by_browser() failed") raise ValueError("get_html_session() failed")
lks = browser.links(r'/black-curtain/save-eligibility/black-curtain') lx = fromstring(res.text)
if isinstance(lks, list) and len(lks): lks = lx.xpath('//a[contains(@href, "/black-curtain/save-eligibility/black-curtain")]/@href')
browser.follow_link(lks[0]) if len(lks) and lks[0].startswith('/'):
res = session.get(urljoin(res.url, lks[0]))
cookie = None cookie = None
html = etree.fromstring(str(browser.page), etree.HTMLParser()) lx = fromstring(res.text)
titles = html.xpath("//span[contains(@class,'a-color-base a-text-normal')]/text()") titles = lx.xpath("//span[contains(@class,'a-color-base a-text-normal')]/text()")
urls = html.xpath("//span[contains(@class,'a-color-base a-text-normal')]/../@href") urls = lx.xpath("//span[contains(@class,'a-color-base a-text-normal')]/../@href")
if not len(urls) or len(urls) != len(titles): if not len(urls) or len(urls) != len(titles):
raise ValueError("titles not found") raise ValueError("titles not found")
idx = amazon_select_one(titles, q_title, number, debug) idx = amazon_select_one(titles, q_title, number, debug)
if not isinstance(idx, int) or idx < 0: if not isinstance(idx, int) or idx < 0:
raise ValueError("title and number not found") raise ValueError("title and number not found")
furl = urls[idx] furl = urljoin(res.url, urls[idx])
r = browser.open_relative(furl) res = session.get(furl)
if not r.ok: if not res.ok:
raise ValueError("browser.open_relative()) failed.") raise ValueError("browser.open_relative()) failed.")
lks = browser.links(r'/black-curtain/save-eligibility/black-curtain') lx = fromstring(res.text)
if isinstance(lks, list) and len(lks): lks = lx.xpath('//a[contains(@href, "/black-curtain/save-eligibility/black-curtain")]/@href')
browser.follow_link(lks[0]) if len(lks) and lks[0].startswith('/'):
res = session.get(urljoin(res.url, lks[0]))
cookie = None cookie = None
lx = fromstring(res.text)
ama_t = browser.page.select_one('#productDescription > p').text.replace('\n',' ').strip() div = lx.xpath('//*[@id="productDescription"]')[0]
ama_t = re.sub(r'審査番号:\d+', '', ama_t) ama_t = ' '.join([e.text.strip() for e in div if not re.search('Comment|h3', str(e.tag), re.I) and isinstance(e.text, str)])
ama_t = re.sub(r'審査番号:\d+', '', ama_t).strip()
if cookie is None: if cookie is None:
# 自动创建的cookies文件放在搜索路径表的末端最低优先级。有amazon.co.jp帐号的用户可以从浏览器导出cookie放在靠前搜索路径 # 删除无效cookies无论是用户创建还是自动创建以避免持续故障
Path(cookies_filepath).unlink(missing_ok=True)
# 自动创建的cookies文件放在搜索路径表的末端最低优先级。有amazon.co.jp帐号的用户可以从浏览器导出cookie放在靠前搜索路径
ama_save = Path.home() / ".local/share/avdc/amazon.json" ama_save = Path.home() / ".local/share/avdc/amazon.json"
ama_save.parent.mkdir(parents=True, exist_ok=True) ama_save.parent.mkdir(parents=True, exist_ok=True)
ama_save.write_text(json.dumps(browser.session.cookies.get_dict(), sort_keys=True, indent=4), encoding='utf-8') ama_save.write_text(json.dumps(session.cookies.get_dict(), sort_keys=True, indent=4), encoding='utf-8')
return ama_t return ama_t