Просмотр исходного кода

[new]增加socket连接以及好友定位消息刷新等功能

zk 10 месяцев назад
Родитель
Сommit
94969d7532

+ 1 - 1
android/app/build.gradle

@@ -28,7 +28,7 @@ android {
 
         ndk {
             //noinspection ChromeOsAbiSupport
-            abiFilters "arm64-v8a"
+//            abiFilters "arm64-v8a"
         }
     }
 

+ 2 - 2
lib/data/consts/constants.dart

@@ -1,5 +1,3 @@
-import '../../utils/mmkv_util.dart';
-
 class Constants {
   Constants._();
 
@@ -17,6 +15,8 @@ class Constants {
 
   static const String _prodBaseUrl = "http://loc-api.v8dashen.com";
 
+  static const String locationClientUrl =
+      "ws://loc-api.v8dashen.com/websocket/";
 
   static String baseUrl = getBaseUrl();
 

+ 5 - 1
lib/data/repositories/account_repository.dart

@@ -11,6 +11,7 @@ import 'package:location/data/bean/member_status_info.dart';
 import 'package:location/data/bean/user_info.dart';
 import 'package:location/data/consts/constants.dart';
 import 'package:location/data/consts/error_code.dart';
+import 'package:location/socket/atmob_location_client.dart';
 import 'package:location/data/repositories/friends_repository.dart';
 import 'package:location/di/get_it.dart';
 import 'package:location/resource/string.gen.dart';
@@ -111,8 +112,9 @@ class AccountRepository {
     KVUtil.putString(keyAccountLoginPhoneNum, phoneNum);
     KVUtil.putString(keyAccountLoginToken, authToken);
 
-    refreshMemberStatus();
+    AtmobLocationClient.connectWebSocket();
 
+    refreshMemberStatus();
     friendsRepository.refreshFriends();
   }
 
@@ -121,6 +123,8 @@ class AccountRepository {
 
     refreshMemberHandler?.cancel();
 
+    AtmobLocationClient.disConnectWebSocket();
+
     KVUtil.putString(keyAccountLoginPhoneNum, null);
     KVUtil.putString(keyAccountLoginToken, null);
     KVUtil.putString(keyAccountLoginUserId, null);

+ 9 - 0
lib/data/repositories/contact_repository.dart

@@ -0,0 +1,9 @@
+import 'package:injectable/injectable.dart';
+import 'package:location/data/api/atmob_api.dart';
+
+@lazySingleton
+class ContactRepository {
+  final AtmobApi atmobApi;
+
+  ContactRepository(this.atmobApi);
+}

+ 24 - 1
lib/data/repositories/friends_repository.dart

@@ -2,9 +2,10 @@ import 'package:get/get.dart';
 import 'package:injectable/injectable.dart';
 import 'package:location/data/api/atmob_api.dart';
 import 'package:location/data/api/request/friends_list_request.dart';
+import 'package:location/data/bean/location_info.dart';
 import 'package:location/di/get_it.dart';
 import 'package:location/utils/http_handler.dart';
-
+import '../../socket/atmob_location_client.dart';
 import '../../utils/atmob_log.dart';
 import '../api/response/friends_list_response.dart';
 import '../bean/user_info.dart';
@@ -20,6 +21,26 @@ class FriendsRepository {
     AtmobLog.d(tag, '$tag....init');
 
     refreshFriends();
+
+    AtmobLocationClient.addLocationListener((location) {
+      updateFriendsLocation(location);
+    });
+  }
+
+  void updateFriendsLocation(List<LocationInfo> location) {
+    AtmobLog.d('zk', 'updateFriendsLocation');
+    if (friendsList.isEmpty || location.isEmpty) {
+      return;
+    }
+    final idUserMap = <String, UserInfo>{
+      for (final user in friendsList) user.id: user
+    };
+    for (var datum in location) {
+      final user = idUserMap[datum.userId];
+      if (user != null) {
+        user.lastLocation.value = datum;
+      }
+    }
   }
 
   void clearFriends() {
@@ -47,4 +68,6 @@ class FriendsRepository {
   static FriendsRepository getInstance() {
     return getIt.get<FriendsRepository>();
   }
+
+  void refreshFriendRequestList() {}
 }

+ 13 - 0
lib/data/repositories/message_repository.dart

@@ -0,0 +1,13 @@
+import 'package:injectable/injectable.dart';
+import 'package:location/data/api/atmob_api.dart';
+
+@lazySingleton
+class MessageRepository {
+  final AtmobApi atmobApi;
+
+  MessageRepository(this.atmobApi);
+
+  void requestMessageList() {}
+
+  void refreshContactList() {}
+}

+ 14 - 0
lib/di/get_it.config.dart

@@ -14,13 +14,16 @@ import 'package:injectable/injectable.dart' as _i526;
 
 import '../data/api/atmob_api.dart' as _i243;
 import '../data/repositories/account_repository.dart' as _i20;
+import '../data/repositories/contact_repository.dart' as _i850;
 import '../data/repositories/friends_repository.dart' as _i1053;
+import '../data/repositories/message_repository.dart' as _i791;
 import '../module/add_friend/add_friend_dialog_controller.dart' as _i897;
 import '../module/browser/browser_controller.dart' as _i923;
 import '../module/login/login_controller.dart' as _i1008;
 import '../module/main/main_controller.dart' as _i731;
 import '../module/mine/mine_controller.dart' as _i732;
 import '../module/splash/splash_controller.dart' as _i973;
+import '../socket/atmob_location_client.dart' as _i220;
 import 'network_module.dart' as _i567;
 
 extension GetItInjectableX on _i174.GetIt {
@@ -46,9 +49,20 @@ extension GetItInjectableX on _i174.GetIt {
         () => _i20.AccountRepository(gh<_i243.AtmobApi>()));
     gh.lazySingleton<_i1053.FriendsRepository>(
         () => _i1053.FriendsRepository(gh<_i243.AtmobApi>()));
+    gh.lazySingleton<_i791.MessageRepository>(
+        () => _i791.MessageRepository(gh<_i243.AtmobApi>()));
+    gh.lazySingleton<_i850.ContactRepository>(
+        () => _i850.ContactRepository(gh<_i243.AtmobApi>()));
+    gh.lazySingleton<_i220.AtmobLocationClient>(() => _i220.AtmobLocationClient(
+          gh<_i1053.FriendsRepository>(),
+          gh<_i791.MessageRepository>(),
+          gh<_i850.ContactRepository>(),
+          gh<_i20.AccountRepository>(),
+        ));
     gh.factory<_i731.MainController>(() => _i731.MainController(
           gh<_i1053.FriendsRepository>(),
           gh<_i20.AccountRepository>(),
+          gh<_i220.AtmobLocationClient>(),
         ));
     gh.factory<_i1008.LoginController>(
         () => _i1008.LoginController(gh<_i20.AccountRepository>()));

+ 3 - 2
lib/module/main/main_controller.dart

@@ -11,8 +11,8 @@ import 'package:location/data/repositories/account_repository.dart';
 import 'package:location/data/repositories/friends_repository.dart';
 import 'package:flutter_map/flutter_map.dart';
 import 'package:location/sdk/map/map_helper.dart';
-import 'package:location/utils/atmob_log.dart';
 import 'package:location/utils/base_expand.dart';
+import '../../socket/atmob_location_client.dart';
 import '../../dialog/add_friend_dialog.dart';
 import '../../dialog/check_loation_permission_dialog.dart';
 import '../../dialog/location_permission_dialog.dart';
@@ -30,7 +30,8 @@ class MainController extends BaseController {
 
   UserInfo get mineUserInfo => accountRepository.mineUserInfo.value;
 
-  MainController(this.friendsRepository, this.accountRepository);
+  MainController(this.friendsRepository, this.accountRepository,
+      AtmobLocationClient atmobLocationClient);
 
   final Rxn<UserInfo> _selectedFriend = Rxn<UserInfo>();
 

+ 5 - 3
lib/module/main/main_friend_item.dart

@@ -6,6 +6,7 @@ import 'package:location/resource/assets.gen.dart';
 import 'package:location/resource/colors.gen.dart';
 import 'package:location/resource/string.gen.dart';
 import 'package:location/utils/common_expand.dart';
+import 'package:location/widget/relative_time_text.dart';
 import '../../utils/common_style.dart';
 import '../../utils/common_util.dart';
 import '../../widget/marquee_text.dart';
@@ -103,9 +104,10 @@ Widget mainSelectedFriendItem(UserInfo userInfo) {
                             color: '#202020'.color),
                       ),
                       SizedBox(width: 7.w),
-                      Text(
-                          time2TimeDesc(
-                              userInfo.lastLocation.value?.lastUpdateTime),
+                      RelativeTimeText(
+                          timestamp:
+                              userInfo.lastLocation.value?.lastUpdateTime,
+                          updateInterval: Duration(minutes: 1),
                           style: TextStyle(
                               fontSize: 12.sp, color: '#A7A7A7'.color)),
                       Spacer(),

+ 206 - 0
lib/socket/atmob_location_client.dart

@@ -0,0 +1,206 @@
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:injectable/injectable.dart';
+import 'package:location/data/repositories/friends_repository.dart';
+import 'package:location/di/get_it.dart';
+import 'package:location/socket/base_message.dart';
+import 'package:location/socket/socket_constants.dart';
+import 'package:location/utils/async_util.dart';
+import 'package:location/utils/atmob_log.dart';
+import 'package:location/utils/base_expand.dart';
+import 'package:web_socket_channel/web_socket_channel.dart';
+import '../data/bean/location_info.dart';
+import '../data/repositories/account_repository.dart';
+import '../data/repositories/contact_repository.dart';
+import '../data/repositories/message_repository.dart';
+
+typedef OnLocationChangeListener = void Function(List<LocationInfo> data);
+
+@lazySingleton
+class AtmobLocationClient {
+  static final String tag = 'AtmobLocationClient';
+
+  static final List<OnLocationChangeListener> _locationListeners = [];
+
+  WebSocketChannel? _webSocket;
+  StreamSubscription? _subscription;
+  bool _isConnecting = false;
+  CancelableFuture? cancelableFuture;
+
+  FriendsRepository friendsRepository;
+  MessageRepository messageRepository;
+  ContactRepository contactRepository;
+  AccountRepository accountRepository;
+
+  AtmobLocationClient(this.friendsRepository, this.messageRepository,
+      this.contactRepository, this.accountRepository) {
+    startWebsocketInternal();
+  }
+
+  static AtmobLocationClient getAtmobLocationClient() {
+    return getIt.get<AtmobLocationClient>();
+  }
+
+  static void connectWebSocket() {
+    AtmobLog.d(tag, 'connectWebSocket');
+    AtmobLocationClient client = getIt.get<AtmobLocationClient>();
+    client.startWebsocketInternal();
+  }
+
+  static void disConnectWebSocket() {
+    AtmobLog.d(tag, 'disConnectWebSocket');
+    AtmobLocationClient client = getIt.get<AtmobLocationClient>();
+    client.stopWebsocketInternal();
+  }
+
+  void startWebsocketInternal() {
+    if (AccountRepository.token == null ||
+        AccountRepository.token?.isEmpty == true ||
+        _isConnecting) {
+      return;
+    }
+    cancelableFuture?.cancel();
+
+    cancelableFuture = AsyncUtil.retryWithExponentialBackoff(
+        () => _startConnect(), 4,
+        initialInterval: Duration(seconds: 2));
+    cancelableFuture!.catchError((error) {
+      AtmobLog.d(tag, '重试最大次数 异常 error:$error');
+      startWebsocketInternal();
+    });
+  }
+
+  void stopWebsocketInternal() {
+    AtmobLog.d(tag, 'stopWebsocketInternal');
+    cancelableFuture?.cancel();
+    _disposePreviousConnection();
+  }
+
+  void _disposePreviousConnection() {
+    _webSocket?.sink.close();
+    _webSocket = null;
+
+    _subscription?.cancel();
+    _subscription = null;
+
+    _isConnecting = false;
+  }
+
+  Future<void> _startConnect() async {
+    AtmobLog.d(tag, '_startConnect');
+
+    _disposePreviousConnection();
+
+    final webSocket = WebSocketChannel.connect(Uri.parse(
+        '${SocketConstants.locationBaseUrl}${AccountRepository.token}'));
+
+    _webSocket = webSocket;
+
+    try {
+      await webSocket.ready;
+      AtmobLog.d(tag, 'webSocket 连接成功');
+      _isConnecting = true;
+    } catch (e) {
+      AtmobLog.d(tag, 'webSocket 连接失败 error:$e');
+      rethrow;
+    }
+
+    _subscription = webSocket.stream
+        .map((s) {
+          AtmobLog.d(tag, 'webSocket receive:$s');
+          try {
+            Map<String, dynamic> data = jsonDecode(s);
+            return BaseMessage.fromJson(data);
+          } catch (e) {
+            AtmobLog.d(tag, 'BaseMessage jsonDecode error:$e');
+          }
+          return null;
+        })
+        .where((message) {
+          if (message == null || message.cmd == null) {
+            return false;
+          }
+          switch (message.cmd) {
+            case SocketConstants.refreshFriendList:
+              friendsRepository.refreshFriends();
+              break;
+            case SocketConstants.refreshFriendRequest:
+              friendsRepository.refreshFriendRequestList();
+              break;
+            case SocketConstants.refreshFriendMessage:
+              messageRepository.requestMessageList();
+              break;
+            case SocketConstants.refreshContact:
+              messageRepository.refreshContactList();
+              break;
+            case SocketConstants.refreshMember:
+              accountRepository.refreshMemberStatus();
+              break;
+          }
+          return SocketConstants.receiveFriendBatchLocation == message.cmd;
+        })
+        .map((message) => message?.data)
+        .where((data) => data != null)
+        .cast<String>()
+        .map((s) {
+          try {
+            List<dynamic> jsonList = jsonDecode(s);
+            return jsonList.map((e) => LocationInfo.fromJson(e)).toList();
+          } catch (e) {
+            AtmobLog.d(tag, 'List<LocationInfo> jsonDecode error:$e');
+          }
+        })
+        .bufferTime(Duration(seconds: 5))
+        .where((locationInfos) => locationInfos.isNotEmpty)
+        .map((lists) {
+          Map<String, LocationInfo> idLocation = {};
+          for (var list in lists) {
+            if (list == null || list.isEmpty) {
+              continue;
+            }
+            for (var location in list) {
+              String? userId = location.userId;
+              if (userId == null ||
+                  location.longitude == 0 ||
+                  location.latitude == 0) {
+                continue;
+              }
+              idLocation[userId] = location;
+            }
+          }
+          return idLocation.values.toList();
+        })
+        .listen(
+          _handleLocationMessage,
+          onError: _handleError,
+          onDone: _handleDisconnect,
+        );
+  }
+
+  void _handleLocationMessage(List<LocationInfo> data) {
+    AtmobLog.d(tag, '接收到位置信息: ${data.length}');
+    for (var listener in _locationListeners) {
+      listener(data);
+    }
+  }
+
+  // 断开处理
+  void _handleDisconnect() {
+    AtmobLog.e(tag, 'WebSocket 断开连接');
+    _isConnecting = false;
+    startWebsocketInternal();
+  }
+
+  void _handleError(error) {
+    AtmobLog.e(tag, 'WebSocket 错误: $error');
+  }
+
+  static void addLocationListener(OnLocationChangeListener listener) {
+    _locationListeners.add(listener);
+  }
+
+  static void removeLocationListener(OnLocationChangeListener listener) {
+    _locationListeners.remove(listener);
+  }
+}

+ 19 - 0
lib/socket/base_message.dart

@@ -0,0 +1,19 @@
+import 'package:json_annotation/json_annotation.dart';
+
+part 'base_message.g.dart';
+
+@JsonSerializable()
+class BaseMessage {
+  @JsonKey(name: 'cmd')
+  final String? cmd;
+
+  @JsonKey(name: 'body')
+  final String? data;
+
+  BaseMessage(this.cmd, this.data);
+
+  factory BaseMessage.fromJson(Map<String, dynamic> json) =>
+      _$BaseMessageFromJson(json);
+
+  Map<String, dynamic> toJson() => _$BaseMessageToJson(this);
+}

+ 18 - 0
lib/socket/base_message.g.dart

@@ -0,0 +1,18 @@
+// GENERATED CODE - DO NOT MODIFY BY HAND
+
+part of 'base_message.dart';
+
+// **************************************************************************
+// JsonSerializableGenerator
+// **************************************************************************
+
+BaseMessage _$BaseMessageFromJson(Map<String, dynamic> json) => BaseMessage(
+      json['cmd'] as String?,
+      json['body'] as String?,
+    );
+
+Map<String, dynamic> _$BaseMessageToJson(BaseMessage instance) =>
+    <String, dynamic>{
+      'cmd': instance.cmd,
+      'body': instance.data,
+    };

+ 16 - 0
lib/socket/socket_constants.dart

@@ -0,0 +1,16 @@
+import '../data/consts/constants.dart';
+
+class SocketConstants {
+  static const String locationBaseUrl = Constants.locationClientUrl;
+
+  static const String refreshFriendList = 'd.refresh.friend.list'; // 刷新好友列表
+  static const String refreshFriendRequest =
+      'd.refresh.friend.request'; // 刷新好友请求
+  static const String refreshFriendMessage =
+      'd.refresh.friend.message'; // 刷新好友消息
+  static const String refreshContact = 'd.refresh.contact'; // 刷新联系人
+  static const String refreshMember = 'd.refresh.member'; // 刷新会员信息
+
+  static const String receiveFriendBatchLocation =
+      'd.location.batch'; //批量接收好友位置信息
+}

+ 2 - 2
lib/utils/async_util.dart

@@ -15,8 +15,8 @@ class AsyncUtil {
 
   static CancelableFuture<T> retryWithExponentialBackoff<T>(
       FutureCallback<T> callback, int maxRetry,
-      {Predicate<dynamic>? predicate}) {
-    const Duration initialInterval = Duration(seconds: 1);
+      {Duration initialInterval = const Duration(seconds: 1),
+      Predicate<dynamic>? predicate}) {
     int retryCount = 0;
     Timer? timer;
 

+ 43 - 0
lib/utils/base_expand.dart

@@ -1,3 +1,5 @@
+import 'dart:async';
+
 extension LetExtension<T> on T {
   /// 类似 Kotlin 的 let 函数,允许对任意对象执行代码块
   R let<R>(R Function(T it) block) {
@@ -23,3 +25,44 @@ extension AlsoExtension<T> on T {
 extension RunExtension<T> on T {
   R run<R>(R Function(T it) block) => block(this);
 }
+
+extension StreamBufferTimeExtension<T> on Stream<T> {
+  /// 将流中的事件按时间窗口缓冲,每隔 [duration] 时间发送一次缓冲列表
+  Stream<List<T>> bufferTime(Duration duration) {
+    StreamController<List<T>>? controller;
+    List<T> buffer = [];
+    Timer? timer;
+
+    controller = StreamController<List<T>>(
+      onListen: () {
+        timer = Timer.periodic(duration, (_) {
+          if (buffer.isNotEmpty) {
+            controller?.add(List.from(buffer));
+            buffer.clear();
+          }
+        });
+        // 监听原始流,收集事件到缓冲区
+        listen(
+          (event) => buffer.add(event),
+          onError: (error) => controller?.addError(error),
+          onDone: () {
+            timer?.cancel();
+            // 流结束时发送剩余缓冲事件
+            if (buffer.isNotEmpty) {
+              controller?.add(List.from(buffer));
+              buffer.clear();
+            }
+            controller?.close();
+          },
+        );
+      },
+      onCancel: () {
+        timer?.cancel();
+        buffer.clear();
+      },
+    );
+
+    // 返回控制器对应的 Stream,而不是控制器本身
+    return controller.stream;
+  }
+}

+ 48 - 0
lib/widget/relative_time_text.dart

@@ -0,0 +1,48 @@
+import 'dart:async';
+
+import 'package:flutter/cupertino.dart';
+
+import '../utils/common_util.dart';
+
+class RelativeTimeText extends StatefulWidget {
+  final int? timestamp;
+  final Duration updateInterval;
+  final TextStyle? style;
+
+  const RelativeTimeText({
+    super.key,
+    required this.timestamp,
+    this.updateInterval = const Duration(minutes: 1),
+    this.style,
+  });
+
+  @override
+  State<RelativeTimeText> createState() => _RelativeTimeTextState();
+}
+
+class _RelativeTimeTextState extends State<RelativeTimeText> {
+  late Timer _timer;
+
+  @override
+  void initState() {
+    super.initState();
+    _timer = Timer.periodic(widget.updateInterval, (_) => _updateTime());
+  }
+
+  void _updateTime() {
+    if (mounted) {
+      setState(() {});
+    }
+  }
+
+  @override
+  void dispose() {
+    _timer.cancel();
+    super.dispose();
+  }
+
+  @override
+  Widget build(BuildContext context) {
+    return Text(time2TimeDesc(widget.timestamp), style: widget.style);
+  }
+}

+ 0 - 1
plugins/map/lib/src/widget/map_controller.dart

@@ -42,7 +42,6 @@ class MapController extends MapOverlaysInterface {
       'method': MapConstants.methodUpdateOrAddMarkers,
       'args': serialized
     };
-    debugPrint("updateMarkers...params==>$params");
     if (_channel != null) {
       _executeMethod(params);
     } else {

+ 0 - 1
plugins/map_amap_android/android/src/main/java/com/atmob/map_amap_android/overlays/marker/MarkersController.java

@@ -135,7 +135,6 @@ public class MarkersController implements MyMethodCallHandler, AMap.OnMarkerClic
             result.error("-1", "updateFriendMarkers.list is empty", null);
             return;
         }
-        LogUtil.i(TAG, "updateMarkers=list==>" + list.size());
         for (MakerInfo makerInfo : list) {
             updateMarker(makerInfo);
         }

+ 1 - 1
pubspec.lock

@@ -1063,7 +1063,7 @@ packages:
     source: hosted
     version: "0.1.6"
   web_socket_channel:
-    dependency: transitive
+    dependency: "direct main"
     description:
       name: web_socket_channel
       sha256: "0b8e2457400d8a859b7b2030786835a28a8e80836ef64402abef392ff4f1d0e5"

+ 2 - 1
pubspec.yaml

@@ -81,7 +81,8 @@ dependencies:
   #跑马灯
   marquee: 2.3.0
 
-
+  #socket连接
+  web_socket_channel: 3.0.2
 
   ######################地图########################
   flutter_map: