010-82449668

EN 中文

Alluxio 源码完整解析 | 你不知道的开源数据编排系统(下)

回顾

在《Alluxio-源码解析-上》主要讲述了Alluxio本地环境搭建,源码项目结构,服务进程的启动流程和服务间RPC调用。

本篇将在上篇的基础上,继续为大家讲述Alluxio中重点类详解,Alluxio中Block底层读写流程,Alluxio Client调用流程和 Alluxio内置的轻量级调度框架。

Part 1 重点类讲述

1.1 Journaled

Journaled接口定义可被Journaled持久化维护的通用方法,通过JournalEntryIterable#getJournalEntryIterator获取Journal元素遍历信息,该接口提供默认checkpoint方法。Journaled接口继承Checkpointed、JournalEntryIterable,定义的方法包括:

  • getJournalEntryIterator:获取Journal所有元素;
  • getCheckpointName:获取checkpoint class类名称;
  • writeToCheckpoint:持久化写入所有状态的checkpoint;
  • restoreFromCheckpoint:checkpoint恢复;
  • processJournalEntry:处理指定的Journal元素,Journal处理核心方法;
  • resetState:重置Journal状态;
  • applyAndJournal:对Journal元素执行和应用Journal操作。

1.2 UnderFileSystem

Alluxio管理和适配数据在底层各个存储系统执行操作,实现UnderFileSystem接口的底层存储可以作为Alluxio的合法UFS。

1.2.1. 类图

UnderFileSystem的类图如下所示,主要由抽象类BaseUnderFileSystem实现,而BaseUnderFileSystem下主要分为两大类:

  • ConsistentUnderFileSystem:具备一致性的UFS实现,主要包括:LocalUnderFileSystem、HdfsUnderFileSystem、CephFSUnderFileSystem等;
  • ObjectUnderFileSystem:对象存储UFS实现,主要包括:S3AUnderFileSystem、COSUnderFileSystem、OSSUnderFileSystem等。

1.2.2.  接口方法

在UnderFileSystem中有两类接口API:

  • 存储系统通用操作,如:创建/删除文件,文件重命名;
  • 处理数据持久化最终一致性的操作(eventual consistency),如:解决当AlluxioMaster维护元数据成功时,但执行UFS操作失败的问题。

1.2.2.1. 存储系统操作

  • create:指定path路径,在UFS中创建数据文件(父目录不存在会自动创建),可通过CreateOptions设置创建文件的用户组和ACL策略;
  • deleteDirectory:删除指定目录,可通过DeleteOptions设置删除的策略和遍历方式;
  • deleteFile:删除指定文件;
  • getDirectoryStatus:获取UFS指定目录状态,需传入已存在的目录文件;
  • getFileStatus:获取UFS指定文件状态;
  • getStatus:获取UFS状态,可指定目录或文件;
  • isDirectory:判断指定路径在UFS是否是目录;
  • open:打开UFS上指定文件,可通过OpenOptions设置文件打开参数;
  • renameDirectory:UFS上指定目录重命名;
  • renameFile:UFS上指定文件重命名;
  • exists:判断指定的文件或目录是否存在;
  • getAclPair:获取UFS的ACL策略;
  • getBlockSizeByte:获取指定目录下UFS的每个Block文件大小,单位bytes;
  • getFileLocations:获取指定路径在UFS关联的存储Location列表;
  • getFingerprint:计算并获取指定路径的文件标识(指纹),文件标识(指纹)的计算必须是确定且不可变的;
  • getOperationMode:获取底层UFS的操作模式,Alluxio的底层存储可以由多种类型UFS组成,该方法用来确定底层UFS的操作模式,例子:底层UFS为:hdfs://ns1/,hdfs://ns2/,则返回结果:{hdfs://ns1/:NO_ACCESS,hdfs://ns2/:READ_WRITE};
  • getPhysicalStores:获取所有UFS类型,包括数据结构和对应权限
  • getSpace:通过制定SpaceType获取UFS中指定路径的存储空间信息,SpaceType包括:SPACE_TOTAL、SPACE_FREE、SPACE_USED;
  • getUnderFSType:获取UFS类型,如hdfs;
  • isFile:判断文件文件在UFS是否存在;
  • isObjectStorage:判断UFS是否是对象存储;
  • isSeekable:判断UFS是否支持搜索;
  • listStatus:指定UFS路径下的文件状态列表,该列表不保证顺序,可通过ListOptions设置是否支持遍历;
  • mkdirs:在UFS上创建指定目录,可通过MkdirsOptions设置目录创建规则,如ACL和递归父目录创建;
  • setAclEntries:指定路径,设置UFS的ALC策略集合;
  • setMode:指定路径,设置UFS ALC Mode,如0777;
  • setOwner:指定路径,设置UFS ALC的user和group;
  • supportsFlush:判断UFS是否支持文件Flush;
  • supportsActiveSync:判断UFS是否支持ActiveSync(访问内部文件共享),ActiveSync相关的接口包括:getActiveSyncInfo、startSync、stopSync、startActiveSyncPolling、stopActiveSyncPolling。

1.2.2.2.  最终一致性操作

  • createNonexistingFile:创建不存在的文件,若文件存在,则退出;
  • deleteExistingDirectory:删除指定目录;
  • deleteExistingFile:删除指定文件;
  • getExistingDirectoryStatus:获取UFS指定目录状态;
  • getExistingFileStatus:获取UFS指定文件状态;
  • getExistingStatus:获取UFS状态,可指定目录或文件;
  • isExistingDirectory:判断指定路径在UFS是否是目录;
  • openExistingFile:打开UFS上指定文件,可通过OpenOptions设置文件打开参数;
  • renameRenamableDirectory:UFS上指定目录重命名;
  • renameRenamableFile:UFS上指定文件重命名。

1.2.2.3. 其他操作

  • cleanup:当数据文件创建时没有正常的成功结束或被抛弃处理,则对底层UFS清理;
  • connectFromMaster:指定AlluxioMaster主机地址,建立指定Master与UFS连接;
  • connectFromWorker:指定AlluxioWorker主机地址,建立指定Worker与UFS连接;
  • resolveUri:给定Alluxio基础URI和路径,返回拼装后的Alluxio路径。

1.3 UfsManager

Alluxio中对底层UFS(Under FileSystem)管理操作的通用统一接口类定义,定义的接口方法包括:

  • addMount:UFS挂载到Alluxio,该方法仅针对Alluxio处理,不对底层UFS操作;
  • removeMount:移除Alluxio中的UFS挂载;
  • get:根据mountId获取挂载的UFS信息;
  • getRoot:获取Alluxio上挂载的根目录信息;
  • getJournal:获取Journal的Location地址;

其中AbstractUfsManager抽象类对UFS管理接口进行基本实现。

1.3.1. UfsClient

维护底层UFS的Client连接信息和其他相关UFS的描述信息,基于UfsClient实现Alluxio对UnderFileSystem的操作。

1.4 BlockClient

BlockClient抽象类定义调用方对Block基本的读写操作,其类图示意如下,主要包括:BlockWriter、BlockReader。

读写Block的定义的方法类:

1.5 DefaultFileSystemMaster

Master服务维护所有FileSystem(文件系统)元数据变更的管理操作,DefaultFileSystemMaster内部基于InodeTree维护文件系统结构,并将InodeTree持久化到日志文件(journal);除此之外,其内部维护多个管理操作,如:InodeLockManager、MasterUfsManager、MountTable等;

备注:DefaultFil1.5.  DefaultFileSystemMastereSystemMaster的启动start方法详情前面所述内容。

1.5.1. 接口方法

FileSystemMaster接口定义master中针对FS的操作方法,DefaultFileSystemMaster继承FileSystemMaster,其接口方法主要包括:

  • cleanupUfs:周期性清理底层UFS;
  • getFileId:基于Alluxio路径URI 获取文件ID,若文件不缓存Alluxio,则调用UFS获取;
  • getFileInfo:根据文件ID获取文件详情,该接口仅对内部服务开放,不对用户直接开放;
  • getPersistenceState:根据文件ID,获取该文件的持久化状态;
  • listStatus:指定Alluxio路径,获取文件状态列表;
  • checkAccess:校验指定Alluxio路径的权限;
  • checkConsistency:校验指定Alluxio路径的文件数据一致性;
  • completeFile:关闭/结束指定Alluxio路径,关闭后,则该文件不可写;
  • createFile:基于指定Alluxio文件路径,创建文件FileInfo;
  • getNewBlockIdForFile:指定Alluxio文件路径,获取下个待操作Block文件的Block ID;
  • getMountPointInfoSummary:获取Alluxio中mount(挂载)路径的快照信息;
  • getDisplayMountPointInfo:获取Alluxio用户展示的Mount信息;
  • delete:删除指定Alluxio路径的文件元信息;
  • getFileBlockInfoList:获取指定Alluxio路径下的所有Block列表信息;
  • getInAlluxioFiles:获取Alluxio中所有的文件列表路径;
  • getInMemoryFiles:获取Alluxio中所有缓存在内存的文件列表路径;
  • createDirectory:创建Alluxio对应的目录,并返回目录ID;
  • rename:Alluxio中文件重命名操作的元数据变更;
  • free:指定Alluxio目录下,释放所有alluxio缓存的block文件信息,支持目录下遍历的文件释放;
  • getPath:根据指定FileId获取Alluxio URI路径;
  • getPinIdList:获取被固定的inode id列表;
  • getUfsAddress:获取master所需的UFS地址;
  • getUfsInfo:根据挂载ID获取对应UFS信息;
  • getLostFiles:获取worker节点丢失的文件列表;
  • mount:核心操作,将UFS路径挂载在Alluxio指定路径;
  • unmount:取消指定Alluxio路径上的UFS挂载;
  • updateMount:更新指定Alluxio路径挂载信息;
  • setAcl:设置Alluxio路径ACL;
  • updateUfsMode:设置底层UFS Mode;
  • validateInodeBlocks:验证inode block信息是否具备完整性;
  • workerHeartbeat:指定worker ID,通知对应worker进行文件的存储持久化;
  • getWorkerInfoList:获取所有worker节点信息列表;
  • getTimeSeries:获取alluxio master中元数据存储的时间版本信息;

1.6 DefaultBlockWorker

1.6.1. 接口

Worker Server针对Block的管理操作,实现接口类:BlockWorker,其接口方法主要包括:

  • getWorkerId:获取worker id;
  • abortBlock:丢弃session中临时创建的block文件;
  • accessBlock:访问指定session和block id下的block信息,该方法可能会在block缓存释放被访问;
  • commitBlock:提交block到Alluxio的管理空间,待提交的block必须是临时的,当block提交成功之前,block是不支持读写访问;
  • commitBlockInUfs:将block提交到UFS持久化;
  • createBlock:在Alluxio管理空间创建block,基于BlockWriter类可对block进行写操作,在block commit提交之前都是临时的;
  • getTempBlockMeta:获取临时block元数据;
  • createBlockWriter:基于session和block id创建BlockWriter,用于block的写操作;
  • getReport:获取worker与master周期性心跳的报告;
  • getStoreMeta:获取整个block存储的元数据信息,包括block中每个存储目录映射和每层存储的容量情况;
  • getStoreMetaFull:与getStoreMeta相似,但包括完整的blockId列表,获取代价更高;
  • getVolatileBlockMeta:根据指定blockId获取block元数据信息;
  • lockBlock:对block进行加锁操作;
  • moveBlock:将block从当前存储Location移动到目标Location;当前仅支持分层存储移动;
  • moveBlockToMedium:block移动并指定对应的存储介质类型(MediumType);
  • createBlockReader:创建BlockReader进行Block读操作,可读取Alluxio Block和 UFS Block;
  • createUfsBlockReader:创建BlockReader进行UFS Block读操作;
  • removeBlock:从Allxuio管理空间移除Block;
  • requestSpace:为指定block获取存储空间,该block必须为临时block;
  • unlockBlock:对block去除锁操作;
  • asyncCache:提交异步缓存请求进行异步的缓存管理;
  • updatePinList:更新底层block存储占用的pin列表;
  • getFileInfo:基于指定file id获取文件信息。

1.6.2. TieredBlockStore

BlockStore定义block的存储接口,用于管理本地block存储,其接口核心目的:具体实现BlockWorker中定义的方法类,接口如下:

TieredBlockStore是BlockStore的实现类,实现了Alluxio中核心功能点:分层存储,使得对应的存储对象可基于block形式进行分层存储管理,并对外暴露提供API进行block管理。TieredBlockStore中内置分配算法确定新block的存取和旧block的释放,基于BlockMetadataManager维护分层存储状态、block读写锁管理等元数据信息。
TieredBlockStore是线程安全的,所有基于block级别的操作都需要调用BlockLockManager来获取对应的读写锁,保证该block下的元数据操作和I/O操作是线程安全的。任何block的元数据操作都需要基于BlockMetadataManager来获取元数据的ReentrantReadWriteLock 读写锁。
Allocator接口定义Alluxio中数据管理的分配策略,接口方法:allocateBlockWithView,目前内部有三种实现类:

  • RoundRobinAllocator:基于round-robin轮训分配,默认从最高层开始分配,当最高层存储空间不足则会到下一层,该分配策略不支持指定存储具体的分层。
  • MaxFreeAllocator:分配到存储中最大剩余空间,当没有指定具体存储分层,默认从最高层开始分配;
  • GreedyAllocator:返回满足存储block大小的第一层存储空间,是存储分配的示例类;

其中BlockStoreLocation定义存储block的location地址和分层信息,描述了三个存储维度:存储层别名、对应存储层目录地址,存储层介质信息。

1.6.2.1. createBlock

当存在可用空间(space)时,基于block分配算法创建临时block;特别的:创建block不会触发其他block的销毁释放,通过BlockMetadataAllocatorView 获取只读的Block元数据信息,为Allocator调度提供数据来源,Allocator分配调度后返回StorageDirView对象并创建TempBlockMeta 并通过BlockMetadataManager管理。存储分配后的元数据会基于createBlockFile方法持久化到Block元文件。


Allocator接口定义Alluxio中数据管理的分配策略,接口方法:allocateBlockWithView,目前内部有三种实现类:

  • RoundRobinAllocator:基于round-robin轮询分配,默认从最高层开始分配,当最高层存储空间不足则会到下一层,该分配策略不支持指定存储具体的分层。
  • MaxFreeAllocator:分配到存储中最大剩余空间,当没有指定具体存储分层,默认从最高层开始分配;
  • GreedyAllocator:返回满足存储block大小的第一层存储空间,是存储分配的示例类;

其中BlockStoreLocation定义存储block的location地址和分层信息,描述了三个存储维度:存储层别名、对应存储层目录地址,存储层介质信息。

1.6.2.2.  freeSpace

同步方法执行Block缓存存储空间执行立刻删除释放,当所有存储分层的空间释放操作结束后才能支持新Block创建。根据BlockMetadataEvictorView 获取Block存储中可移除的Block信息。判断当前缓存存储中是否满足最小连续空间和最小可用空间,若同时满足,则不进行后续空间清理操作;若不满足,则遍历Block信息,判断是否可清理,若可以清理,则删除对应的Block文件及元数据,通过BlockStoreEventListener事件监听器同步Block释放操作。


BlockStoreEventListener 监听BlockStore中元数据变化成功结束的触发事件,主要包括的接口方法类:

  • onAccessBlock:访问Block 事件触发;
  • onAbortBlock:清理和释放临时Block 事件触发;
  • onCommitBlock:提交临时Block并关联Block的存储信息BlockStoreLocation 事件触发;
  • onMoveBlockByClient:基于Client移动Block的BlockStoreLocation 事件触发;
  • onMoveBlockByWorker:基于Worker移动Block的BlockStoreLocation 事件触发;
  • onRemoveBlockByClient:基于Client移除并释放Block的BlockStoreLocation 事件触发;
  • onRemoveBlock:移除并释放Block 事件触发;
  • onBlockLost:Block丢失 事件触发;
  • onStorageLost:存储目录丢失 事件触发。

1.7 PlanDefinition

Alluxio中内置轻量级角度系统的Job执行计划定义,有两个核心部分,1. PlanDefinition#selectExecutors:该方法在Master节点调用,用于选择执行任务的AlluxioJobWorker,2.PlanDefinition#runTask:在JobWorker中运行指定作业计划。PlanDefinition 主要包括的作业定义实现有:
MoveDefinition:在FileSystemMaster校验层级上触发Block的移动操作;

  • ReplicateDefinition:在FileSystemMaster校验层级上触发Block的复制操作;
  • EvictDefinition:在FileSystemMaster校验层级上触发Block释放操作;
  • PersistDefinition:将Alluxio Block缓存存储持久化到底层UFS;
  • CompactDefinition:在指定目录下降结构化表的数据文件进行压缩;
  • MigrateDefinition:Block移动,源和目标Block可以挂载在不同的UFS节点;
  • LoadDefinition:实现简单的Block文件的Load操作。

1.7.1. TaskExecutorManager

管理JobWorker Task执行器,真正的执行任务通过线程池调用TaskExecutor#run,而TaskExecutor#run底层通过PlanDefinition#runTask 实现;同时TaskExecutorManager内部也管理Task的执行容量和Task生命周期管理,如:获取执行的线程池,对任务执行限流/解除限流,任务启停。

 

Part 2 Block读写操作

2.1 读操作

BlockWorker RPC服务提供的客户端的读操作,大致流程如下:

  • BlockWorkerClientServiceHandler.readBlock方法定义Block读取,默认创建请求参数StreamObserverresponseObserver 创建 CallStreamObserver;若支持零拷贝,则使用DataMessageServerStreamObserver
  • 基于CallStreamObserver 创建BlockReadHandler,并调用BlockReadHandler#onReady 开启数据读取,基于线程池提交创建DataReader线程执行;
  • DataReader是Alluxio用于I/O数据读取的线程类,封装了核心的Alluxio读操作逻辑,(1).获取Alluxio数据输入流DataBuffer;(2)调用CallStreamObserver.onNext触发和监听数据流读取;
  • DataReader获取DataBuffer是整个读取处理的核心逻辑,判断数据读取来源:Local、UFS,是否进行Block移动实现短路读;- 创建打开Block,若请求需要加速(promote=true)则操作BlockWorker.moveBlock,将Block移动到存储更高层;- 调用DefaultBlockWorker#createBlockReader 创建BlockReader,判断本地Worker是否可以直接访问,若支持则返回LocalFileBlockReader;若为UFS中,则调用UnderFileSystemBlockReader;- 调用BlockReader.transferTo 读取数据,并将I/O封装为NettyDataBuffer返回。

 


2.1.1. UnderFileSystemBlockReader

UnderFileSystemBlockReader 类实现直接从UFS读取并将读取的信息缓存到读取的Worker Block中,大致流程如下:

  • UfsInputStreamCache.acquire 根据ufs、路径、blockId获取输入流InputStream,若InputStream在缓存中直接获取,若不存在,则根据ufs.openExistingFile 获取底层UFS的文件输入流InputStream;
  • 获取并更新BlockWriter,判断是否存在有对应Block存在,不存在则调用BlockStore.createBlock新建临时Block,并返回对应BlockWriter;
  • 根据第一步骤获取的输入流InputStream和参数offset读取文件,读取的数据:(1).通过BlockWriter写入Block缓存对应Worker;(2).返回调用方读取信息。

备注:

  • LocalFileBlockReader:基于FileChannel.map方法的I/O操作读取文本文件信息
  • RemoteBlockReader:基于远端的Worker(非本地Worker)读取,暂不支持;
  • DelegatingBlockReader根据不同的使用场景,判断和选择使用的BlockReader实现类。

2.1.2. ShortCircuitBlockReadHandler

ShortCircuitBlockReadHandler类是RPC服务实现提供短路读能力,首先Grpc的StreamObserver(观察者模式),一次onNext调用说明一次消息读取,大致的执行流程:

  • 根据OpenLocalBlockRequest获取是否进行加速读取,若加速(promote=true)则调用BlockWorker.moveBlock将存储移动更高层存储分层;
  • 调用BlockWorker.lockBlock 获取Block的读写操作锁,最后BlockWorker.accessBlock获取访问Block

2.2 写操作

BlockWorker RPC服务提供的客户端的写操作,大致流程如下:

  • BlockWorkerClientServiceHandler.writeBlock方法定义Block写入,默认创建请求参数StreamObserverresponseObserver 创建 CallStreamObserver;若支持零拷贝,则使用  BlockWorkerClientServiceHandler;
  • 基于CallStreamObserver 创建DelegationWriteHandler,并调用DelegationWriteHandler#onCancel关闭数据写操作;调用onNext方法进行数据流监听写操作;
  • DelegationWriteHandler 根据请求Command类型获取对应的AbstractWriteHandler 实现类:- ALLUXIO_BLOCK:BlockWriteHandler,数据仅写入Alluxio Block,基于BlockWriter实现写操作;- UFS_FILE:UfsFileWriteHandler,数据仅写入UFS,基于UFS Client创建目录文件并进行I/O操作;- UFS_FALLBACK_BLOCK:UfsFallbackBlockWriteHandler,先基于BlockWriteHandler写入Alluxio Block再写入UFS;

 

AbstractWriteHandler 抽象类关系如下:

2.2.1.  LocalFileBlockWriter

基于本地Worker写入Block文件信息,调用FileChannel.map

2.2.2.  ShortCircuitBlockWriteHandler

ShortCircuitBlockWriterHandler实现短路读的创建本地Block能力,基于onNext调用,大致执行流程:

  • 若仅申请空间资源,则基于BlockWorker.requestSpace 获取Block创建的请求空间资源;
  • 若需创建临时Block,则调用BlockWorker.createBlock创建Block并返回对应Block路径。

 

Part 3 Catalog管理

AlluxioCatalog进行Alluxio中Catalog管理对象,封装和维护了Alluxio中注册的DB信息及各个DB下的Table等元数据信息,其基本的方法操作如下,包括:获取数据库db信息,db元数据同步,db绑定/解绑等操作。

  • attachDatabase:将绑定的db元数据信息维护在内存中并同步持久化到Journal中;
  • syncDatabase:会基于底层udb获取最新元数据database信息,如Hive则调用HMS客户端接口方法IMetaStoreClient#getDatabase获取数据库信息。

 

Part 4 Client操作

4.1 ClientClient接口

抽象定义Alluxio中Client操作,其继承和实现类如下所示,封装了对接各个组件的RPC接口:

  • FileSystemMasterClient:封装 FileSystemMasterClientServiceHandler 相关RPC调用,进行元数据管理操作
  • BlockMasterClient:封装 BlockMasterClientServiceHandler相关RPC调用,进行Block管理操作
  • TableMasterClient:封装 TableMasterClientServiceHandler 相关RPC调用,进行Alluxio Table Catalog管理操作
  • MetaMasterClient:封装 MetaMasterClientServiceHandler 相关RPC调用
  • MetaMasterConfigClient:封装 MetaMasterConfigurationServiceHandler 相关RPC调用
  • JobMasterClient:封装JobMasterClientServiceHandler 相关RPC调用,进行Alluxio Job的调用操作;

4.1.1. FileSystem

Client中定义的文件系统操作接口类,用于元数据管理和数据管理,用户可根据其实现类BaseFileSystem 扩展Client文件操作行为。


FileSystem 中定义的接口方法主要包括以下几类:

  • checkAccess:检查指定路径权限;
  • createDirectory:基于AlluxioURI 创建文件目录;
  • createFile:基于AlluxioURI 创建数据文件;
  • delete:基于AlluxioURI 删除指定文件/目录;
  • exists:基于AlluxioURI判断指定文件/目录是否存在;
  • free:基于AlluxioURI 释放Alluxio空间,但不删除UFS数据文件了;
  • listStatus:列出AlluxioURI文件/目录信息;
  • mount/updateMount/unmount:挂载/更新/取消挂载指定AlluxioURI目录;
  • openFile:打开并读取AlluxioURI文件输入流;
  • persist:将Alluxio中缓存的数据异步持久化底层UFS;
  • rename:Alluxio文件重命名。

4.1.2. FileSystemContext

维护Alluxio基于Client进行文件系统操作的上下文信息,通常的,一个Client JVM进程会使用同个FileSystem连接Alluxio,因此Client对象会在不同线程中共享。FileSystemContext 只有当用户需要个性化配置和认证时才被创建,线程共享的Client会针对FileSystemContext维护独立的线程空间,FileSystemContext 线程不共享(线程安全)会增加Client连接的资源使用,因此当用户停止Alluxio操作后,需要关闭FileSystemContext释放资源。

4.1.3. FileInStream/FileOutStream

Client中定义基于Alluxio文件操作的输入/输出流,如下所示:

  • 输出流:AlluxioFileOutStream,Alluxio输出流写入,底层操作BlockOutStream
  • 输入流:AlluxioFileInStream:Alluxio输入流读取,封装了本地/远端节点数据读取,或者直接基于底层UFS;底层操作BlockInStream,LocalCacheFileInStream,AlluxioHdfsInputStream

4.2 AbstractShell

Client的功能可以通过Shell对外提供操作,AbstractShell抽象类定义Alluxio中Shell命令操作,其继承子类包括:

  • FileSystemShell:Alluxio Shell文件操作入口类;
  • FileSystemAdminShell:Alluxio文件系统管理操作;
  • CollectInfo:Alluxio中从所有Woker节点采集信息命令;
  • TableShell:Alluxio表管理操作;
  • JobShell:Alluxio执行job管理操作。

4.2.1. CatCommand

以CatCommand为例,简述Alluxio Client进行文件读取的大致流程如下:

  • FileSystemShell接收shell命令,执行"cat"打开文件操作,调用CatCommand.run命令,shell命令支持正则和多目录,对每个指定目录执行自定义实现的runPlainPath操作;
  • CatCommand#runPlainPath 方法通过getStatus判断文件类型,若为目录则退出,若为文件则基于FileSystem打开文件获取客户端输入流对象FileInStream(AlluxioFileInStream);
  • 基于AlluxioFileInStream#read读取文件内容,URIStatus维护Alluxio中目录和文件元数据快照信息,基于URIStatus获取指定Alluxio文件对应Block信息,通过Client AlluxioBlockStore中维护的Block信息获取BlockInStream(Block输入流);
  • 基于BlockInStream调用输入流读取操作,底层基于Block的数据读取接口DataReader实现,基于DataReader读取Block详情下述的Block读操作。

4.2.2. TouchCommand

以TouchCommand为例,简述Alluxio Client进行文件写入的大致流程如下:

  • FileSystemShell接收shell命令,执行"touch"打开文件操作,调用TouchCommand.run命令,shell命令支持正则和多目录,对每个指定目录执行自定义实现的runPlainPath操作;
  • TouchCommand#runPlainPath 方法调用FileSystem.createFile 创建文件并在结束后关闭该连接;
  • FileSystem.createFile的方法详解如下:- 基于FileSystemMasterClient获取FileSystemMasterClientServiceHandler 远程的RPC连接信息;- 基于FileSystemMasterClient 调用RPC接口创建数据文件(createFile),将新建Alluxio文件元数据信息同步Alluxio Master;- FileSystem新建Client端的Alluxio文件输出流对象:AlluxioFileOutStream,其底层调用Block的DataWriter对象进行文件处理;- 输出流完成后,执行AlluxioFileOutStream#close方法,调用FileSystemMasterClient#completeFile 判断是否已执行完成,最终基于RPC接口实现completeFile;

 

Part 5 轻量级调度

Alluxio内部基于AlluxioJobMaster和AlluxioJobWoker实现轻量级内置的Alluxio操作调度,Master负责作业的调度管理,而Worker真正执行作业操作。

5.1 调度管理

由前文AlluxioJobMaster启动流程可知,AlluxioJobMaster在启动时会触发JobMaster Server启动,JobMaster内部维护执行计划(plan)的管理追踪器:PlanTracker,用于创建、移除、访问任务作业集合,每个作业都有对应的PlanCoordinator用于分布式作业执行协调。外部服务可通过HTTP和RPC方式调用JobMaster.run 方法根据作业配置(JobConfig)启动并进行作业调度(同步/线程安全的)。JobConfig 定义作业配置接口,分为两类:PlanConfig(单作业执行)、WorkflowConfig(一组作业流执行)。
JobMaster中作业调度管理的大致流程如下:

  • 外部接口可调用JobMaster.run方法触发作业执行,以Plan作业类型为例,调用PlanTracker执行run方法;
  • PlanTracker先校验并移除已完成的作业,并基于PlanCoordinator创建新的作业实例并启动该作业实例;
  • PlanCoordinator作业启动流程:- 基于JobConfig获取对应的PlanDefinition;- 根据可用的Worker列表和PlanDefinition,调用selectExecutors方法获取待执行作业Worker列表;- 调用CommandManager提交作业,将作业及待执行作业worker列表信息维护在内存队列中;
  • 最后,Job Master和Job Worker节点通过RPC心跳检测,下发具体的作业信息给Worker执行。

5.2 作业执行

由前文AlluxioJobWorker启动流程可知,AlluxioJobWorker启动时会触发心跳检测线程CommandHandlingExecutor,对接收到的作业执行调度处理,每个作业启动一个线程执行,作业执行大致流程如下:

  • CommandHandlingExecutor线程启动与JobMaster进行心跳检测,基于JobMasterClient.heartbeat方法获取所有的待执行作业列表;
  • 遍历待执行作业列表,从线程池调用CommandHandler.run线程类执行作业调度,包括的作业类型:启动、取消、注册作业;
  • CommandHandler启动作业会调用TaskExecutorManager 执行作业,以Future执行TaskExecutor 进行线程级别作业调度;
  • TaskExecutor真正执行作业调度:- 对应作业参数进行反序列化操作;- 根据PlanDefinitionRegistry 获取执行Job的PlanDefinition并调动runTask执行作业;


以PersistDefinition为例,大致说明Job Executor操作,将Alluxio Block存储持久化到底层UFS:

  • 获取Alluxio的数据存储URI,读取对应的数据输入流in;
  • 获取指定的UFS目标路径,根据UfsClient判断该路径是否存在,若不存在则创建,并基于UnderFileSystem创建输出流out;
  • 根据I/O操作工具类,将数据从数据流拷贝输出流,持久化到UFS。

Alluxio在数据索引和模型分发中的核心价值与应用

在当前的技术环境下,搜索、推荐、广告、大模型、自动驾驶等领域的业务依赖于海量数据的处理和复杂模型的训练。这些任务通常涉及从用户行为数据和社交网络数据中提取大量信息,进行模型训练和推理。这一过程需要强大的数据分发能力,尤其是在多个服务器同时拉取同一份数据时,更是考验基础设施的性能。

南方科技大学分享:大数据技术如何赋能大模型训练及开发

南方科技大学是深圳在中国高等教育改革发展的时代背景下创建的一所高起点、高定位的公办新型研究型大学。2022年2月14日,教育部等三部委公布第二轮“双一流”建设高校及建设学科名单,南方科技大学及数学学科入选“双一流”建设高校及建设学科名单。

Shopee 在 Alluxio 加速 AI 训练的实践与探索

Shopee是东南亚领航电商平台,覆盖新加坡、马来西亚、菲律宾、泰国、越南、巴西等十余个市场,同时在中国深圳、上海和香港设立跨境业务办公室。2023年Shopee总订单量达82亿,2024年第二季度总订单量同比增长40%,增势强劲。