import json from xlog03 import * from rds_model.db_redis import DbRedis from log.print_log import PrintLog import douyin_class import random import threading redisModel = DbRedis.connect() redisDyModel = DbRedis.douyin_connect() def get_scraper_proxy(proxy_key): proxy_dict = redisModel.hgetall(proxy_key) if (proxy_dict is None) or (len(proxy_dict) == 0): return proxy_list = list(proxy_dict) now = int(time.time()) while True: proxy = random.choice(proxy_list) if proxy is None: return proxy_info = proxy_dict.get(proxy) if proxy_info is None: continue proxy_info = json.loads(proxy_info) expire_at = int(proxy_info.get('expired_at')) # 删除过期的代理 if expire_at <= now: redisModel.hdel(proxy_key, proxy) proxy_list.remove(proxy) continue return proxy def scraper(): rkey = 'DOUYIN_REGISTER_QUEUE' pKey = 'IpProxyHash' uKey = 'SL:List:Douyin:BarrageUserScrapeQueue' sKey = 'SL:List:Douyin:BarrageUserDataQueue' cur_time = int(time.time()) while True: now_time = int(time.time()) if (now_time - cur_time) > 270: print('thrend_' + threading.current_thread().name + ' finish') break rllen = redisDyModel.llen(rkey) if rllen == 0: time.sleep(2) continue ullen = redisDyModel.llen(uKey) if ullen == 0: time.sleep(2) continue json_data = redisDyModel.rpop(rkey) str_time = time.strftime("%H:%M:%S", time.localtime()) try: proxy = get_scraper_proxy(pKey) if proxy: proxies = { "http": "http://" + proxy, "https": "http://" + proxy } else: time.sleep(2) continue print(str_time + ' 爬取代理:' + str(proxies)) PrintLog.print(str_time + ' 爬取代理:' + str(proxies)) douApi = douyin_class.DouYinApi('', proxies) dict_data = json.loads(json_data) device_id, iid, udid, openudid, cookie = dict_data['device_id'], dict_data['iid'], dict_data['uuid'], \ dict_data['openudid'], dict_data['cookie'] douApi.init_device_ids(device_id, iid, udid, openudid) user_id = redisDyModel.rpop(uKey) response = douApi.get_user_info(user_id) if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']: print(str_time + ' user_id:' + str(user_id) + ' 爬取成功') PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功') redisDyModel.lpush(sKey, response.text) redisDyModel.lpush(rkey, json_data) else: print(str_time + ' user_id:' + str(user_id) + ' 爬取失败') PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败') scraper_time = dict_data['times'] if scraper_time < 10: dict_data['times'] += 1 redisDyModel.lpush(rkey, json.dumps(dict_data)) time.sleep(1) except Exception as e: print(str_time + ' 错误:' + str(e)) PrintLog.print(str_time + ' 错误:' + str(e)) redisDyModel.lpush(rkey, json_data) continue if __name__ == '__main__': import warnings warnings.filterwarnings("ignore") print('===========爬取程序===========') PrintLog.print('===========爬取程序===========') threading_count = 10 for i in range(0, threading_count): task = threading.Thread(target=scraper, name=i) task.start() # 准备就绪,等待cpu执行