ImageKnifePro 源码解读(二):请求调度与并发控制
ImageKnifePro 把线程调度交给了 HarmonyOS 的 FFRT(Function Flow Runtime),一个内核级的协程式任务框架。相比 ArkTS taskpool 的"每个 task 独占线程直到返回",FFRT 的并发队列允许更灵活的线程复用,配合 Detach 异步分离机制,网络 I/O 阶段不再占用并发位。 构造函数里创建了三条 FFRT 队列,各有不同用途和并发上限。 taskQueue 是 apiQueue 也是 mainQueue 通过 排队队列 加载阶段的合并 key 是 被合并的请求在 Detach 是调度层最关键的设计。 如果不做 Detach,子线程需要用 Detach 的做法是:fetch 成功后拦截器立即调 当 RCP 下载完成时, 加载完成后检查 解码完成后 所有主线程回调( 文件缓存写入在 每当一个任务完成,主线程回调的最后一行是 Pop 出的请求可能已经过期——组件在排队期间被销毁, Detach 改变了并发位的语义。没有 Detach 时,8 个并发位对应的是"正在等待网络响应"的任务数。有了 Detach,并发位对应的是"正在进行加载前期处理(缓存检查、请求构建)"的任务数。网络 I/O 阶段的请求数量没有上限——它们在 Detach 后就释放了并发位,系统可以立刻从队列中取出下一个请求开始处理。 取消操作在 Detach 场景下需要额外注意。 以上就是本篇内容的所有了~有什么问题欢迎在评论区提出 项目地址:ImageKnifePro一、TaskWorkerFFRT 与三条队列
TaskWorkerFFRT 继承自抽象基类 TaskWorker,编译时通过 IMAGEKNIFEC_USING_FFRT_API 宏决定使用 FFRT 实现还是 NAPI 实现。TaskWorker::GetInstance() 返回静态局部变量,全进程单例。TaskWorkerFFRT::TaskWorkerFFRT()
{
CreateApiQueue();
mainQueue_ = ffrt_get_main_queue();
}ffrt_queue_concurrent 类型,名称为 "ImageKnifePro TaskQueue",默认最大并发 8。所有图片的网络下载、文件缓存读写、图片解码、图形变换任务都提交到这条队列。CreateTaskQueue() 在首次 PushTask(AsyncTask) 时延迟创建——出于避免不必要初始化开销的考虑,如果应用启动后一段时间内没有图片加载请求,这条队列就不会被创建。SetMaxConcurrency() 可以在创建前调整上限。ffrt_queue_concurrent 类型,名称为 "ImageKnifePro ApiQueue",最大并发固定 4。它在构造函数中立即创建,用于轻量异步调用——那些不需要绑定 ImageKnifeTaskInternal 的通用函数,比如 DNS 预解析、缓存文件写入等。ffrt_get_main_queue() 获取,是 HarmonyOS 提供的主线程队列。所有需要回主线程执行的操作——UI 渲染、加载回调触发、DispatchNextJob()——都通过 ffrt_queue_submit(mainQueue_, ...) 提交到这条队列。void TaskWorkerFFRT::CreateTaskQueue(uint64_t maxConcurrency)
{
ffrt_queue_attr_t queue_attr;
(void)ffrt_queue_attr_init(&queue_attr);
ffrt_queue_attr_set_max_concurrency(&queue_attr, maxConcurrency);
taskQueue_ = ffrt_queue_create(ffrt_queue_concurrent, "ImageKnifePro TaskQueue", &queue_attr);
ffrt_queue_attr_destroy(&queue_attr);
}PushTask() 的实现用了 FFRT 的 C 接口。它先创建一个 TaskData 结构体(包含 execute 回调、complete 回调和 task 对象),通过 FunctionWrapper() 包装成 FFRT 可识别的 ffrt_function_header_t,然后调 ffrt_queue_submit() 提交。ffrt_alloc_auto_managed_function_storage_base() 分配的内存在任务完成后自动释放,不需要手动管理。
二、四级优先级队列
DefaultJobQueue 内部维护四个 std::queue:placeholder_、highQueue_、normalQueue_、lowQueue_。Pop() 的优先级顺序是 placeholder -> high -> normal -> low。void DefaultJobQueue::Add(Job job) {
std::lock_guard<std::mutex> guard(queueLock_);
if (job.type == ImageRequestType::PLACE_SRC) {
placeholder_.push(job);
return;
}
if (job.request->GetImageKnifeOption()->priority == Priority::HIGH) {
highQueue_.push(job);
} else if (...priority == Priority::MEDIUM) {
normalQueue_.push(job);
} else {
lowQueue_.push(job);
}
}placeholder_ 队列是占位图专用,优先于所有业务优先级出队。这意味着即使主图请求排满了 high 队列,占位图仍然能最先被执行,保证用户在等待主图加载时至少能看到一个反馈。出于"占位图必须比主图先出现"的交互需求,把它单独提到最高优先级而不是复用 high 队列是合理的。Add() 和 Pop() 都用 std::mutex 保护。加锁的原因是 Add() 可能在主线程调用(Enqueue() 路径),也可能在子线程调用(RequeueCanceledJob() 路径),两个线程同时操作 std::queue 会导致未定义行为。三、请求去重——两阶段合并
ImageKnifeDispatcher 使用两个 JobListsMap 做请求合并:loadingJobMap_ 对应加载阶段(下载),decodingJobMap_ 对应解码阶段。JobListsMap 内部是一个 mutex 保护的 unordered_map<string, shared_ptr<list<Job>>>。bool ImageKnifeDispatcher::JobListsMap::Emplace(const std::string &key, IJobQueue::Job &&job)
{
std::lock_guard<std::mutex> guard(lock_);
auto it = jobListsMap_.find(key);
if (it == jobListsMap_.end()) {
auto jobList = std::make_shared<std::list<IJobQueue::Job>>();
jobList->emplace_back(std::move(job));
jobListsMap_.emplace(key, std::move(jobList));
return true; // 首次出现,需要启动新任务
} else {
job.request->GetImageInfo().isMergedRequest = true;
it->second->emplace_back(std::move(job));
return false; // 已合并,不需要新任务
}
}loadingKey(基于图片 URL),解码阶段的 key 是 memoryKey(基于 URL + 变换参数 + 降采样 + 动图标识等)。两阶段合并的意义在于:两个请求即使最终的 memoryKey 不同(比如变换参数不同),只要下载同一张图片就能共享下载结果。下载完成后进入解码阶段,这时按 memoryKey 再合并一次,变换参数相同的请求共享解码结果。ImageInfo 上标记 isMergedRequest = true,任务完成时 MergedRequestUpdateImageInfo() 将 HTTP 状态码、磁盘检查耗时、解码耗时、图片宽高等信息同步给每个被合并的请求。Enqueue() 的主流程:先尝试内存缓存,命中直接返回。未命中则检查 loadingJobMap_.Size() >= maxRequests(默认 8),超限就将 job 推入排队队列等待。四、Detach 异步分离
DownloadInterceptorDefault 使用 RCP 发起异步请求,HMS_Rcp_Fetch() 提交后立即返回,下载结果通过 ResponseCallback 在系统 I/O 线程中异步回调。promise/future 阻塞等待下载完成。这个线程在整个网络 I/O 期间被占用却什么也不做——一张图片从发出 HTTP 请求到收到完整响应可能需要 100-300ms,8 个并发位很快被网络等待耗尽。if (IsDownloadDetachEnabled()) {
Detach(task);
} else {
bool res = data->waitDownload.get_future().get();
delete data;
return res;
}LoadInterceptor::Detach(task),在 task 内部标记为已分离状态。Process 方法在 ExecuteResolveFunction 返回后检查 IsDetached(),如果为 true 则直接返回,不再继续后续拦截器,也不等待下载结果。当前工作线程释放回 FFRT 线程池,可以立刻处理下一个加载任务。IsDownloadDetachEnabled() 检查 API 版本是否大于 13,静态缓存结果避免重复查询。低版本设备退化为 promise/future 阻塞等待。ResponseCallback 在系统 I/O 线程中被调用。它解析 HTTP 响应,检查状态码,填充 task->product.imageBuffer,然后调 OnComplete(task, result)。void FinishLoadChain(shared_ptr<ImageKnifeTaskInternal> &task, const bool &result)
{
TaskWorker::GetInstance()->PushTask(
[](shared_ptr<ImageKnifeTaskInternal> task) {
ImageKnifeDispatcher::GetInstance().OnTaskComplete(
task, IJobQueue::EndPhase::LOADING);
}, nullptr, task, ImageKnifeQOS::USER_INITIATED);
}OnComplete() 先完成 CRC32 校验(如果请求设置了校验值),校验失败则清空 imageBuffer。如果下载失败且配置了 fallbackUrls,会在当前线程中重试下一个 URL。全部 URL 耗尽仍然失败,交给责任链中的下一个 LoadInterceptor。最终通过 FinishLoadChain() 将 task 重新提交到 FFRT 的 taskQueue_,回到正常的加载完成流程。
五、ExecuteTask 的任务流转
ExecuteTask() 构造一个 lambda 闭包,捕获 this 指针和 task 的 shared_ptr,调 TaskWorker::PushTask() 提交到 taskQueue_。FFRT 在子线程中执行这个闭包,闭包内调用 LoadImageSource(),依次经过内存缓存拦截器、文件缓存拦截器、下载拦截器的责任链。task->IsDetached()。如果没有分离(本地图片或低版本设备),直接在当前子线程调 OnTaskComplete(task, EndPhase::LOADING),从 loadingJobMap_ 取出 jobList。如果任务成功且需要解码,通过 decodingJobMap_.Emplace() 插入解码阶段的合并 map,再提交一个 PrepareDecodeTask 到 taskQueue_。OnTaskComplete(task, EndPhase::DECODING) 调用 CompleteJobList()。这个方法遍历 jobList 中的每个 job,区分"取消/过期/成功/失败"四种状态分别处理。被取消的合并请求需要重新入队(RequeueCanceledJob()),因为主请求的取消不应连带终止其他独立请求。onLoadSuccess、onLoadFailed、onLoadCancel、DisplayImage)被收集到一个 FuncList 中,通过 TaskWorker::ToMainThread() 一次性提交到 mainQueue_ 批量执行,减少线程切换次数。TaskWorker::GetInstance()->ToMainThread([task, jobList](void *data) {
FuncList *funcList = static_cast<FuncList *>(data);
for (auto &func : *funcList) {
func(task);
}
delete funcList;
ImageKnifeDispatcher::GetInstance().DispatchNextJob();
}, funcList);CompleteJobList() 返回后由 WriteFileCacheAndResolvePromise() 单独执行,不在主线程上操作,避免阻塞 UI。六、DispatchNextJob 的调度循环
DispatchNextJob()。这个方法检查 loadingJobMap_.Size() < maxRequests,如果有空位就从排队队列 Pop 一个 job 出来。requestState 变成了 DESTROY。这种情况下不执行加载,触发 ProcessCanceledRequest 后继续 Pop 下一个,直到找到一个有效请求或队列清空。CancelInterceptor() 获取 task 当前所在的拦截器,调用其 Cancel() 方法。DownloadInterceptorDefault::Cancel() 通过 HMS_Rcp_CancelRequest() 取消正在进行的 HTTP 请求。但取消和 ResponseCallback 之间存在竞态:Cancel 和 Fetch 完成可能并发。代码用 cancelMutex_ 和 rcpRequestCanceled 标志位解决了这个问题。fetch 之后先加锁记录 rcpRequest,如果发现 rcpRequestCanceled 已经为 true(说明 Cancel 先到),就补调一次 Cancel。两把锁分开操作——sessionLock_ 保护全局 session 接口调用,cancelMutex_ 保护单个任务的取消状态——避免了嵌套加锁导致的死锁风险。