async_util.dart 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import 'dart:async';
  2. import 'async_typeof.dart';
  3. import 'cancel_future.dart';
  4. class AsyncUtil {
  5. AsyncUtil._();
  6. static CancelableFuture<T> retryWithExponentialBackoff<T>(
  7. FutureCallback<T> callback, int maxRetry, Predicate<dynamic>? predicate) {
  8. const Duration initialInterval = Duration(seconds: 1);
  9. int retryCount = 0;
  10. Timer? timer;
  11. void attempt(Completer<T> completer) {
  12. callback().then((value) {
  13. if (!completer.isCompleted) {
  14. completer.complete(value);
  15. }
  16. }).catchError((error) {
  17. if (retryCount < maxRetry && (predicate == null || predicate(error))) {
  18. retryCount++;
  19. Duration nextInterval = initialInterval * (1 << (retryCount - 1));
  20. timer = Timer(nextInterval, () => attempt(completer));
  21. } else {
  22. if (!completer.isCompleted) {
  23. completer.completeError(error);
  24. }
  25. }
  26. });
  27. }
  28. return CancelableFuture<T>((completer) {
  29. attempt(completer);
  30. }, () {
  31. timer?.cancel();
  32. });
  33. }
  34. static CancelableFuture<T> retryWhen<T>(FutureCallback<T> callback,
  35. int maxRetry, Duration interval, Predicate<dynamic> predicate,
  36. [Duration? timeout]) {
  37. int retryCount = 0;
  38. Timer? timer;
  39. Timer? timeoutTimer;
  40. void attempt(Completer<T> completer) {
  41. callback().then((value) {
  42. if (!completer.isCompleted) {
  43. completer.complete(value);
  44. }
  45. }).catchError((error) {
  46. if ((maxRetry <= 0 || retryCount < maxRetry) && predicate(error)) {
  47. retryCount++;
  48. timer = Timer(interval, () => attempt(completer));
  49. } else {
  50. if (!completer.isCompleted) {
  51. completer.completeError(error);
  52. }
  53. }
  54. });
  55. }
  56. return CancelableFuture<T>((completer) {
  57. if (timeout != null) {
  58. timeoutTimer = Timer(timeout, () {
  59. if (!completer.isCompleted) {
  60. completer.completeError(TimeoutException('Operation timed out'));
  61. }
  62. });
  63. }
  64. attempt(completer);
  65. }, () {
  66. timer?.cancel();
  67. timeoutTimer?.cancel();
  68. });
  69. }
  70. static CancelableFuture<T> retry<T>(
  71. FutureCallback<T> callback, int maxRetry, Duration interval,
  72. [Duration? timeout]) {
  73. return retryWhen(callback, maxRetry, interval, (error) => true, timeout);
  74. }
  75. static CancelableFuture<T> delay<T>(
  76. FutureCallback<T> callback, Duration interval) {
  77. Timer? timer;
  78. return CancelableFuture<T>((completer) {
  79. timer = Timer(interval, () {
  80. callback().then((value) {
  81. if (!completer.isCompleted) {
  82. completer.complete(value);
  83. }
  84. }).catchError((error) {
  85. if (!completer.isCompleted) {
  86. completer.completeError(error);
  87. }
  88. });
  89. });
  90. }, () {
  91. timer?.cancel();
  92. });
  93. }
  94. static StreamController<T> interval<T>(
  95. IntervalCallback<T> callback, Duration interval, int times,
  96. {Duration? delay}) {
  97. Timer? timer;
  98. StreamController<T> controller = StreamController<T>(onCancel: () {
  99. timer?.cancel();
  100. });
  101. int count = 0;
  102. void tick() {
  103. callback(count).then((value) {
  104. controller.add(value);
  105. count++;
  106. if (count < times) {
  107. timer = Timer(interval, tick);
  108. } else {
  109. controller.close();
  110. }
  111. }).catchError((error) {
  112. controller.addError(error);
  113. controller.close();
  114. });
  115. }
  116. if (delay != null && delay > Duration.zero) {
  117. timer = Timer(delay, tick);
  118. } else {
  119. tick();
  120. }
  121. return controller;
  122. }
  123. }