Oi, Wagno! Como vai?
Respondendo suas dúvidas:
- Streaming no Gradio: sim, via funções geradoras que usam
yield (ex.: gr.ChatInterface(stream=True) ou retorno de um generator). - WebSockets nativo: o Gradio usa WebSocket internamente, mas não expõe canal público em tempo real para "push" direto.
- Integração com backend em tempo real: sim. Combine FastAPI + WebSocket ou Redis Pub/Sub para enviar dados em tempo real ao Gradio.
- Se não houver push direto: simule com fila (
queue.Queue, Redis, Kafka) + polling no Gradio, emitindo yield quando houver novo dado. - Sql Server: use pyodbc ou SQLAlchemy.
Veja os exemplos abaixo.
Exemplo 1 — Streaming básico no Gradio
import time, gradio as gr
def stream_answer(question: str):
# **Simula** um modelo que responde em partes
for token in ["Consultando", " base", " de", " dados", "..."]:
time.sleep(0.4)
yield "".join(token)
yield "\n**Pronto!** Consulta finalizada."
demo = gr.ChatInterface(
fn=stream_answer,
title="Streaming simples",
examples=["Como fazer streaming no Gradio?"],
textbox=gr.Textbox(placeholder="Pergunte...")
)
if __name__ == "__main__":
demo.queue().launch()
O que faz: retorna texto em partes (yield) e o Gradio mostra em tempo real.
Exemplo 2 — FastAPI + WebSocket + Gradio (push via fila)
import asyncio, json
from fastapi import FastAPI, WebSocket
import gradio as gr
app = FastAPI()
queue = asyncio.Queue()
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await queue.put(json.loads(data))
except Exception:
await ws.close()
def stream_realtime(_):
while True:
item = asyncio.get_event_loop().run_until_complete(queue.get())
yield f"**Evento:** {item}"
with gr.Blocks() as demo:
log = gr.Textbox(label="Atualizações", lines=12)
btn = gr.Button("Conectar e ouvir")
btn.click(stream_realtime, inputs=None, outputs=log)
fastapi_app = gr.mount_gradio_app(app, demo, path="/")
# Rode com: uvicorn este_arquivo:fastapi_app --reload
O que faz: o backend envia JSON via WebSocket; o Gradio lê da queue e streama na interface.
Exemplo 3 — Simulação de polling (sem WebSocket)
import time, random, gradio as gr
def watch_events(_):
while True:
event = {"ts": time.time(), "value": random.randint(1, 100)}
yield f"**Novo evento** {event['value']} em {event['ts']}"
time.sleep(1)
gr.ChatInterface(fn=watch_events, textbox=False).launch()
O que faz: simula leitura contínua de dados em tempo real.
Exemplo 4 — Consulta no SQL Server com streaming
import time
import pyodbc
import gradio as gr
CONN_STR = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=seu_servidor_sql,1433;"
"DATABASE=sua_base;"
"UID=usuario;PWD=senha;TrustServerCertificate=yes;"
)
def stream_query(sql: str):
with pyodbc.connect(CONN_STR) as conn:
cur = conn.cursor()
cur.execute(sql)
cols = [d[0] for d in cur.description]
yield f"**Colunas:** {cols}"
for row in cur:
time.sleep(0.2)
yield f"**Linha:** {list(row)}"
yield "**Fim da consulta.**"
demo = gr.Interface(fn=stream_query, inputs=gr.Textbox(label="SQL"), outputs="text")
# Exemplo: SELECT TOP 5 name, object_id FROM sys.objects ORDER BY object_id DESC
demo.queue().launch()
O que faz: conecta ao Sql Server e exibe cada linha da consulta em tempo real.
- Use yield no Gradio para streaming imediato.
- Para tempo real, centralize eventos no backend (FastAPI + WebSocket/Redis).
- Para Sql Server, use pyodbc/SQLAlchemy e emita linha a linha.
Espero ter ajudado. Conte com o apoio do Fórum na sua jornada. Fico à disposição.
Abraços e bons estudos!
Caso este post tenha lhe ajudado, por favor, marcar como solucionado