async_util.dart 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import 'dart:async';
  2. typedef FutureCallback<T> = Future<T> Function();
  3. typedef IntervalCallback<T> = Future<T> Function(int times);
  4. typedef CancelCallback<T> = void Function();
  5. typedef FutureCompleter<T> = void Function(Completer<T> completer);
  6. typedef Predicate<T> = bool Function(T? value);
  7. class AsyncUtil {
  8. AsyncUtil._();
  9. static CancelableFuture<T> retryWithExponentialBackoff<T>(
  10. FutureCallback<T> callback, int maxRetry,
  11. {Duration initialInterval = const Duration(seconds: 1),
  12. Predicate<dynamic>? predicate}) {
  13. int retryCount = 0;
  14. Timer? timer;
  15. void attempt(Completer<T> completer) {
  16. callback().then((value) {
  17. if (!completer.isCompleted) {
  18. completer.complete(value);
  19. }
  20. }).catchError((error) {
  21. if (retryCount < maxRetry && (predicate == null || predicate(error))) {
  22. retryCount++;
  23. Duration nextInterval = initialInterval * (1 << (retryCount - 1));
  24. timer = Timer(nextInterval, () => attempt(completer));
  25. } else {
  26. if (!completer.isCompleted) {
  27. completer.completeError(error);
  28. }
  29. }
  30. });
  31. }
  32. return CancelableFuture<T>((completer) {
  33. attempt(completer);
  34. }, () {
  35. timer?.cancel();
  36. });
  37. }
  38. static CancelableFuture<T> retry<T>(
  39. FutureCallback<T> callback, Duration interval,
  40. {Duration? timeout, int? maxRetry, Predicate<dynamic>? predicate}) {
  41. int retryCount = 0;
  42. Timer? timer;
  43. Timer? timeoutTimer;
  44. void attempt(Completer<T> completer) {
  45. callback().then((value) {
  46. if (!completer.isCompleted) {
  47. completer.complete(value);
  48. }
  49. }).catchError((error) {
  50. if ((maxRetry == null || maxRetry <= 0 || retryCount < maxRetry) &&
  51. (predicate == null || predicate(error))) {
  52. retryCount++;
  53. timer = Timer(interval, () => attempt(completer));
  54. } else {
  55. if (!completer.isCompleted) {
  56. completer.completeError(error);
  57. }
  58. }
  59. });
  60. }
  61. return CancelableFuture<T>((completer) {
  62. if (timeout != null) {
  63. timeoutTimer = Timer(timeout, () {
  64. if (!completer.isCompleted) {
  65. completer.completeError(TimeoutException('Operation timed out'));
  66. }
  67. });
  68. }
  69. attempt(completer);
  70. }, () {
  71. timer?.cancel();
  72. timeoutTimer?.cancel();
  73. });
  74. }
  75. static CancelableFuture<T> delay<T>(
  76. FutureCallback<T> callback, Duration interval) {
  77. Timer? timer;
  78. return CancelableFuture<T>((completer) {
  79. timer = Timer(interval, () {
  80. callback().then((value) {
  81. if (!completer.isCompleted) {
  82. completer.complete(value);
  83. }
  84. }).catchError((error) {
  85. if (!completer.isCompleted) {
  86. completer.completeError(error);
  87. }
  88. });
  89. });
  90. }, () {
  91. timer?.cancel();
  92. });
  93. }
  94. static StreamController<T> interval<T>(
  95. IntervalCallback<T> callback, Duration interval, int times,
  96. {Duration? delay}) {
  97. Timer? timer;
  98. StreamController<T> controller = StreamController<T>(onCancel: () {
  99. timer?.cancel();
  100. });
  101. int count = 0;
  102. void tick() {
  103. callback(count).then((value) {
  104. controller.add(value);
  105. count++;
  106. if (times == -1 || count < times) {
  107. timer = Timer(interval, tick);
  108. } else {
  109. controller.close();
  110. }
  111. }).catchError((error) {
  112. if (!controller.isClosed) {
  113. controller.addError(error);
  114. controller.close();
  115. }
  116. });
  117. }
  118. if (delay != null && delay > Duration.zero) {
  119. timer = Timer(delay, tick);
  120. } else {
  121. tick();
  122. }
  123. return controller;
  124. }
  125. static CancelableFuture<List<AsyncResult<T>>> waitForAll<T>(
  126. Iterable<Future<T>> futures) {
  127. final completers = futures.map((_) => Completer<void>()).toList();
  128. final results = <AsyncResult<T>>[];
  129. bool isCancelled = false;
  130. void attempt(Completer<List<AsyncResult<T>>> completer) async {
  131. for (int i = 0; i < futures.length; i++) {
  132. if (isCancelled) {
  133. completer.completeError(CancelledError());
  134. return;
  135. }
  136. try {
  137. final value = await futures.elementAt(i);
  138. results.add(AsyncResult.success(value));
  139. } catch (e, stackTrace) {
  140. results.add(AsyncResult.failure(e, stackTrace));
  141. } finally {
  142. completers[i].complete();
  143. }
  144. }
  145. await Future.wait(completers.map((c) => c.future));
  146. if (!completer.isCompleted) {
  147. completer.complete(results);
  148. }
  149. }
  150. return CancelableFuture<List<AsyncResult<T>>>((completer) {
  151. attempt(completer);
  152. }, () {
  153. isCancelled = true;
  154. });
  155. }
  156. }
  157. class AsyncResult<T> {
  158. final T? value;
  159. final Object? error;
  160. final StackTrace? stackTrace;
  161. bool get isSuccess => error == null;
  162. AsyncResult.success(this.value)
  163. : error = null,
  164. stackTrace = null;
  165. AsyncResult.failure(this.error, this.stackTrace) : value = null;
  166. void handle({
  167. required void Function(T) onSuccess,
  168. required void Function(Object, StackTrace?) onError,
  169. }) {
  170. if (isSuccess) {
  171. onSuccess(value as T);
  172. } else {
  173. onError(error!, stackTrace);
  174. }
  175. }
  176. }
  177. abstract interface class Cancelable {
  178. void cancel();
  179. }
  180. class CancelableFuture<T> implements Future<T>, Cancelable {
  181. final Completer<T> _completer = Completer<T>();
  182. final CancelCallback? _cancelable;
  183. CancelableFuture(FutureCompleter<T> completer, this._cancelable) {
  184. completer.call(_completer);
  185. }
  186. @override
  187. void cancel() {
  188. _cancelable?.call();
  189. if (!_completer.isCompleted) {
  190. _completer.completeError(CancelledError());
  191. }
  192. }
  193. @override
  194. Stream<T> asStream() => _completer.future.asStream();
  195. @override
  196. Future<T> catchError(Function onError, {bool Function(Object error)? test}) =>
  197. _completer.future.catchError(onError, test: test);
  198. @override
  199. Future<R> then<R>(FutureOr<R> Function(T value) onValue,
  200. {Function? onError}) =>
  201. _completer.future.then(onValue, onError: onError);
  202. @override
  203. Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) =>
  204. _completer.future.timeout(timeLimit, onTimeout: onTimeout);
  205. @override
  206. Future<T> whenComplete(FutureOr<void> Function() action) =>
  207. _completer.future.whenComplete(action);
  208. }
  209. class CancelledError extends Error {
  210. @override
  211. String toString() {
  212. return 'Operation was cancelled';
  213. }
  214. }
  215. extension CancelableFutureExtension<T> on Future<T> {
  216. CancelableFuture<T> asCancelable(
  217. FutureCompleter completer, CancelCallback? cancelable) {
  218. CancelableFuture<T> cancelableFuture =
  219. CancelableFuture(completer, cancelable);
  220. then((value) {
  221. if (!cancelableFuture._completer.isCompleted) {
  222. cancelableFuture._completer.complete(value);
  223. }
  224. }).catchError((error) {
  225. if (!cancelableFuture._completer.isCompleted) {
  226. cancelableFuture._completer.completeError(error);
  227. }
  228. });
  229. return cancelableFuture;
  230. }
  231. }