#!/usr/bin/python3 #coding=utf-8 #-*- coding: UTF-8 -* from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ES: def __init__(self): self.index = 'sc_kwai_barrages' self.type = 'sc_kwai_barrage' self.connection = Elasticsearch('es-cn-0pp19srqi00047s72.elasticsearch.aliyuncs.com:9200', http_auth=('elastic', 'Kuxuan666!')) def batch_insert(self, batch_data): actions = [] # 收到弹幕 if batch_data.get('user'): for data in batch_data.get('user'): action = { "_index": self.index, "_type": self.type, "_source": { "live_stream_id": batch_data.get('live_stream_id'), "type": 1, "principal_id": data.get('user').get('principalId'), "user_name": data.get('user').get('userName'), "content": data.get('content'), "created_at": batch_data.get('created_at') } } actions.append(action) # 收到礼物 if batch_data.get('gift'): for data in batch_data.get('gift'): action = { "_index": self.index, "_type": self.type, "_source": { "live_stream_id": batch_data.get('live_stream_id'), "type": 2, "principal_id": data.get('user').get('principalId'), "user_name": data.get('user').get('userName'), "gift_id": data.get('giftId'), "batch_size": data.get('batchSize'), "combo_count": data.get('comboCount'), "rank": data.get('rank'), "created_at": batch_data.get('created_at') } } actions.append(action) # 收到点亮 if batch_data.get('like'): for data in batch_data.get('like'): action = { "_index": self.index, "_type": self.type, "_source": { "live_stream_id": batch_data.get('live_stream_id'), "type": 3, "principal_id": data.get('user').get('principalId'), "user_name": data.get('user').get('userName'), "created_at": batch_data.get('created_at') } } actions.append(action) if len(actions) == 0: return res, _ = bulk(self.connection, actions, index = self.index, raise_on_error=True)