文件摄取是一种ETL应用程序,它逐行读取文件,验证每个行条目,并经常执行某种类型的数据转换。生成的条目放在数据存储中,这些转换后的数据可以被其他应用程序使用。这种类型的处理经常出现在B2B集成领域,零售商的制造商产品更新批量供应产品,金融服务公司之间的证券交易以及内部批处理过程中。事实上,这个用例非常普遍,很难想象一个没有这样做的企业。
传统上,文件摄取作业在批处理环境中以固定的时间表运行,通常在大型机上运行。“夜间批次”在非工作时间运行,新数据可在第二天运行。这就是问题所在。
今天,没有非工作时间,在数字业务时代,客户和下游内部和合作伙伴应用程序需要及时的全天候数据访问。
现代企业应该有一个文件摄取管道,当每个文件都可用时,它会不断处理。通过这种方式,摄取作业可以全天候分布,并且数据可以尽快用于应用程序。管道由持续监视文件系统并检测新文件何时出现的进程触发。这不应该在云中完成吗?您不希望为此类操作管理本地开发的基础架构。
Spring Cloud Data Flow (SCDF)非常适合构建文件摄取数据管道。
Spring Cloud数据流
SCDF的设计初衷是为流和任务工作负载提供一套通用的工具和服务。SCDF数据管道或SCDF用语中的流是由source(一些外部事件和数据源)和一个Sink(消耗或保存结果的东西)组成。您可以选择在其间添加一个或多个处理器。
流是事件驱动的,不断运行,并处理潜在的无限数据集。相反,任务按需执行。它由调度程序启动,由事件触发或手动启动。任务适用于有限的数据集,并在完成后终止。任务可以对少量数据进行操作并且是短暂的。或者它可能是处理大量项目的长期批处理作业。
在这里,流和任务无缝地协同工作。文件摄取部署为一个简单的流,只要出现感兴趣的新文件就会启动任务。使用SCDF,流只需要配置开箱即用的组件。你不写任何代码。该任务是使用 Spring Cloud Task 构建的自定义代码,通常是 Spring Batch 来执行文件处理。SCDF管理流和任务的部署和执行。Spring Cloud Data Flow还处理整个数据管道编排,包括集中管理和监控。
上图是一个通过SFTP上传文件后,使用SCDF分析处理的文件摄取案例。
使用Spring Batch进行生产方案
处理文件,特别是大文件,需要弹性解决方案。例如,如果数据错误阻止处理单个项目或发生系统或网络中断,我们不希望退出数据并再次处理整个文件。由于这些任务运行了很多次,我们需要一种方法来跟踪每个执行,是否成功运行,成功处理了多少项等等。
这些问题属于Spring Batch的范畴。在运行失败的情况下,Spring Batch允许在最后一个成功步骤之后重新启动作业,以便从部分完成的执行中轻松恢复。此外,验证失败的项目可以作为例外跳过和/或处理,允许该过程继续,并且可以根据需要修复少数失败的项目。Spring Batch JobRepository启用了这些功能(和其他功能)。此存储库由数据库提供支持,该数据库可跟踪作业,作业执行和步骤执行等操作。
Spring Cloud Task提供了更轻量级的抽象,以解决相同的一般生命周期问题以及与SCDF的接口。例如,在TaskRepository中跟踪任务执行和状态。任务执行成功或失败,但未提供恢复功能。
简单的任务可能不需要Spring Batch提供的广泛功能,但Spring Batch非常适合文件摄取。SCDF启动使用Spring Cloud Task构建的任务。所以Spring Batch应用程序必须用Spring Cloud包装才能与SCDF一起使用。这并不难,SCDF知道Spring Batch并自动将任务链接到相关的批处理作业。此外,SCDF UI提供了对Spring Batch作业执行状态和相关详细信息的出色可见性。使用Spring Batch,您将享受更高的弹性,能够将复杂流程定义为一系列步骤,并支持其他批处理概念。Spring Batch还提供了许多组件,专门用于从文件读取记录和将项目写入数据库等步骤。
任务启动器 - Spring Cloud Data Flow 1.7中的新功能
为了提供高效且经济高效的文件摄取解决方案,SCDF 1.7引入了 tasklauncher -dataflow接收 器以及sftp源的专用实现,称为 sftp -dataflow 源。sftp源一直作为构建SCDF流的开箱即用组件提供。它监视远程SFTP服务器上的目录,并在出现新文件时触发事件。最近,sftp源被增强,允许它监视多个SFTP位置。我们经常在客户的旧文件处理应用程序中看到这种情况。由于SFTP是文件摄取最常用的来源,因此它是这些增强的第一个目标。
新添加的sftp -dataflow源进一步简化了最近增值的使用。在其中一个配置的sftp位置创建新文件时,源会将其下载到本地文件系统并输出任务启动请求负载。有效内容包含要启动的任务的名称(批处理作业)以及本地文件路径。您可以选择包含任务可能需要的任何其他参数。tasklauncher-dataflow接收器接收任务启动请求,并通过其 REST API将 其发布到SCDF服务器。
它会扩大处理规模吗?
这个架构很简单,但是细节就是魔鬼。SCDF服务器可以在其部署的任何环境中启动任务。例如,如果SCDF在裸机服务器上运行,则每个启动的任务都会创建一个新的JVM进程。在 Cloud Foundry中 ,每个启动的任务都会创建一个新的任务容器。在 Kubernetes中 ,每个启动的任务都会根据要求创建新的pod或Kubernetes作业。
当自动启动任务以响应事件时,如果有人在远程目录中丢弃100个文件会发生什么?平台可以处理所有这些并发任务吗?为解决此问题,SCDF服务器现在具有可 配置的并发任务执行限制 。如果超出该限制,则将拒绝任务启动请求。默认限制为20,但您可以根据需要调整值。SCDF REST API提供此限制以及当前执行任务的数量。该tasklauncher - 数据流 使用该信息,并停止接收新的任务启动请求,而服务器是在它的极限。
本地文件在云中怎么办?
精明的读者可能已经注意到当源和任务在不同的容器中运行时使用本地文件系统的固有挑战,就像Cloud Foundry的情况一样。首先,容器不共享相同的文件系统。另一方面,本地文件系统是短暂的。如果源或任务容器崩溃并重新启动,则文件将消失。
蛮力方法是不依赖于本地文件系统。实际上,此用例的先前版本提供了SFTP连接参数,例如启动请求中的主机,端口和登录凭据以及远程文件位置。该任务负责下载远程文件。这总是一个选择,但不是一个伟大的选择。除了共享凭据和复制代码以允许每个组件连接到同一SFTP服务器的缺点之外,此解决方案还不具备弹性。如果任务容器崩溃,则必须始终通过下载远程文件并摄取其全部内容开始。通过WAN下载一个非常大的文件,重置数据库,并提取每个项目可能是一件很麻烦的事。
使用某种类型的共享文件存储,首选 Cloud Foundry Volume Services 。最新版本的Pivotal Cloud Foundry支持 Pivotal Application Service 中的“批量服务” 。这需要在IAAS层配置NFS存储。设置完成后,可以直接创建应用程序并将其绑定到NFS服务实例。这允许sftp源和在Cloud Foundry中运行的任务共享相同的NFS装载路径,该路径似乎是本地文件系统的一部分,并且在容器重新启动后仍然存在。
请注意,在 PCF 2.3 之前,绑定到NFS服务需要提供配置参数。SCDF目前不支持此功能,因此使用带有SCDF的卷服务需要PCF 2.3或更高版本。
总结
这些新的SCDF功能为在云中运行文件摄取作业提供了灵活有效的解决方案。大多数繁重的工作都是由已启动的任务处理的,这些任务仅在运行时才使用资源。sftp源能够轮询多个远程服务器意味着单个应用程序实例可以执行以前需要多个实例的操作。通过一些小的自定义,单个管道可以处理所有SFTP文件处理任务。即使在繁重的负载下,这些过程也不会压倒您的平台。