import requests from xlog03 import * from rds_model.db_redis import DbRedis import random import json import threading from log.print_log import PrintLog redisModel = DbRedis.connect() redisDyModel = DbRedis.douyin_connect() def get_user_info(user_id, proxies): params = { 'user_id': user_id, 'version_code': '9.8.1', 'js_sdk_version': '1.47.2.2', 'app_name': 'aweme', 'app_version': '9.8.1', 'channel': 'App%20Store', 'mcc_mnc': 46002, 'aid': 1128, 'screen_width': 640, 'os_api': 18, 'ac': 'WIFI', 'os_version': '13.3.1', 'device_platform': 'iphone', 'build_number': 98107, 'device_type': 'iPhone8,4', 'address_book_access': 1 } url = 'https://api3-normal-c-lf.amemv.com/aweme/v1/user/profile/self/?' douyin_url = parse_params(url, params) response = http_get(douyin_url, proxies) return response def parse_params(url, params): if params is None: params = {} if not url.endswith('?'): url = url + '?' common_params = parse.urlencode(params) douyin_url = url + common_params return douyin_url def http_get(douyin_url, proxies): if proxies: resp = requests.get(douyin_url, proxies=proxies, verify=False, timeout=10) else: resp = requests.get(douyin_url, verify=False, timeout=10) return resp def get_scraper_proxy(proxy_key): proxy_dict = redisModel.hgetall(proxy_key) if (proxy_dict is None) or (len(proxy_dict) == 0): return proxy_list = list(proxy_dict) now = int(time.time()) while True: proxy = random.choice(proxy_list) if proxy is None: return proxy_info = proxy_dict.get(proxy) if proxy_info is None: continue proxy_info = json.loads(proxy_info) expire_at = int(proxy_info.get('expired_at')) # 删除过期的代理 if expire_at <= now: redisModel.hdel(proxy_key, proxy) proxy_list.remove(proxy) continue return proxy def scraper(): pKey = 'IpProxyHash' uKey = 'SL:List:Douyin:BarrageUserScrapeQueue' sKey = 'SL:List:Douyin:BarrageUserDataQueue' hKey = 'SL:Hash:Douyin:BarrageUserScrapeRecord' cur_time = int(time.time()) while True: now_time = int(time.time()) if (now_time - cur_time) > 270: print('thrend_' + threading.current_thread().name + ' finish') break ullen = redisDyModel.llen(uKey) if ullen == 0: time.sleep(2) continue str_time = time.strftime("%H:%M:%S", time.localtime()) user_id = redisDyModel.rpop(uKey) try: proxy = get_scraper_proxy(pKey) if proxy: proxies = { "http": "http://" + proxy, "https": "http://" + proxy } else: time.sleep(2) continue print(str_time + ' 爬取代理:' + str(proxies)) PrintLog.print(str_time + ' 爬取代理:' + str(proxies)) response = get_user_info(user_id, proxies) if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']: print(str_time + ' user_id:' + str(user_id) + ' 爬取成功') PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功') redisDyModel.lpush(sKey, response.text) redisDyModel.hdel(hKey, user_id) else: print(str_time + ' user_id:' + str(user_id) + ' 爬取失败') PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败') record_json = redisDyModel.hget(hKey, user_id) if record_json: record_dict = json.loads(record_json) if record_dict['times'] < 10: record_dict['times'] += 1 redisDyModel.hset(hKey, user_id, json.dumps(record_dict)) else: redisDyModel.hdel(hKey, user_id) else: record_dict = {'times': 1} redisDyModel.hset(hKey, user_id, json.dumps(record_dict)) time.sleep(1) except Exception as e: print(str_time + ' 错误:' + str(e)) PrintLog.print(str_time + ' 错误:' + str(e)) redisDyModel.lpush(uKey, user_id) continue if __name__ == '__main__': import warnings warnings.filterwarnings("ignore") print('===========爬取程序===========') PrintLog.print('===========爬取程序===========') threading_count = 50 for i in range(0, threading_count): task = threading.Thread(target=scraper, name=i) task.start() # 准备就绪,等待cpu执行