使用HoneyBee在Swift中实现高级并发

在Swift中设计, 测试和维护并发算法非常困难, 正确获取详细信息对于应用程序的成功至关重要。并发算法(也称为并行编程)是一种算法, 旨在同时执行多个(也许很多)操作, 以利用更多的硬件资源并减少总体执行时间。
在Apple平台上, 编写并发算法的传统方法是NSOperation。 NSOperation的设计邀请程序员将并发算法细分为各个长时间运行的异步任务。每个任务将在其自己的NSOperation子类中定义, 并且这些类的实例将通过目标API组合在一起, 以在运行时创建部分任务顺序。这种设计并发算法的方法是苹果平台七年来的最新技术。
苹果在2014年推出了Grand Central Dispatch(GCD), 这是在表达并发操作方面迈出的重要一步。 GCD以及伴随它并为其提供动力的新语言功能块, 提供了一种在发起异步请求后立即紧凑地描述异步响应处理程序的方法。不再鼓励程序员在多个NSOperation子类中的多个文件中分布并发任务的定义。现在, 可以在单个方法内编写整个并发算法。表达能力和类型安全性的提高是概念上的重大转变。这种编写方式的典型算法如下所示:

func processImageData(completion: (result: Image?, error: Error?) -> Void) { loadWebResource("dataprofile.txt") { (dataResource, error) in guard let dataResource = dataResource else { completion(nil, error) return } loadWebResource("imagedata.dat") { (imageResource, error) in guard let imageResource = imageResource else { completion(nil, error) return } decodeImage(dataResource, imageResource) { (imageTmp, error) in guard let imageTmp = imageTmp else { completion(nil, error) return } dewarpAndCleanupImage(imageTmp) { imageResult in guard let imageResult = imageResult else { completion(nil, error) return } completion(imageResult, nil) } } } } }

让我们对该算法进行一些分解。函数processImageData是一个异步函数, 它自己进行四个异步调用以完成其工作。四种异步调用以对基于块的异步处理最自然的方式彼此嵌套。结果块每个都有一个可选的Error参数, 除一个以外的所有块都包含一个附加的可选参数, 表示aysnc操作的结果。
上面的代码块的形状对于大多数Swift开发人员来说似乎都很熟悉。但是这种方法有什么问题呢?以下痛点清单可能会同样熟悉。
  • 嵌套代码块的这种” 厄运金字塔” 形状很快就会变得笨拙。如果再添加两个异步操作会怎样?四个?有条件操作呢?重试行为或资源限制保护如何?现实世界中的代码从未像博客文章中的示例那样简洁明了。 “ 厄运金字塔” 效应很容易导致代码难以阅读, 难以维护并且易于出现错误。
  • 上面示例中的错误处理尝试(尽管是Swifty)实际上是不完整的。程序员已经假定, 两参数的Objective-C风格的异步回调块将始终提供两个参数之一。他们永远不会同时为零。这不是一个安全的假设。并发算法因难以编写和调试而闻名, 而毫无根据的假设也是其中的一部分。对于打算在现实世界中运行的任何并发算法, 完整而正确的错误处理是不可避免的。
  • 进一步思考一下, 也许编写被调用异步函数的程序员并不像你那样有原则。如果在某些情况下被调用函数无法回调怎么办?还是回电不止一次?在这种情况下processImageData的正确性会发生什么?专业人士不会冒险。关键任务功能必须正确, 即使它们依赖于第三方编写的功能。
  • 也许最引人注目的是, 所考虑的异步算法是次优构造的。前两个异步操作都是远程资源的下载。即使它们没有相互依赖性, 上述算法也会按顺序而不是并行执行下载。原因很明显。嵌套块语法助长了这种浪费。竞争激烈的市场不会容忍不必要的落后。如果你的应用程序无法尽快执行其异步操作, 则另一个应用程序将会执行。
我们如何做得更好? HoneyBee是一个Future / Promises库, 它使Swift并发编程变得容易, 富有表现力和安全。让我们用HoneyBee重写上述异步算法, 然后检查结果:
func processImageData(completion: (result: Image?, error: Error?) -> Void) { HoneyBee.start() .setErrorHandler { completion(nil, $0) } .branch { stem in stem.chain(loadWebResource =< < "dataprofile.txt") + stem.chain(loadWebResource =< < "imagedata.dat") } .chain(decodeImage) .chain(dewarpAndCleanupImage) .chain { completion($0, nil) } }

此实施开始的第一行是新的HoneyBee配方。第二行建立默认的错误处理程序。在HoneyBee配方中, 错误处理不是可选的。如果可能出现问题, 则算法必须对其进行处理。第三行打开一个分支, 允许并行执行。 loadWebResource的两个链将并行执行, 其结果将合并(第5行)。这两个已加载资源的组合值将转发到DecodeImage, 依此类推, 直到调用完成为止。
让我们遍历以上痛点列表, 看看HoneyBee如何改进此代码。现在, 维护此功能非常容易。 HoneyBee配方看起来像它所表达的算法。该代码可读, 易懂且可快速修改。 HoneyBee的设计可确保任何错误的指令顺序都将导致编译时错误, 而不是运行时错误。现在, 该功能不易受到错误和人为错误的影响。
所有可能的运行时错误均已得到完全处理。 HoneyBee支持的每个功能签名(其中有38个)确保得到充分处理。在我们的示例中, Objective-C样式的两参数回调将产生一个非空错误, 该错误将被路由到错误处理程序, 或者将产生一个非空值, 该值将沿着链进行下去, 或者如果两者值为nil HoneyBee将生成一个错误, 解释函数回调未履行其约定。
HoneyBee还会处理函数回调被调用次数的合同正确性。如果函数无法调用其回调, 则HoneyBee会产生描述性故障。如果该函数多次调用其回调, 则HoneyBee将取消辅助调用和日志警告。这两种故障响应(以及其他故障响应)都可以根据程序员的个人需求进行定制。
希望已经很明显, 这种形式的processImageData可以正确并行化资源下载, 以提供最佳性能。 HoneyBee最强的设计目标之一是, 配方应看起来像它所表达的算法。
好多了。对?但是HoneyBee提供了更多功能。
警告:下一个案例研究不适合胆小的人。请考虑以下问题描述:你的移动应用程序使用CoreData保持其状态。你有一个称为Media的NSManagedObject模型, 该模型表示上载到后端服务器的媒体资产。允许用户一次选择数十个媒体项目, 并将它们批量上传到后端系统。首先通过引用字符串表示媒体, 该字符串必须转换为Media对象。幸运的是, 你的应用程序已经包含一个辅助方法, 该方法可以执行以下操作:
func export(_ mediaRef: String, completion: @escaping (Media?, Error?) -> Void) { // transcoding stuff completion(Media(context: managedObjectContext), nil) }

将媒体引用转换为媒体对象后, 必须将媒体项目上载到后端。再次, 你有一个辅助功能准备好进行网络工作。
func upload(_ media: Media, completion: @escaping (Error?) -> Void) { // network stuff completion(nil) }

由于允许用户一次选择数十个媒体项目, 因此UX设计人员已指定了相当可靠的有关上传进度的反馈。需求已精简为以下四个功能:
/// Called if anything goes wrong in the upload func errorHandler(_ error: Error) { // do the right thing }/// Called once per mediaRef, after either a successful or unsuccessful upload func singleUploadCompletion(_ mediaRef: String) { // update a progress indicator }/// Called once per successful upload func singleUploadSuccess(_ media: Media) { // do celebratory things } /// Called if the entire batch was considered to be uploaded successfully. func totalProcessSuccess() { // declare victory }

但是, 由于你的应用程序获取的媒体参考有时会过期, 因此如果至少有一半的上传成功, 则业务经理决定向用户发送” 成功” 消息。也就是说, 如果少于一半的上载尝试失败, 则并发进程应声明胜利, 并调用totalProcessSuccess。这是作为开发人员交付给你的规范。但是, 作为一名经验丰富的程序员, 你会意识到必须满足更多要求。
当然, 企业希望批量上传尽快发生, 因此串行上传是不可能的。上载必须并行执行。
但是不要太多。如果你只是不加选择地使整个批次异步, 那么数十个并发上载将淹没移动NIC(网络接口卡), 并且实际上上载的速度将比串行执行的速度慢, 而不是更快。
移动网络连接不被认为是稳定的。即使是短暂的交易也可能仅由于网络连接的更改而失败。为了真正声明上传失败, 我们需要至少重试一次。
重试策略不应包含导出操作, 因为它不会遭受短暂故障。
导出过程是受计算限制的, 因此必须在主线程之外执行。
因为导出是受计算限制的, 所以与其他上载过程相比, 导出应具有更少的并发实例数, 以免影响处理器。
上面描述的四个回调函数都会更新UI, 因此必须在主线程上全部调用。
【使用HoneyBee在Swift中实现高级并发】媒体是一个NSManagedObject, 它来自NSManagedObjectContext, 并且具有必须遵守的自己的线程要求。
这个问题说明似乎有点晦涩吗?如果你发现将来存在此类潜伏的问题, 请不要感到惊讶。我在自己的作品中遇到了这样的情况。首先, 让我们尝试使用传统工具解决此问题。扣起来, 这将不会很漂亮。
/// An enum describing specific problems that the algorithm might encounter. enum UploadingError : Error { case invalidResponse case tooManyFailures }/// A semaphore to prevent flooding the NIC let outerLimit = DispatchSemaphore(value: 4) /// A semaphore to prevent thrashing the processor let exportLimit = DispatchSemaphore(value: 1) /// The number of times to retry the upload if it fails let uploadRetries = 1 /// Dispatch group to keep track of when the entire process is finished let fullProcessDispatchGroup = DispatchGroup() /// How many of the uploads fully completed. var uploadSuccesses = 0// this notify block is called when the full process has completed. fullProcessDispatchGroup.notify(queue: DispatchQueue.main) { let successRate = Float(uploadSuccesses) / Float(mediaReferences.count) if successRate > 0.5 { totalProcessSuccess() } else { errorHandler(UploadingError.tooManyFailures) } }// start in the background DispatchQueue.global().async { for mediaRef in mediaReferences { // alert the group that we're starting a process fullProcessDispatchGroup.enter() // wait until it's safe to start uploading outerLimit.wait()/// common cleanup operations needed later func finalizeMediaRef() { singleUploadCompletion(mediaRef) fullProcessDispatchGroup.leave() outerLimit.signal() }// wait until it's safe to start exporting exportLimit.wait() export(mediaRef) { (media, error) in // allow another export to begin exportLimit.signal() if let error = error { DispatchQueue.main.async { errorHandler(error) finalizeMediaRef() } } else { guard let media = media else { DispatchQueue.main.async { errorHandler(UploadingError.invalidResponse) finalizeMediaRef() } return } // the export was successfulvar uploadAttempts = 0 /// define the upload process and its retry behavior func doUpload() { // respect Media's threading requirements managedObjectContext.perform { upload(media) { error in if let error = error { if uploadAttempts < uploadRetries { uploadAttempts += 1 doUpload() // retry } else { DispatchQueue.main.async { // too many upload failures errorHandler(error) finalizeMediaRef() } } } else { DispatchQueue.main.async { uploadSuccesses += 1 singleUploadSuccess(media) finalizeMediaRef() } } } } } // kick off the first upload doUpload() } } } }

哇!不加评论, 大约有75行。你是否一直遵循推理?如果你在新工作的第一周遇到这个怪物, 你会感觉如何?你是否准备维护或修改它?你知道它是否包含错误?是否包含错误?
现在, 考虑HoneyBee的替代方案:
HoneyBee.start(on: DispatchQueue.main) .setErrorHandler(errorHandler) .insert(mediaReferences) .setBlockPerformer(DispatchQueue.global()) .each(limit: 4, acceptableFailure: .ratio(0.5)) { elem in elem.finally { link in link.setBlockPerformer(DispatchQueue.main) .chain(singleUploadCompletion) } .limit(1) { link in link.chain(export) } .setBlockPerformer(managedObjectContext) .retry(1) { link in link.chain(upload) // subject to transient failure } .setBlockPerformer(DispatchQueue.main) .chain(singleUploadSuccess) } .setBlockPerformer(DispatchQueue.main) .drop() .chain(totalProcessSuccess)

这种形式对你有何影响?让我们一步一步地研究它。在第一行, 我们从主线程开始启动HoneyBee配方。从主线程开始, 我们确保所有错误都将传递到主线程上的errorHandler(第2行)。第3行将mediaReferences数组插入到流程链中。接下来, 我们切换到全局后台队列以准备一些并行性。在第5行, 我们开始对每个mediaReferences进行并行迭代。我们将此并行性限制为最多4个并发操作。我们还声明, 如果至少有一半子链成功(不出错), 则完整迭代将被视为成功。第6行声明了一个finally链接, 无论下面的子链是成功还是失败, 该链接都会被调用。在finally链接上, 我们切换到主线程(第7行)并调用singleUploadCompletion(第8行)。在第10行, 我们在导出操作(第11行)周围将最大并行化设置为1(单次执行)。第13行切换到我们ManagedObjectContext实例拥有的专用队列。第14行声明了一次上载操作的重试尝试(第15行)。第17行再次切换到主线程, 第18行调用singleUploadSuccess。到执行时间线20时, 所有并行迭代均已完成。如果少于一半的迭代失败, 则第20行最后一次切换到主队列(回想每一个都在后台队列中运行), 第21行丢弃入站值(still mediaReferences), 第22行调用totalProcessSuccess。
HoneyBee表单更清晰, 更干净且更易于阅读, 更不用说易于维护了。如果需要循环将Media对象重新集成到类似于map函数的数组中, 那么该算法的长格式会发生什么?进行更改后, 你对如何满足所有算法要求有多大信心?在HoneyBee表单中, 此更改将是用地图替换每个地图以采用并行地图功能。 (是的, 它也减少了。)
HoneyBee是Swift的功能强大的期货库, 它使编写异步和并发算法变得更加容易, 安全和更具表现力。在本文中, 我们已经了解了HoneyBee如何使你的算法更易于维护, 更正确和更快。 HoneyBee还支持其他关键异步范例, 例如重试支持, 多个错误处理程序, 资源保护和收集处理(映射, 过滤和精简的异步形式)。有关功能的完整列表, 请访问网站。要了解更多信息或提出问题, 请访问全新的社区论坛。
附录:确保异步功能的合同正确性确保功能的合同正确性是计算机科学的基本宗旨。如此之多, 以至于几乎所有现代编译器都进行了检查, 以确保声明返回值的函数仅返回一次。返回少于一次或多次以上被视为错误, 并适当地阻止了完整编译。
但是, 这种编译器帮助通常不适用于异步函数。考虑以下(有趣的)示例:
func generateIcecream(from int: Int, completion: (String) -> Void) { if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

generateIcecream函数接受一个I??nt并异步返回一个String。尽管它包含一些明显的问题, 但迅速的编译器欣然接受上述形式为正确的形式。给定某些输入, 此函数可能将完成调用称为零, 一或两次。使用异步功能的程序员通常会在自己的工作中回顾此问题的示例。我们能做什么?当然, 我们可以将代码重构得更整洁(带有范围用例的开关将在这里起作用)。但是有时功能复杂性很难降低。如果编译器可以像定期返回函数那样帮助我们验证正确性, 会更好吗?
事实证明有办法。遵守以下Swifty咒语:
func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion }if int > 5 { if int < 20 { completion("Chocolate") } else if int < 10 { completion("Strawberry") } // else completion("Pistachio") } else if int < 2 { completion("Vanilla") } }

在此函数顶部插入的四行代码迫使编译器验证完成回调仅被调用一次, 这意味着该函数不再编译。这是怎么回事?在第一行中, 我们声明但不初始化最终希望该函数产生的结果。通过将其保留为未定义状态, 我们确保必须先将其分配给它一次, 然后再声明它, 以确保不能将其分配给两次。第二行是一个defer, 将作为此函数的最终操作执行。在函数的其余部分将其分配给finalResult之后, 它将调用finalResult的完成块。第3行创建了一个称为完成的新常量, 该常量遮盖了回调参数。新的完成类型为Void, 它不声明任何公共API。此行确保在此行之后对完成的任何使用都将是编译器错误。第2行上的defer是完成块的唯一允许使用。第4行删除了一个编译器警告, 否则该警告将关于未使用新的完成常数。
因此, 我们已经成功地迫使swift编译器报告该异步功能未履行其合同。让我们逐步进行纠正。首先, 让我们用对finalResult的分配替换所有对回调的直接访问。
func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult ="Strawberry" } // else finalResult = "Pistachio" } else if int < 2 { finalResult = "Vanilla" } }

现在, 编译器报告了两个问题:
error: AsyncCorrectness.playground:1:8: error: constant 'finalResult' used before being initialized defer { completion(finalResult) } ^ error: AsyncCorrectness.playground:11:3: error: immutable value 'finalResult' may only be initialized once finalResult = "Pistachio"

不出所料, 该函数具有将finalResult分配零次的途径, 以及具有多次分配它的途径。我们解决以下问题:
func generateIcecream(from int: Int, completion: (String) -> Void) { let finalResult: String defer { completion(finalResult) } let completion: Void = Void() defer { completion } if int > 5 { if int < 20 { finalResult = "Chocolate" } else if int < 10 { finalResult ="Strawberry" } else { finalResult = "Pistachio" } } else if int < 2 { finalResult = "Vanilla" } else { finalResult = "Neapolitan" } }

“ 开心果” 已移至适当的else子句, 我们意识到我们未能涵盖一般情况-当然是” 那不勒斯” 。
刚刚描述的模式可以很容易地进行调整, 以返回可选值, 可选错误或复杂的类型, 例如常见的Result枚举。通过强制编译器验证回调仅被调用一次, 我们可以断言异步函数的正确性和完整性。

    推荐阅读