| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import 'dart:async';
- import 'dart:convert';
- import 'dart:typed_data';
- class SSEParseUtil {
- static Stream<Message> parse(Stream<Uint8List> stream) {
- return stream.transform(SSETransformer());
- }
- }
- class Message {
- final String id;
- final String event;
- final String data;
- final int? retry;
- Message(
- {required this.id,
- required this.event,
- required this.data,
- required this.retry});
- @override
- String toString() {
- return 'Message{id: $id, event: $event, data: $data, retry: $retry}';
- }
- }
- class SSETransformer extends StreamTransformerBase<Uint8List, Message> {
- @override
- Stream<Message> bind(Stream<Uint8List> stream) {
- return Stream.eventTransformed(
- stream.map((uint8List) => List<int>.from(uint8List)),
- (sink) => SSESink(sink),
- );
- }
- }
- class SSESink implements EventSink<List<int>> {
- static final _eventSeparator = utf8.encode("\n\n");
- static const _fieldSeparator = "\n";
- static const _dataPrefix = "data:";
- static const _dataPrefixR = "data: ";
- static const _idPrefix = "id:";
- static const _idPrefixR = "id: ";
- static const _eventPrefix = "event:";
- static const _eventPrefixR = "event: ";
- static const _retryPrefix = "retry:";
- static const _retryPrefixR = "retry: ";
- static const _commentPrefix = ":";
- static const _commentPrefixR = ": ";
- final EventSink<Message> _eventSink;
- final List<int> _buffer = [];
- SSESink(this._eventSink);
- @override
- void add(List<int> event) {
- _buffer.addAll(event);
- final endIndex = _indexOf(_buffer, _eventSeparator);
- if (endIndex == -1) {
- return;
- }
- final completedEvent = _buffer.sublist(0, endIndex);
- _buffer.removeRange(0, endIndex + _eventSeparator.length);
- parseEvent(completedEvent);
- }
- @override
- void addError(Object error, [StackTrace? stackTrace]) {
- _eventSink.addError(error, stackTrace);
- }
- @override
- void close() {
- _eventSink.close();
- }
- int _indexOf(List<int> origin, List<int> target) {
- for (var i = 0; i < origin.length - target.length; i++) {
- var found = true;
- for (var j = 0; j < target.length; j++) {
- if (origin[i + j] != target[j]) {
- found = false;
- break;
- }
- }
- if (found) {
- return i;
- }
- }
- return -1;
- }
- void parseEvent(List<int> completedEvent) {
- final eventString = utf8.decode(completedEvent);
- final fields = eventString.split(_fieldSeparator);
- String? id;
- String? event;
- String data = "";
- int? retry;
- for (final field in fields) {
- final trimmedField = field.trim();
- if (trimmedField.isEmpty) {
- continue;
- }
- if (trimmedField.startsWith(_commentPrefix) ||
- trimmedField.startsWith(_commentPrefixR)) {
- continue;
- }
- if (trimmedField.startsWith(_retryPrefixR)) {
- retry = int.tryParse(trimmedField.substring(_retryPrefixR.length));
- } else if (trimmedField.startsWith(_dataPrefixR)) {
- data += trimmedField.substring(_dataPrefixR.length);
- } else if (trimmedField.startsWith(_eventPrefixR)) {
- event = trimmedField.substring(_eventPrefixR.length);
- } else if (trimmedField.startsWith(_idPrefixR)) {
- id = trimmedField.substring(_idPrefixR.length);
- } else if (trimmedField.startsWith(_idPrefix)) {
- id = trimmedField.substring(_idPrefix.length);
- } else if (trimmedField.startsWith(_eventPrefix)) {
- event = trimmedField.substring(_eventPrefix.length);
- } else if (trimmedField.startsWith(_dataPrefix)) {
- data += trimmedField.substring(_dataPrefix.length);
- } else if (trimmedField.startsWith(_retryPrefix)) {
- retry = int.tryParse(trimmedField.substring(_retryPrefix.length));
- }
- }
- _eventSink.add(Message(
- id: id ?? "", event: event ?? "", data: data, retry: retry));
- }
- }
|