atmob_location_client.dart 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import 'dart:async';
  2. import 'dart:convert';
  3. import 'package:injectable/injectable.dart';
  4. import 'package:location/data/repositories/friends_repository.dart';
  5. import 'package:location/di/get_it.dart';
  6. import 'package:location/socket/base_message.dart';
  7. import 'package:location/socket/socket_constants.dart';
  8. import 'package:location/utils/async_util.dart';
  9. import 'package:location/utils/atmob_log.dart';
  10. import 'package:location/utils/base_expand.dart';
  11. import 'package:web_socket_channel/web_socket_channel.dart';
  12. import '../data/bean/location_info.dart';
  13. import '../data/repositories/account_repository.dart';
  14. import '../data/repositories/contact_repository.dart';
  15. import '../data/repositories/message_repository.dart';
  16. typedef OnLocationChangeListener = void Function(List<LocationInfo> data);
  17. @lazySingleton
  18. class AtmobLocationClient {
  19. static final String tag = 'AtmobLocationClient';
  20. static final List<OnLocationChangeListener> _locationListeners = [];
  21. WebSocketChannel? _webSocket;
  22. StreamSubscription? _subscription;
  23. bool _isConnecting = false;
  24. CancelableFuture? cancelableFuture;
  25. FriendsRepository friendsRepository;
  26. MessageRepository messageRepository;
  27. ContactRepository contactRepository;
  28. AccountRepository accountRepository;
  29. AtmobLocationClient(this.friendsRepository, this.messageRepository,
  30. this.contactRepository, this.accountRepository) {
  31. startWebsocketInternal();
  32. }
  33. static AtmobLocationClient getAtmobLocationClient() {
  34. return getIt.get<AtmobLocationClient>();
  35. }
  36. static void connectWebSocket() {
  37. AtmobLog.d(tag, 'connectWebSocket');
  38. AtmobLocationClient client = getIt.get<AtmobLocationClient>();
  39. client.startWebsocketInternal();
  40. }
  41. static void disConnectWebSocket() {
  42. AtmobLog.d(tag, 'disConnectWebSocket');
  43. AtmobLocationClient client = getIt.get<AtmobLocationClient>();
  44. client.stopWebsocketInternal();
  45. }
  46. void startWebsocketInternal() {
  47. if (AccountRepository.token == null ||
  48. AccountRepository.token?.isEmpty == true ||
  49. _isConnecting) {
  50. return;
  51. }
  52. cancelableFuture?.cancel();
  53. cancelableFuture = AsyncUtil.retryWithExponentialBackoff(
  54. () => _startConnect(), 4,
  55. initialInterval: Duration(seconds: 2));
  56. cancelableFuture!.catchError((error) {
  57. AtmobLog.d(tag, '重试最大次数 异常 error:$error');
  58. startWebsocketInternal();
  59. });
  60. }
  61. void stopWebsocketInternal() {
  62. AtmobLog.d(tag, 'stopWebsocketInternal');
  63. cancelableFuture?.cancel();
  64. _disposePreviousConnection();
  65. }
  66. void _disposePreviousConnection() {
  67. _webSocket?.sink.close();
  68. _webSocket = null;
  69. _subscription?.cancel();
  70. _subscription = null;
  71. _isConnecting = false;
  72. }
  73. Future<void> _startConnect() async {
  74. AtmobLog.d(tag, '_startConnect');
  75. _disposePreviousConnection();
  76. final webSocket = WebSocketChannel.connect(Uri.parse(
  77. '${SocketConstants.locationBaseUrl}${AccountRepository.token}'));
  78. _webSocket = webSocket;
  79. try {
  80. await webSocket.ready;
  81. AtmobLog.d(tag, 'webSocket 连接成功');
  82. _isConnecting = true;
  83. } catch (e) {
  84. AtmobLog.d(tag, 'webSocket 连接失败 error:$e');
  85. rethrow;
  86. }
  87. _subscription = webSocket.stream
  88. .map((s) {
  89. AtmobLog.d(tag, 'webSocket receive:$s');
  90. try {
  91. Map<String, dynamic> data = jsonDecode(s);
  92. return BaseMessage.fromJson(data);
  93. } catch (e) {
  94. AtmobLog.d(tag, 'BaseMessage jsonDecode error:$e');
  95. }
  96. return null;
  97. })
  98. .where((message) {
  99. if (message == null || message.cmd == null) {
  100. return false;
  101. }
  102. switch (message.cmd) {
  103. case SocketConstants.refreshFriendList:
  104. friendsRepository.refreshFriends();
  105. break;
  106. case SocketConstants.refreshFriendRequest:
  107. friendsRepository.refreshFriendRequestList();
  108. break;
  109. case SocketConstants.refreshFriendMessage:
  110. messageRepository.requestMessageList();
  111. break;
  112. case SocketConstants.refreshContact:
  113. messageRepository.refreshContactList();
  114. break;
  115. case SocketConstants.refreshMember:
  116. accountRepository.refreshMemberStatus();
  117. break;
  118. }
  119. return SocketConstants.receiveFriendBatchLocation == message.cmd;
  120. })
  121. .map((message) => message?.data)
  122. .where((data) => data != null)
  123. .cast<String>()
  124. .map((s) {
  125. try {
  126. List<dynamic> jsonList = jsonDecode(s);
  127. return jsonList.map((e) => LocationInfo.fromJson(e)).toList();
  128. } catch (e) {
  129. AtmobLog.d(tag, 'List<LocationInfo> jsonDecode error:$e');
  130. }
  131. })
  132. .bufferTime(Duration(seconds: 5))
  133. .where((locationInfos) => locationInfos.isNotEmpty)
  134. .map((lists) {
  135. Map<String, LocationInfo> idLocation = {};
  136. for (var list in lists) {
  137. if (list == null || list.isEmpty) {
  138. continue;
  139. }
  140. for (var location in list) {
  141. String? userId = location.userId;
  142. if (userId == null ||
  143. location.longitude == 0 ||
  144. location.latitude == 0) {
  145. continue;
  146. }
  147. idLocation[userId] = location;
  148. }
  149. }
  150. return idLocation.values.toList();
  151. })
  152. .listen(
  153. _handleLocationMessage,
  154. onError: _handleError,
  155. onDone: _handleDisconnect,
  156. );
  157. }
  158. void _handleLocationMessage(List<LocationInfo> data) {
  159. AtmobLog.d(tag, '接收到位置信息: ${data.length}');
  160. for (var listener in _locationListeners) {
  161. listener(data);
  162. }
  163. }
  164. // 断开处理
  165. void _handleDisconnect() {
  166. AtmobLog.e(tag, 'WebSocket 断开连接');
  167. _isConnecting = false;
  168. startWebsocketInternal();
  169. }
  170. void _handleError(error) {
  171. AtmobLog.e(tag, 'WebSocket 错误: $error');
  172. }
  173. static void addLocationListener(OnLocationChangeListener listener) {
  174. _locationListeners.add(listener);
  175. }
  176. static void removeLocationListener(OnLocationChangeListener listener) {
  177. _locationListeners.remove(listener);
  178. }
  179. }