import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; import 'package:flutter/cupertino.dart'; class SSEParseUtil { static Stream parse(Stream stream) { return stream.transform(SSETransformer()); } } class Message { final String id; final String event; final String data; final int? retry; Message( {required this.id, required this.event, required this.data, required this.retry}); @override String toString() { return 'Message{id: $id, event: $event, data: $data, retry: $retry}'; } } class SSETransformer extends StreamTransformerBase { @override Stream bind(Stream stream) { return Stream.eventTransformed( stream.map((bytes) { try { return utf8.decoder.convert(bytes); } catch (e) { debugPrint("$e: $bytes"); } return ""; }), (sink) => SseEventSink(sink), ); } } class SseEventSink implements EventSink { static const _eventSeparator = "\n\n"; static const _fieldSeparator = "\n"; static const _dataPrefix = "data:"; static const _dataPrefixR = "data: "; static const _idPrefix = "id:"; static const _idPrefixR = "id: "; static const _eventPrefix = "event:"; static const _eventPrefixR = "event: "; static const _retryPrefix = "retry:"; static const _retryPrefixR = "retry: "; static const _commentPrefix = ":"; static const _commentPrefixR = ": "; final EventSink _eventSink; String? _id; String? _event; String _data = ""; int? _retry; String buffer = ""; String completedEvent = ""; SseEventSink(this._eventSink); @override void add(String event) { buffer += event; if (buffer.endsWith("\n\n")) { completedEvent = buffer; buffer = ""; parseEvent(); } } @override void addError(Object error, [StackTrace? stackTrace]) { _eventSink.addError(error, stackTrace); } @override void close() { _eventSink.close(); } void parseEvent() { completedEvent = completedEvent.substring(0, completedEvent.length - 2); completedEvent.split("\n").forEach((element) { element = element.trim(); if (element.isEmpty) return; if (element.startsWith(_commentPrefix) || element.startsWith(_commentPrefixR)) { return; } if (element.startsWith(_retryPrefixR)) { _retry = int.tryParse(element.substring(_retryPrefixR.length)); } else if (element.startsWith(_dataPrefixR)) { _data += element.substring(_dataPrefixR.length); } else if (element.startsWith(_eventPrefixR)) { _event = element.substring(_eventPrefixR.length); } else if (element.startsWith(_idPrefixR)) { _id = element.substring(_idPrefixR.length); } else if (element.startsWith(_idPrefix)) { _id = element.substring(_idPrefix.length); } else if (element.startsWith(_eventPrefix)) { _event = element.substring(_eventPrefix.length); } else if (element.startsWith(_dataPrefix)) { _data += element.substring(_dataPrefix.length); } else if (element.startsWith(_retryPrefix)) { _retry = int.tryParse(element.substring(_retryPrefix.length)); } }); _eventSink.add(Message( id: _id ?? "", event: _event ?? "", data: _data, retry: _retry)); _id = null; _event = null; _data = ""; _retry = null; } }