Skip to content

Observation API

The Observation API handles mosquito observation data, including recording new observations and retrieving existing observation records.

Router Implementation

Observation Router Module for CulicidaeLab Server API.

This module provides FastAPI router endpoints for managing mosquito observation records. It handles creation and retrieval of observation data, including species identification, location information, and metadata.

The router integrates with the observation service layer to persist and retrieve data from the database while providing proper validation and error handling.

Typical usage example

from backend.routers.observation import router app.include_router(router, prefix="/api/v1")

Endpoints

POST /observations - Create a new observation record GET /observations - Retrieve observations with optional filtering

Observation

Complete observation model with unique identifier.

Extends ObservationBase with system-generated fields for storing complete observation records.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

ObservationListResponse

Response model for paginated observation lists.

Contains the total count and list of observations for API responses.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

create_observation(observation: Observation) -> Observation async

Create a new mosquito observation record.

This endpoint accepts a complete observation record and stores it in the database. The observation should include species identification, location data, and metadata. If no user_id is provided, a UUID will be automatically generated.

Parameters:

Name Type Description Default
observation Observation

Complete observation data including species information, location coordinates, count, and optional metadata. Must conform to the Observation schema with all required fields validated.

required

Returns:

Name Type Description
Observation Observation

The created observation record with assigned ID and any server-generated fields.

Raises:

Type Description
HTTPException

If observation creation fails due to validation errors or database issues. Returns 500 status code for server errors.

Example

from backend.schemas.observation_schemas import Observation, Location observation_data = Observation( ... species_scientific_name="Aedes aegypti", ... count=5, ... location=Location(lat=40.7128, lng=-74.0060), ... observed_at="2024-01-15T10:30:00Z", ... notes="Found near standing water" ... ) result = await create_observation(observation_data) print(f"Created observation with ID: {result.id}")

Source code in backend\routers\observation.py
@router.post(
    "/observations",
    response_model=Observation,
    status_code=status.HTTP_201_CREATED,
    summary="Submit a new observation",
    description="Submit a complete observation record. Prediction must be done beforehand.",
)
async def create_observation(
    observation: Observation,
) -> Observation:
    """Create a new mosquito observation record.

    This endpoint accepts a complete observation record and stores it in the database.
    The observation should include species identification, location data, and metadata.
    If no user_id is provided, a UUID will be automatically generated.

    Args:
        observation: Complete observation data including species information,
            location coordinates, count, and optional metadata. Must conform
            to the Observation schema with all required fields validated.

    Returns:
        Observation: The created observation record with assigned ID and
            any server-generated fields.

    Raises:
        HTTPException: If observation creation fails due to validation errors
            or database issues. Returns 500 status code for server errors.

    Example:
        >>> from backend.schemas.observation_schemas import Observation, Location
        >>> observation_data = Observation(
        ...     species_scientific_name="Aedes aegypti",
        ...     count=5,
        ...     location=Location(lat=40.7128, lng=-74.0060),
        ...     observed_at="2024-01-15T10:30:00Z",
        ...     notes="Found near standing water"
        ... )
        >>> result = await create_observation(observation_data)
        >>> print(f"Created observation with ID: {result.id}")
    """
    print("\n--- [ROUTER] Received request to CREATE observation ---")
    try:
        # FastAPI has already validated the incoming JSON against the Observation model.
        # All we need to do is add any final business logic.

        # Ensure user_id exists
        if not observation.user_id:
            observation.user_id = str(uuid4())
            print(f"[ROUTER] Generated new user_id: {observation.user_id}")

        print("[ROUTER] Observation data is valid. Calling observation service to save...")
        service = await get_observation_service()
        new_observation = await service.create_observation(observation)
        print(f"[ROUTER] Observation created successfully with ID: {new_observation.id}")
        return new_observation

    except HTTPException:
        # Re-raise exceptions from services or validation
        raise
    except Exception as e:
        # Catch any other unexpected errors
        print(f"[ROUTER] CRITICAL ERROR in /observations: {type(e).__name__} - {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to create observation: {str(e)}",
        )

get_observations(species_id: str | None = None, limit: int = 100, offset: int = 0, user_id: str = 'default_user_id') -> ObservationListResponse async

Retrieve mosquito observations with optional filtering.

This endpoint returns a paginated list of observation records. Results can be filtered by species and user, with configurable pagination limits.

Parameters:

Name Type Description Default
species_id str | None

Optional species identifier to filter observations. If None, returns observations for all species. Should be a valid species UUID or identifier from the database.

None
limit int

Maximum number of observations to return in a single response. Must be between 1 and 1000. Defaults to 100. Larger values are automatically capped at 1000 for performance.

100
offset int

Number of observations to skip for pagination. Must be non-negative. Defaults to 0. Use with limit for paginated results.

0
user_id str

Identifier for the user whose observations to retrieve. Currently defaults to "default_user_id" but should be replaced with proper authentication in production.

'default_user_id'

Returns:

Name Type Description
ObservationListResponse ObservationListResponse

Paginated response containing the total count of matching observations and the list of observation records. Each observation includes full species, location, and metadata.

Raises:

Type Description
HTTPException

If observation retrieval fails due to database errors or invalid parameters. Returns 500 status code for server errors.

Example
Get first 50 observations for all species

response = await get_observations(limit=50) print(f"Total observations: {response.count}")

Get observations for a specific species with pagination

response = await get_observations( ... species_id="aedes-aegypti-uuid", ... limit=25, ... offset=25 ... ) for obs in response.observations: ... print(f"Species: {obs.species_scientific_name}")

Get observations for a specific user (when auth is implemented)

response = await get_observations( ... user_id="authenticated-user-id", ... limit=10 ... )

Source code in backend\routers\observation.py
@router.get(
    "/observations",
    response_model=ObservationListResponse,
    summary="Get observations",
)
async def get_observations(
    species_id: str | None = None,
    limit: int = 100,
    offset: int = 0,
    user_id: str = "default_user_id",  # This should likely be replaced with auth
) -> ObservationListResponse:
    """Retrieve mosquito observations with optional filtering.

    This endpoint returns a paginated list of observation records. Results can be
    filtered by species and user, with configurable pagination limits.

    Args:
        species_id: Optional species identifier to filter observations. If None,
            returns observations for all species. Should be a valid species UUID
            or identifier from the database.
        limit: Maximum number of observations to return in a single response.
            Must be between 1 and 1000. Defaults to 100. Larger values are
            automatically capped at 1000 for performance.
        offset: Number of observations to skip for pagination. Must be non-negative.
            Defaults to 0. Use with limit for paginated results.
        user_id: Identifier for the user whose observations to retrieve. Currently
            defaults to "default_user_id" but should be replaced with proper
            authentication in production.

    Returns:
        ObservationListResponse: Paginated response containing the total count
            of matching observations and the list of observation records.
            Each observation includes full species, location, and metadata.

    Raises:
        HTTPException: If observation retrieval fails due to database errors
            or invalid parameters. Returns 500 status code for server errors.

    Example:
        >>> # Get first 50 observations for all species
        >>> response = await get_observations(limit=50)
        >>> print(f"Total observations: {response.count}")
        >>>
        >>> # Get observations for a specific species with pagination
        >>> response = await get_observations(
        ...     species_id="aedes-aegypti-uuid",
        ...     limit=25,
        ...     offset=25
        ... )
        >>> for obs in response.observations:
        ...     print(f"Species: {obs.species_scientific_name}")
        >>>
        >>> # Get observations for a specific user (when auth is implemented)
        >>> response = await get_observations(
        ...     user_id="authenticated-user-id",
        ...     limit=10
        ... )
    """
    print("\n--- [ROUTER] Received request for /observations (GET) ---")
    print(f"[ROUTER] Params: species_id='{species_id}', limit={limit}, offset={offset}, user_id='{user_id}'")
    try:
        service = await get_observation_service()
        print("[ROUTER] Calling service.get_observations...")
        result = await service.get_observations(
            user_id=user_id,
            species_id=species_id,
            limit=min(limit, 1000),  # Ensure limit is not excessive
            offset=max(offset, 0),  # Ensure offset is non-negative
        )
        print(f"[ROUTER] Retrieved {len(result.observations)} observations.")
        return result
    except HTTPException as http_exc:
        print(f"[ROUTER] Caught HTTPException: {http_exc.status_code} - {http_exc.detail}")
        raise
    except Exception as e:
        print(f"[ROUTER] CRITICAL ERROR in GET /observations: {type(e).__name__} - {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve observations: {str(e)}",
        )

get_observation_service() async

Get or initialize the global observation service instance.

This function implements a singleton pattern for the ObservationService, ensuring that only one instance exists and is properly initialized. If the service hasn't been created yet, it creates a new instance and initializes it.

Returns:

Name Type Description
ObservationService

The global observation service instance, initialized and ready for use.

Example

service = await get_observation_service() observations = await service.get_observations(limit=10)

Source code in backend\services\observation_service.py
async def get_observation_service():
    """Get or initialize the global observation service instance.

    This function implements a singleton pattern for the ObservationService,
    ensuring that only one instance exists and is properly initialized.
    If the service hasn't been created yet, it creates a new instance
    and initializes it.

    Returns:
        ObservationService: The global observation service instance,
            initialized and ready for use.

    Example:
        >>> service = await get_observation_service()
        >>> observations = await service.get_observations(limit=10)
    """
    global observation_service
    if observation_service is None:
        observation_service = ObservationService()
        await observation_service.initialize()
    return observation_service

Data Schemas

The Observation API uses Pydantic schemas for observation data validation:

backend.schemas.observation_schemas

Pydantic models for the Observation service.

This module defines the schema models used for request/response validation in the Observation service endpoints.

Location

Geographic location model.

Represents latitude and longitude coordinates for observations.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

ObservationBase

Base model for observation data.

Contains core observation fields used for creating and validating observation records.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Observation

Complete observation model with unique identifier.

Extends ObservationBase with system-generated fields for storing complete observation records.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

ObservationListResponse

Response model for paginated observation lists.

Contains the total count and list of observations for API responses.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Service Layer

The Observation API integrates with observation service layers:

backend.services.observation_service

Observation service for managing mosquito observation data.

This module provides functionality for creating, storing, and retrieving mosquito observation records in the LanceDB database. It handles data validation, transformation, and provides both synchronous and asynchronous methods for observation management.

Example

from backend.services.observation_service import get_observation_service service = await get_observation_service() observation = await service.create_observation(obs_data)

ObservationService()

Service for managing mosquito observation data.

This class provides methods for creating new observations, retrieving existing observations with filtering options, and managing the database connection lifecycle.

Attributes:

Name Type Description
table_name str

The name of the database table for observations.

db str

The LanceDB database connection object.

Example

service = ObservationService() await service.initialize() observations = await service.get_observations(limit=10)

Initialize the ObservationService with default configuration.

Sets up the service with the observations table name and prepares for database connection initialization.

Example

service = ObservationService() print(service.table_name) # "observations"

initialize()

Initialize the database connection and ensure required tables exist.

This method establishes a connection to the LanceDB database through the LanceDB manager and prepares the service for observation operations. The observations table will be created if it doesn't exist.

Returns:

Name Type Description
ObservationService

The initialized service instance for method chaining.

Example

service = ObservationService() await service.initialize() print(f"Connected to DB: {service.db is not None}")

create_observation(observation_data: Observation) -> Observation

Create a new observation record in the database.

This method transforms the Pydantic Observation model into the appropriate LanceDB schema format and inserts it into the observations table. It handles JSON serialization for complex fields like metadata and data_source.

Parameters:

Name Type Description Default
observation_data Observation

The observation data to store in the database.

required

Returns:

Name Type Description
Observation Observation

The same observation data that was passed in, confirming successful storage.

Raises:

Type Description
HTTPException

If there's an error saving the observation to the database, an HTTP 500 error is raised with details about the failure.

Example

from backend.schemas.observation_schemas import Observation, Location obs = Observation( ... id="obs_001", ... species_scientific_name="Aedes aegypti", ... location=Location(lat=40.7128, lng=-74.0060), ... observed_at="2023-06-15T10:30:00Z" ... ) result = await service.create_observation(obs)

get_observations(user_id: str | None = None, species_id: str | None = None, limit: int = 100, offset: int = 0) -> ObservationListResponse

Retrieve observations with optional filtering by user and species.

This method queries the observations table and returns filtered results based on user ID and/or species. It supports pagination and returns properly formatted Observation objects.

Parameters:

Name Type Description Default
user_id str | None

Filter observations by a specific user ID. If None or "default_user_id", no user filtering is applied.

None
species_id str | None

Filter observations by species scientific name. If None, no species filtering is applied.

None
limit int

Maximum number of observations to return. Defaults to 100.

100
offset int

Number of observations to skip for pagination. Defaults to 0.

0

Returns:

Name Type Description
ObservationListResponse ObservationListResponse

A response object containing the total count and list of matching observations.

Raises:

Type Description
HTTPException

If there's an error retrieving observations from the database, an HTTP 500 error is raised with details about the failure.

Example
Get recent observations for a specific user

user_obs = await service.get_observations(user_id="user123", limit=50) print(f"Found {user_obs.count} observations")

Get Aedes aegypti observations with pagination

aedes_obs = await service.get_observations( ... species_id="Aedes aegypti", ... limit=20, ... offset=40 ... )

Location

Geographic location model.

Represents latitude and longitude coordinates for observations.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Observation

Complete observation model with unique identifier.

Extends ObservationBase with system-generated fields for storing complete observation records.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

ObservationListResponse

Response model for paginated observation lists.

Contains the total count and list of observations for API responses.

model_config = {} class-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

get_observation_service() async

Get or initialize the global observation service instance.

This function implements a singleton pattern for the ObservationService, ensuring that only one instance exists and is properly initialized. If the service hasn't been created yet, it creates a new instance and initializes it.

Returns:

Name Type Description
ObservationService

The global observation service instance, initialized and ready for use.

Example

service = await get_observation_service() observations = await service.get_observations(limit=10)

get_lancedb_manager() -> LanceDBManager async

Get the global LanceDB manager instance.

This function returns the singleton LanceDB manager instance, ensuring it's connected before returning.

Returns:

Type Description
LanceDBManager

The connected LanceDB manager instance.

Example Usage

Create New Observation

import httpx
from datetime import datetime

async with httpx.AsyncClient() as client:
    observation_data = {
        "species_id": "aedes-aegypti",
        "location": {
            "latitude": 40.7128,
            "longitude": -74.0060
        },
        "observed_at": datetime.now().isoformat(),
        "observer_name": "Dr. Smith",
        "notes": "Found in urban area near standing water"
    }

    response = await client.post(
        "http://localhost:8000/api/v1/observations",
        json=observation_data
    )
    observation = response.json()
    print(f"Created observation ID: {observation['id']}")

Retrieve Observations

# Get list of observations with filters
response = await client.get(
    "http://localhost:8000/api/v1/observations",
    params={
        "species_id": "aedes-aegypti",
        "start_date": "2024-01-01",
        "end_date": "2024-12-31",
        "limit": 50
    }
)
observations = response.json()

# Get specific observation details
response = await client.get(
    "http://localhost:8000/api/v1/observations/12345"
)
observation_detail = response.json()

Update Observation

# Update existing observation
update_data = {
    "notes": "Updated notes with additional details",
    "verified": True
}

response = await client.patch(
    "http://localhost:8000/api/v1/observations/12345",
    json=update_data
)
updated_observation = response.json()