python ETL工具 pyetl

(编辑:jimmy 日期: 2025/5/4 浏览:2)

pyetl是一个纯python开发的ETL框架, 相比sqoop, datax 之类的ETL工具,pyetl可以对每个字段添加udf函数,使得数据转换过程更加灵活,相比专业ETL工具pyetl更轻量,纯python代码操作,更加符合开发人员习惯

安装

pip3 install pyetl

使用示例

数据库表之间数据同步

from pyetl import Task, DatabaseReader, DatabaseWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")
Task(reader, writer).start()

数据库表到hive表同步

from pyetl import Task, DatabaseReader, HiveWriter2
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = HiveWriter2("hive://localhost:10000/default", table_name="target")
Task(reader, writer).start()

数据库表同步es

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget")
Task(reader, writer).start()

原始表目标表字段名称不同,需要添加字段映射

添加

# 原始表source包含uuid,full_name字段
reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# 目标表target包含id,name字段
writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns配置目标表和原始表的字段映射关系
columns = {"id": "uuid", "name": "full_name"}
Task(reader, writer, columns=columns).start()

字段的udf映射,对字段进行规则校验、数据标准化、数据清洗等

# functions配置字段的udf映射,如下id转字符串,name去除前后空格
functions={"id": str, "name": lambda x: x.strip()}
Task(reader, writer, columns=columns, functions=functions).start()

继承Task类灵活扩展ETL任务

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

class NewTask(Task):
  reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
  writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
  
  def get_columns(self):
    """通过函数的方式生成字段映射配置,使用更灵活"""
    # 以下示例将数据库中的字段映射配置取出后转字典类型返回
    sql = "select columns from task where name='new_task'"
    columns = self.writer.db.read_one(sql)["columns"]
    return json.loads(columns)
   
  def get_functions(self):
    """通过函数的方式生成字段的udf映射"""
    # 以下示例将每个字段类型都转换为字符串
    return {col: str for col in self.columns}
   
  def apply_function(self, record):
    """数据流中对一整条数据的udf"""
    record["flag"] = int(record["id"]) % 2
    return record

  def before(self):
    """任务开始前要执行的操作, 如初始化任务表,创建目标表等"""
    sql = "create table destination_table(id int, name varchar(100))"
    self.writer.db.execute(sql)
  
  def after(self):
    """任务完成后要执行的操作,如更新任务状态等"""
    sql = "update task set status='done' where name='new_task'"
    self.writer.db.execute(sql)

NewTask().start()

目前已实现Reader和Writer列表

  Reader 介绍 DatabaseReader 支持所有关系型数据库的读取 FileReader 结构化文本数据读取,如csv文件 ExcelReader Excel表文件读取

Writer 介绍 DatabaseWriter 支持所有关系型数据库的写入 ElasticSearchWriter 批量写入数据到es索引 HiveWriter 批量插入hive表 HiveWriter2 Load data方式导入hive表(推荐) FileWriter 写入数据到文本文件

项目地址pyetl

总结

一句话新闻

Windows上运行安卓你用过了吗
在去年的5月23日,借助Intel Bridge Technology以及Intel Celadon两项技术的驱动,Intel为PC用户带来了Android On Windows(AOW)平台,并携手国内软件公司腾讯共同推出了腾讯应用宝电脑版,将Windows与安卓两大生态进行了融合,PC的使用体验随即被带入到了一个全新的阶段。