"""
tenso.serve: Quick-start utilities for spinning up Tenso-optimized servers.
Provides helpers that wrap FastAPI/gRPC with sensible defaults for
high-throughput tensor serving: correct content types, worker pool sizing
based on CPU/GPU resources, and built-in Tenso request/response handling.
Example::
from tenso.serve import create_app, run
app = create_app()
@app.tenso_endpoint("/infer")
def infer(tensor):
return tensor * 2.0
run(app, workers=4)
"""
import asyncio
import inspect
import logging
import os
from typing import Any, Callable, Optional
logger = logging.getLogger("tenso.serve")
def _detect_workers() -> int:
"""Determine optimal worker count based on available resources."""
cpu_count = os.cpu_count() or 1
# For tensor serving, use fewer workers than CPUs to leave room
# for numpy/torch compute threads within each worker.
return max(1, min(cpu_count // 2, 8))
[docs]
class TensoApp:
"""
A lightweight wrapper around FastAPI pre-configured for Tenso serving.
Automatically handles Tenso binary request parsing and response
serialization for registered endpoints.
"""
def __init__(
self,
title: str = "Tenso Server",
check_integrity: bool = False,
compress_response: bool = False,
):
from fastapi import FastAPI
self._app = FastAPI(title=title)
self._check_integrity = check_integrity
self._compress_response = compress_response
self._endpoints = []
@property
def app(self):
"""The underlying FastAPI application instance."""
return self._app
[docs]
def tenso_endpoint(
self,
path: str,
method: str = "POST",
check_integrity: Optional[bool] = None,
) -> Callable:
"""
Decorator to register a function as a Tenso-powered endpoint.
The decorated function receives a deserialized tensor/bundle
and should return a numpy array or dict. The return value is
automatically serialized as a TensoResponse.
Parameters
----------
path : str
The URL path for the endpoint.
method : str
HTTP method (default: POST).
check_integrity : bool, optional
Override per-endpoint integrity checking.
"""
integrity = check_integrity if check_integrity is not None else self._check_integrity
def decorator(func: Callable) -> Callable:
from fastapi import Request
from .fastapi import TensoResponse, get_tenso_data
is_async = asyncio.iscoroutinefunction(func)
async def endpoint_handler(request: Request) -> TensoResponse:
tensor = await get_tenso_data(request)
if is_async:
result = await func(tensor)
else:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, func, tensor)
return TensoResponse(
result,
check_integrity=integrity,
)
self._app.add_api_route(
path,
endpoint_handler,
methods=[method.upper()],
)
self._endpoints.append(path)
return func
return decorator
[docs]
def add_health_check(self, path: str = "/health"):
"""Add a simple health check endpoint."""
@self._app.get(path)
def health():
return {"status": "ok", "endpoints": self._endpoints}
[docs]
def create_app(
title: str = "Tenso Server",
check_integrity: bool = False,
health_check: bool = True,
) -> TensoApp:
"""
Create a new TensoApp with sensible defaults.
Parameters
----------
title : str
Server title.
check_integrity : bool
Enable XXH3 integrity checking on all endpoints.
health_check : bool
Add a /health endpoint.
Returns
-------
TensoApp
The configured application.
"""
app = TensoApp(title=title, check_integrity=check_integrity)
if health_check:
app.add_health_check()
return app
[docs]
def run(
app: TensoApp,
host: str = "0.0.0.0",
port: int = 8000,
workers: Optional[int] = None,
log_level: str = "info",
):
"""
Run a TensoApp using uvicorn with optimized settings.
Parameters
----------
app : TensoApp
The TensoApp to serve.
host : str
Bind address.
port : int
Bind port.
workers : int, optional
Number of worker processes. Auto-detected if None.
log_level : str
Uvicorn log level.
"""
import uvicorn
if workers is None:
workers = _detect_workers()
uvicorn.run(
app.app,
host=host,
port=port,
workers=workers,
log_level=log_level,
limit_concurrency=1000,
timeout_keep_alive=30,
)
[docs]
def run_grpc(
servicer_class: Any,
port: int = 50051,
max_workers: Optional[int] = None,
max_message_length: int = 256 * 1024 * 1024,
):
"""
Run a gRPC server with Tenso-optimized settings.
Parameters
----------
servicer_class : class
A TensorInferenceServicer subclass instance.
port : int
gRPC listen port.
max_workers : int, optional
Thread pool size. Auto-detected if None.
max_message_length : int
Maximum message size (default 256MB for large tensors).
"""
import grpc
from concurrent import futures
from .grpc import tenso_msg_pb2_grpc
if max_workers is None:
max_workers = _detect_workers() * 2
options = [
("grpc.max_send_message_length", max_message_length),
("grpc.max_receive_message_length", max_message_length),
]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=max_workers),
options=options,
)
tenso_msg_pb2_grpc.add_TensorInferenceServicer_to_server(
servicer_class, server
)
server.add_insecure_port(f"[::]:{port}")
server.start()
logger.info("Tenso gRPC server started on port %d with %d workers", port, max_workers)
server.wait_for_termination()