| 16 | | def test_sse_middleware_skipped(self): |
| 17 | | from django.middleware.gzip import GZipMiddleware |
| 18 | | from django.http import StreamingHttpResponse |
| 19 | | |
| 20 | | class DummyRequest: |
| 21 | | META = {"HTTP_ACCEPT_ENCODING": "gzip"} |
| 22 | | |
| 23 | | def event_stream(): |
| 24 | | for i in range(3): |
| 25 | | yield f"data: {i}\n\n".encode("utf-8") |
| 26 | | time.sleep(1) |
| 27 | | |
| 28 | | response = StreamingHttpResponse(event_stream(), content_type="text/event-stream") |
| 29 | | middleware = GZipMiddleware(lambda req: response) |
| 30 | | result = middleware(DummyRequest()) |
| 31 | | |
| 32 | | first_chunk = next(result.streaming_content) |
| 33 | | self.assertFalse(first_chunk.startswith(b"\x1f\x8b"), "SSE response should not be gzipped") |
| 34 | | |
| | 16 | run test in pycharm: tests.middleware.test_gzip.GzipMiddlewareTest.test_flush_streaming_compression |