async_util.dart 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import 'dart:async';
  2. typedef FutureCallback<T> = Future<T> Function();
  3. typedef IntervalCallback<T> = Future<T> Function(int times);
  4. typedef CancelCallback<T> = void Function();
  5. typedef FutureCompleter<T> = void Function(Completer<T> completer);
  6. typedef Predicate<T> = bool Function(T? value);
  7. class AsyncUtil {
  8. AsyncUtil._();
  9. static CancelableFuture<T> retryWithExponentialBackoff<T>(
  10. FutureCallback<T> callback, int maxRetry,
  11. {Duration initialInterval = const Duration(seconds: 1),
  12. Predicate<dynamic>? predicate}) {
  13. int retryCount = 0;
  14. Timer? timer;
  15. void attempt(Completer<T> completer) {
  16. callback().then((value) {
  17. if (!completer.isCompleted) {
  18. completer.complete(value);
  19. }
  20. }).catchError((error) {
  21. if (retryCount < maxRetry && (predicate == null || predicate(error))) {
  22. retryCount++;
  23. Duration nextInterval = initialInterval * (1 << (retryCount - 1));
  24. timer = Timer(nextInterval, () => attempt(completer));
  25. } else {
  26. if (!completer.isCompleted) {
  27. completer.completeError(error);
  28. }
  29. }
  30. });
  31. }
  32. return CancelableFuture<T>((completer) {
  33. attempt(completer);
  34. }, () {
  35. timer?.cancel();
  36. });
  37. }
  38. static CancelableFuture<T> retry<T>(
  39. FutureCallback<T> callback, Duration interval,
  40. {Duration? timeout, int? maxRetry, Predicate<dynamic>? predicate}) {
  41. int retryCount = 0;
  42. Timer? timer;
  43. Timer? timeoutTimer;
  44. void attempt(Completer<T> completer) {
  45. callback().then((value) {
  46. if (!completer.isCompleted) {
  47. completer.complete(value);
  48. }
  49. }).catchError((error) {
  50. if ((maxRetry == null || maxRetry <= 0 || retryCount < maxRetry) &&
  51. (predicate == null || predicate(error))) {
  52. retryCount++;
  53. timer = Timer(interval, () => attempt(completer));
  54. } else {
  55. if (!completer.isCompleted) {
  56. completer.completeError(error);
  57. }
  58. }
  59. });
  60. }
  61. return CancelableFuture<T>((completer) {
  62. if (timeout != null) {
  63. timeoutTimer = Timer(timeout, () {
  64. if (!completer.isCompleted) {
  65. completer.completeError(TimeoutException('Operation timed out'));
  66. }
  67. });
  68. }
  69. attempt(completer);
  70. }, () {
  71. timer?.cancel();
  72. timeoutTimer?.cancel();
  73. });
  74. }
  75. static CancelableFuture<T> delay<T>(
  76. FutureCallback<T> callback, Duration interval) {
  77. Timer? timer;
  78. return CancelableFuture<T>((completer) {
  79. timer = Timer(interval, () {
  80. callback().then((value) {
  81. if (!completer.isCompleted) {
  82. completer.complete(value);
  83. }
  84. }).catchError((error) {
  85. if (!completer.isCompleted) {
  86. completer.completeError(error);
  87. }
  88. });
  89. });
  90. }, () {
  91. timer?.cancel();
  92. });
  93. }
  94. static StreamController<T> interval<T>(
  95. IntervalCallback<T> callback, Duration interval, int times,
  96. {Duration? delay}) {
  97. Timer? timer;
  98. StreamController<T> controller = StreamController<T>(onCancel: () {
  99. timer?.cancel();
  100. });
  101. int count = 0;
  102. void tick() {
  103. callback(count).then((value) {
  104. controller.add(value);
  105. count++;
  106. if (times == -1 || count < times) {
  107. timer = Timer(interval, tick);
  108. } else {
  109. controller.close();
  110. }
  111. }).catchError((error) {
  112. if (!controller.isClosed) {
  113. controller.addError(error);
  114. controller.close();
  115. }
  116. });
  117. }
  118. if (delay != null && delay > Duration.zero) {
  119. timer = Timer(delay, tick);
  120. } else {
  121. tick();
  122. }
  123. return controller;
  124. }
  125. }
  126. abstract interface class Cancelable {
  127. void cancel();
  128. }
  129. class CancelableFuture<T> implements Future<T>, Cancelable {
  130. final Completer<T> _completer = Completer<T>();
  131. final CancelCallback? _cancelable;
  132. CancelableFuture(FutureCompleter<T> completer, this._cancelable) {
  133. completer.call(_completer);
  134. }
  135. @override
  136. void cancel() {
  137. _cancelable?.call();
  138. if (!_completer.isCompleted) {
  139. _completer.completeError(CancelledError());
  140. }
  141. }
  142. @override
  143. Stream<T> asStream() => _completer.future.asStream();
  144. @override
  145. Future<T> catchError(Function onError, {bool Function(Object error)? test}) =>
  146. _completer.future.catchError(onError, test: test);
  147. @override
  148. Future<R> then<R>(FutureOr<R> Function(T value) onValue,
  149. {Function? onError}) =>
  150. _completer.future.then(onValue, onError: onError);
  151. @override
  152. Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) =>
  153. _completer.future.timeout(timeLimit, onTimeout: onTimeout);
  154. @override
  155. Future<T> whenComplete(FutureOr<void> Function() action) =>
  156. _completer.future.whenComplete(action);
  157. }
  158. class CancelledError extends Error {
  159. @override
  160. String toString() {
  161. return 'Operation was cancelled';
  162. }
  163. }
  164. extension CancelableFutureExtension<T> on Future<T> {
  165. CancelableFuture<T> asCancelable(
  166. FutureCompleter completer, CancelCallback? cancelable) {
  167. CancelableFuture<T> cancelableFuture =
  168. CancelableFuture(completer, cancelable);
  169. then((value) {
  170. if (!cancelableFuture._completer.isCompleted) {
  171. cancelableFuture._completer.complete(value);
  172. }
  173. }).catchError((error) {
  174. if (!cancelableFuture._completer.isCompleted) {
  175. cancelableFuture._completer.completeError(error);
  176. }
  177. });
  178. return cancelableFuture;
  179. }
  180. }