从本地计算机到具有Terraform的Dask集群

本文概述

  • 从本地开始:Dask和LocalCluster
  • 将计算环境与Docker捆绑在一起
  • 定义Dask集群
  • 运行集群
  • 集群上的Scikit-Learn网格搜索
  • 尚待讨论
为了不断改进回购并将其转变为市场领导者, 我们最近决定解决客户服务代理商的挑战。第一步, 创建了带标签的电子邮件转储, 并设定了第一个目标:构建一个POC, 该POC自动标记电子邮件。为此, 必须使用NLP, 并且必须执行冗长的(和贪婪的)网格搜索。如此冗长, 以至于笔记本的4个核心工作了几个小时而没有结果。这就是我决定探索dask及其同级对象时的重点。
在本教程中, 你将探索如何将使用Scikit-Learn进行网格搜索的本地代码带到AWS(EC2)节点的集群中。
从本地开始:Dask和LocalCluster你从数据加载和网格搜索超参数的最小示例开始。该项目的结构可能是:
. ├── data ├── models └── src

在./src中, 你可能会包含一些想要在项目中或更复杂的管道中使用的特殊工具, 函数和类。本教程稍后将显示如何在分布式环境中包括这些工具。
请注意, 它具有Python项目的结构, 并且应在项目的根目录中包含setup.py。
让我们从一个简单的例子开始:
from sklearn.datasets import load_digits from sklearn.svm import SVC from sklearn.model_selection import GridSearchCV # from src import myfoo # An example included from `src`param_space = {'C': [1e-4, 1, 1e4], 'gamma': [1e-3, 1, 1e3], 'class_weight': [None, 'balanced']}model = SVC(kernel='rbf')digits = load_digits()search = GridSearchCV(model, param_space, cv=3) search.fit(digits.data, digits.target)

可以在此处定义的Docker映像中找到该示例的详细说明。你可以通过克隆此存储库并运行以下命令来进行尝试:
docker build . -t dask-example docker run --rm dask-example ./gridsearch_local.py

到现在为止还挺好。但是, 想象一下数据集更大并且超参数的空间更复杂。事情几乎变成不可能在本地计算机上运行。在这一点上, 至少有两种可能的行动方案:
  1. 使用更多的计算能力
  2. 优化搜索和/或更聪明
在本文中, 你将采用前者。将本地计算机扩展到群集的一种看似简单的方法很简单。首先, 留在本地计算机上, 让我们试用LocalCluster。签出gridsearch_local_dask.py, 你可以尝试
docker run -it --rm dask-example ./gridsearch_local_dask.py

感觉已经快了一点, 不是吗?但是, 你需要横向扩展, 并为此目的, 要拥有一组可以使用的EC2节点。主要有两个步骤:
  1. 将计算环境捆绑在Docker映像中
  2. 运行一个dask集群, 其中每个节点都有计算环境
将计算环境与Docker捆绑在一起为了使dask集群正常运行, 每个节点必须具有相同的计算环境。 Docker是实现这一目标的直接方法。要做的方法是定义一个Dockerfile:
FROM continuumio/miniconda3RUN mkdir projectCOPY requirements.txt /project/requirements.txt COPY src/ /project/src COPY setup.py /project/setup.py WORKDIR /project RUN pip install -r requirements.txt

本地requirements.txt和setup.py已加载到映像中。建议在Requirements.txt中包含bokeh;否则dask的网络信息中心将无法正常运行。 Dockerfile可以包括更多步骤, 例如RUN apt-get update & & apt-get install -y build-essential freetds-dev或RUN python -m nltk.downloader punkt。如果./src包含所需的类, 函数等, 请确保包含-e之类的内容。或仅仅是。在requirements.txt中; 这样, 这些依赖关系将在映像中可用。在Dockerfile中包含计算环境所需的所有组件非常重要!
接下来, 应将映像放置在EC2实例可访问的位置。现在是将映像推送到Docker注册表的时候了。在本教程中, 你将使用AWS服务-ECS, 但可以使用其他选项, 例如DockerHub。我假设你已安装awscli, 并且凭据已知。你可以通过以下方式登录到注册表
# Execute from the project's root $(aws ecr get-login --no-include-email) docker build -t image-name . docker tag image-name:latest repo.url/image-name:latest docker push repo.url/image-name:latest

现在是时候设置集群的节点了。
定义Dask集群我们采用声明性方法, 并使用terraform设置群集的节点。请注意, 在此示例中, 你利用了AWS Spots。你可以轻松地更改代码并使用常规的按需实例。这留作练习。你使用两组文件来定义集群:
  • .tf指令:由terraform解析并定义要使用的实例, 标记, 区域等。
  • 设置Shell脚本:在节点上安装所需的工具
.tf文件
【从本地计算机到具有Terraform的Dask集群】使用terraform时, 将读取并连接所有.tf文件。当然还有更多细节。一个好的切入点就是这个。在我们的示例中, 你组织.tf文件的方式如下:
  • terraform.tf:常规设置
  • vars.tf:可以从CLI使用的变量定义
  • Provision.tf:有关如何调用配置脚本的说明
  • resources.tf:资源的定义
  • output.tf:terraform提供的输出的定义
terraform.tf
provider "aws" { region = "eu-west-1" }

vars.tf
variable "instanceType" { type= "string" default = "c5.2xlarge" }variable "spotPrice" { # Not needed for on-demand instances default = "0.1" }variable "contact" { type = "string" default = "d.atariah" }variable "department" { type = "string" default = "My wonderful department" }variable "subnet" { default = "subnet-007" }variable "securityGroup" { type = "string" default = "sg-42" }variable "workersNum" { default = "4" }variable "schedulerPrivateIp" { # We predefine a private IP for the scheduler; it will be used by the workers default = "172.31.36.190" }variable "dockerRegistry" { default = "" }# By defining the AWS keys as variables we can get them from the command line # and pass them to the provisioning scripts variable "awsKey" {} variable "awsPrivateKey" {}

provision.tf
data "template_file" "scheduler_setup" { template = "${file("scheulder_setup.sh")}" # see the shell script bellow vars { # Use the AWS keys passed from the terraform CLI AWS_KEY = "${var.awsKey}" AWS_PRIVATE_KEY = "${var.awsPrivateKey}" DOCKER_REG = "${var.dockerRegistry}" } }data "template_file" "worker_setup" { template = "${file("worker_setup.sh")}" # see the shell script bellow vars { AWS_KEY = "${var.awsKey}" AWS_PRIVATE_KEY = "${var.awsPrivateKey}" DOCKER_REG = "${var.dockerRegistry}" SCHEDULER_IP = "${var.schedulerPrivateIp}" } }

resources.tf这是设置的核心, 在这里你将所有内容放在一起并定义了对AWS Spot的请求。
resource "aws_spot_instance_request" "dask-scheduler" { ami= "ami-4cbe0935" # [1] instance_type= "${var.instanceType}" spot_price= "${var.spotPrice}" wait_for_fulfillment= true key_name= "dask_poc" security_groups= ["${var.securityGroup}"] subnet_id= "${var.subnet}" associate_public_ip_address = true private_ip= "${var.schedulerPrivateIp}" # [2] user_data= "http://www.srcmini.com/${data.template_file.scheduler_setup.rendered}" tags { Name = "${terraform.workspace}-dask-scheduler", Department = "${var.department}", contact = "${var.contact}" } }resource "aws_spot_instance_request" "dask-worker" { count= "${var.workersNum}" # [3] ami= "ami-4cbe0935" # [1] instance_type= "${var.instanceType}" spot_price= "${var.spotPrice}" wait_for_fulfillment= true key_name= "dask_poc" subnet_id= "${var.subnet}" security_groups= ["${var.securityGroup}"] associate_public_ip_address = true user_data= "http://www.srcmini.com/${data.template_file.worker_setup.rendered}" tags { Name = "${terraform.workspace}-dask-worker${count.index}", Department = "${var.department}", contact = "${var.contact}" } }

这里有一些重要的注意事项:
  1. 我使用的AMI是针对eu-west-1的一种, 它是为Docker优化并由Amazon提供的。可以使用其他映像, 但是它们必须支持docker, 这一点很重要。
  2. 定义调度程序的专用IP。在启动工作人员时将需要使用它, 并且比找到IP更容易了解IP。
  3. 指出应雇用多少工人
output.tfterraform允许定义各种输出。与往常一样, 可以在此处找到更多详细信息。
output "scheduler-info" { value = "http://www.srcmini.com/${aws_spot_instance_request.dask-scheduler.public_ip}" }output "workers-info" { value = "http://www.srcmini.com/${join(", ", aws_spot_instance_request.dask-worker.*.public_ip)}" }output "scheduler-status" { value = "http://${aws_spot_instance_request.dask-scheduler.public_ip}:8787/status" }

设置脚本
resources.tf中的user_data字段指示应使用哪些脚本在节点上进行供应。你提供了两个脚本模板, 这些模板将填充terraform中所需的变量。一个用于调度程序的脚本, 另一个用于工人的脚本。
#!/bin/bash# scheduler_setup.shexec > > (tee /var/log/user-data.log|logger -t user-data -s 2> /dev/console) 2> & 1 set -xecho "Installing pip" curl -O https://bootstrap.pypa.io/get-pip.py python get-pip.py --user ~/.local/bin/pip install awscli --upgrade --user echo "Logging in to ECS registry" export AWS_ACCESS_KEY_ID=${AWS_KEY} export AWS_SECRET_ACCESS_KEY=${AWS_PRIVATE_KEY} export AWS_DEFAULT_REGION=eu-west-1 $(~/.local/bin/aws ecr get-login --no-include-email)# Assigning tags to instance derived from spot request # See https://github.com/hashicorp/terraform/issues/3263#issuecomment-284387578 REGION=eu-west-1 INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id) SPOT_REQ_ID=$(~/.local/bin/aws --region $REGION ec2 describe-instances --instance-ids "$INSTANCE_ID"--query 'Reservations[0].Instances[0].SpotInstanceRequestId' --output text) if [ "$SPOT_REQ_ID" != "None" ] ; then TAGS=$(~/.local/bin/aws --region $REGION ec2 describe-spot-instance-requests --spot-instance-request-ids "$SPOT_REQ_ID" --query 'SpotInstanceRequests[0].Tags') ~/.local/bin/aws --region $REGION ec2 create-tags --resources "$INSTANCE_ID" --tags "$TAGS" fiecho "Starting docker container from image" docker run -d -it --network host ${DOCKER_REG} /opt/conda/bin/dask-scheduler

除了最后一行外, 工作程序和调度程序的脚本是相同的。对于工人, 你应该有
docker run -d -it --network host ${DOCKER_REG} /opt/conda/bin/dask-worker ${SCHEDULER_IP}:8786

请注意, 你启动dask-worker而不是dask-scheduler, 并且专用于调度程序的专用IP。重要的是要注意– network主机。直观地, 这确保了容器的网络及其对应的主机将是相同的, 因此, 不同主机上的不同容器将能够通信。
运行集群现在, 你可以运行集群。为此, 你需要执行两个命令。首先, terraform init。这是一个准备工具, 并准备好启动节点。接下来, 你必须应用说明。你可以通过调用以下方法进行操作:
TF_VAR_awsKey=YOUR_AWS_KEY \ TF_VAR_awsPrivateKey=YOUR_AWS_PRIVATE_KEY \ terraform apply -var 'workersNum=2' -var 'instanceType="t2.small"' \ -var 'spotPrice=0.2' -var 'schedulerPrivateIp="172.31.36.170"' \ -var 'dockerRegistry="repo.url/image-name:latest"'

请注意, 你将两个环境变量用于AWS密钥。 var.tf中定义的其他变量作为参数传递。完成后, 你可以通过以下方式访问新创建的调度程序节点:ssh -i?/ .aws / key.pem ec2-user @ $(terraform output scheduler-info)。在集群中, 你可以在/var/log/user-data.log中查看日志。你还可以使用docker ps检查正在运行的Docker容器的状态。最后, 如果一切顺利, 你应该可以访问群集的Web界面。可以通过调用terraform输出调度程序状态来找到其地址。
集群上的Scikit-Learn网格搜索你一直在等待的时刻:在dask群集上运行超参数网格搜索。为此, 你可以使用类似于./gridsearch_local_dask.py的代码。只需要更改客户的地址:
#!/usr/bin/env pythonfrom sklearn.datasets import load_digits from sklearn.svm import SVC from sklearn.model_selection import train_test_split as tts from sklearn.metrics import classification_report from distributed import Client, LocalCluster from dask_searchcv import GridSearchCV # from src import myfoo # An example included from `src`def main(): param_space = {'C': [1e-4, 1, 1e4], 'gamma': [1e-3, 1, 1e3], 'class_weight': [None, 'balanced']}model = SVC(kernel='rbf')digits = load_digits()X_train, X_test, y_train, y_test = tts(digits.data, digits.target, test_size=0.3)print("Starting local cluster") client = Client(x.y.z.w:8786) print(client)print("Start searching") search = GridSearchCV(model, param_space, cv=3) search.fit(X_train, y_train)print("Prepare report") print(classification_report( y_true=y_test, y_pred=search.best_estimator_.predict(X_test)) )if __name__ == '__main__': main()

运行此脚本将在dask群集上启动网格搜索。可以在Web仪表板上对其进行监视。如果你在x.y.z.w上有一个正在运行的集群, 则可以尝试一下:
docker run -it --rm -p 8786:8786 dask-example ./gridsearch_cluster_dask.py x.y.z.w

尚待讨论
  • 你可能想要探索terraform工作区;这可以帮助你从同一目录运行多个集群。例如, 当同时运行不同的实验时。
  • 使用Jupyter服务器启用节点, 因此不需要本地笔记本

    推荐阅读