概述

为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能

实现流程

  1. 开发者在主线程封装任务抛给任务队列(创建TaskGroup并通过addTask()添加对应的任务)
  2. 系统选择合适的工作线程,进行任务的分发及执行(通过execute()执行任务组,并指定优先级)
  3. 工作线程将结果返回给主线程

实现任务

  1. 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。
  2. 从API version 11开始,实现任务的函数需要使用类方法时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。
  3. 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。
  4. 实现任务的函数入参需满足序列化支持的类型
mport { taskpool } from '@kit.ArkTS';

@Concurrent
function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
// 步骤1: 具体的图像处理操作及其他耗时操作
return dataSlice;
}

function histogramStatistic(pixelBuffer: ArrayBuffer): void {
// 步骤2: 分成三段
let number: number = pixelBuffer.byteLength / 3;
let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);
// 步骤3: 添加对应的任务,并发调度
let group: taskpool.TaskGroup = new taskpool.TaskGroup();
group.addTask(imageProcessing, buffer1);
group.addTask(imageProcessing, buffer2);
group.addTask(imageProcessing, buffer3);

taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => {
// 步骤4: 指定优先级并执行,结果数组汇总处理
})
}

同步任务示例

  1. 定义并发函数,内部调用同步方法。
  2. 创建任务Task,通过execute()接口执行该任务,并对任务返回的结果进行操作。
  3. 执行并发操作。
// Handle.ets 代码
"use shared"
// 单例
// 实现任务的函数需要使用**类方法**时,该类必须使用装饰器@Sendable装饰器标注
@Sendable
export default class Handle {
private static instance: Handle = new Handle();
static getInstance(): Handle {
// 返回单例对象
return Handle.instance;
}
static syncGet(): void {
// 同步Get方法
}
static syncSet(num: number): number {
// 模拟同步步骤1
console.info("taskpool: this is 1st print!");
// 模拟同步步骤2
console.info("taskpool: this is 2nd print!");
return ++num;
}
static syncSet2(num: number): number {
// 模拟同步步骤1
console.info("taskpool: this is syncSet2 1st print!");
// 模拟同步步骤2
console.info("taskpool: this is syncSet2 2nd print!");
return ++num;
}
}

// Index.ets代码
import { taskpool} from '@kit.ArkTS';
import Handle from './Handle'; // 返回静态句柄
// 步骤1: 定义并发函数,内部调用同步方法
@Concurrent
async function func(num: number): Promise<number> {
// 调用静态类对象中实现的同步等待调用
// 先调用syncSet方法并将其结果作为syncSet2的参数,模拟同步调用逻辑
let tmpNum: number = Handle.syncSet(num);
console.info("this is Child_Thread")
return Handle.syncSet2(tmpNum);
}
// 步骤2: 创建任务并执行
async function asyncGet(): Promise<void> {
// 创建task、task2并传入函数func
let task: taskpool.Task = new taskpool.Task(func, 1);
let task2: taskpool.Task = new taskpool.Task(func, 2);
// 执行task、task2任务,await保证其同步执行
let res: number = await taskpool.execute(task) as number;
let res2: number = await taskpool.execute(task2) as number;
// 打印任务结果
console.info("taskpool: task res is: " + res);
console.info("taskpool: task res2 is: " + res2);
}
@Entry
@Component
struct Index {
@State message: string = 'Hello World';
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(async () => {
// 步骤3: 执行并发操作
asyncGet();
let num: number = Handle.syncSet(100);
console.info("this is Main_Thread!")
})
}
.width('100%')
.height('100%')
}
}
}

系统对taskpool的管理

  1. 任务较多时会扩容
  2. 长时间没有任务分发时会缩容,减少工作线程数量

生命周期

TaskPool自行管理线程数量,其生命周期由TaskPool统一管理(无需开发者关心)

注意事项

  1. Promise不支持跨线程传递,不能作为concurrent function的返回值
  2. 不支持在TaskPool工作线程中使用AppStorage
  3. 序列化传输的数据量大小限制为16MB
  4. 不同线程中上下文对象不同,因此TaskPool工作线程只能使用线程安全的库

与Worker的异同

taskpool偏向于独立任务,worker偏向于线程

内容 taskpool worker
参数传递机制 标准的结构化克隆算法进行序列化、反序列化,完成参数传递
支持ArrayBuffer转移和SharedArrayBuffer共享
参数传递 直接传递,无需封装,默认进行transfer 消息对象唯一参数,需要自己封装
方法调用 直接将方法传入 Worker线程中进行消息解析并调用对应方法
返回值 异步调用后默认返回 主动发送消息,需在onmessage解析赋值
任务池个数上限 自动管理,无需配置 同个进程下,最多支持同时开启64个Worker
任务执行时长上限 3分钟(不包含Promise和async/await异步调用的耗时),长时任务无执行时长上限 无限制
设置任务的优先级 支持 不支持
负载均衡 支持 不支持
执行任务的取消 支持 不支持
线程复用 支持 不支持
任务延时执行 支持 不支持
设置任务依赖关系 支持 不支持
串行队列 支持 不支持
任务组 支持 不支持

使用场景

  1. 需要设置优先级的任务
  2. 需要频繁取消的任务
  3. 大量或者调度点较分散的任务
  4. IO密集型、三分钟内独立任务的CPU密集型
  5. 同步任务之间相对独立(静态方法,或者单例实现的方法)