概述
为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能
实现流程
- 开发者在主线程封装任务抛给任务队列(创建TaskGroup并通过addTask()添加对应的任务)
- 系统选择合适的工作线程,进行任务的分发及执行(通过execute()执行任务组,并指定优先级)
- 工作线程将结果返回给主线程
实现任务
- 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。
- 从API version 11开始,实现任务的函数需要使用类方法时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。
- 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。
- 实现任务的函数入参需满足序列化支持的类型
mport { taskpool } from '@kit.ArkTS';
@Concurrent function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer { return dataSlice; }
function histogramStatistic(pixelBuffer: ArrayBuffer): void { let number: number = pixelBuffer.byteLength / 3; let buffer1: ArrayBuffer = pixelBuffer.slice(0, number); let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2); let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2); let group: taskpool.TaskGroup = new taskpool.TaskGroup(); group.addTask(imageProcessing, buffer1); group.addTask(imageProcessing, buffer2); group.addTask(imageProcessing, buffer3);
taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => { }) }
|
同步任务示例
- 定义并发函数,内部调用同步方法。
- 创建任务Task,通过execute()接口执行该任务,并对任务返回的结果进行操作。
- 执行并发操作。
"use shared"
@Sendable export default class Handle { private static instance: Handle = new Handle(); static getInstance(): Handle { return Handle.instance; } static syncGet(): void { } static syncSet(num: number): number { console.info("taskpool: this is 1st print!"); console.info("taskpool: this is 2nd print!"); return ++num; } static syncSet2(num: number): number { console.info("taskpool: this is syncSet2 1st print!"); console.info("taskpool: this is syncSet2 2nd print!"); return ++num; } }
import { taskpool} from '@kit.ArkTS'; import Handle from './Handle';
@Concurrent async function func(num: number): Promise<number> { let tmpNum: number = Handle.syncSet(num); console.info("this is Child_Thread") return Handle.syncSet2(tmpNum); }
async function asyncGet(): Promise<void> { let task: taskpool.Task = new taskpool.Task(func, 1); let task2: taskpool.Task = new taskpool.Task(func, 2); let res: number = await taskpool.execute(task) as number; let res2: number = await taskpool.execute(task2) as number; console.info("taskpool: task res is: " + res); console.info("taskpool: task res2 is: " + res2); } @Entry @Component struct Index { @State message: string = 'Hello World'; build() { Row() { Column() { Text(this.message) .fontSize(50) .fontWeight(FontWeight.Bold) .onClick(async () => { asyncGet(); let num: number = Handle.syncSet(100); console.info("this is Main_Thread!") }) } .width('100%') .height('100%') } } }
|
系统对taskpool的管理
- 任务较多时会扩容
- 长时间没有任务分发时会缩容,减少工作线程数量
生命周期
TaskPool自行管理线程数量,其生命周期由TaskPool统一管理(无需开发者关心)
注意事项
- Promise不支持跨线程传递,不能作为concurrent function的返回值
- 不支持在TaskPool工作线程中使用AppStorage
- 序列化传输的数据量大小限制为16MB
- 不同线程中上下文对象不同,因此TaskPool工作线程只能使用线程安全的库
与Worker的异同
taskpool偏向于独立任务,worker偏向于线程
内容 |
taskpool |
worker |
参数传递机制 |
标准的结构化克隆算法进行序列化、反序列化,完成参数传递 支持ArrayBuffer转移和SharedArrayBuffer共享 |
同 |
参数传递 |
直接传递,无需封装,默认进行transfer |
消息对象唯一参数,需要自己封装 |
方法调用 |
直接将方法传入 |
Worker线程中进行消息解析并调用对应方法 |
返回值 |
异步调用后默认返回 |
主动发送消息,需在onmessage解析赋值 |
任务池个数上限 |
自动管理,无需配置 |
同个进程下,最多支持同时开启64个Worker |
任务执行时长上限 |
3分钟(不包含Promise和async/await异步调用的耗时),长时任务无执行时长上限 |
无限制 |
设置任务的优先级 |
支持 |
不支持 |
负载均衡 |
支持 |
不支持 |
执行任务的取消 |
支持 |
不支持 |
线程复用 |
支持 |
不支持 |
任务延时执行 |
支持 |
不支持 |
设置任务依赖关系 |
支持 |
不支持 |
串行队列 |
支持 |
不支持 |
任务组 |
支持 |
不支持 |
使用场景
- 需要设置优先级的任务
- 需要频繁取消的任务
- 大量或者调度点较分散的任务
- IO密集型、三分钟内独立任务的CPU密集型
- 同步任务之间相对独立(静态方法,或者单例实现的方法)