import 'dart:async'; import 'dart:convert'; import 'package:flutter_map/src/entity/map_location.dart'; import 'package:injectable/injectable.dart'; import 'package:location/data/repositories/friends_repository.dart'; import 'package:location/di/get_it.dart'; import 'package:location/socket/base_message.dart'; import 'package:location/socket/socket_constants.dart'; import 'package:location/utils/async_util.dart'; import 'package:location/utils/atmob_log.dart'; import 'package:location/utils/base_expand.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../data/bean/location_info.dart'; import '../data/repositories/account_repository.dart'; import '../data/repositories/contact_repository.dart'; import '../data/repositories/message_repository.dart'; import 'location_message.dart'; typedef OnLocationChangeListener = void Function(List data); @lazySingleton class AtmobLocationClient { static final String tag = 'AtmobLocationClient'; static final List _locationListeners = []; WebSocketChannel? _webSocket; StreamSubscription? _subscription; bool _isConnecting = false; CancelableFuture? cancelableFuture; AtmobLocationClient() { startWebsocketInternal(); } static AtmobLocationClient getAtmobLocationClient() { return getIt.get(); } static void connectWebSocket() { AtmobLog.d(tag, 'connectWebSocket'); AtmobLocationClient client = getIt.get(); client.startWebsocketInternal(); } static void disConnectWebSocket() { AtmobLog.d(tag, 'disConnectWebSocket'); AtmobLocationClient client = getIt.get(); client.stopWebsocketInternal(); } void startWebsocketInternal() { if (AccountRepository.token == null || AccountRepository.token?.isEmpty == true || _isConnecting) { return; } cancelableFuture?.cancel(); cancelableFuture = AsyncUtil.retryWithExponentialBackoff( () => _startConnect(), 4, initialInterval: Duration(seconds: 2)); cancelableFuture!.catchError((error) { AtmobLog.d(tag, '重试最大次数 异常 error:$error'); startWebsocketInternal(); }); } void stopWebsocketInternal() { AtmobLog.d(tag, 'stopWebsocketInternal'); cancelableFuture?.cancel(); _disposePreviousConnection(); } void _disposePreviousConnection() { _webSocket?.sink.close(); _webSocket = null; _subscription?.cancel(); _subscription = null; _isConnecting = false; } Future _startConnect() async { AtmobLog.d(tag, '_startConnect'); _disposePreviousConnection(); final webSocket = WebSocketChannel.connect(Uri.parse( '${SocketConstants.locationBaseUrl}${AccountRepository.token}')); _webSocket = webSocket; try { await webSocket.ready; AtmobLog.d(tag, 'webSocket 连接成功'); _isConnecting = true; } catch (e) { AtmobLog.d(tag, 'webSocket 连接失败 error:$e'); rethrow; } _subscription = webSocket.stream .map((s) { AtmobLog.d(tag, 'webSocket receive:$s'); try { Map data = jsonDecode(s); return BaseMessage.fromJson(data); } catch (e) { AtmobLog.d(tag, 'BaseMessage jsonDecode error:$e'); } return null; }) .where((message) { if (message == null || message.cmd == null) { return false; } switch (message.cmd) { case SocketConstants.refreshFriendList: FriendsRepository.getInstance().refreshFriends(); break; case SocketConstants.refreshFriendRequest: FriendsRepository.getInstance().refreshFriendRequestList(); break; case SocketConstants.refreshFriendMessage: MessageRepository.getInstance().requestMessageList(); break; case SocketConstants.refreshContact: ContactRepository.getInstance().refreshContactList(); break; case SocketConstants.refreshMember: AccountRepository.getInstance().refreshMemberStatus(); break; } return SocketConstants.receiveFriendBatchLocation == message.cmd; }) .map((message) => message?.data) .where((data) => data != null) .cast() .map((s) { try { List jsonList = jsonDecode(s); return jsonList.map((e) => LocationInfo.fromJson(e)).toList(); } catch (e) { AtmobLog.d(tag, 'List jsonDecode error:$e'); } }) .bufferTime(Duration(seconds: 5)) .where((locationInfos) => locationInfos.isNotEmpty) .map((lists) { Map idLocation = {}; for (var list in lists) { if (list == null || list.isEmpty) { continue; } for (var location in list) { String? userId = location.userId; if (userId == null || location.longitude == 0 || location.latitude == 0) { continue; } idLocation[userId] = location; } } return idLocation.values.toList(); }) .listen( _handleLocationMessage, onError: _handleError, onDone: _handleDisconnect, ); } void _handleLocationMessage(List data) { AtmobLog.d(tag, '接收到位置信息: ${data.length}'); for (var listener in _locationListeners) { listener(data); } } // 断开处理 void _handleDisconnect() { AtmobLog.e(tag, 'WebSocket 断开连接'); _isConnecting = false; startWebsocketInternal(); } void _handleError(error) { AtmobLog.e(tag, 'WebSocket 错误: $error'); } static void addLocationListener(OnLocationChangeListener listener) { _locationListeners.add(listener); } static void removeLocationListener(OnLocationChangeListener listener) { _locationListeners.remove(listener); } void _sendMessage(String msg) { if (_webSocket == null) { return; } _webSocket!.sink.add(msg); AtmobLog.d(tag, 'send location: $msg'); } void uploadLocation(MapLocation location) { if (_webSocket == null || location.latitude == 0 && location.longitude == 0) { return; } _sendMessage(LocationMessage.obtainMessage(location)); } }