店播爬取Python脚本

douyin_user_scraper_in.py 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import requests
  2. from xlog03 import *
  3. from rds_model.db_redis import DbRedis
  4. import random
  5. import json
  6. import threading
  7. from log.print_log import PrintLog
  8. redisModel = DbRedis.connect()
  9. redisDyModel = DbRedis.douyin_connect()
  10. def get_user_info(user_id, proxies):
  11. params = {
  12. 'user_id': user_id,
  13. 'version_code': '9.8.1',
  14. 'js_sdk_version': '1.47.2.2',
  15. 'app_name': 'aweme',
  16. 'app_version': '9.8.1',
  17. 'channel': 'App%20Store',
  18. 'mcc_mnc': 46002,
  19. 'aid': 1128,
  20. 'screen_width': 640,
  21. 'os_api': 18,
  22. 'ac': 'WIFI',
  23. 'os_version': '13.3.1',
  24. 'device_platform': 'iphone',
  25. 'build_number': 98107,
  26. 'device_type': 'iPhone8,4',
  27. 'address_book_access': 1
  28. }
  29. url = 'https://api3-normal-c-lf.amemv.com/aweme/v1/user/profile/self/?'
  30. douyin_url = parse_params(url, params)
  31. response = http_get(douyin_url, proxies)
  32. return response
  33. def parse_params(url, params):
  34. if params is None:
  35. params = {}
  36. if not url.endswith('?'):
  37. url = url + '?'
  38. common_params = parse.urlencode(params)
  39. douyin_url = url + common_params
  40. return douyin_url
  41. def http_get(douyin_url, proxies):
  42. if proxies:
  43. resp = requests.get(douyin_url, proxies=proxies, verify=False, timeout=10)
  44. else:
  45. resp = requests.get(douyin_url, verify=False, timeout=10)
  46. return resp
  47. def get_scraper_proxy(proxy_key):
  48. proxy_dict = redisModel.hgetall(proxy_key)
  49. if (proxy_dict is None) or (len(proxy_dict) == 0):
  50. return
  51. proxy_list = list(proxy_dict)
  52. now = int(time.time())
  53. while True:
  54. proxy = random.choice(proxy_list)
  55. if proxy is None:
  56. return
  57. proxy_info = proxy_dict.get(proxy)
  58. if proxy_info is None:
  59. continue
  60. proxy_info = json.loads(proxy_info)
  61. expire_at = int(proxy_info.get('expired_at'))
  62. # 删除过期的代理
  63. if expire_at <= now:
  64. redisModel.hdel(proxy_key, proxy)
  65. proxy_list.remove(proxy)
  66. continue
  67. return proxy
  68. def scraper():
  69. pKey = 'IpProxyHash'
  70. uKey = 'SL:List:Douyin:BarrageUserScrapeQueue'
  71. sKey = 'SL:List:Douyin:BarrageUserDataQueue'
  72. hKey = 'SL:Hash:Douyin:BarrageUserScrapeRecord'
  73. cur_time = int(time.time())
  74. while True:
  75. now_time = int(time.time())
  76. if (now_time - cur_time) > 270:
  77. print('thrend_' + threading.current_thread().name + ' finish')
  78. break
  79. ullen = redisDyModel.llen(uKey)
  80. if ullen == 0:
  81. time.sleep(2)
  82. continue
  83. str_time = time.strftime("%H:%M:%S", time.localtime())
  84. user_id = redisDyModel.rpop(uKey)
  85. try:
  86. proxy = get_scraper_proxy(pKey)
  87. if proxy:
  88. proxies = {
  89. "http": "http://" + proxy,
  90. "https": "http://" + proxy
  91. }
  92. else:
  93. time.sleep(2)
  94. continue
  95. print(str_time + ' 爬取代理:' + str(proxies))
  96. PrintLog.print(str_time + ' 爬取代理:' + str(proxies))
  97. response = get_user_info(user_id, proxies)
  98. if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']:
  99. print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
  100. PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
  101. redisDyModel.lpush(sKey, response.text)
  102. redisDyModel.hdel(hKey, user_id)
  103. else:
  104. print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
  105. PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
  106. record_json = redisDyModel.hget(hKey, user_id)
  107. if record_json:
  108. record_dict = json.loads(record_json)
  109. if record_dict['times'] < 10:
  110. record_dict['times'] += 1
  111. redisDyModel.hset(hKey, user_id, json.dumps(record_dict))
  112. else:
  113. redisDyModel.hdel(hKey, user_id)
  114. else:
  115. record_dict = {'times': 1}
  116. redisDyModel.hset(hKey, user_id, json.dumps(record_dict))
  117. time.sleep(1)
  118. except Exception as e:
  119. print(str_time + ' 错误:' + str(e))
  120. PrintLog.print(str_time + ' 错误:' + str(e))
  121. redisDyModel.lpush(uKey, user_id)
  122. continue
  123. if __name__ == '__main__':
  124. import warnings
  125. warnings.filterwarnings("ignore")
  126. print('===========爬取程序===========')
  127. PrintLog.print('===========爬取程序===========')
  128. threading_count = 50
  129. for i in range(0, threading_count):
  130. task = threading.Thread(target=scraper, name=i)
  131. task.start() # 准备就绪,等待cpu执行