05 DataBricks遍历S3容器

背景:为介绍实战项目,本文先介绍一个简单实例。目标:将csv数据文件同步到Databricks表中
连接s3 当前s3目录
  • flag
    • 主题01_时间戳1.csv
    • 主题02_时间戳2.csv
  • request
    • file_时间戳1
      • data_file.csv
    • file_时间戳2
      • data_file.csv
falg文件中存储的是每个主题文件的csv,每个主题csv中有所需表的列表、以及当前增量行数。request中根据主题文件时间戳可以找到对于时间戳的文件夹。文件夹中包含真正的数据文件。(时间戳精确到秒,所以需要根据flag文件的时间戳确定文件夹的时间戳)
import boto3 import pandas as pd#s3存储桶访问信息 key_id='key_id' secret_key='密钥' my_bucket='容器名'#建立s3临时客户端 client = boto3.client( 's3', aws_access_key_id=key_id, aws_secret_access_key=secret_key, region_name='cn-north-1' )#调用函数列出文件信息 paginator = client.get_paginator('list_objects') # print(paginator) page_iterator = paginator.paginate( Bucket=my_bucket , Delimiter='/', Prefix='EDW_SHARE/Flag/')

遍历s3,并做限制 上传的文件并非都获取,我们只获取当前有用到的。所以需要一个flag表做限制,在flag中的表获取出来。我们先将限制的列表做好。
# #定义空dataframe用于存放flag文件内容 df=pd.DataFrame(data=https://www.it610.com/article/None, index=None, columns=('flag_file','content'), dtype=None, copy=False)# #定义dataframe用于存放已读入的flag文件名,用于过于已经读入过的flag,并将sql查询结果从sparksql支持的dataframe转换为pandas支持的dataframe df1 = spark.sql('select distinct flag_file from cfg.flag_file_info').toPandas()# #将flag依次插入列表 file_list=[i for i in df1['flag_file']]# file_list就是限制获取到s3中的文件列表

开始遍历,并在遍历时过滤不在falg表中的记录。
# 从s3返回的json数组中拆分flag文件名 for page in page_iterator: for key in page['Contents']:if key['Key'] not in file_list:#从返回的json数组中读取flag内容 response1 = client.select_object_content( Bucket=my_bucket, Key=key['Key'], Expression='SELECT SOURCE_NAME FROM S3Object', ExpressionType='SQL', InputSerialization={ 'CSV': { 'FileHeaderInfo': 'USE', 'QuoteCharacter': '"' }}, OutputSerialization={ 'JSON': {} } )for i in response1['Payload']: if 'Records' in i: # print(key['Key']) for content in i['Records']['Payload'].decode('utf-8').replace('{"SOURCE_NAME":','').replace('}','').replace('"','').split('\n'): if content: df.loc[len(df)]=(key['Key'],content)# 将pandas的dataframe转换为sparksql支持的dataframe df=spark.createDataFrame(df) #将dataframe转换为临时表 df.createOrReplaceTempView("file_flag_file_info")

file_flag_file_info获取到的就是需要的文件名。获取到两列,如下
flag_file content
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csv DW_TEST01
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csv DW_TEST02
获取到的记录保存在一个表中
spark.sql("insert into cfg.flag_file_info select date_id,flag_file,content,status,create_dt from (select date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd') as date_id,flag_file,content,1 as status,from_utc_timestamp(current_timestamp(),'UTC+8') as create_dt,rank() over(partition by content order by flag_file desc) as rk from file_flag_file_info wherecontent in (select file_name from cfg.file_config_list where is_active=1))t where t.rk=1")

【05 DataBricks遍历S3容器】遍历s3文件目的是为了正确取到数据文件在s3的路径,为之后DataFactory从S3中拿文件做基础。

    推荐阅读