#!/usr/bin/python3 # coding=utf-8 # -*- coding: utf-8 -*- import time import json import threading import sys from rds_model.rds_live_promotions_request_list import RdsLivePromotionsRequestList from libs.live_promotions import LivePromotions from log.print_log import PrintLog start_time = time.time() def scrape(heat, request_data_str): rds_list = RdsLivePromotionsRequestList() request_data = json.loads(request_data_str) uid = request_data.get('uid') room_id = request_data.get('room_id') if (uid is None) or (room_id is None): PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n' + request_data_str ) sys.exit(0) room_id = str(room_id) uid = str(uid) PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + str(heat) + ' ' + room_id + ' ' + uid) try: response = LivePromotions.get_data(uid, room_id) response_json = response.json() if response_json.get('promotions') is None: # rds_list.record_score(0) sys.exit(0) # 没有商品 if len(response_json.get('promotions')) == 0: sys.exit(0) data = json.dumps({ "data": response_json, "extra": { 'room_id': room_id, 'uid': uid, 'heat': heat, } }) rds_list.record_score(1) rds_list.push_data_list(data) except Exception as e: rds_list.record_score(0) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n' + request_data_str ) sys.exit(0) if __name__ == "__main__": print("主方法开始执行") heat = int(sys.argv[1]) # 并行线程数 threading_count = int(sys.argv[2]) rds = RdsLivePromotionsRequestList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 爬取直播商品队列长度为:' + str(heat) + ' ' + str(rds.get_len(heat))) while True: sys.stdout.flush() # 减去主线程 active_count = threading.active_count() - 1 increment = threading_count - active_count while increment > 0: sys.stdout.flush() request_data_str = rds.get_request_params(heat) if request_data_str is None: time.sleep(0.1) break task = threading.Thread(target=scrape, args=(heat, request_data_str)) task.start() # 准备就绪,等待cpu执行 increment = increment - 1 current_time = time.time() if current_time - start_time > 1800: print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 主方法执行终止') sys.exit(0) time.sleep(0.01)