| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- //
- // PhotosImageClassifier+Batchs.swift
- // Demo
- //
- // Created by Groot on 2025/4/27.
- //
- import Foundation
- // MARK: - 批处理方法
- extension PhotosImageClassifier {
- /// 按批次处理图片项目
- /// - Parameters:
- /// - types: 分类类型
- /// - items: 图片项目列表
- /// - progressHandler: 进度和批次结果回调处理器
- func processBatches(
- with types: [PhotoImageClassifyType],
- items: [ImageItem],
- batchCompletionHandler: @escaping BatchProgressCompletion
- ) async {
- guard !items.isEmpty else {
- print("Unprocess ImageItems count is Zero --- Finished batch")
- return
- }
- // 计算需要处理的批次总数
- let totalItems = items.count
- let batchCount = Int(ceil(Double(totalItems) / Double(configuration.batchSize)))
- let startTime = Date()
-
- for batchIndex in 0..<batchCount {
- let batchStartTime = Date()
- // 处理当前批次
- let startIndex = batchIndex * configuration.batchSize
- let endIndex = min((batchIndex + 1) * configuration.batchSize, totalItems)
- guard startIndex <= endIndex else { return }
- let currentBatch = Array(items[startIndex..<endIndex])
-
- // 处理当前批次
- let processedItems = await processBatch(with: types, items: currentBatch)
-
- // 是否为最后一个批次
- let isLastBatch = batchIndex == batchCount - 1
-
- let batchDuration = Date().timeIntervalSince(batchStartTime)
- let totalDuration = Date().timeIntervalSince(startTime)
- let progress = Double(batchIndex + 1) / Double(batchCount)
- // 创建进度信息对象
- let progressInfo = ClassificationProgress(
- currentBatch: batchIndex + 1,
- totalBatches: batchCount,
- rate: progress,
- batchDuration: batchDuration,
- totalDuration: totalDuration,
- isLastBatch: isLastBatch
- )
-
- // 回调进度更新和批次处理结果
- await batchCompletionHandler(progressInfo, processedItems)
- }
- }
-
- /// 处理单批次图片项目
- /// - Parameters:
- /// - types: 分类类型
- /// - items: 图片项目列表
- /// - Returns: 处理后的图片项目列表
- func processBatch(with types: [PhotoImageClassifyType], items: [ImageItem]) async -> [ImageItem] {
- return await limitConcurrentProcessing(items: items, maxConcurrent: configuration.maxConcurrentProcessing) { item in
- // 请求图片
- var mutableItem = await self.requestAssetsImageFor(item, options: self.imageRequestOptions)
- // 处理相似图片、人物图片、模糊图片
- self.processImage(with: types, item: &mutableItem)
- // 提取图片信息
- mutableItem.imageInfo = mutableItem.asset.extractAssetInfo()
- return mutableItem
- }
- }
-
- // MARK: - 辅助方法
-
- /// 限制并发任务处理
- /// - Parameters:
- /// - items: 待处理项目
- /// - maxConcurrent: 最大并发数
- /// - processItem: 单项处理函数
- /// - Returns: 处理结果数组
- func limitConcurrentProcessing<T>(
- items: [ImageItem],
- maxConcurrent: Int,
- processItem: @escaping (ImageItem) async -> T?
- ) async -> [T] {
- guard !items.isEmpty else { return [] }
-
- var results: [T] = []
-
- // 使用TaskGroup并发处理
- await withTaskGroup(of: T?.self) { group in
- // 控制进行中的任务数量
- var pendingItems = items[...]
- var runningTasks = 0
-
- // 添加任务的辅助函数
- func addTask(for item: ImageItem) {
- group.addTask {
- return await processItem(item)
- }
- pendingItems = pendingItems.dropFirst()
- runningTasks += 1
- }
-
- // 初始添加任务,不超过最大并发数
- while runningTasks < maxConcurrent, let item = pendingItems.first {
- addTask(for: item)
- }
-
- // 处理完成的任务,并添加新任务
- for await result in group {
- runningTasks -= 1
-
- // 添加有效结果
- if let result = result {
- results.append(result)
- }
-
- // 如果还有待处理项,继续添加任务
- if let nextItem = pendingItems.first, runningTasks < maxConcurrent {
- addTask(for: nextItem)
- }
- }
- }
-
- return results
- }
- }
|