(编辑:jimmy 日期: 2025/10/29 浏览:2)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time : 2019/8/12
# @Author : Zhang Fan
# @Desc : Library
# @File : MyDatabases.py
# @Update : 2019/8/23
# *************************************
import elasticsearch
import phoenixdb
import pysolr
import pymysql
class MyELS(object):
"""
===================================================================
===================== MyELS =========================
===================================================================
"""
def __init__(self):
self.els_conn = None
def connect_to_els(self, host, port):
"""
连接到ElasticSearch服务器.
"""
self.els_conn = elasticsearch.Elasticsearch([{'host': host, 'port': port}])
print('Executing : Connect To Elastic Search | %s' % self.els_conn)
def get_els_data(self, query, index):
"""
获取ElasticSearch数据
"""
print('Executing : Search | %s' % query)
try:
rst = self.els_conn.search(index=index, q=query)
return rst['hits']
except Exception as e:
print('Elastic Search Error | %s' % e)
raise Exception(e)
class MyPhoenix(object):
"""
===================================================================
===================== MyPhoenix ======================
===================================================================
"""
def __init__(self):
self.phoenix_conn = None
self.phoenix_cursor = None
def connect_to_phoenix(self, host, port=8765):
"""
连接到phoenix服务器
"""
address = 'http://{0}:{1}/'.format(host, port)
print('Executing : Connect To Phoenix | %s' % address)
self.phoenix_conn = phoenixdb.connect(address, autocommit=True)
self.phoenix_cursor = self.phoenix_conn.cursor()
def set_schema(self, sql, schema):
"""
设置schema
"""
pre_sub, sub, fol_sub = sql.upper().partition('FROM')
fol_sub = ' ' + schema + '.' + fol_sub.strip()
new_sql = ''.join([pre_sub, sub, fol_sub])
return new_sql
def execute_phoenix_sql(self, sql):
"""
执行sql语句
"""
# sql = self.set_schema(sql, schema)
print('Executing : Execute | %s' % sql)
self.phoenix_cursor.execute(sql)
def get_from_phoenix(self, sql):
"""
获取phoenix数据
"""
# sql = self.set_schema(sql, schema)
print('Executing : Query | %s' % sql)
try:
self.phoenix_cursor.execute(sql)
except Exception as e:
print('Phoenix Error | %s' % e)
raise Exception(e)
return self.phoenix_cursor.fetchall()
def disconnect_from_phoenix(self):
"""
断开phoenix连接
"""
print('Executing : Disconnect From HBase')
self.phoenix_cursor.close()
self.phoenix_conn.close()
class MySolr(object):
"""
===================================================================
===================== MySolr =========================
===================================================================
"""
def __init__(self):
self.solr_conn = None
self.base_url = None
def connect_to_solr(self, address, selector):
"""连接到solr服务器.
"""
self.base_url = 'http://{0}/solr/{1}/'.format(address, selector)
self.solr_conn = pysolr.Solr(self.base_url)
print('Executing : Connect To Solr | %s' % self.base_url)
def get_solr_data(self, query):
"""
获取solr数据
"""
results = list()
print('Executing : Search | %s' % query)
try:
items = self.solr_conn.search(query)
for item in items:
results.append(item)
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
return results
def add_solr_data(self, data):
"""
添加solr数据
"""
print('Executing : add | %s' % data)
try:
self.solr_conn.add([data])
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
def del_solr_byId(self, data):
"""
删除solr数据
"""
print('Executing : del | %s' % data)
try:
self.solr_conn.delete(id=data)
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
if __name__ == '__main__':
print('This is test.')
ms = MySolr()
me = MyELS()
mp = MyPhoenix()
以上就是Python 调用 ES、Solr、Phoenix的示例代码的详细内容,更多关于Python 调用 ES、Solr、Phoenix的资料请关注其它相关文章!