#!/usr/bin/python3 # coding=utf-8 # -*- coding: utf-8 -*- import time import json import threading import sys import random from rds_model.rds_live_commodity_detail_request_list import RdsLiveCommodityDetailRequestList from libs.live_commodity_detail_v1 import LiveCommodityDetailV1 from libs.mysql_dy_live_ifsolt import MysqlDyLiveCommodity from log.print_log import PrintLog start_time = time.time() def scrape(heat): while True: rds = RdsLiveCommodityDetailRequestList() request_data = rds.get_request_params(heat) if request_data is None: time.sleep(0.1) continue rds_list = RdsLiveCommodityDetailRequestList() request_split = request_data.split('@') if len(request_split) < 5: print('数据格式不对' + request_data) continue promotion_id = request_split[0] product_id = request_split[1] uid = request_split[2] room_id = request_split[3] exec_time = int(request_split[4]) if int(time.time()) < exec_time: sp_ns = exec_time - int(time.time()) time.sleep(sp_ns) PrintLog.print( time.strftime("%H:%M:%S", time.localtime()) + ' ' + time.strftime("%H:%M:%S", time.localtime(exec_time)) + ' ' + str(heat) + ' ' + promotion_id ) try: commodity_detail = LiveCommodityDetailV1.get_data(product_id) if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False : rds_list.record_v1_score(0) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 数据为空 ' + promotion_id ) if heat==0: if random.randint(0, 100) > 0: rds_list.add_request_params(request_data, heat) print('首次失败重新插入 ' + request_data) continue if heat>0: if random.randint(0, 9) > 0: rds_list.add_request_params(request_data, heat) print(str(heat) + '非首次失败重新插入 ' + request_data) continue if heat < 2: goodsDb = MysqlDyLiveCommodity() pix = str(uid)[-1] goodsInfo = goodsDb.get_goods_info(room_id, product_id, pix) dataid,sold_out_at,revise_price,created_at = goodsInfo if revise_price is not None: continue if sold_out_at == '0000-00-00 00:00:00' or sold_out_at == '1970-01-01 08:00:00': add_t = 300 news_time = int(time.time()) + add_t next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time) rds_list.push_middle_list(next_rds) else : add_t = 1200 news_time = int(time.time()) + add_t next_rds = promotion_id + '@' + product_id + '@' + uid + '@' + room_id + '@' + str(news_time) rds_list.push_last_list(next_rds) if (commodity_detail is None) or (commodity_detail == '') or ('real_sell_num' in commodity_detail)==False: continue data = json.dumps({ "data": commodity_detail, "extra": { 'room_id': room_id, 'promotion_id': promotion_id, 'product_id': product_id, 'uid': uid, 'heat': heat } }) rds_list.record_v1_score(1) rds_list.push_v1_data_list(data) except Exception as e: rds_list.record_v1_score(0) rds_list.add_request_params(request_data, heat) PrintLog.print( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '抛出异常!' + str(e) + '\n' + request_data ) if __name__ == "__main__": import warnings warnings.filterwarnings("ignore") heat = int(sys.argv[1]) threading_count = int(sys.argv[2]) rds = RdsLiveCommodityDetailRequestList() print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' ' + str(heat) + ' 开始执行,商品销量队列长度:' + str(rds.get_len(heat))) while True: sys.stdout.flush() # 减去主线程 active_count = threading.active_count() - 1 increment = threading_count - active_count if increment > 0: sys.stdout.flush() task = threading.Thread(target=scrape, args=(heat,)) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 启动线程' + str(increment)) task.start() # 准备就绪,等待cpu执行 current_time = time.time() time.sleep(0.01)