店播爬取Python脚本

douyin_user_scraper_ab.py 3.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import json
  2. from xlog03 import *
  3. from rds_model.db_redis import DbRedis
  4. from log.print_log import PrintLog
  5. import douyin_class
  6. import random
  7. import threading
  8. redisModel = DbRedis.connect()
  9. redisDyModel = DbRedis.douyin_connect()
  10. def get_scraper_proxy(proxy_key):
  11. proxy_dict = redisModel.hgetall(proxy_key)
  12. if (proxy_dict is None) or (len(proxy_dict) == 0):
  13. return
  14. proxy_list = list(proxy_dict)
  15. now = int(time.time())
  16. while True:
  17. proxy = random.choice(proxy_list)
  18. if proxy is None:
  19. return
  20. proxy_info = proxy_dict.get(proxy)
  21. if proxy_info is None:
  22. continue
  23. proxy_info = json.loads(proxy_info)
  24. expire_at = int(proxy_info.get('expired_at'))
  25. # 删除过期的代理
  26. if expire_at <= now:
  27. redisModel.hdel(proxy_key, proxy)
  28. proxy_list.remove(proxy)
  29. continue
  30. return proxy
  31. def scraper():
  32. rkey = 'DOUYIN_REGISTER_QUEUE'
  33. pKey = 'IpProxyHash'
  34. uKey = 'SL:List:Douyin:BarrageUserScrapeQueue'
  35. sKey = 'SL:List:Douyin:BarrageUserDataQueue'
  36. cur_time = int(time.time())
  37. while True:
  38. now_time = int(time.time())
  39. if (now_time - cur_time) > 270:
  40. print('thrend_' + threading.current_thread().name + ' finish')
  41. break
  42. rllen = redisDyModel.llen(rkey)
  43. if rllen == 0:
  44. time.sleep(2)
  45. continue
  46. ullen = redisDyModel.llen(uKey)
  47. if ullen == 0:
  48. time.sleep(2)
  49. continue
  50. json_data = redisDyModel.rpop(rkey)
  51. str_time = time.strftime("%H:%M:%S", time.localtime())
  52. try:
  53. proxy = get_scraper_proxy(pKey)
  54. if proxy:
  55. proxies = {
  56. "http": "http://" + proxy,
  57. "https": "http://" + proxy
  58. }
  59. else:
  60. time.sleep(2)
  61. continue
  62. print(str_time + ' 爬取代理:' + str(proxies))
  63. PrintLog.print(str_time + ' 爬取代理:' + str(proxies))
  64. douApi = douyin_class.DouYinApi('', proxies)
  65. dict_data = json.loads(json_data)
  66. device_id, iid, udid, openudid, cookie = dict_data['device_id'], dict_data['iid'], dict_data['uuid'], \
  67. dict_data['openudid'], dict_data['cookie']
  68. douApi.init_device_ids(device_id, iid, udid, openudid)
  69. user_id = redisDyModel.rpop(uKey)
  70. response = douApi.get_user_info(user_id)
  71. if len(response.text) > 0 and response.json()['status_code'] == 0 and response.json()['user']:
  72. print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
  73. PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取成功')
  74. redisDyModel.lpush(sKey, response.text)
  75. redisDyModel.lpush(rkey, json_data)
  76. else:
  77. print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
  78. PrintLog.print(str_time + ' user_id:' + str(user_id) + ' 爬取失败')
  79. scraper_time = dict_data['times']
  80. if scraper_time < 10:
  81. dict_data['times'] += 1
  82. redisDyModel.lpush(rkey, json.dumps(dict_data))
  83. time.sleep(1)
  84. except Exception as e:
  85. print(str_time + ' 错误:' + str(e))
  86. PrintLog.print(str_time + ' 错误:' + str(e))
  87. redisDyModel.lpush(rkey, json_data)
  88. continue
  89. if __name__ == '__main__':
  90. import warnings
  91. warnings.filterwarnings("ignore")
  92. print('===========爬取程序===========')
  93. PrintLog.print('===========爬取程序===========')
  94. threading_count = 10
  95. for i in range(0, threading_count):
  96. task = threading.Thread(target=scraper, name=i)
  97. task.start() # 准备就绪,等待cpu执行