123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- #!/usr/bin/python3
- # coding=utf-8
- # -*- coding: utf-8 -*-
- import time
- import json
- import threading
- import sys
- from rds_model.rds_live_promotions_request_list import RdsLivePromotionsRequestList
- from libs.live_promotions import LivePromotions
- from log.print_log import PrintLog
- start_time = time.time()
- def scrape(heat, request_data_str):
- rds_list = RdsLivePromotionsRequestList()
- request_data = json.loads(request_data_str)
- uid = request_data.get('uid')
- room_id = request_data.get('room_id')
- if (uid is None) or (room_id is None):
- PrintLog.print(
- time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n'
- + request_data_str
- )
- sys.exit(0)
- room_id = str(room_id)
- uid = str(uid)
- PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + str(heat) + ' ' + room_id + ' ' + uid)
- try:
- response = LivePromotions.get_data(uid, room_id)
- response_json = response.json()
- if response_json.get('promotions') is None:
- # rds_list.record_score(0)
- sys.exit(0)
- # 没有商品
- if len(response_json.get('promotions')) == 0:
- sys.exit(0)
- data = json.dumps({
- "data": response_json,
- "extra": {
- 'room_id': room_id,
- 'uid': uid,
- 'heat': heat,
- }
- })
- rds_list.record_score(1)
- rds_list.push_data_list(data)
- except Exception as e:
- rds_list.record_score(0)
- PrintLog.print(
- time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n'
- + request_data_str
- )
- sys.exit(0)
- if __name__ == "__main__":
- print("主方法开始执行")
- heat = int(sys.argv[1])
- # 并行线程数
- threading_count = int(sys.argv[2])
- rds = RdsLivePromotionsRequestList()
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 爬取直播商品队列长度为:' + str(heat) + ' ' + str(rds.get_len(heat)))
- while True:
- sys.stdout.flush()
- # 减去主线程
- active_count = threading.active_count() - 1
- increment = threading_count - active_count
- while increment > 0:
- sys.stdout.flush()
- request_data_str = rds.get_request_params(heat)
- if request_data_str is None:
- time.sleep(0.1)
- break
- task = threading.Thread(target=scrape, args=(heat, request_data_str))
- task.start() # 准备就绪,等待cpu执行
- increment = increment - 1
- current_time = time.time()
- if current_time - start_time > 1800:
- print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 主方法执行终止')
- sys.exit(0)
- time.sleep(0.01)
|