import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; class SSEParseUtil { static Stream parse(Stream stream) { return stream.transform(SSETransformer()); } } class Message { final String id; final String event; final String data; final int? retry; Message( {required this.id, required this.event, required this.data, required this.retry}); @override String toString() { return 'Message{id: $id, event: $event, data: $data, retry: $retry}'; } } class SSETransformer extends StreamTransformerBase { @override Stream bind(Stream stream) { return Stream.eventTransformed( stream.map((uint8List) => List.from(uint8List)), (sink) => SSESink(sink), ); } } class SSESink implements EventSink> { static final _eventSeparator = utf8.encode("\n\n"); static const _fieldSeparator = "\n"; static const _dataPrefix = "data:"; static const _dataPrefixR = "data: "; static const _idPrefix = "id:"; static const _idPrefixR = "id: "; static const _eventPrefix = "event:"; static const _eventPrefixR = "event: "; static const _retryPrefix = "retry:"; static const _retryPrefixR = "retry: "; static const _commentPrefix = ":"; static const _commentPrefixR = ": "; final EventSink _eventSink; final List _buffer = []; SSESink(this._eventSink); @override void add(List event) { _buffer.addAll(event); while (true) { final endIndex = _indexOf(_buffer, _eventSeparator); if (endIndex == -1) { break; } final completedEvent = _buffer.sublist(0, endIndex); _buffer.removeRange(0, endIndex + _eventSeparator.length); parseEvent(completedEvent); } } @override void addError(Object error, [StackTrace? stackTrace]) { _eventSink.addError(error, stackTrace); } @override void close() { _eventSink.close(); } int _indexOf(List origin, List target) { for (var i = 0; i < origin.length - target.length; i++) { var found = true; for (var j = 0; j < target.length; j++) { if (origin[i + j] != target[j]) { found = false; break; } } if (found) { return i; } } return -1; } void parseEvent(List completedEvent) { final eventString = utf8.decode(completedEvent); final fields = eventString.split(_fieldSeparator); String? id; String? event; String data = ""; int? retry; for (final field in fields) { final trimmedField = field.trim(); if (trimmedField.isEmpty) { continue; } if (trimmedField.startsWith(_commentPrefix) || trimmedField.startsWith(_commentPrefixR)) { continue; } if (trimmedField.startsWith(_retryPrefixR)) { retry = int.tryParse(trimmedField.substring(_retryPrefixR.length)); } else if (trimmedField.startsWith(_dataPrefixR)) { data += trimmedField.substring(_dataPrefixR.length); } else if (trimmedField.startsWith(_eventPrefixR)) { event = trimmedField.substring(_eventPrefixR.length); } else if (trimmedField.startsWith(_idPrefixR)) { id = trimmedField.substring(_idPrefixR.length); } else if (trimmedField.startsWith(_idPrefix)) { id = trimmedField.substring(_idPrefix.length); } else if (trimmedField.startsWith(_eventPrefix)) { event = trimmedField.substring(_eventPrefix.length); } else if (trimmedField.startsWith(_dataPrefix)) { data += trimmedField.substring(_dataPrefix.length); } else if (trimmedField.startsWith(_retryPrefix)) { retry = int.tryParse(trimmedField.substring(_retryPrefix.length)); } } _eventSink.add( Message(id: id ?? "", event: event ?? "", data: data, retry: retry)); } }