from rds_model.rds_user_shop_promotion_list import RdsUserShopPromotionList import time import json import sys import threading import random import urllib import requests from rds_model.db_redis import DbRedis from libs.Xg04 import X_Gorgon from libs.proxy import Proxy from libs.mysql_user_living import * start_time = time.time() def get_random(i, random_type=1): if random_type == 1: return str(random.randint(1 * 10 ** (i - 1), 1 * 10 ** i - 1)) elif random_type == 8: seed = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" sa = [] for i in range(i): sa.append(random.choice(seed)) salt = ''.join(sa) return salt else: seed = "1234567890abcde" sa = [] for i in range(i): sa.append(random.choice(seed)) salt = ''.join(sa) return salt def get_random_brand_type(): brand_type = get_random(3, random_type=8) + '-' + get_random(2, random_type=8) + '00' return brand_type def get_mc(): def a(): seed = "1234567890ABCDEF" sa = [] for i in range(2): sa.append(random.choice(seed)) salt = ''.join(sa) return salt k = '' for i in range(6): k += a() + ':' return k[:-1] def get_trace(): trace_list = [ '00-70f99f2209e0b045dd14266ee1da0468-70f99f2209e0b045-01', '00-ce7faf4409b7fcc0ae6135fdd4250468-ce7faf4409b7fcc0-01', '00-ce7faf3b09b7fcc0ae6042f1d8100468-ce7faf3b09b7fcc0-01', '00-cdd79d2309b7fcc0ae6625a4cb190468-cdd79d2309b7fcc0-01', '00-cefde9f009b7fcc0ae6750e1349e0468-cefde9f009b7fcc0-01', '00-ced2e6ef09b7fcc0ae67dd7bfe000468-ced2e6ef09b7fcc0-01', '00-cefbfeb509b7fcc0ae659396a6ea0468-cefbfeb509b7fcc0-01', '00-cefaa25409b7fcc0ae657726a3c30468-cefaa25409b7fcc0-01', '00-63eb56f009b7fcc0ae600208011d0468-63eb56f009b7fcc0-01' ] return random.choice(trace_list) def get_user_shop_promotion_on_shelf_data(user_id, sec_user_id, result): domain = 'api3-normal-c-lq.amemv.com' # domain = 'aweme.snssdk.com' url = 'https://' + domain + '/aweme/v1/promotion/user/promotion/list/?' rticket = str(int(time.time() * 1000)) mc = get_mc udid = '8604' + get_random(11) trace_id = get_trace() # openudid = '3b22' + str(udid.uuid4())[-12:] device_id, iid, udid, openudid, cookie, V1, V2, device_type, device_brand = result[0], result[1], result[3], result[2], result[4], result[8], result[9], result[10], result[11] query = { "user_id" : "2066709087257544", #"user_id" : str(user_id), "sec_user_id" : "MS4wLjABAAAASJ-Q7Mjt6chYEEju720IdOKArNsreGnDi8ADd3E64SsbAeqoXRQex9mCDG7u5DaY", #"sec_user_id" : str(sec_user_id), "cursor" : "0", "count" : "1000", "column_id" : "0", "goods_type" : "0", "shop_version" : "1", "os_api" : "23", "device_type" : "HUAWEI+MLA-AL10", # "device_type" : device_type, "ssmix" : "a", "manifest_version_code" : "130001", "dpi" : "480", "app_name" : "aweme", "version_name" : "13.0.0", "ts" : "1633763939", # "ts" : int(time.time()), "cpu_support64" : "true", "storage_type" : "0", "app_type" : "normal", "appTheme" : "dark", "ac" : "wifi", "host_abi" : "armeabi-v7a", "update_version_code" : "13009900", "channel" : "tengxun_new", "_rticket" : "1633763939688", # "_rticket" : rticket, "device_platform" : "android", "iid" : "1961967166955757", # "iid" : str(iid), "version_code" : "130000", # "mac_address" : mc, "mac_address" : "50%3A01%3AD9%3A21%3AED%3AC2", "cdid" : "ab363b15-99db-4ef5-bfb1-834a8e564105", "openudid" : "291f3ce2efe59345", # "openudid" : str(openudid), "device_id" : "49388718822", # "device_id" : str(device_id), "resolution" : "1080*1800", "os_version" : "6.0", "language" : "zh", # "device_brand":device_brand, "device_brand" : "HUAWEI", "aid" : "1128" } query_params = urllib.parse.urlencode(query) url = url + query_params body = '' xGorgon = X_Gorgon(query_params, body) # userAgent = f'aweme.snssdk.com/130001 (Linux; U; Android 6.0; zh_CN; {device_type}; Build/{device_type}; Cronet/TTNetVersion:414feb46 2020-09-08 QuicVersion:7aee791b 2020-06-05)' userAgent = 'com.ss.android.ugc.aweme/130001 (Linux; U; Android 6.0; zh_CN; HUAWEI+MLA-AL10; Build/HUAWEI+MLA-AL10; Cronet/TTNetVersion:414feb46 2020-09-08 QuicVersion:7aee791b 2020-06-05)' headers = { 'Host': domain, 'Connection': 'keep-alive', 'passport-sdk-version' : '18', 'sdk-version' : '2', 'X-SS-DP' : '1128', # 'x-tt-trace-id' : trace_id, # 'Upgrade-Insecure-Requests': '1', 'User-Agent': userAgent, # 'accept-encoding': 'gzip, deflate', # "x-SS-REQ-TICKET": rticket, "x-Gorgon": xGorgon.get('X-Gorgon'), "x-Khronos": xGorgon.get('X-Khronos'), # "cookie" : cookie, "accept": "application/json, text/plain, */*", } retry = 0 response_json = None while True: if retry > 5: break retry += 1 proxy = Proxy.dailiyun_get() proxies = { "http": "http://" + proxy, "https": "http://" + proxy } try: response = requests.get( url, headers=headers, proxies=proxies, timeout=8 ) if (response.status_code == 200) and (response.text is not None) and (response.text != ''): response_json = response.json() if (response_json.get('promotions') is not None): print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据获取成功!' + '\n' + user_id + '\n' ) break else: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据获取失败!' + '\n' + user_id + '\n' + response.text + Proxy.proxy_info + '\n' ) else: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 爬取http连接失败!' #+ str(response.status_code) + str(url) + '\n' + str(headers) + '\n' + Proxy.proxy_info + '\n' + user_id + '\n' + '爬取结果:' + str(response) + '\n' ) time.sleep(1) except requests.exceptions.ProxyError as e: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 代理过期!' + str(e) + '\n' + user_id + '\n' + Proxy.proxy_info + '\n' ) Proxy.dailiyun_del_proxy(proxy) pass except requests.exceptions.ConnectTimeout as e: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ConnectTimeout!' + str(e) + '\n' + user_id + '\n' + Proxy.proxy_info + '\n' ) Proxy.dailiyun_del_proxy(proxy) pass except Exception as e: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 请求抛出异常!' + str(e) + '\n' + user_id + '\n' + Proxy.proxy_info + '\n' ) pass return response_json def scrape(): rds_list = RdsUserShopPromotionList() room_info = rds.get_on_shelf_request_param() if room_info is None: return None # room_info = json.loads(room_info) room_info = room_info.split('@') room_id = room_info[0] user_id = room_info[1] room_id = str(room_id) user_id = str(user_id) # 根据用户id获取sec_uid user_info = MysqlUserLiving().get_user_info(user_id) sec_user_id, = user_info print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' user_id:' + str(user_id) + ' sec_uid:' + str(sec_user_id) ) key = 'DOUYIN_SCRAPE_DID_IID_TTREQ_1221' rdid = DbRedis.connect().rpop(key) if rdid: result = rdid.split('@@@') else: return None DbRedis.connect().lpush(key, rdid) try: response_json = get_user_shop_promotion_on_shelf_data(user_id=user_id, sec_user_id=sec_user_id, result=result) if response_json is None: # rds_list.record_score(0) # rds_list.push_request_id(room_id) print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据获取失败!响应数据为空!' + '\n' + user_id + '\n' ) sys.exit(0) data = json.dumps({ "data": response_json.get('promotions'), "extra": { 'room_id' : room_id, 'user_id' : user_id, } }) # rds_list.record_score(1) rds_list.push_on_shelf_data_list(data) except Exception as e: # rds_list.record_score(0) rds_list.push_request_id_on_shelf(room_id + '@' + user_id) print( time.strftime("%H:%M:%S", time.localtime()) + ' ' + user_id + '数据异常:' + str(e) ) sys.exit(0) if __name__ == "__main__": print("主方法开始执行") # 并行线程数 threading_count = int(sys.argv[1]) num = int(sys.argv[2]) rds = RdsUserShopPromotionList() print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + ' 开始执行,更新直播队列长度:' + str(rds.get_on_shelf_len()) ) while True: sys.stdout.flush() # 减去主线程 active_count = threading.active_count() - 1 increment = threading_count - active_count while increment > 0: sys.stdout.flush() # scrape() task = threading.Thread(target=scrape, args=()) task.start() # 准备就绪, 等待cpu执行 increment = increment - 1 current_time = time.time() if current_time - start_time > 3600: print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 主方法执行终止' ) sys.exit(0) time.sleep(1)