Skip to main content
This guide walks through instrumenting a Flask-based inference API with the OneX Observability SDK. You’ll create a monitor, wrap your model with watch(), and use request_context() so each request is traced end-to-end with neural signals, raw payloads, and application responses.

Prerequisites

  • Python 3.8+
  • Flask
  • PyTorch and a model you want to instrument (e.g. HuggingFace Transformers)
  • API key and ingestion endpoint from the OneX Observability Dashboard
Install the SDK with PyTorch support:
pip install onex-sdk[pytorch] flask torch transformers

Step 1: Create the monitor

Create an OneXMonitor with your API key and endpoint. Use the dashboard to get environment-specific endpoints (e.g. development vs production). Optional config such as request_metadata and payload sampling helps tailor what gets sent to the platform.
from onex import OneXMonitor

monitor = OneXMonitor(
    api_key="your-api-key",
    endpoint="onex-ingestion-endpoint",
    config={
        "payload_sample_items": 5,
        "payload_tensor_sample": 32,
        "request_metadata": {"app": "flask-sentiment-api"},
    },
)
For local debugging, set enable_logging=True to see framework detection and signal export in the terminal.

Step 2: Load and watch the model

Load your model as usual, then pass it to monitor.watch(). The SDK auto-detects the framework (PyTorch, TensorFlow, JAX), attaches hooks, and streams signals asynchronously. All inference inside a request_context will be associated with the same request.
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch

MODEL_NAME = "nlptown/bert-base-multilingual-uncased-sentiment"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    output_hidden_states=True,
    output_attentions=True,
)
model.eval()
model = monitor.watch(model)

Step 3: Wrap routes with request context

Use monitor.request_context() inside each prediction route. Pass the raw request payload (e.g. user input) and any route metadata. Run your model forward inside the context, build the API response, then call ctx.record_response() so the platform receives both neural signals and the application-level response.
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/predict", methods=["POST"])
def predict():
    payload = request.json or {}
    text = payload.get("text", "")

    with monitor.request_context({"text": text}, metadata={"route": "/predict"}) as ctx:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)

        probs = torch.softmax(outputs.logits, dim=-1)
        rating = int(torch.argmax(probs).item() + 1)
        confidence = float(torch.max(probs).item())

        api_response = {"rating": rating, "confidence": confidence, "text": text}
        ctx.record_response(api_response)

    return jsonify(api_response)
  • Raw payload: The first argument to request_context is sent as the request payload event (e.g. {"text": "..."}), so the platform can correlate inputs with neural signals.
  • Metadata: Optional metadata (e.g. route, version) is attached to request/response events.
  • Response: ctx.record_response(api_response) ensures the application response is exported with the same request_id as the neural signals and payload.

Complete example

Putting it together:
from flask import Flask, request, jsonify
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from onex import OneXMonitor

app = Flask(__name__)

monitor = OneXMonitor(
    api_key="your-api-key",
    endpoint="onex-ingestion-endpoint",
    config={
        "payload_sample_items": 5,
        "payload_tensor_sample": 32,
        "request_metadata": {"app": "flask-sentiment-api"},
    },
)

MODEL_NAME = "nlptown/bert-base-multilingual-uncased-sentiment"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    output_hidden_states=True,
    output_attentions=True,
)
model.eval()
model = monitor.watch(model)


@app.route("/predict", methods=["POST"])
def predict():
    payload = request.json or {}
    text = payload.get("text", "")

    with monitor.request_context({"text": text}, metadata={"route": "/predict"}) as ctx:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)

        probs = torch.softmax(outputs.logits, dim=-1)
        rating = int(torch.argmax(probs).item() + 1)
        confidence = float(torch.max(probs).item())

        api_response = {"rating": rating, "confidence": confidence, "text": text}
        ctx.record_response(api_response)

    return jsonify(api_response)


@app.route("/health", methods=["GET"])
def health():
    return jsonify({"status": "ok"})


if __name__ == "__main__":
    app.run()
Run the app and send a test request:
curl -X POST http://127.0.0.1:5000/predict \
  -H "Content-Type: application/json" \
  -d '{"text": "The product is great and delivery was fast."}'

Error handling

If an exception is raised inside request_context, you can still record a failure response so the platform has a full trace. Use a try/except and record_response with a payload that indicates failure (and optionally success=False if you use manual instrumentation).
@app.route("/predict", methods=["POST"])
def predict():
    payload = request.json or {}
    text = payload.get("text", "")

    with monitor.request_context({"text": text}, metadata={"route": "/predict"}) as ctx:
        try:
            inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
            with torch.no_grad():
                outputs = model(**inputs)
            # ... build api_response ...
            ctx.record_response(api_response)
            return jsonify(api_response)
        except Exception as e:
            ctx.record_response({"error": str(e), "success": False})
            return jsonify({"error": str(e)}), 500

Graceful shutdown

Call monitor.stop() when the Flask process exits so the SDK flushes outstanding batches and closes cleanly. Using atexit or signal handlers works well:
import atexit
import signal

def shutdown():
    monitor.stop()

atexit.register(shutdown)
signal.signal(signal.SIGTERM, lambda *a: (shutdown(), exit(0)))
signal.signal(signal.SIGINT, lambda *a: (shutdown(), exit(0)))
If you use a production WSGI server (e.g. Gunicorn), register the same shutdown logic in the worker or master process so it runs before the process exits.

Next steps

  • Configuration: See Configuration Reference for sampling, payload capture, logits/probabilities, and throughput limits.
  • Integration overview: See Integration Guide for manual instrumentation, disabling request/response capture, and attention/metrics options.
  • Signals: See Signals for the events the SDK sends to the platform.