店播爬取Python脚本

live_promotion_detail_v1_sale_scraper.py 4.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. #!/usr/bin/python3
  2. # coding=utf-8
  3. # -*- coding: utf-8 -*-
  4. import time
  5. import json
  6. import threading
  7. import sys
  8. import random
  9. from rds_model.rds_live_commodity_detail_request_list import RdsLiveCommodityDetailRequestList
  10. from libs.live_commodity_detail_v1 import LiveCommodityDetailV1
  11. from libs.mysql_dy_live_ifsolt import MysqlDyLiveCommodity
  12. from log.print_log import PrintLog
  13. start_time = time.time()
  14. def scrape(heat):
  15. while True:
  16. rds = RdsLiveCommodityDetailRequestList()
  17. request_data = rds.get_request_params(heat)
  18. if request_data is None:
  19. time.sleep(0.1)
  20. continue
  21. rds_list = RdsLiveCommodityDetailRequestList()
  22. request_split = request_data.split('@')
  23. if len(request_split) < 5:
  24. print('数据格式不对' + request_data)
  25. continue
  26. promotion_id = request_split[0]
  27. product_id = request_split[1]
  28. uid = request_split[2]
  29. room_id = request_split[3]
  30. exec_time = int(request_split[4])
  31. if int(time.time()) < exec_time:
  32. sp_ns = exec_time - int(time.time())
  33. time.sleep(sp_ns)
  34. PrintLog.print(
  35. time.strftime("%H:%M:%S", time.localtime()) + ' '
  36. + time.strftime("%H:%M:%S", time.localtime(exec_time)) + ' '
  37. + str(heat) + ' ' + promotion_id
  38. )
  39. try:
  40. commodity_detail = LiveCommodityDetailV1.get_data(product_id)
  41. if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False :
  42. rds_list.record_v1_score(0)
  43. PrintLog.print(
  44. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据为空 ' + promotion_id
  45. )
  46. if heat==0:
  47. if random.randint(0, 100) > 0:
  48. rds_list.add_request_params(request_data, heat)
  49. print('首次失败重新插入 ' + request_data)
  50. continue
  51. if heat>0:
  52. if random.randint(0, 9) > 0:
  53. rds_list.add_request_params(request_data, heat)
  54. print(str(heat) + '非首次失败重新插入 ' + request_data)
  55. continue
  56. if heat < 2:
  57. goodsDb = MysqlDyLiveCommodity()
  58. pix = str(uid)[-1]
  59. goodsInfo = goodsDb.get_goods_info(room_id, product_id, pix)
  60. dataid,sold_out_at,revise_price,created_at = goodsInfo
  61. if revise_price is not None:
  62. continue
  63. if sold_out_at == '0000-00-00 00:00:00' or sold_out_at == '1970-01-01 08:00:00':
  64. add_t = 300
  65. news_time = int(time.time()) + add_t
  66. next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time)
  67. rds_list.push_middle_list(next_rds)
  68. else :
  69. add_t = 1200
  70. news_time = int(time.time()) + add_t
  71. next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time)
  72. rds_list.push_last_list(next_rds)
  73. if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False:
  74. continue
  75. data = json.dumps({
  76. "data": commodity_detail,
  77. "extra": {
  78. 'room_id': room_id,
  79. 'promotion_id': promotion_id,
  80. 'product_id': product_id,
  81. 'uid': uid,
  82. 'heat': heat
  83. }
  84. })
  85. rds_list.record_v1_score(1)
  86. rds_list.push_v1_data_list(data)
  87. except Exception as e:
  88. rds_list.record_v1_score(0)
  89. rds_list.add_request_params(request_data, heat)
  90. PrintLog.print(
  91. time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n'
  92. + request_data
  93. )
  94. if __name__ == "__main__":
  95. import warnings
  96. warnings.filterwarnings("ignore")
  97. heat = int(sys.argv[1])
  98. threading_count = int(sys.argv[2])
  99. rds = RdsLiveCommodityDetailRequestList()
  100. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(heat) + ' 开始执行,商品销量队列长度:' + str(rds.get_len(heat)))
  101. while True:
  102. sys.stdout.flush()
  103. # 减去主线程
  104. active_count = threading.active_count() - 1
  105. increment = threading_count - active_count
  106. if increment > 0:
  107. sys.stdout.flush()
  108. task = threading.Thread(target=scrape, args=(heat,))
  109. print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 启动线程' + str(increment))
  110. task.start() # 准备就绪,等待cpu执行
  111. current_time = time.time()
  112. time.sleep(0.01)