店播爬取Python脚本

douyin_user_scraper_in.py 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import requests
  2. from xlog03 import *
  3. from rds_model.db_redis import DbRedis
  4. import random
  5. import json
  6. import threading
  7. from log.print_log import PrintLog
  8. redisModel = DbRedis.connect()
  9. redisDyModel = DbRedis.douyin_connect()
  10. def get_user_info(user_id, proxies):
  11. params = {
  12. 'user_id': user_id,
  13. 'version_code': '9.8.1',
  14. 'js_sdk_version': '1.47.2.2',
  15. 'app_name': 'aweme',
  16. 'app_version': '9.8.1',
  17. 'channel': 'App%20Store',
  18. 'mcc_mnc': 46002,
  19. 'aid': 1128,
  20. 'screen_width': 640,
  21. 'os_api': 18,
  22. 'ac': 'WIFI',
  23. 'os_version': '13.3.1',
  24. 'device_platform': 'iphone',
  25. 'build_number': 98107,
  26. 'device_type': 'iPhone8,4',
  27. 'address_book_access': 1
  28. }
  29. url = 'https://api3-normal-c-lf.amemv.com/aweme/v1/user/profile/self/?'
  30. douyin_url = parse_params(url, params)
  31. response = http_get(douyin_url, proxies)
  32. return response
  33. def parse_params(url, params):
  34. if params is None:
  35. params = {}
  36. if not url.endswith('?'):
  37. url = url + '?'
  38. common_params = parse.urlencode(params)
  39. douyin_url = url + common_params
  40. return douyin_url
  41. def http_get(douyin_url, proxies):
  42. if proxies:
  43. resp = requests.get(douyin_url, proxies=proxies, verify=False, timeout=10)
  44. else:
  45. resp = requests.get(douyin_url, verify=False, timeout=10)
  46. return resp
  47. def get_scraper_proxy(proxy_key):
  48. proxy_dict = redisModel.hgetall(proxy_key)
  49. if (proxy_dict is None) or (len(proxy_dict) == 0):
  50. return
  51. proxy_list = list(proxy_dict)
  52. now = int(time.time())
  53. while True:
  54. proxy = random.choice(proxy_list)
  55. if proxy is None:
  56. return
  57. proxy_info = proxy_dict.get(proxy)
  58. if proxy_info is None:
  59. continue
  60. proxy_info = json.loads(proxy_info)
  61. expire_at = int(proxy_info.get('expired_at'))
  62. # 删除过期的代理
  63. if expire_at <= now:
  64. redisModel.hdel(proxy_key, proxy)
  65. proxy_list.remove(proxy)
  66. continue
  67. return proxy
  68. def scraper():
  69. pKey = 'IpProxyHash'
  70. uKey = 'SL:List:Douyin:BarrageUserScrapeQueue'
  71. sKey = 'SL:List:Douyin:BarrageUserDataQueue'
  72. hKey = 'SL:Hash:Douyin:BarrageUserScrapeRecord'
  73. cur_time = int(time.time())
  74. while True:
  75. now_time = int(time.time())
  76. if (now_time - cur_time) > 270:
  77. print('thrend_' + threading.current_thread().name + ' finish')
  78. break
  79. user_id = redisDyModel.rpop(uKey)
  80. if not user_id:
  81. time.sleep(2)
  82. continue
  83. str_time = time.strftime("%H:%M:%S", time.localtime())
  84. try:
  85. proxy = get_scraper_proxy(pKey)
  86. if proxy:
  87. proxies = {
  88. "http": "http://" + proxy,
  89. "https": "http://" + proxy
  90. }
  91. else:
  92. time.sleep(2)
  93. continue
  94. print(str_time + ' 爬取代理:' + str(proxies))
  95. PrintLog.print(str_time + ' 爬取代理:' + str(proxies))
  96. response = get_user_info(user_id, proxies)
  97. if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']:
  98. print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
  99. PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
  100. redisDyModel.lpush(sKey, response.text)
  101. redisDyModel.hdel(hKey, user_id)
  102. else:
  103. print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
  104. PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
  105. record_json = redisDyModel.hget(hKey, user_id)
  106. if record_json:
  107. record_dict = json.loads(record_json)
  108. if record_dict['times'] < 10:
  109. record_dict['times'] += 1
  110. redisDyModel.hset(hKey, user_id, json.dumps(record_dict))
  111. else:
  112. redisDyModel.hdel(hKey, user_id)
  113. else:
  114. record_dict = {'times': 1}
  115. redisDyModel.hset(hKey, user_id, json.dumps(record_dict))
  116. time.sleep(1)
  117. except Exception as e:
  118. print(str_time + ' 错误:' + str(e))
  119. PrintLog.print(str_time + ' 错误:' + str(e))
  120. redisDyModel.lpush(uKey, user_id)
  121. continue
  122. if __name__ == '__main__':
  123. import warnings
  124. warnings.filterwarnings("ignore")
  125. print('===========爬取程序===========')
  126. PrintLog.print('===========爬取程序===========')
  127. threading_count = 50
  128. for i in range(0, threading_count):
  129. task = threading.Thread(target=scraper, name=i)
  130. task.start() # 准备就绪,等待cpu执行