excel导入hive的web工具 #导入MD文档图片#

家资是何物,积帙列梁梠。这篇文章主要讲述excel导入hive的web工具 #导入MD文档图片#相关的知识,希望能为你提供帮助。
实现功能目前有一部分数据是人工处理的,处理后放在一个excel文件中。现需要将这些excel数据导入到大数据平台,供其他部门使用。
本程序提供一个web页面,实现在web页面上传指定格式的excel文件,程序自动将该文件的数据导入到hive中。
模块Web模块(app.py):使用flask提供web交互、流程控制。
Service模块(service.py):流程管理模块,由web模块调用后,对后续的数据转换、数据导入、文件移动等流程,根据不同数据采用不同操作。
文件解析模块(pandas.py):使用Pandas解析excel。
数据导入模块(impala_utils.py):使用impyla将解析后的数据导入impala。实际上使用hdfs更优,但是开发时测试环境没有hdfs入口,所有使用impala。如果后续由其他类似需求,且实现价值比较大,可以在Excel解析模块中使用pandas将数据转换成parquet格式后,在数据导入模块使用hdfs上传至表对应目录。
文件工具模块(file_utils.py):本地文件操作模块,用于上传后清空本地临时目录、将临时目录下的文件移动到永久保存目录等。
【excel导入hive的web工具 #导入MD文档图片#】配置模块(CONF.py):用于配置各个表的临时目录、永久保存目录、表名、列名、表列数等配置信息。
项目结构

excel导入hive的web工具 #导入MD文档图片#

文章图片

Static:用于存放各种静态文件,如css、js、img等。目前里面有个back.jpg,用于做页面的背景。
Templates:用于存放各种模板文件,目前放了index.html和return.html。
Utils:存放各种工具类,除了app.py和service.py以外,其他模块都放在这个目录下。
Venv:虚拟环境文件。
App.py:flask主程序,用于。
Piplist:虚拟环境安装的包,每次安装包后,执行 pip freeze 手动维护文件内容。
Service.py:流程控制。
模块详细介绍 Web模块 Index.html
程序主页面,提供了以下功能:选择数据类型、指定导入分区、上传文件。
数据类型选项为单选框,name为dataType,value为daily和weekly。
< p> < input type="radio" name="dataType"value="https://www.songbingjia.com/android/daily"/> 每日数据 < input type="radio" name="dataType"value="https://www.songbingjia.com/android/weekly"/> 每周数据 < /p>

导入分区选项为文本框,name为partition,value默认值是当前日期(由flask后端渲染),可由用户指定其他分区。
< p> 导入分区:< input type="text" name="partition" value="https://www.songbingjia.com/android/{{ currentDate }}"> (yyyy-MM-dd) < /p>

上传文件和提交按钮如下:
```html/xml
< p>
< input type=file name=file>
< input type=submit value=https://www.songbingjia.com/android/点击上传>
< /p>
### Return.html 用户提交后,用于返回信息的页面文件。仅包含返回提示和返回链接。 **返回信息** result由flask渲染: ```html/xml < br/> {{ result }} < br/>

返回链接 部署后需要修改为实际链接。
```html/xml
< a rel=" nofollow" href=https://www.songbingjia.com/android/" http://127.0.0.1:5000/"> 返回< /a>
### App.py Flask的主程序。 **主要方法为upload_file**,该方法响应web主页面请求,并对get请求返回主页面;对post请求接收文件,并判断用户是否选择了正确的数据类型、分区,判断文件内容是否符合上传规定,如上传文件必须是xlsx格式的,excel必须只有一个sheet,excel列数与目标表列数必须一致。 **upload_file方法**的流程如下: ![image.png](https://s2.51cto.com/images/20210803/1627973944728927.png?x-oss-process=image/watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=) 1.判断请求是post请求还是get请求,如果是get请求,直接返回主页面;如果是post请求,则获取请求中的file、dataType、partition。 2.判断dataType是否为空,如果为空,说明用户没有选择文件类型,返回提示;否则判断Partition是否为有效的日期格式。 3.如果Partition不是有效时间数据类型,说明用户重新输入了分区信息,但是不是规范的日期格式,返回信息,让用户重新填写;如果是有效时间数据类型,则进入下一步。 4.每个数据会放在不同的临时目录,先判断文件是否是指定的数据格式(xlsx),根据传入的dataType,清空对应的临时目录;并将文件保存到临时目录。 5.检查临时目录下的excel文件是否只有一个sheet,且列数与对应数据类型的列数一致。如果不一致,则返回错误信息;如果一致,则异步调用service.py模块,进行后续导入工作,并向用户返回成功信息。**allowed_file**方法:用于判断上传文件的后缀是否符合指定格式。可在ALLOWED_EXTENSIONS中添加允许接收的文件格式。 **代码实现** ```python import os from flask import Flask, request, render_template from werkzeug.utils import secure_filename from utils import file_utils as fu, pandas_utils as pu, CONF import service import logging from concurrent.futures import ThreadPoolExecutorapp = Flask(__name__)# 接收的文件类型 ALLOWED_EXTENSIONS = set([\'xlsx\'])# 日志系统配置 handler = logging.FileHandler(\'app.log\', encoding=\'UTF-8\') # 设置日志文件,和字符编码 logging_format = logging.Formatter( \'%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s\') handler.setFormatter(logging_format) app.logger.addHandler(handler)executor = ThreadPoolExecutor(3)def allowed_file(filename): return \'.\' in filename and \\ filename.rsplit(\'.\', 1)[1] in ALLOWED_EXTENSIONS@app.route(\'/\', methods=[\'GET\', \'POST\']) def upload_file(): try: if request.method == \'POST\': file = request.files[\'file\'] dataType = request.form.get("dataType") partition = request.form.get(\'partition\') if dataType == None: return render_template(\'return.html\', result="未选择数据类型,请确认上传数据属于 每日数据,还是 每周数据 ") if fu.is_vaild_date(partition) == False: return render_template(\'return.html\', result="分区格式错误,请输入正确的日期格式。") if file and allowed_file(file.filename): #filename = secure_filename(file.filename) print(file.filename) filename = fu.check_filename(file.filename) # 清空保存目录,并检查上传的文件(目前只检查是否只有一个sheet) if dataType == "daily": fu.empty_dir(CONF.DAILY_TMP_DIR) file.save(os.path.join(CONF.DAILY_TMP_DIR, filename)) ret = pu.check(CONF.DAILY_TMP_DIR + filename, None, dataType) elif dataType == "weekly": fu.empty_dir(CONF.WEEKLY_TMP_DIR) file.save(os.path.join(CONF.WEEKLY_TMP_DIR, filename)) ret = pu.check(CONF.WEEKLY_TMP_DIR + filename, None, dataType)if ret: #service.import_function_dict[dataType](filename) executor.submit(service.import_function_dict[dataType], filename, partition) return render_template(\'return.html\', result="上传成功,后台正在导入,请稍后在**上查看数据。") else: return render_template(\'return.html\', result="1.该excel包含多个sheet,请上传只有一个sheet的excel。< br> 2.该excel列数与目标表列数不一致,请检查。")elif allowed_file(file.filename) == False: return render_template(\'return.html\', result="不是以xlsx结尾的文件") return render_template(\'index.html\',currentDate=fu.get_date()) except Exception as e: app.logger.exception(\'%s\', e) return e.__str__()if __name__ == \'__main__\': app.run(host=\'0.0.0.0\',port=5000)

Service模块 Service.py
该模块的import_ts_data_daily和import_ts_data_weekly方法用于集成pandas_utils、impala_utils、file_utils,完成将excel导入数仓,并将excel文件移动至永久保存目录下。
提供一个字典供app.py调用,key为app.py中接收到的dataType,value为对应数据类型导入的方法对象,这样在app.py中只需这样调用:
executor.submit(service.import_function_dict[dataType], filename, partition)

字典创建代码为:
#以字段作为函数调用的接口 import_function_dict = {"daily":import_ts_data_daily, "weekly":import_ts_data_weekly}

代码实现
from utils import file_utils as fu, impala_utils as iu, pandas_utils as pu ,CONF from pandas import pandas as pd import time#处理每日文件 def import_ts_data_daily(filename, partition):source_file = CONF.DAILY_TMP_DIR + filename target_path = CONF.DAILY_DATA_DIR table_name = CONF.DAILY_TABLE_NAMEdf: pd.DataFrame = pu.getData(source_file, None) iu.insert_daily_data(df, table_name, partition) fu.move_file(source_file, target_path)#处理每周文件 def import_ts_data_weekly(filename, partition):source_file = CONF.WEEKLY_TMP_DIR + filename target_path = CONF.WEEKLY_DATA_DIR table_name = CONF.WEEKLY_TABLE_NAMEdf: pd.DataFrame = pu.getData(source_file, None) iu.insert_weekly_data(df, table_name, partition) fu.move_file(source_file, target_path)#以字段作为函数调用的接口 import_function_dict = {"daily":import_ts_data_daily, "weekly":import_ts_data_weekly}

文件工具模块 file_utils.py
该模块包含以下方法:
move_file:将文件(绝对路径)移动到另一个目录下(绝对路径)。
该方法会判断目标路径下是否有重名文件,如果有,则会在当前文件名前拼接上当前日期,后拼接小括号加数字 “(1)”。
如:2021-02-07日数据(测试)(1).xlsx
empty_dir:清空目录,用于清空临时目录。
check_filename:用于文件名校验,防止用户上传的文件名包含类似 ‘../’ 的字符。该方法在app.py的upload_file方法中用到。
get_date:用于获取当前日期,格式为 yyyy-MM-dd。
is_vaild_date:用于判断一个字符串是否是一个日期。该方法在app.py的upload_file方法中用与判断用户输入的partition是否是正常日期格式。
代码实现
import os from pathlib import Path import datetime import time #将文件(绝对路径)移动到另一个目录下(绝对路径) def move_file(old_path_str : str, target_path_str : str): #获取文件名 tmp_str = old_path_str.split("/") file_full_name = tmp_str[(len(tmp_str)-1)].split(".")file_name = file_full_name[0] file_suffix = file_full_name[1]#判断原文件是否存在 print("原文件地址:" + old_path_str) if Path(old_path_str).exists() != True: print("原文件不存在") return#判断目标文件是否存在 target_path = Path(target_path_str + "/" + file_name + "." + file_suffix) cur_date = get_date() print(target_path) if target_path.exists(): i = 1 while target_path.exists(): #拼接新文件名 target_path = Path("{}/{}({}).{}".format(target_path_str, cur_date+file_name, i, file_suffix)) print(target_path) i = i + 1 os.rename(old_path_str, target_path)#获取当前时间 def get_date(): curr_time = datetime.datetime.now(); return curr_time.strftime("%Y-%m-%d")#校验时间格式是否规范 def is_vaild_date(date): try: if ":" in date: time.strptime(date, "%Y-%m-%d %H:%M:%S") else: time.strptime(date, "%Y-%m-%d") return True except: return False #清空目录 def empty_dir(path): os.system("DEL C:\\\\Users\\\\wuzixuan\\\\Desktop\\\\测试文件上传\\\\tmp\\\\* /S /Q") print("清空tmp目录")#文件名校验 def check_filename(filename : str): if filename.__contains__("./") or filename.__contains__("../") : filename = "tmp" return filename

文件解析模块 Pandas_utils.py
该模块用于解析文件,目前实现了使用Pandas解析单个sheet的excel。
如果后续业务需要,可添加解析多个sheet的excel、复杂格式的excel,也可添加将excel转换成csv、parquet等功能。
该模块包含以下方法:
Check:该方法用于校验excel是否只有一个sheet,且列数是否与目标表一致,返回boolean类型。该方法在app.py的upload_file方法中用于校验上传文件是否满足导入数仓的要求。
GetDate:该方法传入一个excel文件的绝对路径和sheet名,返回该excel对应sheet的dataframe数据。如果excel只有一个sheet,则sheetName留空。
代码实现
from pandas import pandas as pd from utils import CONF#获取要导入的数据 #如果只有一个sheet,则sheetName留空即可 def getData(fileName,sheetName): if sheetName != None: df:pd.DataFrame = pd.read_excel(fileName, sheet_name=sheetName) else: return pd.read_excel(fileName) return df#校验上传的excel是否只有一个sheet def check(fileName,sheetName,datatype): #判断sheet是否只有一个 if sheetName == None: df:pd.DataFrame = pd.read_excel(fileName, sheet_name=None) if df.keys().__len__() > 1 : return False #判断列数是否与目标表符合 df: pd.DataFrame = pd.read_excel(fileName) if df.columns.__len__() != CONF.TABLE_COLUMS_NUM[datatype]: return False return True

数据导入模块 Impala_utils.py
该模块用于将pandas的dataframe导入impala。
包含以下方法:
get_connect:获取链接。
close_connect:关闭连接。
insert_daily_data:将数据类型为daily的数据导入数仓。该方法会先获取impala连接,然后创建INSERT OVERWRITE TABLE VALUES语句,遍历传入的dataframe,把数据拼接在sql语句中,并执行最终sql,最后关闭连接。由于投诉数据量较小,所以可以使用这种方式,如果是数据量大的其他需求,应该使用将excel转换成csv或parquet,再写入hdfs的方式比较好。
insert_weekly_data:将数据类型为weekly的数据导入数仓。实现与insert_daily_data基本相同。
代码实现
from impala.dbapi import connect from pandas import pandas as pd#获取连接 def get_connect(): conn = connect(host=\'*****\', port=*****, user="***", password="***", auth_mechanism="PLAIN") return conn#关闭连接 def close_connect(conn): conn.close()# 导入每日数据到临时表 def insert_daily_data(df: pd.DataFrame, table_name, partition): conn = get_connect() cur = conn.cursor()sql_start = "INSERT OVERWRITE TABLE {} partition(dt = \'{}\') VALUES ".format(table_name,partition) sql_data = https://www.songbingjia.com/'\' # 遍历dataframe,将每行数据写成sql for i in df.index: row = df.loc[i] ts_dt = row.iloc[0] ts_source = row.iloc[1] user_name = row.iloc[2] menu1 = row.iloc[3] menu2 = row.iloc[4] menu3 = row.iloc[5] department = row.iloc[6] content = row.iloc[7] # 不是最后一行,加逗号 if i < len(df.index) - 1: sql_data += "(\'{0}\',\'{1}\',\'{2}\',\'{3}\',\'{4}\',\'{5}\',\'{6}\',\'{7}\'),".format(ts_dt, ts_source, user_name, menu1, menu2, menu3, department, content) # 最后一行不加逗号 else: sql_data += "(\'{0}\',\'{1}\',\'{2}\',\'{3}\',\'{4}\',\'{5}\',\'{6}\',\'{7}\')".format(ts_dt, ts_source, user_name, menu1, menu2, menu3, department, content) sql_insert = sql_start + sql_data cur.execute(sql_insert) close_connect(conn)# 导入每周数据到临时表 def insert_weekly_data(df: pd.DataFrame, table_name, partition): conn = get_connect() cur = conn.cursor()sql_start = "INSERT OVERWRITE TABLE {} partition(dt = \'{}\') VALUES ".format(table_name,partition) sql_data = https://www.songbingjia.com/'\' # 遍历dataframe,将每行数据写成sql for i in df.index: row = df.loc[i] ts_dt = row.iloc[0] is_late = row.iloc[1] dalei = row.iloc[2] ts_source = row.iloc[3] user_name = row.iloc[4] menu1 = row.iloc[5] menu2 = row.iloc[6] menu3 = row.iloc[7] department = row.iloc[8] status = row.iloc[9] content = row.iloc[10] ts_attitude = row.iloc[11] # 不是最后一行,加逗号 if i < len(df.index) - 1: sql_data += "(\'{0}\',\'{1}\',\'{2}\',\'{3}\',\'{4}\',\'{5}\',\'{6}\',\'{7}\',\'{8}\',\'{9}\',\'{10}\',\'{11}\'),".format(ts_dt, is_late, dalei, ts_source, user_name, menu1, menu2, menu3, department, status, content, ts_attitude) # 最后一行不加逗号 else: sql_data += "(\'{0}\',\'{1}\',\'{2}\',\'{3}\',\'{4}\',\'{5}\',\'{6}\',\'{7}\',\'{8}\',\'{9}\',\'{10}\',\'{11}\')".format(ts_dt, is_late, dalei, ts_source, user_name, menu1, menu2, menu3, department, status, content, ts_attitude) sql_insert = sql_start + sql_data cur.execute(sql_insert) close_connect(conn)

配置模块 CONF.py
配置文件,用于配置每个数据类型的临时目录、保存目录、表名、列名等信息。
以及将每个表的列数存放在字典TABLE_COLUMS_NUM中。该字段会在pandas_utils.py的check方法中用于判断excel的列数是否与目标表列数一致。
代码实现
DAILY_TMP_DIR = "C:/Users/wuzixuan/Desktop/测试文件上传/tmp/" DAILY_DATA_DIR = "C:/Users/wuzixuan/Desktop/测试文件上传/data/ts_daily/" DAILY_TABLE_NAME = "test.ts_zixuantest" DAILY_TABLE_S_NAME = "" DAILY_TABLE_COLUNMS = [\'ts_dt\', \'ts_source\', \'user_name\', \'menu1\', \'menu2\', \'menu3\', \'department\', \'content\']WEEKLY_TMP_DIR = "C:/Users/wuzixuan/Desktop/测试文件上传/tmp/" WEEKLY_DATA_DIR = "C:/Users/wuzixuan/Desktop/测试文件上传/data/ts_weekly/" WEEKLY_TABLE_NAME = "test.ts_weekly_zixuantest" WEEKLY_TABLE_S_NAME = "" WEEKLY_TABLE_COLUNMS = [\'ts_dt\', \'is_late\', \'dalei\', \'ts_source\', \'user_name\', \'menu1\', \'menu2\', \'menu3\', \'department\', \'status\', \'content\', \'ts_attitude\']TABLE_COLUMS_NUM = {"daily":DAILY_TABLE_COLUNMS.__len__(),"weekly":WEEKLY_TABLE_COLUNMS.__len__()}

使用说明
  1. 将excel数据修改为只有一个sheet,并且每日数据与每周数据的字段顺序分别按照以下格式保存:
    每日数据:
    excel导入hive的web工具 #导入MD文档图片#

    文章图片

    每周数据:
    excel导入hive的web工具 #导入MD文档图片#

    文章图片
  2. 使用浏览器访问部署后的连接。
    excel导入hive的web工具 #导入MD文档图片#

    文章图片
  3. 以每日数据为例:勾选每日数据,并确认导入分区(默认是当天分区,可不修改。如果是上传历史数据,如昨天没有上传,想要补传昨日数据,则可以将导入分区的日期修改为昨天)。
    excel导入hive的web工具 #导入MD文档图片#

    文章图片

    点击选择文件,在弹出框中选择要上传的文件,并点击“打开“:
    excel导入hive的web工具 #导入MD文档图片#

    文章图片

    然后点击上传:
    excel导入hive的web工具 #导入MD文档图片#

    文章图片
#导入MD文档图片#

    推荐阅读