[源码解析]|[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
目录

  • [源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
    • 0x00 摘要
    • 0x01 支撑系统
      • 1.1 引擎入口
      • 1.2 SendRpcBackward
        • 1.2.1 剖析
        • 1.2.2 定义
        • 1.2.3 构建
        • 1.2.4 grads_
    • 0x02 定义
      • 2.1 定义
      • 2.2 单例
      • 2.3 重要注释
        • 2.3.1 成员变量
        • 2.3.2 构建
        • 2.3.3 GPU to CPU continuations
        • 2.3.4 析构
        • 2.3.5 插入队列
        • 2.3.6 工作线程
    • 0x03 总体执行
    • 0x04 验证节点和边
      • 4.1 gradient_edge
      • 4.2 validate_outputs
      • 4.3 VS 普通 engine
    • 0x05 计算依赖
      • 5.1 总体过程
      • 5.2 第一部分 准备工作
        • 5.2.1 实现
        • 5.2.2 相关
          • 5.2.2.1 sendFunctions
          • 5.2.2.2 outstanding_tasks_
            • GraphTask
            • vania engine
            • dist engine
      • 5.3 第二部分 计算依赖
        • 5.3.1 实现
        • 5.3.2 叶子节点的种类
      • 5.4 第三部分 得到Functions
        • 5.4.1 算法
        • 5.4.2 实现
      • 5.5 小结
    • 0xFF 参考

0x00 摘要 上文已经分析了如何启动/接受反向传播,如何进入分布式autograd 引擎,本文和下文就看看如何分布式引擎如何运作。通过本文的学习,读者可以对 dist.autograd 引擎基本静态架构和总体执行逻辑有所了解。
PyTorch分布式其他文章如下:
深度学习利器之自动微分(1)
深度学习利器之自动微分(2)
[源码解析]深度学习利器之自动微分(3) --- 示例解读
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[源码解析] PyTorch 分布式(4)------分布式应用基础概念
[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用
[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store
[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组
[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇
[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化
[源码解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer静态架构
[源码解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 构建Reducer和Join操作
[源码解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向传播
[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播
[源码解析] PyTorch 分布式 Autograd (1) ---- 设计
[源码解析] PyTorch 分布式 Autograd (2) ---- RPC基础
[源码解析] PyTorch 分布式 Autograd (3) ---- 上下文相关
[源码解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎
为了更好的说明,本文代码会依据具体情况来进行相应精简。
0x01 支撑系统 我们首先看看一些引擎内部支撑系统。
1.1 引擎入口
引擎入口在 backward 函数中有调用,从 DistEngine::getInstance().execute 进入到引擎,由前文可知,这里是主动调用引擎。
void backward( int64_t context_id, const variable_list& roots, bool retain_graph) { RECORD_FUNCTION( kDistAutogradBackwardProfilingKey, std::vector()); try { DistEngine::getInstance().execute(context_id, roots, retain_graph); } catch (std::exception& e) { throw std::runtime_error(e.what()); } }

1.2 SendRpcBackward
被动调用引擎是从 SendRpcBackward 开始的。SendRpcBackward 是前向传播之中发送行为对应的反向传播算子。DistAutogradContext 存储在一个worker之上的每一个分布式autograd的相关信息,其在分布式 autograd 之中封装前向和后向传播,累积梯度,这避免了多个worker在彼此的梯度上互相影响。在上下文 DistAutogradContext 之中有个成员变量,记录了本 worker 所有发送行为对应的反向传播算子
std::unordered_map sendAutogradFunctions_;

sendAutogradFunctions_ 中的内容都是SendRpcBackward。
1.2.1 剖析 SendRpcBackward 作为分布式autograd实现的一部分,每当我们将RPC从一个节点发送到另一个节点时,我们都会向autograd图添加一个"SendRpcBackward"autograd function。这是一个占位符函数,用于在向后传播时启动当前worker的autograd引擎。此autograd function的边是RPC方法的输入。
在向后传播过程中,此函数将在autograd引擎中排队等待执行,该引擎最终将运行autograd图的其余部分。
SendRpcBackward实际上是本地节点上autograd图的根。我们给出之前的示意图如下:
  • SendRpcBackward不会接收任何 "输入",而是RPC框架将梯度传递给该函数以启动局部autograd计算。
  • SendRpcBackward的input边是RPC方法的输入,就是梯度。
[源码解析]|[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
文章图片

1.2.2 定义 SendRpcBackward 是 Node 的派生类,因为是 Node,所以有 next_edges,可以看到其新增成员变量是 grads_。
// As part of our distributed autograd implementation, whenever we send an RPC // from one node to another, we add a 'SendRpcBackward' autograd function to the // autograd graph. This is more or less a placeholder function that is used to // kickoff the autograd engine on the current worker on the backward pass. The // edges for this autograd function are the inputs to the RPC method. // // During the backward pass, this function is queued for execution in the // autograd engine which eventually runs the rest of the autograd graph. struct TORCH_API SendRpcBackward : public torch::autograd::Node { public: torch::autograd::variable_list apply( torch::autograd::variable_list&& inputs) override; // SendRpcBackward is actually the root of an autograd graph on the local // node. As a result, it doesn't receive any 'inputs', but rather the RPC // framework passes gradients over to this function to kickoff local autograd // computation. void setGrads(const torch::autograd::variable_list& grads); // Retrieve the grads for the function. const torch::autograd::variable_list& getGrads() const; private: torch::autograd::variable_list grads_; };

1.2.3 构建 在前向传播过程之中,addSendRpcBackward 会构建一个SendRpcBackward,会把其前向传播输入边作为反向传播的输出边设置在 SendRpcBackward 之中
void addSendRpcBackward( const ContextPtr& autogradContext, const AutogradMetadata& autogradMetadata, std::vector& tensors) { // Attach autograd information only for tensors requiring grad. std::vector tensors_with_grad; std::copy_if( tensors.begin(), tensors.end(), std::back_inserter(tensors_with_grad), [](const torch::Tensor& t) { return t.requires_grad(); }); // Attach the appropriate autograd edges. auto grad_fn = std::make_shared(); // 构建了 SendRpcBackward grad_fn->set_next_edges( torch::autograd::collect_next_edges(tensors_with_grad)); // Add the appropriate input metadata for the grad_fn. for (const auto& tensor : tensors_with_grad) { grad_fn->add_input_metadata(tensor); // 添加边 SendRpcBackward }// Record the send autograd function in our current context. // 插入到上下文 autogradContext->addSendFunction(grad_fn, autogradMetadata.autogradMessageId); }

1.2.4 grads_ 之前看到,SendRpcBackward新增成员变量是 grads_,我们看看 grads_ 如何设置和使用?
SendRpcBackward 提供了 set, get 操作。
void SendRpcBackward::setGrads(const torch::autograd::variable_list& grads) { grads_ = grads; }const torch::autograd::variable_list& SendRpcBackward::getGrads() const { return grads_; }

何时会使用?在 torch/csrc/distributed/rpc/request_callback_no_python.cpp 之中有 processBackwardAutogradReq。processBackwardAutogradReq 会:
  1. 使用 sendFunction->setGrads(gradientsCall.getGrads()) 来设置远端传递来的梯度。
  2. 调用 DistEngine::getInstance().executeSendFunctionAsync 来执行引擎开始本地后向计算。
对应了设计中如下文字,也就是被动进入引擎的起点:
SendRpcBackward实际上是本地节点上autograd图的根。因此,它不会接收任何"输入",而是RPC框架将梯度传递给该函数以启动局部autograd计算。
具体代码如下:
void RequestCallbackNoPython::processBackwardAutogradReq( RpcCommandBase& rpc, const int64_t messageId, const c10::intrusive_ptr& responseFuture) const { auto& gradientsCall = static_cast(rpc); const auto& autogradMetadata = https://www.it610.com/article/gradientsCall.getAutogradMetadata(); // Retrieve the appropriate autograd context. auto autogradContext = DistAutogradContainer::getInstance().retrieveContext( autogradMetadata.autogradContextId); // Lookup the appropriate'send' function to enqueue. std::shared_ptr sendFunction = autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId); // Attach the gradients to the send function. sendFunction->setGrads(gradientsCall.getGrads()); // 这里设置,就是把RPC传来的梯度赋值// Now execute the autograd graph using the "distributed engine." auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // 这里使用了 grads_ autogradContext, sendFunction, gradientsCall.retainGraph()); // Our response is satisfied when the rpcs come back. execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) { if (!execFuture.hasError()) { Message m = std::move(PropagateGradientsResp()).toMessage(); m.setId(messageId); responseFuture->markCompleted( IValue(c10::make_intrusive(std::move(m)))); } else { responseFuture->setError(execFuture.exception_ptr()); } }); }

executeSendFunctionAsync 就会用 sendFunction->getGrads() 提取梯度,进行操作。
c10::intrusive_ptr DistEngine::executeSendFunctionAsync( const ContextPtr& autogradContext, const std::shared_ptr& sendFunction, bool retainGraph) {// Typically the local autograd engine ensures stream synchronizations between // nodes in the graph. However, for distributed autograd the sendFunction // inputs might have been retrieved over the wire on a separate stream and the // sendFunction itself runs on a different stream. As a result, we need to // manually synchronize those two streams here. const auto& send_backward_stream = sendFunction->stream(c10::DeviceType::CUDA); if (send_backward_stream) { for (const auto& grad : sendFunction->getGrads()) { // 这里有获取 const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA}; const auto default_stream = guard.getStream(grad.device()); if (send_backward_stream != default_stream) { auto event = c10::Event{c10::DeviceType::CUDA}; event.record(default_stream); send_backward_stream->wait(event); } } }// 省略后续代码

具体如下图:
[源码解析]|[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
文章图片

0x02 定义 2.1 定义
DistEngine 的定义如下,为了更好讲解,下面删除了部分代码:
class TORCH_API DistEngine { public: // Retrieve the singleton instance. static DistEngine& getInstance(); // Given a list of root variables, start the distributed backwards pass from // these variables and accumulate all the gradients in the current autograd // context on each node. This method is used to kickoff distributed autograd // on a single node. void execute( int64_t context_id, const torch::autograd::variable_list& roots, bool retainGraph); // Given a send function to execute in the autograd engine, ensures we compute // dependencies once for this node and enqueues the send function for execute // in the engine. // This method is used to kick off the autograd computation on a node when it // receives gradients from the corresponding 'recv' method on another node. // The gradients are accumulated in the provided autograd context. c10::intrusive_ptr executeSendFunctionAsync( const ContextPtr& autogradContext, const std::shared_ptr& sendFunction, bool retainGraph); // Number of backward passes currently running for the Distributed Engine. size_t numBackwardPasses() const; // Returns key-value pairs consisting of useful debugging information related // to distributed autograd. std::unordered_map getDebugInfo() const; // Validates the input roots for the backward computations and retrieves the // appropriate root edges and corresponding gradients. Populates root_edges // with the appropriate gradient edges and grads with the gradients for each // edge. void validateRootsAndRetrieveEdges( const torch::autograd::variable_list& roots, torch::autograd::edge_list& rootEdges, torch::autograd::variable_list& grads); // Given the autograd context, root edges and grads, we compute dependencies // for the local node and fill out the provided GraphTask and GraphRoot with // appropriate information for the local autograd engine. // We also determine all leaf nodes(functions) in the graph and accumulate // them in outputEdges. void computeDependencies( const ContextPtr& context, const torch::autograd::edge_list& rootEdges, const torch::autograd::variable_list& grads, const std::shared_ptr& graphRoot, torch::autograd::edge_list& outputEdges, bool retainGraph); // Given a pre-populated GraphTask and a root node, compute the backward pass // for the autograd graph until the graph task ready queue is empty. // // This method assumes that the appropriate GraphTask has already been // initialized appropriately. It will construct a local ready queue to // traverse the GraphTask instead of using the GraphTask embedded // cpu_ready_queue, this is because dist engine might run the same GraphTask // from different SendFunctions concurrently in different threads. The method // will only mark the GraphTask as completed when it needes to, which means it // might not mark as completed for every call as dist engine would like to // keep the GraphTask alive when it not receives all gradients. // // When `incrementOutstandingTasks=false`, the function does not increment // 'outstanding_tasks_' in the appropriate GraphTask. It is assumed we've // already done this before hand for this task (to ensure we don't pre-mark // this graph_task as completed). This is useful in the distributed autograd // case where we need to increment 'outstanding_tasks_' first to indicate the // local autograd engine the graph task is not completed until it receives the // signals from other workers over the network. // // XXX: calling this function assumes that we will have NO GPU nodetasks be // executed for the graph_task, the caller of this function need to ensure // this otherwise there will be undefined behaviors. A correct way to fix this // is to re-design the autograd engine so that GPU worker thread to behave the // same as CPU caller thread, record the operation/thread for the device, and // reuse it in backward. // TODO: 1. Add assert in the dist engine to ensure no GPU NodeTasks during // backward //2. properly setup the thread local ready queue to enable reentrant //backwards void execute_graph_task_until_ready_queue_empty( torch::autograd::NodeTask&& node_task, bool incrementOutstandingTasks = true); // Run the local autograd engine using the provided graphTask and graphRoot // and accumulate the gradients part 'outputEdges' in the provided autograd // context. c10::intrusive_ptr runEngineAndAccumulateGradients( const ContextPtr& autogradContext, const std::shared_ptr& graphRoot, const torch::autograd::edge_list& outputEdges, bool incrementOutStandingTasks = true); // Run after the backward pass is done to appropriately cleanup structures. void cleanupBackwardPass(const ContextPtr& autogradContext); // Global thread to execute CPU continuations. void globalCpuThread( const std::shared_ptr& ready_queue); // Set of autograd context_ids, which we have already initialized for // distributed autograd on this node (e.g.: already computed dependencies) std::unordered_set initializedContextIds_; mutable std::mutex initializedContextIdsLock_; // Reference to local autograd engine. torch::autograd::Engine& engine_; // Ready queue used by the CPU thread in distributed engine. // See Note [GPU to CPU continuations] // 每个 GraphTask都把 global_cpu_ready_queue_ 设置为自己的 cpu_ready_queue_ std::shared_ptr global_cpu_ready_queue_; // See Note [GPU to CPU continuations] std::thread global_cpu_thread_; friend class BackwardPassCleanupGuard; };

2.2 单例
引擎使用了单例模式,这样每个 worker 之中就只有一个单例在运行。
DistEngine& DistEngine::getInstance() { // Leaky singleton to avoid module destructor race. static DistEngine* engine = new DistEngine(); return *engine; }

2.3 重要注释
PyTorch 源码之中有大量详尽的注释,我们挑选一些来看看。
2.3.1 成员变量 代码中定义了两个 CPU 全局相关成员变量,具体如下,均注明需要看 [GPU to CPU continuations] 这个注释。
// Ready queue used by the CPU thread in distributed engine. // See Note [GPU to CPU continuations] std::shared_ptr global_cpu_ready_queue_; // See Note [GPU to CPU continuations] std::thread global_cpu_thread_;

2.3.2 构建 这两个成员变量具体初始化位置是在构建函数之中。
DistEngine::DistEngine() : initializedContextIds_(), engine_(Engine::get_default_engine()), global_cpu_ready_queue_(std::make_shared()), // 这里构建了 global_cpu_thread_( // 这里构建了 &DistEngine::globalCpuThread, this, global_cpu_ready_queue_) { // Note [GPU to CPU continuations] // ~~~~~~~~~~~~~~~~~~~~~~~~~~ // Initialize a single CPU thread to execute continuations from GPU // tasks. The multithreaded structure for the distributed engine works // well only for CPU tasks. If we have an order of tasks like // CPU->GPU->CPU, distributed autograd has no thread to execute the last // CPU task on. To fix this, we introduce a global CPU thread to handle // such situations and it will be responsible for executing these CPU // tasks. The CPU thread has its own ready_queue which is used as the // cpu_ready_queue for all GraphTasks for DistEngine. This ensures all GPU // to CPU continuations are enqueued on this thread. The global CPU thread // simply dequeues tasks from the global queue and calls // "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the // appropriate task. global_cpu_thread_.detach(); // detach之后就独立运行了 }

2.3.3 GPU to CPU continuations 以下是 GPU to CPU continuations 的翻译和理解。
Continuations 最初应该是在schema语言里面接触过的,后来也看过不少语言用到,这个概念没有找到一个很好的延续概念,暂时使用"延续"这个翻译。
为了执行GPU任务的延续(continuations),所以需要初始化一个单独的CPU线程来处理。分布式引擎的多线程结构仅适用于CPU任务。如果我们有CPU->GPU->CPU这样的任务顺序,分布式 autograd 就没有线程来执行最后一个CPU任务。为了解决这个问题,我们引入了一个全局CPU线程来处理这种情况,它将负责执行这些CPU任务。
CPU线程有自己的就绪队列(ready_queue),它用作DistEngine的所有GraphTask的CPU就绪队列(cpu_ready_queue)。这确保所有GPU到CPU的延续(continuations)都在此线程上排队。全局CPU线程只需将任务从全局队列中取出,并在JIT线程上调用"execute_graph_task_until_ready_queue_empty",以执行相应的任务。
If we have an order of tasks like CPU->GPU->CPU, distributed autograd has no thread to execute the last CPU task on. To fix this, we introduce a global CPU thread to handle such situations and it will be responsible for executing these CPU tasks. The CPU thread has its own ready_queue which is used as the cpu_ready_queue for all GraphTasks for DistEngine. This ensures all GPU to CPU continuations are enqueued on this thread. The global CPU thread simply dequeues tasks from the global queue and calls "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the appropriate task.

2.3.4 析构 析构函数之中有如下,就是为了引擎结束而做对这两个成员变量做了相关操作。
DistEngine::~DistEngine() { // Ensure we shutdown the CPU thread. TORCH_ASSERT_NO_GIL_WITHOUT_PYTHON_DEP(); global_cpu_ready_queue_->pushShutdownTask(); global_cpu_thread_.join(); }

2.3.5 插入队列 在哪里往 global_cpu_ready_queue_ 插入?在 DistEngine::computeDependencies 里面会有插入。首先,每个 GraphTask 都把 global_cpu_ready_queue_ 设置为 cpu_ready_queue。GraphTask构造函数这里参数在调用时候传入的是 global_cpu_ready_queue_。
void DistEngine::computeDependencies( const ContextPtr& autogradContext, const edge_list& rootEdges, const variable_list& grads, const std::shared_ptr& graphRoot, edge_list& outputEdges, bool retainGraph) {// Build the graph task and graph root. // NOTE: we don't need to build and pass a cpu_ready_queue to GraphTask // as we use execute_graph_task_until_ready_queue_empty, which will build // a separate ReadyQueue for each call. auto graphTask = std::make_shared( /* keep_graph */ retainGraph, /* create_graph */ false, /* depth */ 0, /* cpu_ready_queue */ global_cpu_ready_queue_, /* exit_on_error */ true); // 省略其他 graphTask 初始化// Let autograd context take ownership of the GraphTask. // 上下文里面设置了 GraphTask autogradContext->setGraphTask(std::move(graphTask)); }

所以,如果 GraphTask 最后返回需要 CPU 运行时候,就统一用这个。
2.3.6 工作线程 globalCpuThread 是工作线程,其就是从 ready queue 里面弹出 NodeTask,然后执行。
void DistEngine::globalCpuThread( const std::shared_ptr& ready_queue) {while (true) { NodeTask task = ready_queue->pop(); if (task.isShutdownTask_) { // Need to shutdown this thread. C10_LOG_API_USAGE_ONCE("torch.autograd.thread_shutdown"); break; }auto graphTask = task.base_.lock(); if (graphTask == nullptr) { // GraphTask has expired, ignore and continue processing. continue; }// Launch the execution on a JIT thread. at::launch([this, graphTask, graphRoot = task.fn_, variables = InputBuffer::variables(std::move(task.inputs_))]() mutable { InputBuffer inputs(variables.size()); for (size_t i = 0; i < variables.size(); i++) { inputs.add(i, std::move(variables[i]), c10::nullopt, c10::nullopt); } execute_graph_task_until_ready_queue_empty( /*node_task*/ NodeTask(graphTask, graphRoot, std::move(inputs)), /*incrementOutstandingTasks*/ false); }); } }

0x03 总体执行 总体执行是在 DistEngine::execute 之中完成,具体分为如下步骤:
  • 使用 contextId 得到前向的上下文。
  • 使用 validateRootsAndRetrieveEdges 进行验证。
  • 构造一个GraphRoot,用它来驱动后向传播,可以认为是一个虚拟根
  • 使用 computeDependencies 计算依赖
  • 使用 runEngineAndAccumulateGradients 进行反向传播计算。
  • 使用 clearAndWaitForOutstandingRpcsAsync 等待 RPC 完成。
可以看到,与普通引擎相比较,分布式多了一个计算root边和生成边上梯度信息的过程。因为在普通前向传播过程之中,这些是已经配置好的,但是在分布式计算之中,前向传播是没有计算这些,所以需要在反向传播之前计算出来。
void DistEngine::execute( int64_t contextId, const variable_list& roots, bool retainGraph) { // Retrieve the context for the given context_id. This will throw if the // context_id is invalid. auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(contextId); // Perform initial pre-processing. edge_list rootEdges; variable_list grads; validateRootsAndRetrieveEdges(roots, rootEdges, grads); // 构造一个GraphRoot,用它来驱动后向传播,可以认为是一个虚拟根 std::shared_ptr graphRoot = std::make_shared(rootEdges, grads); edge_list outputEdges; // Compute dependencies locally, starting from all roots and all 'send' // functions. { std::lock_guard guard(initializedContextIdsLock_); // Context should not have been initialized already. TORCH_INTERNAL_ASSERT( initializedContextIds_.find(autogradContext->contextId()) == initializedContextIds_.end()); // 计算依赖 computeDependencies( autogradContext, rootEdges, grads, graphRoot, outputEdges, retainGraph); // Mark the autograd context id as initialized. initializedContextIds_.insert(autogradContext->contextId()); }BackwardPassCleanupGuard guard(autogradContext); // This needs to be blocking and as a result we wait for the future to // complete. runEngineAndAccumulateGradients(autogradContext, graphRoot, outputEdges) ->waitAndThrow(); // 反向传播计算// Wait for all of the outstanding rpcs to complete. autogradContext->clearAndWaitForOutstandingRpcsAsync()->waitAndThrow(); }

0x04 验证节点和边 我们接下来看看如何做验证工作。
validateRootsAndRetrieveEdges 被用来验证节点和边的有效性,具体逻辑是:
  • 验证根节点的有效性,获取根节点的边。
  • 看看根节点是否为空。
  • 根节点是否需要计算梯度。
  • 根节点是否有梯度函数。
  • 计算梯度的边,生成相应的梯度。
  • 调用 validate_outputs 来验证输出。
void DistEngine::validateRootsAndRetrieveEdges( const variable_list& roots, edge_list& rootEdges, variable_list& grads) { TORCH_CHECK(!roots.empty(), "No tensors provided for gradient computation."); TORCH_INTERNAL_ASSERT(rootEdges.empty()); TORCH_INTERNAL_ASSERT(grads.empty()); // Verify roots are all scalar and require gradients. for (const auto& root : roots) { TORCH_CHECK(root.requires_grad(), "requires_grad not set on root"); TORCH_CHECK( root.numel() == 1, // python numel()函数:返回数组中元素的个数 root.name(), " is not a scalar, all roots need to be scalar"); TORCH_CHECK( root.grad_fn(), root.name(), " does not have a valid gradient function."); // Compute the root edges and generate the appropriate gradients. rootEdges.push_back(torch::autograd::impl::gradient_edge(root)); grads.push_back(at::ones_like(root, LEGACY_CONTIGUOUS_MEMORY_FORMAT)); }// Validate rootEdges and grads. validate_outputs( rootEdges, grads, [](const std::string& msg) { return msg; }); }

4.1 gradient_edge
gradient_edge 在本文下面会用到,就是利用一个Variable的梯度和前向传播的输出来构建一个Edge。
Edge gradient_edge(const Variable& self) { // If grad_fn is null (as is the case for a leaf node), we instead // interpret the gradient function to be a gradient accumulator, which will // accumulate its inputs into the grad property of the variable. These // nodes get suppressed in some situations, see "suppress gradient // accumulation" below. Note that only variables which have `requires_grad = // True` can have gradient accumulators.// self.grad_fn() 这里触发了一个调用,得到了一个Node实例 if (const auto& gradient = self.grad_fn()) { return Edge(gradient, self.output_nr()); // self.output_nr() 表示本Edge是function的第n个输入。前向传播时候的第 n 个输出在反向传播时候就是第 n 个输入。 } else { return Edge(grad_accumulator(self), 0); // 0表示本Edge是function的第一个输入 } }

4.2 validate_outputs
其定义在 torch/csrc/autograd/engine.cpp,原生引擎和分布式引擎都会调用。validate_outputs 之中包含了大量的验证代码。
  • 如果梯度数量与边数目不同,则退出。
  • 遍历梯度,对于每个梯度:
    • 获取对应的边,如果边无效,则去下一个梯度。
    • 使用input_metadata 获取输入信息。
    • 如果梯度没有定义,也去下一个梯度。
    • 如果梯度尺寸与输入形状不同,则退出。
    • 对梯度的设备,元数据的设备进行一系列判断。
具体代码如下:
void validate_outputs( const edge_list& edges, variable_list& grads, const std::function& format_error) { if (grads.size() != edges.size()) { std::stringstream ss; ss << "invalid number of gradients - expected "; ss << edges.size() << ", but got " << grads.size(); AT_ERROR(format_error(ss.str())); } for (size_t i = 0; i < grads.size(); i++) { const auto& edge = edges[i]; if (!edge.is_valid()) continue; const auto& metadata = https://www.it610.com/article/edge.function->input_metadata(edge.input_nr); auto& grad = grads[i]; if (!grad.defined()) { // FIXME: TestJit.test_ge_optimized fails this assertion. // std::stringstream ss; // ss << "undefined gradient at index " << i; // AT_ERROR(format_error(ss.str())); continue; } // 如果梯度尺寸与输入形状不同,则退出 if (!grad.sizes().equals(metadata.shape())) { if (!at::is_expandable_to(metadata.shape(), grad.sizes())) { std::stringstream ss; ss << "invalid gradient at index " << i << " - got "; ss << grad.sizes() << " but expected shape compatible with "; ss << metadata.shape(); AT_ERROR(format_error(ss.str())); } grad = at::sum_to(std::move(grad), metadata.shape()); }bool input_is_complex = isComplexType(c10::typeMetaToScalarType(metadata.options().dtype())); bool grad_is_complex = isComplexType(grad.scalar_type()); TORCH_CHECK(isFloatingType(grad.scalar_type()) || (input_is_complex == grad_is_complex)); if (c10::typeMetaToScalarType(metadata.options().dtype()) != grad.scalar_type()) { grad = grad.to(c10::typeMetaToScalarType(metadata.options().dtype())); } if (grad.device() != metadata.device() && grad.dim() == 0) { grad = grad.to(metadata.device()); } if (!is_compatible_type(metadata.options(), grad.options())) { std::stringstream ss; ss << "invalid gradient at index " << i << " - expected type "; ss << metadata.options() << " but got " << grad.options(); AT_ERROR(format_error(ss.str())); } auto grad_device = grad.device(); if (grad_device != metadata.device()) { std::stringstream ss; ss << "invalid gradient at index " << i << " - expected device "; ss << metadata.device() << " but got " << grad_device; AT_ERROR(format_error(ss.str())); } // We should not build graph for Tensors that are not differentiable TORCH_INTERNAL_ASSERT(isDifferentiableType(grad.scalar_type())); } }

4.3 VS 普通 engine
我们和普通引擎进行对比一下校验部分。
普通Engine 之中只调用了 validate_outputs。
auto Engine::execute(const edge_list& roots, const variable_list& inputs, bool keep_graph, bool create_graph, bool accumulate_grad, const edge_list& outputs) -> variable_list {validate_outputs(roots, const_cast(inputs), [](const std::string& msg) { return msg; }); // 省略其他后续代码

因此,对于校验部分,DistEngine 可以总结为:
  • 做校验。
  • 根据 roots 来计算root对应的边和生成对应梯度。
  • 再用validate_outputs验证输出。
0x05 计算依赖 我们回忆一下设计文档中的 FAST模式算法。该算法的关键假设是:当我们运行反向传播时,每个send函数的依赖为 1。换句话说,我们假设我们会从另一个节点通过 RPC 接收梯度。算法如下:
  1. 我们从具有反向传播根的worker开始(所有根都必须是本地的)。
  2. 查找当前Distributed Autograd Context 的所有send函数 。
  3. 从提供的根和我们检索到的所有send函数开始,我们在本地计算依赖项 。
  4. 计算依赖项后,使用提供的根来启动本地 autograd 引擎。
  5. 当 autograd 引擎执行该recv函数时,该recv 函数通过 RPC 将输入梯度发送到适当的worker。每个recv函数都知道目标 worker id,因为它被记录为前向传播的一部分。通过autograd_context_idautograd_message_idrecv函数被发送到远程主机。
  6. 当远程主机收到这个请求时,我们使用 autograd_context_idautograd_message_id来查找适当的send函数。
  7. 如果这是worker第一次收到对给定 autograd_context_id的请求,它将按照上面的第 1-3 点所述在本地计算依赖项。
  8. 然后将在第6点接受到的send方法插入队列,以便在该worker的本地 autograd 引擎上执行。
  9. 最后,我们不是在 Tensor的.grad之上累积梯度,而是在每个Distributed Autograd Context之上分别累积梯度 。梯度存储在Dict[Tensor, Tensor]之中 ,Dict[Tensor, Tensor]基本上是从 Tensor 到其关联梯度的映射,并且可以使用 get_gradients() API检索该映射 。
本章就是对应了算法的前三项,这部分是和普通引擎最大区别之一。
5.1 总体过程
计算依赖分为两大部分,第一部分是做准备工作,第二部分是计算依赖关系,第三部分是根据依赖关系来得到需要计算哪些函数。
我们先给出总体代码和注释,后续会仔细分析。
void DistEngine::computeDependencies( const ContextPtr& autogradContext, const edge_list& rootEdges, const variable_list& grads, const std::shared_ptr& graphRoot, edge_list& outputEdges, bool retainGraph) { TORCH_INTERNAL_ASSERT(graphRoot, "graphRoot is null!"); // 第一部分,准备工作 // 1. 生成一个GraphTask // Build the graph task and graph root. // NOTE: we don't need to build and pass a cpu_ready_queue to GraphTask // as we use execute_graph_task_until_ready_queue_empty, which will build // a separate ReadyQueue for each call. // 不需要给 GraphTask 传一个cpu_ready_queue,因为我们后面使用execute_graph_task_until_ready_queue_empty,在那里会给每一个调用建立一个独立的ReadyQueue auto graphTask = std::make_shared( /* keep_graph */ retainGraph, /* create_graph */ false, /* depth */ 0, /* cpu_ready_queue */ global_cpu_ready_queue_, /* exit_on_error */ true); // Run BFS to traverse the graph locally. The roots of the graph are // GraphRoot and all send functions for this autograd context. std::unordered_set seen; // 记录已经访问过的节点 std::queue queue; // 一个 Node 类型的 queue queue.push(static_cast(graphRoot.get())); // 插入根对应的Nodeauto sendFunctions = autogradContext->sendFunctions(); // 为了获取出边// 2. 获取出边列表 // Add all the send functions to the queue as roots. // 普通状态下,root节点内在反向传播时候,已经有了next edges,但是分布式模式下,出边是在sendFunctions之中 for (const auto& mapEntry : sendFunctions) { // sendFunctions就是出边,之前在 addSendFunction之中被添加 // Increment 'outstanding_tasks_' for GraphTask for each send_function // since we want the local autograd engine to wait for all of them. graphTask->outstanding_tasks_++; // 出边增加 queue.push(mapEntry.second.get()); // 后续用queue来处理,插入的是 SendRpcBackward }// 第二部分,遍历图,计算依赖关系,此时 queue 里面是 root 和 若干 SendRpcBackward edge_list recvBackwardEdges; // Traverse the graph. auto& dependencies = graphTask->dependencies_; // 获取依赖关系 while (!queue.empty()) { // 遍历所有发送边 auto fn = queue.front(); // 得到发送边 queue.pop(); for (const auto& edge : fn->next_edges()) { // 遍历Node(根节点或者SendRpcBackward)的next_edges if (auto nextFn = edge.function.get()) { // 得到一个边 dependencies[nextFn] += 1; // 对应的节点依赖度加一 const bool wasInserted = seen.insert(nextFn).second; // 是否已经访问过 if (wasInserted) { // 如果true,是插入了,就说明之前没有访问过,否则插不进去,是false // Seeing this function for the first time. queue.push(nextFn); // 既然之前没有访问过,就插入到queueif (nextFn->next_edges().empty()) { // 如果这个边本身没有输出边,说明是叶子节点 TORCH_INTERNAL_ASSERT( dynamic_cast(nextFn) || dynamic_cast(nextFn)); // 叶子节点有两种 // We have found a leaf node which should be either AccumulateGrad // or RecvRpcBackward. Record the function // to ensure we don't execute it and instead accumulate the grads on // the autograd context. These functions would be passed in as the // 'outputs' parameter of the vanilla autograd engine.// We don't accumulate any grads in the context for RecvRpcBackward. // RecvRpcBackward is added as an output edge to indicate it is a // leaf node and this helps in properly computing dependencies for // the local autograd graph. Putting RecvRpcBackward in // 'outputEdges' means that this function needs to be executed // (inline with our assumption for FAST mode that all send/recv // functions are valid in the backward pass), and as a result all of //its ancestors need to be executed as well. if (dynamic_cast(nextFn)) { recvBackwardEdges.emplace_back(edge); // 特殊处理 } outputEdges.emplace_back(edge); // 最终输出边 } } } } }// 此时,recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad// 以下是第三部分,根据依赖关系找到需要计算那些functions // Now lets compute which functions need to be executed. The algorithm is as // follows: // 1. Create a dummy GraphRoot which points to all 'send' functions for this //context and the original graphRoot. Run 'init_to_execute' with the //outputEdges and the dummy GraphRoot. This ensures we mark //appropriate functions as needed if they are reachable only from a //specific 'send' function locally and not necessarily from the provided //roots. // 2. For all edges in 'outputEdges' which point to 'RecvRpcBackward', mark //those functions as needed for execution. The reason for this is that //'init_to_execute', will mark these as not needed. But 'RecvRpcBackward' //is unique in the sense that we use it as a leaf node in graph to compute //needed execution accurately, but unlike AccumulateGrad, we do need to //execute this function. if (!outputEdges.empty()) { // Compute 'needed execution' starting from all 'send' functions and the // original graphRoot. edge_list edges; // Create some dummy edges (input_nr not important for init_to_execute). for (const auto& mapEntry : sendFunctions) { // 遍历 edges.emplace_back(mapEntry.second, 0); // 得到出边列表 }// Add the original graphRoot as an edge. edges.emplace_back(graphRoot, 0); // root也加入出边列表// Create a dummy GraphRoot and run init_to_execute with it. GraphRoot dummyRoot(edges, {}); // 建立一个虚拟Root // 如果出边不为空,则会调用 init_to_execute对GraphTask进行初始化 graphTask->init_to_execute(dummyRoot, outputEdges, /*accumulate_grad=*/false, /*min_topo_nr=*/0); // exec_info_ 的数据结构是std::unordered_map for (auto& mapEntry : graphTask->exec_info_) { auto& execInfo = mapEntry.second; if (!execInfo.captures_) { // 看看此张量是否在所求梯度的张量路径上 continue; // 如果不在路径之上,就跳到下一个张量 } auto fn = mapEntry.first; // 拿到 Node // There may be nodes other than 'AccumulateGrad', e.g. RecvRPCBackward, // to be captured. if (auto accumulateGradFn = dynamic_cast(fn)) { // 如果是叶子节点 for (auto& capture : *execInfo.captures_) { // 遍历张量路径上的节点 capture.hooks_.push_back( std::make_unique( // 给张量插入Hook std::dynamic_pointer_cast( accumulateGradFn->shared_from_this()), autogradContext)); } } }// Mark all 'RecvRPCBackward' as needing execution. // RecvRPCBackward需要执行 for (const auto& recvBackwardEdge : recvBackwardEdges) { graphTask->exec_info_[recvBackwardEdge.function.get()].needed_ = true; } }// Let autograd context take ownership of the GraphTask. // 设定在上下文之中 autogradContext->setGraphTask(std::move(graphTask)); }

5.2 第一部分 准备工作
5.2.1 实现 因为这里是计算本地的依赖关系,所以遍历需要从 root 和 本地的 SendRpcBackward 开始计算。我们先要先做一些准备工作:
  • 首先生成一个GraphTask,但是不需要给 GraphTask 传一个cpu_ready_queue,因为我们后面使用execute_graph_task_until_ready_queue_empty,在那里会给每一个调用 建立一个独立的ReadyQueue。
  • 其次用 seen 来记录已经访问过的节点。
  • 构建一个 Node 类型的 queue,把根节点插入到queue。
  • 然后从上下文之中拿到出边Functions,放入到 sendFunctions 之中。
    • sendFunctions就是出边,之前在 addSendFunction之中被添加。
    • 普通状态下,root节点内在反向传播时候,已经有了next edges,但是分布式模式下,出边是在sendFunctions之中
  • 遍历出边 sendFunctions,构建出边列表,对于 sendFunctions 中的每一项:
    • GraphTask 出边数目增加 graphTask->outstanding_tasks_++。
    • 在 queue 之中插入 sendFunctions 中的 SendRpcBackward。
    • 最后,queue 里面是 root 和 若干 SendRpcBackward。
5.2.2 相关 实现之中,使用了部分函数或者成员变量,我们选取重点进行介绍。
5.2.2.1 sendFunctions sendFunctions 是获取了上下文的sendAutogradFunctions_,这是一个 std::unordered_map
std::unordered_map DistAutogradContext::sendFunctions() const { std::lock_guard guard(lock_); return sendAutogradFunctions_; }

sendFunctions就是出边,之前在 addSendFunction之中被添加,addSendRpcBackward 会调用 addSendFunction。
5.2.2.2 outstanding_tasks_ 利用 graphTask->outstanding_tasks_++ 把GraphTask 出边数目增加。
GraphTask outstanding_tasks_ 是 GraphTask 的成员变量。
  • outstanding_tasks_ :用来记录当前任务数目,如果数目为0,则说明任务结束了。 如果这个数量不为0,则此GraphTask依然需要运行。
vania engine 在 vania engine 之中就有 outstanding_tasks_。
是待处理 NodeTask的数量,用来判断该GrapTask是否还需要执行,如果数目为0,则说明任务结束了。
  • 当 GraphTask 被创建出来时候,此数值为0。
  • 如果有一个NodeTask被送入到 ReadyQueue,则outstanding_tasks_ 增加 1。
  • 如果在工作线程作执行一次 evaluate_function(task)后,outstanding_tasks的值减1。
  • 如果这个数量不为0,则此GraphTask依然需要运行。
bool GraphTask::completed() { return outstanding_tasks_.load() == 0 || (exit_on_error_ && has_error_.load()); }

NodeTask任务增加时候 outstanding_tasks_ 就加一。
dist engine 在计算依赖时候,遍历 sendFunctions,上下文有几个SendRpcBackward,就把 outstanding_tasks_ 就加几,每多一条出边,就意味着多了一个计算过程。
std::unordered_map DistAutogradContext::sendFunctions() const { std::lock_guard guard(lock_); return sendAutogradFunctions_; }

而执行时候,void DistEngine::execute_graph_task_until_ready_queue_empty 和 Engine::thread_main 都会减少 outstanding_tasks_。
5.3 第二部分 计算依赖
第二部分是遍历图,计算依赖关系。
5.3.1 实现 此时 queue 里面是 root 和 若干 SendRpcBackward,所以接下来就是从 queue 之中不停弹出Node 进行计算。具体逻辑是:
  • 遍历所有发送边(从 queue 之中不停弹出Node ),对于每个Node,遍历Node(根节点或者SendRpcBackward)的next_edges:
    • 如果可以得到一个边,则:
      • 对应的节点依赖度加一。
      • 如果之前没有访问过,就插入到queue。
      • 如果这个边本身没有输出边,说明是叶子节点,叶子节点有两种:AccumulateGrad 或者 RecvRpcBackward。
        • 对于 recvBackwardEdges.emplace_back(edge) 做特殊处理。
        • 插入到最终输出边 outputEdges,注意,RecvRpcBackward 也插入到这里。
这之后,局部变量 recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad 和 RecvRpcBackward
5.3.2 叶子节点的种类 有两种叶子节点,所以需要分开处理。
  • AccumulateGrad : 普通叶子节点,就是本地叶子节点。
  • RecvRpcBackward : 在正向图中,是RPC接收节点。
从设计文档之中,有如下对应:"
我们发现了一个叶节点,它应该是AccumulateGrad或RecvRpcBackward。我们记录函数以确保我们不执行它,而是在autograd上下文中累积梯度。这些函数将作为"输出"参数传入到vanilla autograd引擎。
我们没有在RecvRpcBackward上下文积累任何梯度。RecvRpcBackward被添加为输出边,以指示它是叶节点,这有助于正确计算本地autograd graph的依赖关系。将RecvRpcBackward放在"outputEdges"中意味着需要执行此函数(与我们对快速模式的假设一致,即所有send/recv函数在向后传播中都有效),因此也需要执行其所有祖先函数。
比如,对于 work 1, recv 就是叶子节点,是一个RecvRpcBackward,它需要把梯度传递给 worker 0。对于 worker 0,上面的子图,t1, t2 也是叶子节点,都是AccumulateGrad。
[源码解析]|[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
文章图片

5.4 第三部分 得到Functions
这部分根据依赖关系找到需要计算那些functions
5.4.1 算法 现在让我们计算需要执行哪些函数。算法如下:
    1. 创建一个虚拟GraphRoot,它指向此上下文和原始GraphRoot的所有"发送"函数。使用outputEdges和虚拟GraphRoot来运行"init_to_execute"。这确保我们根据需要标记适当的函数:如果它们只能从本地特定的"发送"函数访问,而不需要从提供的根访问。
    1. 对于"outputEdges"中指向"RecvRpcBackward"的所有边,根据执行需要标记这些函数。原因是"init_to_execute"会将这些标记为不需要。但"RecvRpcBackward"的独特之处在于,我们将其用作图中的叶节点来准确计算所需的执行操作,但与AccumageGrad不同,我们确实需要执行此函数。
具体就是:
  • RecvRpcBackward 需要执行。
  • AccumulateGrad 需要累积梯度。
5.4.2 实现 此时,recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad 和 RecvRpcBackward。我们需要根据这些信息来标识后续如何执行。具体实现是:
  • 先计算 AccumulateGrad,如果 outputEdges 不为空,则把 outputEdges 的信息插入到 GraphTask.exec_info_ 之中
    • 构建一个 edge_list edges,就是出边列表。
    • 遍历 sendFunctions,得到输出列表,加入到 edges。
    • root也加入出边列表。
    • 建立一个虚拟Root。
    • 如果出边不为空,则会调用 init_to_execute 对GraphTask进行初始化。
    • 遍历 GraphTask 的 exec_info,exec_info_ 的数据结构是std::unordered_map
      • 看看此张量是否在所求梯度的张量路径上。
      • 如果不在路径之上,就跳到下一个张量。
      • 拿到 exec_info_ 的 Node。
      • 如果 Node 是叶子节点。
        • 遍历张量路径上的节点。
        • 给张量插入Hook。这里是关键,就是 AccumulateGrad 对应的张量加上了 Hook,用来后续累积梯度
  • 遍历 recvBackwardEdges,对于每个 recvBackward,在 GraphTask.exec_info_ 之中对应项之上设止为 "需要执行"。
至此,依赖项处理完毕,所有需要计算的函数信息都位于 GraphTask.exec_info_ 之上,我们在下一篇来看看如何执行。
5.5 小结
我们总结一下计算依赖的逻辑:
  1. computeDependencies 开始计算依赖。
  2. 从 DistAutogradContext 之中获取 sendAutogradFunctions_,把 SendRpcBackward 都放入到 sendFunctions。普通状态下,root节点内在反向传播时候,已经有了next edges,但是分布式模式下,出边是在sendFunctions之中,所以要提取出来,放入下面的 queue。
  3. 遍历 sendFunctions,把 Node 加入到 queue,此时 queue 之中是 root 和 一些 SendRpcBackward。
  4. 遍历 Queue 进行处理,处理结果是两个局部变量 edge_list。 recvBackwardEdges 里面是RecvRpcBackward,outputEdges 里面是 AccumulateGrad 和 RecvRpcBackward,我们需要根据这些信息来标识后续如何执行。
  5. 遍历 recvBackwardEdges 和 outputEdges,把相关信息加入到GraphTask.exec_info_,至此,依赖项处理完毕,所有需要计算的函数信息都位于 GraphTask.exec_info_ 之上。
    1. AccumulateGrad 被加入了 Hook,用来后续累积梯度。
    2. RecvRpcBackward 被设置了需要执行。
computeDependencies + +---------------------------+| 1 | DistAutogradContext|| ||v ||2 |sendAutogradFunctions_ +-------> map > sendFunctions || +---------------------------++ | | 3 vqueue queue+ | 4 | | vrecvBackwardEdges = [RecvRpcBackward 1, RecvRpcBackward 2, ...]outputEdges = [RecvRpcBackward 1, RecvRpcBackward 2, AccumulateGrad 1, AccumulateGrad 2, ...]+ | | 5 vGraphTask.exec_info_

0xFF 参考 Distributed Autograd Design
Remote Reference Protocol
PyTorch 源码解读之分布式训练了解一下?
https://pytorch.org/docs/stable/distributed.html
https://pytorch.apachecn.org/docs/1.7/59.html
https://pytorch.org/docs/stable/distributed.html#module-torch.distributed
https://pytorch.org/docs/master/notes/autograd.html
https://pytorch.org/docs/master/rpc/distributed_autograd.html
https://pytorch.org/docs/master/rpc/rpc.html
https://www.w3cschool.cn/pytorch/pytorch-cdva3buf.html
PyTorch 分布式 Autograd 设计
Getting started with Distributed RPC Framework
Implementing a Parameter Server using Distributed RPC Framework
Combining Distributed DataParallel with Distributed RPC Framework
Profiling RPC-based Workloads
Implementing batch RPC processing
【[源码解析]|[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)】Distributed Pipeline Parallel

    推荐阅读