123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import json
- from xlog03 import *
- from rds_model.db_redis import DbRedis
- from log.print_log import PrintLog
- import douyin_class
- import random
- import threading
- redisModel = DbRedis.connect()
- redisDyModel = DbRedis.douyin_connect()
- def get_scraper_proxy(proxy_key):
- proxy_dict = redisModel.hgetall(proxy_key)
- if (proxy_dict is None) or (len(proxy_dict) == 0):
- return
- proxy_list = list(proxy_dict)
- now = int(time.time())
- while True:
- proxy = random.choice(proxy_list)
- if proxy is None:
- return
- proxy_info = proxy_dict.get(proxy)
- if proxy_info is None:
- continue
- proxy_info = json.loads(proxy_info)
- expire_at = int(proxy_info.get('expired_at'))
- # 删除过期的代理
- if expire_at <= now:
- redisModel.hdel(proxy_key, proxy)
- proxy_list.remove(proxy)
- continue
- return proxy
- def scraper():
- rkey = 'DOUYIN_REGISTER_QUEUE'
- pKey = 'IpProxyHash'
- uKey = 'SL:List:Douyin:BarrageUserScrapeQueue'
- sKey = 'SL:List:Douyin:BarrageUserDataQueue'
- cur_time = int(time.time())
- while True:
- now_time = int(time.time())
- if (now_time - cur_time) > 270:
- print('thrend_' + threading.current_thread().name + ' finish')
- break
- rllen = redisDyModel.llen(rkey)
- if rllen == 0:
- time.sleep(2)
- continue
- ullen = redisDyModel.llen(uKey)
- if ullen == 0:
- time.sleep(2)
- continue
- json_data = redisDyModel.rpop(rkey)
- str_time = time.strftime("%H:%M:%S", time.localtime())
- try:
- proxy = get_scraper_proxy(pKey)
- if proxy:
- proxies = {
- "http": "http://" + proxy,
- "https": "http://" + proxy
- }
- else:
- time.sleep(2)
- continue
- print(str_time + ' 爬取代理:' + str(proxies))
- PrintLog.print(str_time + ' 爬取代理:' + str(proxies))
- douApi = douyin_class.DouYinApi('', proxies)
- dict_data = json.loads(json_data)
- device_id, iid, udid, openudid, cookie = dict_data['device_id'], dict_data['iid'], dict_data['uuid'], \
- dict_data['openudid'], dict_data['cookie']
- douApi.init_device_ids(device_id, iid, udid, openudid)
- user_id = redisDyModel.rpop(uKey)
- response = douApi.get_user_info(user_id)
- if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']:
- print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
- PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
- redisDyModel.lpush(sKey, response.text)
- redisDyModel.lpush(rkey, json_data)
- else:
- print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
- PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
- scraper_time = dict_data['times']
- if scraper_time < 10:
- dict_data['times'] += 1
- redisDyModel.lpush(rkey, json.dumps(dict_data))
- time.sleep(1)
- except Exception as e:
- print(str_time + ' 错误:' + str(e))
- PrintLog.print(str_time + ' 错误:' + str(e))
- redisDyModel.lpush(rkey, json_data)
- continue
- if __name__ == '__main__':
- import warnings
- warnings.filterwarnings("ignore")
- print('===========爬取程序===========')
- PrintLog.print('===========爬取程序===========')
- threading_count = 10
- for i in range(0, threading_count):
- task = threading.Thread(target=scraper, name=i)
- task.start() # 准备就绪,等待cpu执行
|