转载

Agari使用Airbnb的Airflow实现更智能计划任务的实践

【编者按】这是一篇由 Siddharth Anand 撰写的文章,他是Agari公司的数据架构师。本文是Agari使用Airbnb的Airflow实现更智能计划任务的实践,Airbnb的开源项目Airflow是一种用于数据管道的工作流调度。

Agari使用Airbnb的Airflow实现更智能计划任务的实践 工作流调度程序是一个负责让工作流在可靠并可扩展方法中周期性执行的系统。工作流调度程序是无处不在的,例如,任何有数据仓库的公司都有一个通常用于报告的专门的数据库,该数据库使用工作流调度程序夜以继日地加载到数据库。比如像Agari这样的公司更感兴趣的是可以使用工作流调度程序更可靠地执行复杂而关键的”大”数据科学工作!Agari,是一家电子邮件安保公司,拦截钓鱼网站的问题,正越来越多地利用数据科学、机器学习和大数据的业务尤其出现在如Linkedln、Google和Facebook这样的数据驱动公司,以满足迅速增长的数据和建模需求。

在之前的 文章 中,我描述了我们如何利用AWS在Agari中建立一个可扩展的数据管道。在这篇文章中,我将讨论我们使用工作流调度来提高我们数据管道可靠性的的需求,以提供之前文章的管道作为工作示例。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

工作流调度程序 @Agari – 一个机智的Cron  

(译者注,Cron:在Linux中,我们经常用到 cron 服务器来根据配置文件约定的时间来执行特定的作务。-来自百度百科)

在写以前的文章时,我们仍然使用Linux cron 来计划我们周期性的工作,并且我们需要一个工作流调度程序(又称为DAG)。为什么?在我之前的文章中,我描述了我们如何加载并处理本地收集器中的数据(即存在于我们企业级客户的数据中心里的收集器)。当我们周期性加载数据时,Cron是个很好的第一解决方案,但它不能完全满足我们的需要我们需要一个执行引擎还要做如下工作:

  • 提供一个简单的方式去创建一个新DAG,并且管理已存在的DAG;
  • 开始周期性加载涉及DAG任务的数据;
  • 多次重试任务来解决间歇性问题;
  • 成功或失败的DAG执行都通过电子邮件报告;
  • 提供引人注目的UI设计让人一目了然;
  • 提供集中日志-一个用来收集日志的中心位置供配置管理;
  • 提供强大的CLI用于自动易于集成;
  • 提供状态捕获功能;
  • 对于任何运行,我们能够知道用于运行的输入和配置文件。这在用于评分和分类目的的模型应用程序中是特别重要的。当我们修改我们的模型,我们需要一种方法来挑选一个特别的模型版本满足诊断和归因的需要。

使用Cron时,一个开发者需要写一个程序用于Cron调用。开发者不仅需要写代码来定义和执行DAG,也需要负责控制日志、配置文件管理、指标及见解、故障处理(比如重试失败任务或者对长时间见运行的任务提示超时)、报告(比如把成功或失败通过电子邮件报告),以及状态捕获。不久,每个开发人员都在重复操作。DAG调度程序还考虑到一些辅助需求-比如开发者只需要定义DAG就可以了。

初识Airflow

今年夏天早些时候,我正在寻找一个好的DAG调度程序, Airbnb  开始使用DAG调度程序, Airflow ——它满足了我们上述的所有需求。

创建DAG

Airflow提供一个非常容易定义DAG的机制:一个开发者使用Python 脚本定义他的DAG。然后自动加载这个DAG到DAG引擎,为他的首次运行进行调度。修改一个DAG就像修改Python 脚本一样容易。这使得开发人员更快投入到Airflow架构设计中。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

一旦你的DAG被加载到引擎中,你将会在Airflow主页中看到它。在这个页面,你可以很容易地通过on/off键隐藏你的DAG—这是非常实用的,如果你的一个下游系统正处于长期维护中的话。尽管Airflow能处理故障,有时最好还是隐藏DAG以避免不必要的错误提示。在如下截图中,那“cousin domains”DAG正是被禁用的。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

DAG调度

Airflow为你的DAG提供了一些观点。首先是图形视图,它通过执行2个 Spark作业开始了运行:第一个将一些未经任何处理的控制文件从Avro转换为以日期划分的Parquet文件,第二个运行聚集并标识上特别的日期(比如运行日期)。当第二个Spark把他的输出写到S3,S3“对象已创建”,通知就会被发送到一个SQS队列中。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

下一个任务(即 check_for_sqs_message_branch_condition )提供了其他DAG调度程序所显现不出来的很好的特性—分支条件任务。这个类型任务允许DAG中的各种路径中的其中一个向一个特定任务执行下去。在我们的例子中,如果我们检查并发现SQS中没有数据,我们会放弃继续进行并且发送一封通知SQS中数据丢失的通知邮件!如果一切正常,那么消息将在SQS中显示,我们将继续进行我们管道中的主要工作!这涉及到几个更多的任务:

  • wait_for_new_data_in_db
  • 确保新生成的数据正在被成功地写入数据库
  • wait_for_empty_queue
  • 等待SQS队列清空
  • send_email_notification_flow_successful
  • 查询数据库中导出记录的数量
  • 把数量放在一个“成功”邮件中并发送给工程师

随着时间的推移,我们从根据Airflow的树形图迅速进掌握运行的状态。在下面的图片中,垂直列着的方格表示的是一个DAG在一天里运行的所有任务。以7月26日这天的数据为例,所有的方块都是绿色表示运行全部成功!

Agari使用Airbnb的Airflow实现更智能计划任务的实践

Airflow命令行界面

Airflow还有一个非常强大的命令界面,一是我们使用自动化,一个是强大的命令,“backfill”,、允许我们在几天内重复运行一个DAG。在下面这个树形图中,我们看到一个一个运行30天的backfill。有几天是完成的(例如7月26 到 30日),一些是正在进行中的(例如7月31日、8月1日、8月2日、8月3)和一些尚未被计划的(例如8月16日)。当Airflow可以基于定义DAG时间有限选择的原则时,它可以同时进行几个任务,它基于定义时间有限选择的原则时(比如前期的任务必须在运行执行当前期任务之前成功完成)。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

DAG度量和见解

对于每一个DAG执行,Airflow都可以捕捉它的运行状态,包括所有参数和配置文件,然后提供给你运行状态。我们可以利用这个运行状态来捕获信息,比如我们在使用自己管道中机器学习所需要的不同模型版本这个能帮助我们进行问题诊断和归因。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

在管道执行方面,我们关心管道加速。在下面的甘特图中,我们可以看到我们的Spark工作任务需要多久以便让它加速。此外,我们的wait_for_empty_queue阶段可以被一些积极自动扩展功能进行加速。例如,我们一般一次超出输入者4个单位,一旦我们一次超出8个单位,或者增加最大ASG域范围,比如从20增加到40,这样我们可以减少我们管道中这个阶段所费时间。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

我们也关心运行的时间变化。例如,通常运行一个任务需要30分钟,但是时间会有很大差异么?正如Task Duration 图中所示,在两个阶段中,这两个spark作业时间有很大的不同。在这两个任务中的时间差异就会导致完成全部工作的时间差异很大。因此,这个图很清晰地告诉了为了运行时间更可预测,如果我们要根据速度和可扩展性增强,我们该在哪里花时间。一旦我们解决了这个问题,我们可以考虑转向另个Airflow特征:SLAs (Service-level Agreements)。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

DAG 配置文件

Airflow的另一个特性是变量。变量让我们能够通过一个我们的DAG的Admin屏幕来完成特定环境(如Prod、QA、Dev)的配置文件。这个配置从我们的GIT Repo中拿出来,然后放到UI和Airflow Metadata数据库中排列整齐。它也能够允许我们在通信过程中做出改变而不需要进入Git检查变化和等待部署。

Agari使用Airbnb的Airflow实现更智能计划任务的实践

更多优良特性

Airflow允许你指定任务池,任务优先级和强大的CLI,这些我们会在自动化中利用到。

为什么使用Airflow?

作为一个管理员,Airflow很容易设置(比如你只想通过设置PIP来减轻任务)它有很棒的UI。它的开发者很人性化,因为它允许一个开发者建立简单的DAG并且在几分钟内测试。它是如何与领先的解决方案如Spotify’s Luigi、LinkedIn’s Azkaban和Oozie相比较的?之前在LinkedIn工作时使用过Azkaban,我曾想要一个具有很UI功能的DAG调度程序,至少与Azkaban的持平。Spotify’s Luigi的UI并不好用。然而,Azkaban需要一些构建自动化然后把一些甚至简单但相关的DAG压缩到一个ZIP文件中。这个zip文件压缩了包含树结构表现形式的代码和配置文件的目录,修改DAG需要通过树形配置。Oozie,至少当我上次使用它,需要在XML文件定义DAG——这使得甚至简单的DAG成为一场噩梦。Spotify的Luigi 和Airbnb的 Airflow都在一个简单文件中提供DAG定义,两者都利用Python。另一个要求是DAG调度程序需要是cloud-friendly的。因为Luigi和Airflow都是在云环境中产生的,这样少了一个让人头痛的烦恼。简而言之,我想要Azkaban的UI复杂度和Luigi的云友好、DAG管理和易于定义—Airbnb的Airflow正是那个正确的混合。

我们修改后的架构如下显示:

Agari使用Airbnb的Airflow实现更智能计划任务的实践

警告

值得注意的是:提出Airflow只是几个月前刚刚开始,它仍是个正在进行中的工作。它很有前景,一个专业并且有能力的团队和一个小但是日益成长的社区。作为一个早期采用者,Agari致力于这个成功的项目,无论是在克服bug中做报告,还是提议特性和增强特性,或者代码库都有贡献。

原文链接: How Agari Uses Airbnb's Airflow As A Smarter Cron (译者/王苇棋 审校/Wendy 责编/仲浩)

译者简介: 王苇棋,硕士毕业于香港浸会大学,关注数据挖掘和信息安全。

正文到此结束
Loading...