sse_parse_util.dart 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import 'dart:async';
  2. import 'dart:convert';
  3. import 'dart:typed_data';
  4. import 'package:flutter/cupertino.dart';
  5. class SSEParseUtil {
  6. static Stream<Message> parse(Stream<Uint8List> stream) {
  7. return stream.transform(SSETransformer());
  8. }
  9. }
  10. class Message {
  11. final String id;
  12. final String event;
  13. final String data;
  14. final int? retry;
  15. Message(
  16. {required this.id,
  17. required this.event,
  18. required this.data,
  19. required this.retry});
  20. @override
  21. String toString() {
  22. return 'Message{id: $id, event: $event, data: $data, retry: $retry}';
  23. }
  24. }
  25. class SSETransformer extends StreamTransformerBase<Uint8List, Message> {
  26. @override
  27. Stream<Message> bind(Stream<Uint8List> stream) {
  28. return Stream.eventTransformed(
  29. stream.map((bytes) {
  30. try {
  31. return utf8.decoder.convert(bytes);
  32. } catch (e) {
  33. debugPrint("$e: $bytes");
  34. }
  35. return "";
  36. }),
  37. (sink) => SseEventSink(sink),
  38. );
  39. }
  40. }
  41. class SseEventSink implements EventSink<String> {
  42. static const _eventSeparator = "\n\n";
  43. static const _fieldSeparator = "\n";
  44. static const _dataPrefix = "data:";
  45. static const _dataPrefixR = "data: ";
  46. static const _idPrefix = "id:";
  47. static const _idPrefixR = "id: ";
  48. static const _eventPrefix = "event:";
  49. static const _eventPrefixR = "event: ";
  50. static const _retryPrefix = "retry:";
  51. static const _retryPrefixR = "retry: ";
  52. static const _commentPrefix = ":";
  53. static const _commentPrefixR = ": ";
  54. final EventSink<Message> _eventSink;
  55. String? _id;
  56. String? _event;
  57. String _data = "";
  58. int? _retry;
  59. String buffer = "";
  60. String completedEvent = "";
  61. SseEventSink(this._eventSink);
  62. @override
  63. void add(String event) {
  64. buffer += event;
  65. if (buffer.endsWith("\n\n")) {
  66. completedEvent = buffer;
  67. buffer = "";
  68. parseEvent();
  69. }
  70. }
  71. @override
  72. void addError(Object error, [StackTrace? stackTrace]) {
  73. _eventSink.addError(error, stackTrace);
  74. }
  75. @override
  76. void close() {
  77. _eventSink.close();
  78. }
  79. void parseEvent() {
  80. completedEvent = completedEvent.substring(0, completedEvent.length - 2);
  81. completedEvent.split("\n").forEach((element) {
  82. element = element.trim();
  83. if (element.isEmpty) return;
  84. if (element.startsWith(_commentPrefix) ||
  85. element.startsWith(_commentPrefixR)) {
  86. return;
  87. }
  88. if (element.startsWith(_retryPrefixR)) {
  89. _retry = int.tryParse(element.substring(_retryPrefixR.length));
  90. } else if (element.startsWith(_dataPrefixR)) {
  91. _data += element.substring(_dataPrefixR.length);
  92. } else if (element.startsWith(_eventPrefixR)) {
  93. _event = element.substring(_eventPrefixR.length);
  94. } else if (element.startsWith(_idPrefixR)) {
  95. _id = element.substring(_idPrefixR.length);
  96. } else if (element.startsWith(_idPrefix)) {
  97. _id = element.substring(_idPrefix.length);
  98. } else if (element.startsWith(_eventPrefix)) {
  99. _event = element.substring(_eventPrefix.length);
  100. } else if (element.startsWith(_dataPrefix)) {
  101. _data += element.substring(_dataPrefix.length);
  102. } else if (element.startsWith(_retryPrefix)) {
  103. _retry = int.tryParse(element.substring(_retryPrefix.length));
  104. }
  105. });
  106. _eventSink.add(Message(
  107. id: _id ?? "", event: _event ?? "", data: _data, retry: _retry));
  108. _id = null;
  109. _event = null;
  110. _data = "";
  111. _retry = null;
  112. }
  113. }