import json import sys import threading import time import warnings from libs.proxy import Proxy from log.print_log import PrintLog from rds_model.db_redis import DbRedis from rds_model.rds_user_video_list import RdsUserVideoList from web_cookie import Cookie from web_dy import * start_time = time.time() # -- coding: utf-8 --** def set_score(flag): rds = RdsUserVideoList() if flag == 'success': data_score = rds.get_score() if data_score is None: data_score = '1@@@1@@@0' else: data_score = data_score.split('@@@') total, success, fail = int(data_score[0]), int(data_score[1]), int(data_score[2]) success = success + 1 data_score = str(total) + '@@@' + str(success) + '@@@' + str(fail) rds.record_score(data_score) elif flag == 'fail': data_score = rds.get_score() if data_score is None: data_score = '1@@@0@@@1' else: data_score = data_score.split('@@@') total, success, fail = int(data_score[0]), int(data_score[1]), int(data_score[2]) fail = fail + 1 data_score = str(total) + '@@@' + str(success) + '@@@' + str(fail) rds.record_score(data_score) elif flag == 'all': data_score = rds.get_score() if data_score is None: data_score = '1@@@0@@@0' else: data_score = data_score.split('@@@') total, success, fail = int(data_score[0]), int(data_score[1]), int(data_score[2]) total = total + 1 data_score = str(total) + '@@@' + str(success) + '@@@' + str(fail) rds.record_score(data_score) return None def get_signature(url=None,method='_signature'): with open('/mnt/shop_live_scraper/signature.js', 'r', encoding='utf-8') as f: b = f.read() c = execjs.compile(b) # url=url.replace('%28','(').replace('%29',')').replace('%2C',',') d = c.call(method, url.replace('\n','')) return d def get_ua_ck(): ua_list=[ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3870.400 QQBrowser/10.8.4405.400" ] ck_list=[ 'ttwid=1%7CTVzdM0P0u-8dtsmh6c-EaQEtBoTSOs_MG85FAg07AbA%7C1631502013%7C66442d8594de8e93ad18b73f3dfe0c94ed864c3d932824bcde9918b5be172321; passport_csrf_token=866923f1a32045fd82e47053158402a2', 'ttwid=1%7CGPDDu9-w3RGs2Pcd0wRlvLYoktpDt-v8LP5ZMyb1NBM%7C1630319594%7Cffb8de47e6da87dcfd76349b5ad34aa1f9b9d4332261a3a8436b932a893366c1; passport_csrf_token=79284b8777a7a54f3066cefef9af539e', 'ttwid=1%7CGsfqc7NpdOg4N-U-VX7Q77KsWjVTZ7gxLNifsisj8YE%7C1631618570%7Cafbb13a27fd1c2d7a8245454b1e0d7cd654d80848a320933a25d9ef77638c18c; passport_csrf_token=84911c9af94040a99cc10416bd27533d', 'ttwid=1%7C82FGr05YUOReYUB301ao_erqOQ3ilbXZdEy0tkMsdXY%7C1631863641%7C1dcebe643a96f00841a3b490db60de886bfe07ff3d276e509717abc4e1681ba6; passport_csrf_token=494ae3fffe00328101fd40e050ce49db', 'ttwid=1%7CwfnX3T9LY4_60iGoQNzyqYe5ahILFeRxfMuZ1pdgXf8%7C1632724192%7Cb613fddc0b533d5578dad4d5f9290705fdc6432aa854d492f4761d164dd3fdd5; passport_csrf_token=4a8afba333103b033e537003b72ee91b' ] return random.choice(ua_list),random.choice(ck_list) def get_user_videos(sec_user_id, max_cursor=0, count=20): ua,ck=get_ua_ck() # ua="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" url='https://www.douyin.com/aweme/v1/web/aweme/post/?' param={ 'device_platform': 'webapp', 'aid': '6383', 'channel': 'channel_pc_web', 'sec_user_id': sec_user_id, 'max_cursor': str(max_cursor), 'count': str(count), 'publish_video_strategy_type': '2', 'version_code': '170400', 'version_name': '17.4.0', 'cookie_enabled': 'true', 'screen_width': '1920', 'screen_height': '1080', 'browser_language': 'zh-CN', 'browser_platform': 'Win32', 'browser_name': 'Mozilla', 'browser_version':ua.replace('Mozilla/',''), 'browser_online': 'true', "source" : "channel_pc_web" } url = url + parse.urlencode(param) _signature = get_signature(url) url+='&_signature='+quote(_signature) ck = Cookie.get() if ck is None: print('获取cookie失败') return None headers = { "authority": "www.douyin.com", "method": "GET", "path": str(url).replace('https://www.douyin.com',''), "scheme": "https", "accept": "application/json, text/plain, */*", # "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "cookie": ck, "referer": "https://www.douyin.com/user/{sec_user_id}?enter_method=search_result&enter_from=search_result".format(sec_user_id=sec_user_id), "user-agent":ua, "withcredentials": "true", "sec-ch-ua" : '"Google Chrome";v="93", " Not;A Brand";v="99", "Chromium";v="93"', "sec-ch-ua-mobile" : "?0", "sec-ch-ua-platform" : "Windows", "sec-fetch-dest" : "empty", "sec-fetch-mode" : "cors", "sec-fetch-site" : "same-origin" } if ck: headers['cookie']=ck retry = 0 response_json = None while True: if retry > 20: Cookie.del_cookie(ck) break retry += 1 proxy = Proxy.get() proxies = { "http": "http://" + proxy, "https": "http://" + proxy } try: response = requests.get( url, headers=headers, proxies=proxies, timeout=8 ) if (response.status_code == 200) and (response.text is not None) and (response.text != ''): # print(response) response_json = response.json() if (response_json.get('aweme_list') is not None): print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据获取成功!' + '\n' + str(sec_user_id) ) break else: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据获取失败!' + '\n' + str(sec_user_id) + '\n' + response.text + Proxy.proxy_info ) else: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 爬取http连接失败!' + str(response.status_code) + '\n' + Proxy.proxy_info + '\n' + str(sec_user_id) + '\n' + '爬取结果:' + str(response) + '\n' ) time.sleep(1) except requests.exceptions.ProxyError as e: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 代理过期!' + str(e) + '\n' + str(sec_user_id) + '\n' + Proxy.proxy_info ) Proxy.del_proxy(proxy) pass except requests.exceptions.ConnectTimeout as e: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ConnectTimeout!' + str(e) + '\n' + str(sec_user_id) + '\n' + Proxy.proxy_info ) Proxy.del_proxy(proxy) pass except Exception as e: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 请求抛出异常!' + str(e) + '\n' + str(e.__traceback__.tb_lineno) + '\n' + str(sec_user_id) + '\n' + Proxy.proxy_info ) pass return response_json def scrape(): rds = RdsUserVideoList() user_info = rds.get_request_param() if user_info is None: return None sec_user_id = str(user_info) print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(sec_user_id) ) try: videos = get_user_videos(sec_user_id=sec_user_id,max_cursor=0,count=50) if videos is None: # rds.push_request_id(sec_user_id) print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据获取失败!响应数据为空!' + '\n' + str(sec_user_id) + '\n' ) sys.exit(0) if isinstance(videos, dict): awemes = videos.get('aweme_list') else: # print(videos) awemes = None if awemes: set_score('success') data = str(sec_user_id) + '@@@' + json.dumps(videos) rds.push_data_list(data) else: set_score('fail') except Exception as e: set_score('fail') # rds.push_request_id(sec_user_id) print( time.strftime("%H:%M:%S", time.localtime()) + ' ' + str(sec_user_id) + '数据异常:' + str(e) ) sys.exit(0) if __name__ == '__main__': print("主方法开始执行") # 并行线程数 threading_count = int(sys.argv[1]) rds = RdsUserVideoList() warnings.filterwarnings("ignore") print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,用户队列长度:' + str(rds.get_len()) ) while True: sys.stdout.flush() # 减去主线程 active_count = threading.active_count() - 1 increment = threading_count - active_count while increment > 0: sys.stdout.flush() # scrape() task = threading.Thread(target=scrape, args=()) task.start() # 准备就绪, 等待cpu执行 increment = increment - 1 current_time = time.time() if current_time - start_time > 3600: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 主方法执行终止' ) sys.exit(0) time.sleep(1)