| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import 'dart:async';
- typedef FutureCallback<T> = Future<T> Function();
- typedef IntervalCallback<T> = Future<T> Function(int times);
- typedef CancelCallback<T> = void Function();
- typedef FutureCompleter<T> = void Function(Completer<T> completer);
- typedef Predicate<T> = bool Function(T? value);
- class AsyncUtil {
- AsyncUtil._();
- static CancelableFuture<T> retryWithExponentialBackoff<T>(
- FutureCallback<T> callback, int maxRetry,
- {Duration initialInterval = const Duration(seconds: 1),
- Predicate<dynamic>? predicate}) {
- int retryCount = 0;
- Timer? timer;
- void attempt(Completer<T> completer) {
- callback().then((value) {
- if (!completer.isCompleted) {
- completer.complete(value);
- }
- }).catchError((error) {
- if (retryCount < maxRetry && (predicate == null || predicate(error))) {
- retryCount++;
- Duration nextInterval = initialInterval * (1 << (retryCount - 1));
- timer = Timer(nextInterval, () => attempt(completer));
- } else {
- if (!completer.isCompleted) {
- completer.completeError(error);
- }
- }
- });
- }
- return CancelableFuture<T>((completer) {
- attempt(completer);
- }, () {
- timer?.cancel();
- });
- }
- static CancelableFuture<T> retry<T>(
- FutureCallback<T> callback, Duration interval,
- {Duration? timeout, int? maxRetry, Predicate<dynamic>? predicate}) {
- int retryCount = 0;
- Timer? timer;
- Timer? timeoutTimer;
- void attempt(Completer<T> completer) {
- callback().then((value) {
- if (!completer.isCompleted) {
- completer.complete(value);
- }
- }).catchError((error) {
- if ((maxRetry == null || maxRetry <= 0 || retryCount < maxRetry) &&
- (predicate == null || predicate(error))) {
- retryCount++;
- timer = Timer(interval, () => attempt(completer));
- } else {
- if (!completer.isCompleted) {
- completer.completeError(error);
- }
- }
- });
- }
- return CancelableFuture<T>((completer) {
- if (timeout != null) {
- timeoutTimer = Timer(timeout, () {
- if (!completer.isCompleted) {
- completer.completeError(TimeoutException('Operation timed out'));
- }
- });
- }
- attempt(completer);
- }, () {
- timer?.cancel();
- timeoutTimer?.cancel();
- });
- }
- static CancelableFuture<T> delay<T>(
- FutureCallback<T> callback, Duration interval) {
- Timer? timer;
- return CancelableFuture<T>((completer) {
- timer = Timer(interval, () {
- callback().then((value) {
- if (!completer.isCompleted) {
- completer.complete(value);
- }
- }).catchError((error) {
- if (!completer.isCompleted) {
- completer.completeError(error);
- }
- });
- });
- }, () {
- timer?.cancel();
- });
- }
- static StreamController<T> interval<T>(
- IntervalCallback<T> callback, Duration interval, int times,
- {Duration? delay}) {
- Timer? timer;
- StreamController<T> controller = StreamController<T>(onCancel: () {
- timer?.cancel();
- });
- int count = 0;
- void tick() {
- callback(count).then((value) {
- controller.add(value);
- count++;
- if (times == -1 || count < times) {
- timer = Timer(interval, tick);
- } else {
- controller.close();
- }
- }).catchError((error) {
- if (!controller.isClosed) {
- controller.addError(error);
- controller.close();
- }
- });
- }
- if (delay != null && delay > Duration.zero) {
- timer = Timer(delay, tick);
- } else {
- tick();
- }
- return controller;
- }
- static CancelableFuture<List<AsyncResult<T>>> waitForAll<T>(
- Iterable<Future<T>> futures) {
- final completers = futures.map((_) => Completer<void>()).toList();
- final results = <AsyncResult<T>>[];
- bool isCancelled = false;
- void attempt(Completer<List<AsyncResult<T>>> completer) async {
- for (int i = 0; i < futures.length; i++) {
- if (isCancelled) {
- completer.completeError(CancelledError());
- return;
- }
- try {
- final value = await futures.elementAt(i);
- results.add(AsyncResult.success(value));
- } catch (e, stackTrace) {
- results.add(AsyncResult.failure(e, stackTrace));
- } finally {
- completers[i].complete();
- }
- }
- await Future.wait(completers.map((c) => c.future));
- if (!completer.isCompleted) {
- completer.complete(results);
- }
- }
- return CancelableFuture<List<AsyncResult<T>>>((completer) {
- attempt(completer);
- }, () {
- isCancelled = true;
- });
- }
- }
- class AsyncResult<T> {
- final T? value;
- final Object? error;
- final StackTrace? stackTrace;
- bool get isSuccess => error == null;
- AsyncResult.success(this.value)
- : error = null,
- stackTrace = null;
- AsyncResult.failure(this.error, this.stackTrace) : value = null;
- void handle({
- required void Function(T) onSuccess,
- required void Function(Object, StackTrace?) onError,
- }) {
- if (isSuccess) {
- onSuccess(value as T);
- } else {
- onError(error!, stackTrace);
- }
- }
- }
- abstract interface class Cancelable {
- void cancel();
- }
- class CancelableFuture<T> implements Future<T>, Cancelable {
- final Completer<T> _completer = Completer<T>();
- final CancelCallback? _cancelable;
- CancelableFuture(FutureCompleter<T> completer, this._cancelable) {
- completer.call(_completer);
- }
- @override
- void cancel() {
- _cancelable?.call();
- if (!_completer.isCompleted) {
- _completer.completeError(CancelledError());
- }
- }
- @override
- Stream<T> asStream() => _completer.future.asStream();
- @override
- Future<T> catchError(Function onError, {bool Function(Object error)? test}) =>
- _completer.future.catchError(onError, test: test);
- @override
- Future<R> then<R>(FutureOr<R> Function(T value) onValue,
- {Function? onError}) =>
- _completer.future.then(onValue, onError: onError);
- @override
- Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) =>
- _completer.future.timeout(timeLimit, onTimeout: onTimeout);
- @override
- Future<T> whenComplete(FutureOr<void> Function() action) =>
- _completer.future.whenComplete(action);
- }
- class CancelledError extends Error {
- @override
- String toString() {
- return 'Operation was cancelled';
- }
- }
- extension CancelableFutureExtension<T> on Future<T> {
- CancelableFuture<T> asCancelable(
- FutureCompleter completer, CancelCallback? cancelable) {
- CancelableFuture<T> cancelableFuture =
- CancelableFuture(completer, cancelable);
- then((value) {
- if (!cancelableFuture._completer.isCompleted) {
- cancelableFuture._completer.complete(value);
- }
- }).catchError((error) {
- if (!cancelableFuture._completer.isCompleted) {
- cancelableFuture._completer.completeError(error);
- }
- });
- return cancelableFuture;
- }
- }
|