123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import requests
- from xlog03 import *
- from rds_model.db_redis import DbRedis
- import random
- import json
- import threading
- from log.print_log import PrintLog
- redisModel = DbRedis.connect()
- redisDyModel = DbRedis.douyin_connect()
- def get_user_info(user_id, proxies):
- params = {
- 'user_id': user_id,
- 'version_code': '9.8.1',
- 'js_sdk_version': '1.47.2.2',
- 'app_name': 'aweme',
- 'app_version': '9.8.1',
- 'channel': 'App%20Store',
- 'mcc_mnc': 46002,
- 'aid': 1128,
- 'screen_width': 640,
- 'os_api': 18,
- 'ac': 'WIFI',
- 'os_version': '13.3.1',
- 'device_platform': 'iphone',
- 'build_number': 98107,
- 'device_type': 'iPhone8,4',
- 'address_book_access': 1
- }
- url = 'https://api3-normal-c-lf.amemv.com/aweme/v1/user/profile/self/?'
- douyin_url = parse_params(url, params)
- response = http_get(douyin_url, proxies)
- return response
- def parse_params(url, params):
- if params is None:
- params = {}
- if not url.endswith('?'):
- url = url + '?'
- common_params = parse.urlencode(params)
- douyin_url = url + common_params
- return douyin_url
- def http_get(douyin_url, proxies):
- if proxies:
- resp = requests.get(douyin_url, proxies=proxies, verify=False, timeout=10)
- else:
- resp = requests.get(douyin_url, verify=False, timeout=10)
- return resp
- def get_scraper_proxy(proxy_key):
- proxy_dict = redisModel.hgetall(proxy_key)
- if (proxy_dict is None) or (len(proxy_dict) == 0):
- return
- proxy_list = list(proxy_dict)
- now = int(time.time())
- while True:
- proxy = random.choice(proxy_list)
- if proxy is None:
- return
- proxy_info = proxy_dict.get(proxy)
- if proxy_info is None:
- continue
- proxy_info = json.loads(proxy_info)
- expire_at = int(proxy_info.get('expired_at'))
- # 删除过期的代理
- if expire_at <= now:
- redisModel.hdel(proxy_key, proxy)
- proxy_list.remove(proxy)
- continue
- return proxy
- def scraper():
- pKey = 'IpProxyHash'
- uKey = 'SL:List:Douyin:BarrageUserScrapeQueue'
- sKey = 'SL:List:Douyin:BarrageUserDataQueue'
- hKey = 'SL:Hash:Douyin:BarrageUserScrapeRecord'
- cur_time = int(time.time())
- while True:
- now_time = int(time.time())
- if (now_time - cur_time) > 270:
- print('thrend_' + threading.current_thread().name + ' finish')
- break
- ullen = redisDyModel.llen(uKey)
- if ullen == 0:
- time.sleep(2)
- continue
- str_time = time.strftime("%H:%M:%S", time.localtime())
- user_id = redisDyModel.rpop(uKey)
- try:
- proxy = get_scraper_proxy(pKey)
- if proxy:
- proxies = {
- "http": "http://" + proxy,
- "https": "http://" + proxy
- }
- else:
- time.sleep(2)
- continue
- print(str_time + ' 爬取代理:' + str(proxies))
- PrintLog.print(str_time + ' 爬取代理:' + str(proxies))
- response = get_user_info(user_id, proxies)
- if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']:
- print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
- PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
- redisDyModel.lpush(sKey, response.text)
- redisDyModel.hdel(hKey, user_id)
- else:
- print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
- PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
- record_json = redisDyModel.hget(hKey, user_id)
- if record_json:
- record_dict = json.loads(record_json)
- if record_dict['times'] < 10:
- record_dict['times'] += 1
- redisDyModel.hset(hKey, user_id, json.dumps(record_dict))
- else:
- redisDyModel.hdel(hKey, user_id)
- else:
- record_dict = {'times': 1}
- redisDyModel.hset(hKey, user_id, json.dumps(record_dict))
- time.sleep(1)
- except Exception as e:
- print(str_time + ' 错误:' + str(e))
- PrintLog.print(str_time + ' 错误:' + str(e))
- redisDyModel.lpush(uKey, user_id)
- continue
- if __name__ == '__main__':
- import warnings
- warnings.filterwarnings("ignore")
- print('===========爬取程序===========')
- PrintLog.print('===========爬取程序===========')
- threading_count = 50
- for i in range(0, threading_count):
- task = threading.Thread(target=scraper, name=i)
- task.start() # 准备就绪,等待cpu执行
|