在Python中使用PostgreSQL

本文概述

  • 使用SQL数据库
  • 用Python连接到数据库
  • 一个更复杂的例子
在这篇文章中, 你将发现:
  • PostgreSQL和Python教程
  • 使用SQL数据库
  • 用Python连接到数据库
    • 尝试一些复杂的查询
    • 剖析此功能
诸如PostgreSQL之类的数据库需要用户身份验证才能访问, 并且特定于给定的数据库结构。一旦可以访问数据库, 就可以采用类似的技术。
本模块说明了一些基本技术, 可用于连接和使用关系数据库中的数据, 在本例中为PostgreSQL, 它是几个基于SQL的数据库之一。如果你想了解有关关系数据库管理系统(RDBMS)的系统理论方面的更多信息, 那么” 数据库系统和概念” 是一个很好的资源。
使用SQL数据库在Python中使用数据库依赖于能够在SQL中编写数据库查询。实际上, 测试Python脚本的一种好方法是首先在第三方程序(例如DBeaver)中测试SQL命令。
【在Python中使用PostgreSQL】SQLZOO是你当前可以练习这些技能的好地方
基础在SQL语言中, 数据存储在表中。表通常嵌套在称为” 模式” 的组中。一个数据库中可能有多个架构。
$$ \ text {数据库} \ supset \ text {Schema} \ supset \ text {Table} $$
示例:假设你有一个数据库, 其中包含数据和机器学习问题的结果, 例如预测不良警察事件。结果存储在” 结果” 模式中。特定的模型指标位于” 评估” 表中。要查看所有此类结果:
SQL从results.evaluations中选择*;
Decomposing this query, you have several essential elements: `select`. What task you will be doing, `select`. Next, what do you want to return? If everything uses `*`. If you only want one column `your_colunm_name`. Thus `select *` or `select your_column_name`. Next, where are your data stored: `from`? Specify a `schema.table`.Putting this together, you have the general form: `select * from schema.table; ` Note: it is best to end your SQL statements with a "`; `".Now suppose you want only the results for a specific model id, parameter, and metric, and parameter, you can use a `where` statement to specify:```sqlselect * from results.evaluationswhere model_id=971and metric='false negatives@' and parameter='75.0';

用Python连接到数据库连接数据库的一种简单方法是使用Python。在这里, 你首先需要一个凭证文件, 例如example_psql.py:
PGHOST=your_database_hostPGDATABASE=your_database_namePGUSER=your_database_usernamePGPASSWORD=your_database_secret_password

接下来, 你将需要导入几个软件包:
import psycopg2import sys, osimport numpy as npimport pandas as pdimport example_psql as credsimport pandas.io.sql as psql

最后, 数据库连接可以相对简单:
## ****** LOAD PSQL DATABASE ***** ### Set up a connection to the postgres server.conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.PGUSER \+" password="+ creds.PGPASSWORDconn=psycopg2.connect(conn_string)print("Connected!")# Create a cursor objectcursor = conn.cursor()def load_data(schema, table):sql_command = "SELECT * FROM {}.{}; ".format(str(schema), str(table))print (sql_command)# Load the datadata = http://www.srcmini.com/pd.read_sql(sql_command, conn)print(data.shape)return (data)

为了强调此代码的作用, 它使用你的凭据连接到你的数据库, 并返回你查询的数据, 即schema.table中的select *;作为熊猫数据框。然后, 你可以可视化或分析此数据, 就像将任何数据从CSV加载到Pandas中一样。
一个更复杂的例子除了简单的连接, 你还可以在单??独的脚本中使用一系列功能来连接数据库。该脚本位于setup / setup_environment.py中:
#!/usr/bin/env pythonimport osimport yamlfrom sqlalchemy import create_engineimport logginglog = logging.getLogger(__name__)def get_database():try:engine = get_connection_from_profile()log.info("Connected to PostgreSQL database!")except IOError:log.exception("Failed to get database connection!")return None, 'fail'return enginedef get_connection_from_profile(config_file_name="default_profile.yaml"):"""Sets up database connection from config file.Input:config_file_name: File containing PGHOST, PGUSER, PGPASSWORD, PGDATABASE, PGPORT, which are thecredentials for the PostgreSQL database"""with open(config_file_name, 'r') as f:vals = yaml.load(f)if not ('PGHOST' in vals.keys() and'PGUSER' in vals.keys() and'PGPASSWORD' in vals.keys() and'PGDATABASE' in vals.keys() and'PGPORT' in vals.keys()):raise Exception('Bad config file: ' + config_file_name)return get_engine(vals['PGDATABASE'], vals['PGUSER'], vals['PGHOST'], vals['PGPORT'], vals['PGPASSWORD'])def get_engine(db, user, host, port, passwd):"""Get SQLalchemy engine using credentials.Input:db: database nameuser: Usernamehost: Hostname of the database serverport: Port numberpasswd: Password for the database"""url = 'postgresql://{user}:{passwd}@{host}:{port}/{db}'.format(user=user, passwd=passwd, host=host, port=port, db=db)engine = create_engine(url, pool_size = 50)return engine

有了此脚本后, 我们可以使用新脚本连接到数据库:
import sysimport osimport pandas as pdimport subprocessimport argparseimport pdbimport picklefrom setup import setup_environment# Make PostgreSQL Connectionengine = setup_environment.get_database()try:con = engine.raw_connection()con.cursor().execute("SET SCHEMA '{}'".format('your_schema_name'))except:pass

注意:在此代码示例中, 你希望将” your_schema_name” 替换为架构的特定名称, 例如” models” 架构。
尝试一些复杂的查询:
现在, 你已经建立了数据库连接, 你可以尝试一个复杂的查询, 例如为使用Sci-kit Learn构建的模型结果返回pickle文件(一种存储数据的Python方法)。
def get_pickle_best_models(timestamp, metric, parameter=None, number=25, directory="results/"):"""--------------------------------------------------------Get the PICKLE FILE of the best modelsby the specified timestamp and given metricRETURNS the PICKLE FILE to a DIRECTORY--------------------------------------------------------ARGUMENTS:timestamp:models run on or after given timestampexample: '2018-09-02'metric:metric to be optimizedexample: 'precision@'parameter:parameter value or threshold if anydefault=Noneexample: '10.0'number:maximum number of desired resultsdefault = 25--------------------------------------------------------"""if parameter is None:query = ("SELECT pickle_blob, run_timeFROM \(SELECT evaluations.model_id, run_time \FROM results.evaluations JOIN results.models \ON evaluations.model_id=models.model_id \WHERE run_time > = '{}' \AND value is not null \AND metric = '{}' \ORDER BY value DESC LIMIT {}) \AS top_models \INNER JOIN results.data \ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, number)elif parameter is not None:query = ("SELECT pickle_blob, run_timeFROM \(SELECT evaluations.model_id, run_time \FROM results.evaluations JOIN results.models \ON evaluations.model_id=models.model_id \WHERE run_time > = '{}' \AND value is not null \AND metric = '{}' \AND parameter = '{}' \ORDER BY value DESC LIMIT {}) \AS top_models \INNER JOIN results.data \ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, parameter, number)df_models = pd.read_sql(query, con=con)N = len(df_models['pickle_blob'])for file_number in range(0, N):pickle_file = pickle.loads(df_models['pickle_blob'].iloc[file_number])file_name = df_models['run_time'].apply(lambda x: str(x).replace(' ', 'T')).iloc[file_number]if parameter is None:full_file_name = "police_eis_results_"+"top_"+metric+"any"+"_"+file_name+".pkl"elif parameter is not None:full_file_name = "police_eis_results_"+"top_"+metric+parameter+"_"+file_name+".pkl"file_path = directory+full_file_namepickle.dump(pickle_file, open( file_path, "wb" ) )return None

剖析此功能:
除去一些细节, 你便具有以下一般流程
def get_pickle_best_models(timestamp, metric, parameter=None, number=25, directory="results/"):if parameter is None:# Do Query WITHOUT Parameterelif parameter is not None:# Do Query WITH Parameter# Run Query and Store Results as Pandas Data Framedf_models = pd.read_sql(query, con=con)# Loop Over Dataframe to Save Pickle Files for Different Model Run Times# (Code Here)

编写Python查询如示例所示, 你将SQL查询编写为字符串:
query = ("Long SQL Query as a String")

在这里, 你可以使用.format(var)方法将变量插入查询中。这样, 你可以根据传递给函数的各种参数来系统地更改查询。上述查询之一的完整格式如下:
query = ("SELECT pickle_blob, run_timeFROM \(SELECT evaluations.model_id, run_time \FROM results.evaluations JOIN results.models \ON evaluations.model_id=models.model_id \WHERE run_time > = '{}' \AND value is not null \AND metric = '{}' \AND parameter = '{}' \ORDER BY value DESC LIMIT {}) \AS top_models \INNER JOIN results.data \ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, parameter, number)

注意查询的格式。在python代码中, 你需要使用\中断查询行, 并使用格式函数'{}’ 。format(x)将变量插入脚本。格式括号是否需要用单引号引起来, 具体取决于你是否尝试将字符串值或数字值传递给查询。从Python操作中抽象查询, 然后剩下一个SQL查询。下面, 你可以尝试一些示例格式的示例值, 以表示时间戳, 度量, 参数和结果数的Python格式:
SELECT pickle_blob, run_timeFROM(SELECT evaluations.model_id, run_timeFROM results.evaluations JOIN results.modelsON evaluations.model_id=models.model_idWHERE run_time > = '2018-09-02'AND value is not nullAND metric = 'precision@'AND parameter = '10.0'ORDER BY value DESC LIMIT 10)AS top_modelsINNER JOIN results.dataON top_models.model_id=data.model_id ;

你还回什么上面的查询是一个更复杂的示例, 但很有价值。首先, 你要返回什么:
  • pickle_blob
  • 运行
*例如。 SELECT pickle_blob, 运行时
这些数据从哪里来?
  • 通过子查询对results.data表进行联接, 对result.evaluations和results.models进行联接。
在上面的示例中, 你需要模型的pickle_blob和模型的run_time。你还需要根据某些条件获得最佳模型。但是, 所有这些属性都位于结果模式中的离散表中。因此, 你必须对这些表进行几次连接。
从技术上讲, 你正在执行所谓的子查询。请注意主查询SELECT pickle_blob的第一行, run_time FROM, 其中FROM引用括号中的子查询(SELECT … LIMIT 10), 你使用别名AS top_models对其进行引用。
在这里, top_models是一个任意名称, 仅在查询中定义和使用。你可以将top_models替换为x或y或my_best_model_ever。该名称无所谓, 只要它与其他表名和保留的SQL语句(例如select)是离散的即可。它仅引用我们的子查询的结果。
本质上, 子查询返回联接的result.models和results.evaluations表的选择性版本。你可以使用ON的top_models.model_id = data.model_id对top_models子查询使用top_models子查询来进行INNER JOIN。
你的子查询是什么?
SELECT evaluations.model_id, run_timeFROM results.evaluations JOIN results.modelsON evaluations.model_id=models.model_idWHERE run_time > = '2018-09-02'AND value is not nullAND metric = 'precision@'AND parameter = '10.0'ORDER BY value DESC LIMIT 10

子查询还对在model_id上联接的result.evaluations和results.models表进行联接。它从联接的表中返回model_id和run_time。
条件:在这里, 我们只需要符合特定条件的模型:
  • 时间戳记WHERE运行时间> =’ 2018-09-02′
  • value列上的结果为非null:AND值不为null
  • 指标AND指标=’ precision @’
  • 参数AND参数= ’ 10 .0′
  • 前n个结果ORDER BY值DESC LIMIT 10
如你所见, 尽管此查询包含许多元素, 但是你可以使用Python操纵查询。最后, 你可以利用数据库的功能和Python的功能来构建动态代码。
总结
做得好!你现在知道了Python中的数据库, 而PostgreSQL数据库中的示例很少。此外, 你可以自己进行自我学习并找到学习资源。如果你有兴趣一般地了解有关数据库主题的更多信息, 请阅读srcmini的教程” 在SQL中执行Python / R” 。

    推荐阅读