Przeglądaj źródła

[Fix]修复SSE解析异常

zhipeng 1 rok temu
rodzic
commit
cdd1a01db5

+ 5 - 1
lib/data/api/atmob_stream_api.c.dart

@@ -22,7 +22,10 @@ class _AtmobStreamApi implements AtmobStreamApi {
   Future<ResponseBody> chat(ChatRequest request) async {
     const _extra = <String, dynamic>{};
     final queryParameters = <String, dynamic>{};
-    final _headers = <String, dynamic>{};
+    final _headers = <String, dynamic>{
+      r'Content-Type': 'application/json',
+      r'Accept': 'text/event-stream'
+    };
     final _data = <String, dynamic>{};
     _data.addAll(request.toJson());
     final _result =
@@ -30,6 +33,7 @@ class _AtmobStreamApi implements AtmobStreamApi {
       method: 'POST',
       headers: _headers,
       extra: _extra,
+      responseType: ResponseType.stream,
     )
             .compose(
               _dio.options,

+ 1 - 1
lib/data/consts/constants.dart

@@ -1,7 +1,7 @@
 class Constants {
   Constants._();
 
-  static const String env = envTest;
+  static const String env = envProd;
 
   static const String envDev = 'dev';
 

+ 1 - 2
lib/module/chat/controller.dart

@@ -6,7 +6,6 @@ import 'package:electronic_assistant/data/bean/chat_item.dart';
 import 'package:electronic_assistant/data/bean/file_chat_item.dart';
 import 'package:electronic_assistant/data/bean/progressing_chat_item.dart';
 import 'package:electronic_assistant/data/bean/reference_chat_item.dart';
-import 'package:electronic_assistant/data/bean/stream_chat_origin_data.dart';
 import 'package:electronic_assistant/data/repositories/account_repository.dart';
 import 'package:electronic_assistant/data/repositories/chat_repository.dart';
 import 'package:electronic_assistant/data/repositories/talk_repository.dart';
@@ -17,6 +16,7 @@ import 'package:get/get.dart';
 import 'package:pull_to_refresh/pull_to_refresh.dart';
 import 'package:uuid/uuid.dart';
 
+import '../../data/bean/stream_chat_origin_data.dart';
 import '../../data/bean/talks.dart';
 import '../../data/consts/error_code.dart';
 import '../../router/app_pages.dart';
@@ -145,7 +145,6 @@ class ChatController extends BaseController {
     chatItems.insert(0, progressingChatItem);
 
     _scrollToBottom();
-
     chatRepository
         .streamChat(chatContent,
             talkId: talkInfo.value?.id, agendaId: agenda.value?.id)

+ 72 - 56
lib/utils/sse_parse_util.dart

@@ -2,6 +2,7 @@ import 'dart:async';
 import 'dart:convert';
 import 'dart:typed_data';
 
+import 'package:connectivity_plus/connectivity_plus.dart';
 import 'package:flutter/cupertino.dart';
 
 class SSEParseUtil {
@@ -32,21 +33,14 @@ class SSETransformer extends StreamTransformerBase<Uint8List, Message> {
   @override
   Stream<Message> bind(Stream<Uint8List> stream) {
     return Stream.eventTransformed(
-      stream.map((bytes) {
-        try {
-          return utf8.decoder.convert(bytes);
-        } catch (e) {
-          debugPrint("$e: $bytes");
-        }
-        return "";
-      }),
-      (sink) => SseEventSink(sink),
+      stream.map((uint8List) => List<int>.from(uint8List)),
+      (sink) => SSESink(sink),
     );
   }
 }
 
-class SseEventSink implements EventSink<String> {
-  static const _eventSeparator = "\n\n";
+class SSESink implements EventSink<List<int>> {
+  static final _eventSeparator = utf8.encode("\n\n");
   static const _fieldSeparator = "\n";
   static const _dataPrefix = "data:";
   static const _dataPrefixR = "data: ";
@@ -61,25 +55,23 @@ class SseEventSink implements EventSink<String> {
 
   final EventSink<Message> _eventSink;
 
-  String? _id;
-  String? _event;
-  String _data = "";
-  int? _retry;
-
-  String buffer = "";
-  String completedEvent = "";
+  final List<int> _buffer = [];
 
-  SseEventSink(this._eventSink);
+  SSESink(this._eventSink);
 
   @override
-  void add(String event) {
-    buffer += event;
+  void add(List<int> event) {
+    _buffer.addAll(event);
 
-    if (buffer.endsWith("\n\n")) {
-      completedEvent = buffer;
-      buffer = "";
-      parseEvent();
+    final endIndex = _indexOf(_buffer, _eventSeparator);
+    if (endIndex == -1) {
+      return;
     }
+
+    final completedEvent = _buffer.sublist(0, endIndex);
+    _buffer.removeRange(0, endIndex + _eventSeparator.length);
+
+    parseEvent(completedEvent);
   }
 
   @override
@@ -92,38 +84,62 @@ class SseEventSink implements EventSink<String> {
     _eventSink.close();
   }
 
-  void parseEvent() {
-    completedEvent = completedEvent.substring(0, completedEvent.length - 2);
-    completedEvent.split("\n").forEach((element) {
-      element = element.trim();
-      if (element.isEmpty) return;
-      if (element.startsWith(_commentPrefix) ||
-          element.startsWith(_commentPrefixR)) {
-        return;
+  int _indexOf(List<int> origin, List<int> target) {
+    for (var i = 0; i < origin.length - target.length; i++) {
+      var found = true;
+      for (var j = 0; j < target.length; j++) {
+        if (origin[i + j] != target[j]) {
+          found = false;
+          break;
+        }
+      }
+      if (found) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  void parseEvent(List<int> completedEvent) {
+    final eventString = utf8.decode(completedEvent);
+    final fields = eventString.split(_fieldSeparator);
+
+    String? id;
+    String? event;
+    String data = "";
+    int? retry;
+
+    for (final field in fields) {
+      final trimmedField = field.trim();
+      if (trimmedField.isEmpty) {
+        continue;
       }
-      if (element.startsWith(_retryPrefixR)) {
-        _retry = int.tryParse(element.substring(_retryPrefixR.length));
-      } else if (element.startsWith(_dataPrefixR)) {
-        _data += element.substring(_dataPrefixR.length);
-      } else if (element.startsWith(_eventPrefixR)) {
-        _event = element.substring(_eventPrefixR.length);
-      } else if (element.startsWith(_idPrefixR)) {
-        _id = element.substring(_idPrefixR.length);
-      } else if (element.startsWith(_idPrefix)) {
-        _id = element.substring(_idPrefix.length);
-      } else if (element.startsWith(_eventPrefix)) {
-        _event = element.substring(_eventPrefix.length);
-      } else if (element.startsWith(_dataPrefix)) {
-        _data += element.substring(_dataPrefix.length);
-      } else if (element.startsWith(_retryPrefix)) {
-        _retry = int.tryParse(element.substring(_retryPrefix.length));
+
+      if (trimmedField.startsWith(_commentPrefix) ||
+          trimmedField.startsWith(_commentPrefixR)) {
+        continue;
       }
-    });
+
+      if (trimmedField.startsWith(_retryPrefixR)) {
+        retry = int.tryParse(trimmedField.substring(_retryPrefixR.length));
+      } else if (trimmedField.startsWith(_dataPrefixR)) {
+        data += trimmedField.substring(_dataPrefixR.length);
+      } else if (trimmedField.startsWith(_eventPrefixR)) {
+        event = trimmedField.substring(_eventPrefixR.length);
+      } else if (trimmedField.startsWith(_idPrefixR)) {
+        id = trimmedField.substring(_idPrefixR.length);
+      } else if (trimmedField.startsWith(_idPrefix)) {
+        id = trimmedField.substring(_idPrefix.length);
+      } else if (trimmedField.startsWith(_eventPrefix)) {
+        event = trimmedField.substring(_eventPrefix.length);
+      } else if (trimmedField.startsWith(_dataPrefix)) {
+        data += trimmedField.substring(_dataPrefix.length);
+      } else if (trimmedField.startsWith(_retryPrefix)) {
+        retry = int.tryParse(trimmedField.substring(_retryPrefix.length));
+      }
+    }
+
     _eventSink.add(Message(
-        id: _id ?? "", event: _event ?? "", data: _data, retry: _retry));
-    _id = null;
-    _event = null;
-    _data = "";
-    _retry = null;
+        id: id ?? "", event: event ?? "", data: data, retry: retry));
   }
-}
+}

+ 2 - 10
pubspec.lock

@@ -250,7 +250,7 @@ packages:
     source: hosted
     version: "1.0.0"
   cupertino_icons:
-    dependency: "direct main"
+    dependency: transitive
     description:
       name: cupertino_icons
       sha256: ba631d1c7f7bef6b729a622b7b752645a2d076dba9976925b8f25725a30e1ee6
@@ -776,16 +776,8 @@ packages:
       url: "https://pub.dev"
     source: hosted
     version: "2.1.0"
-  package_info:
-    dependency: "direct main"
-    description:
-      name: package_info
-      sha256: "6c07d9d82c69e16afeeeeb6866fe43985a20b3b50df243091bfc4a4ad2b03b75"
-      url: "https://pub.dev"
-    source: hosted
-    version: "2.0.2"
   package_info_plus:
-    dependency: transitive
+    dependency: "direct main"
     description:
       name: package_info_plus
       sha256: a75164ade98cb7d24cfd0a13c6408927c6b217fa60dee5a7ff5c116a58f28918