店播爬取Python脚本

live_promotion_detail_v1_sale_scraper.py 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/python3
  2. # coding=utf-8
  3. # -*- coding: utf-8 -*-
  4. import time
  5. import json
  6. import threading
  7. import sys
  8. import random
  9. from rds_model.rds_live_commodity_detail_request_list import RdsLiveCommodityDetailRequestList
  10. from libs.live_commodity_detail_v1 import LiveCommodityDetailV1
  11. from libs.mysql_dy_live_ifsolt import MysqlDyLiveCommodity
  12. from log.print_log import PrintLog
  13. start_time = time.time()
  14. def scrape(request_data, heat):
  15. rds_list = RdsLiveCommodityDetailRequestList()
  16. request_split = request_data.split('@')
  17. if len(request_split) < 5:
  18. print('数据格式不对' + request_data)
  19. sys.exit(0)
  20. promotion_id = request_split[0]
  21. product_id = request_split[1]
  22. uid = request_split[2]
  23. room_id = request_split[3]
  24. exec_time = int(request_split[4])
  25. if int(time.time()) < exec_time:
  26. sp_ns = exec_time - int(time.time())
  27. time.sleep(sp_ns)
  28. PrintLog.print(
  29. time.strftime("%H:%M:%S", time.localtime()) + ' '
  30. + time.strftime("%H:%M:%S", time.localtime(exec_time)) + ' '
  31. + str(heat) + ' ' + promotion_id
  32. )
  33. try:
  34. commodity_detail = LiveCommodityDetailV1.get_data(product_id)
  35. if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False :
  36. rds_list.record_v1_score(0)
  37. PrintLog.print(
  38. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据为空 ' + promotion_id
  39. )
  40. if heat==0:
  41. if random.randint(0, 100) > 0:
  42. rds_list.add_request_params(request_data, heat)
  43. print('首次失败重新插入 ' + request_data)
  44. sys.exit(0)
  45. if heat>0:
  46. if random.randint(0, 9) > 0:
  47. rds_list.add_request_params(request_data, heat)
  48. print(str(heat) + '非首次失败重新插入 ' + request_data)
  49. sys.exit(0)
  50. if heat < 2:
  51. goodsDb = MysqlDyLiveCommodity()
  52. pix = str(uid)[-1]
  53. goodsInfo = goodsDb.get_goods_info(room_id, product_id, pix)
  54. dataid,sold_out_at,revise_price,created_at = goodsInfo
  55. if revise_price is not None:
  56. sys.exit(0)
  57. if sold_out_at == '0000-00-00 00:00:00' or sold_out_at == '1970-01-01 08:00:00':
  58. add_t = 300
  59. news_time = int(time.time()) + add_t
  60. next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time)
  61. rds_list.push_middle_list(next_rds)
  62. else :
  63. add_t = 1200
  64. news_time = int(time.time()) + add_t
  65. next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time)
  66. rds_list.push_last_list(next_rds)
  67. if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False:
  68. sys.exit(0)
  69. data = json.dumps({
  70. "data": commodity_detail,
  71. "extra": {
  72. 'room_id': room_id,
  73. 'promotion_id': promotion_id,
  74. 'product_id': product_id,
  75. 'uid': uid,
  76. 'heat': heat
  77. }
  78. })
  79. rds_list.record_v1_score(1)
  80. rds_list.push_v1_data_list(data)
  81. except Exception as e:
  82. rds_list.record_v1_score(0)
  83. rds_list.add_request_params(request_data, heat)
  84. PrintLog.print(
  85. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n'
  86. + request_data
  87. )
  88. sys.exit(0)
  89. if __name__ == "__main__":
  90. import warnings
  91. warnings.filterwarnings("ignore")
  92. heat = int(sys.argv[1])
  93. threading_count = int(sys.argv[2])
  94. rds = RdsLiveCommodityDetailRequestList()
  95. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(heat) + ' 开始执行,商品销量队列长度:' + str(rds.get_len(heat)))
  96. while True:
  97. sys.stdout.flush()
  98. # 减去主线程
  99. active_count = threading.active_count() - 1
  100. increment = threading_count - active_count
  101. while increment > 0:
  102. sys.stdout.flush()
  103. request_data = rds.get_request_params(heat)
  104. if request_data is None:
  105. time.sleep(0.1)
  106. break
  107. task = threading.Thread(target=scrape, args=(request_data, heat))
  108. task.start() # 准备就绪,等待cpu执行
  109. increment = increment - 1
  110. current_time = time.time()
  111. if current_time - start_time > 1800:
  112. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 主方法执行终止')
  113. sys.exit(0)
  114. time.sleep(0.01)