client.py

import requests

# Warning: This is a heavy process.

data = ""
for i in range(1, 250000):
    data += str(i)

r = requests.post('http://0.0.0.0:8000/stream', data=data)
print(r.text)

server.py

from sanic import Sanic
from sanic.blueprints import Blueprint
from sanic.response import stream, text
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator


bp = Blueprint("bp_example")
app = Sanic("Example")


class SimpleView(HTTPMethodView):
    @stream_decorator
    async def post(self, request):
        result = ""
        while True:
            body = await request.stream.get()
            if body is None:
                break
            result += body.decode("utf-8")
        return text(result)


@app.post("/stream", stream=True)
async def handler(request):
    async def streaming(response):
        while True:
            body = await request.stream.get()
            if body is None:
                break
            body = body.decode("utf-8").replace("1", "A")
            await response.write(body)

    return stream(streaming)


@bp.put("/bp_stream", stream=True)
async def bp_handler(request):
    result = ""
    while True:
        body = await request.stream.get()
        if body is None:
            break
        result += body.decode("utf-8").replace("1", "A")
    return text(result)


async def post_handler(request):
    result = ""
    while True:
        body = await request.stream.get()
        if body is None:
            break
        result += body.decode("utf-8")
    return text(result)


app.blueprint(bp)
app.add_route(SimpleView.as_view(), "/method_view")


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

Related articles

amending request object

from random import randint from sanic import Sanic from sanic.response import text app = Sanic("Example")

authorized sanic

# -*- coding: utf-8 -*- from functools import wraps from sanic import Sanic from sanic.response import json

sanic blueprints

from sanic import Blueprint, Sanic from sanic.response import file, json app = Sanic("Example") blueprint = Blueprint("bp_example", url_prefix="/my_blueprint") blueprint2 = Blueprint("bp_example2", url_prefix="/my_blueprint2")

fastapi request directly

from fastapi import FastAPI, Request app = FastAPI() @app.get("/items/{item_id}") def read_root(item_id: str, request: Request):

fastapi websockets

from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse app = FastAPI()

simple fastapi wsgi

from fastapi import FastAPI from fastapi.middleware.wsgi import WSGIMiddleware from flask import Flask, escape, request

matplotlib multicursor

Showing a cursor on multiple plots simultaneously. This example generates three axes split over two different figures.  On hovering the cursor over data in one subplot, the values of that datapoint are shown in all axes

polygon selector

import matplotlib.pyplot as plt from matplotlib.widgets import PolygonSelector # To create the polygon programmatically fig, ax = plt.subplots() fig.show() selector = PolygonSelector(ax, lambda *args: None)

matplotlib radio buttons

Using radio buttons to choose properties of your plot. Radio buttons let you choose between multiple options in a visualization. In this case, the buttons let the user choose one of the three different sine waves to be shown in the plot.

sanic delayed response

from asyncio import sleep from sanic import Sanic, response app = Sanic("DelayedResponseApp", strict_slashes=True) app.config.AUTO_EXTEND = False