#!/usr/bin/python3 # coding=utf-8 # -*- coding: utf-8 -*- import time import json import threading import sys from rds_model.rds_live_promotions_request_list import RdsLivePromotionsRequestList from libs.live_promotions import LivePromotions from log.print_log import PrintLog start_time = time.time() def scrape(heat): while True: rds = RdsLivePromotionsRequestList() request_data_str = rds.get_request_params(heat) if request_data_str is None: time.sleep(1) continue rds_list = RdsLivePromotionsRequestList() request_data = json.loads(request_data_str) uid = request_data.get('uid') room_id = request_data.get('room_id') if (uid is None) or (room_id is None): PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '请求数据数据异常!' + '\n' + request_data_str ) continue room_id = str(room_id) uid = str(uid) PrintLog.print(time.strftime("%H:%M:%S", time.localtime()) + ' ' + str(heat) + ' ' + room_id + ' ' + uid) try: response = LivePromotions.get_data(uid, room_id) response_json = response.json() if response_json.get('promotions') is None: # rds_list.record_score(0) continue # 没有商品 if len(response_json.get('promotions')) == 0: continue data = json.dumps({ "data": response_json, "extra": { 'room_id': room_id, 'uid': uid, 'heat': heat, } }) rds_list.record_score(1) rds_list.push_data_list(data) except Exception as e: rds_list.record_score(0) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n' + request_data_str ) if __name__ == "__main__": print("主方法开始执行") heat = int(sys.argv[1]) # 并行线程数 threading_count = int(sys.argv[2]) rds = RdsLivePromotionsRequestList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 爬取直播商品队列长度为:' + str(heat) + ' ' + str(rds.get_len(heat))) while True: sys.stdout.flush() # 减去主线程 active_count = threading.active_count() - 1 increment = threading_count - active_count if increment > 0: sys.stdout.flush() task = threading.Thread(target=scrape, args=(heat,)) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 启动线程' + str(increment)) task.start() # 准备就绪,等待cpu执行 current_time = time.time() time.sleep(0.01)