import 'dart:async'; extension LetExtension on T { /// 类似 Kotlin 的 let 函数,允许对任意对象执行代码块 R let(R Function(T it) block) { return block(this); } } extension ApplyExtension on T { /// 类似 Kotlin 的 apply 函数,允许对对象执行配置操作,并返回自身 T apply(void Function(T it) block) { block(this); return this; } } extension AlsoExtension on T { T also(void Function(T it) block) { block(this); return this; } } extension RunExtension on T { R run(R Function(T it) block) => block(this); } extension StreamBufferTimeExtension on Stream { /// 将流中的事件按时间窗口缓冲,每隔 [duration] 时间发送一次缓冲列表 Stream> bufferTime(Duration duration) { StreamController>? controller; List buffer = []; Timer? timer; controller = StreamController>( onListen: () { timer = Timer.periodic(duration, (_) { if (buffer.isNotEmpty) { controller?.add(List.from(buffer)); buffer.clear(); } }); // 监听原始流,收集事件到缓冲区 listen( (event) => buffer.add(event), onError: (error) => controller?.addError(error), onDone: () { timer?.cancel(); // 流结束时发送剩余缓冲事件 if (buffer.isNotEmpty) { controller?.add(List.from(buffer)); buffer.clear(); } controller?.close(); }, ); }, onCancel: () { timer?.cancel(); buffer.clear(); }, ); // 返回控制器对应的 Stream,而不是控制器本身 return controller.stream; } }