Skip to content

Core API

culicidaelab.core

Core components of the CulicidaeLab library.

This module provides the base classes, configuration management, and resource handling functionalities that form the foundation of the library. It exports key classes and functions for convenient access from other parts of the application.

Attributes:

Name Type Description
__all__ list[str]

A list of the public objects of this module.

__all__ = ['BasePredictor', 'BaseProvider', 'WeightsManagerProtocol', 'ConfigManager', 'CulicidaeLabConfig', 'PredictorConfig', 'DatasetConfig', 'ProviderConfig', 'SpeciesModel', 'SpeciesConfig', 'ProviderService', 'ResourceManager', 'ResourceManagerError', 'Settings', 'get_settings', 'download_file', 'str_to_bgr'] module-attribute
BasePredictor

Abstract base class for all predictors.

This class defines the common interface for all predictors (e.g., detector, segmenter, classifier). It relies on the main Settings object for configuration and a WeightsManager for model file management.

Parameters:

Name Type Description Default
settings Settings

The main Settings object for the library.

required
predictor_type str

The key for this predictor in the configuration (e.g., 'classifier').

required
weights_manager WeightsManagerProtocol

An object conforming to the WeightsManagerProtocol.

required
load_model bool

If True, loads the model immediately upon initialization.

False

Attributes:

Name Type Description
settings Settings

The main settings object.

predictor_type str

The type of the predictor.

weights_manager WeightsManagerProtocol

The manager responsible for providing model weights.

Source code in culicidaelab/core/base_predictor.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class BasePredictor(Generic[PredictionType, GroundTruthType], ABC):
    """Abstract base class for all predictors.

    This class defines the common interface for all predictors (e.g., detector,
    segmenter, classifier). It relies on the main Settings object for
    configuration and a WeightsManager for model file management.

    Args:
        settings (Settings): The main Settings object for the library.
        predictor_type (str): The key for this predictor in the configuration
            (e.g., 'classifier').
        weights_manager (WeightsManagerProtocol): An object conforming to the
            WeightsManagerProtocol.
        load_model (bool): If True, loads the model immediately upon initialization.

    Attributes:
        settings (Settings): The main settings object.
        predictor_type (str): The type of the predictor.
        weights_manager (WeightsManagerProtocol): The manager responsible for
            providing model weights.
    """

    def __init__(
        self,
        settings: Settings,
        predictor_type: str,
        weights_manager: WeightsManagerProtocol,
        load_model: bool = False,
    ):
        """Initializes the predictor.

        Raises:
            ValueError: If the configuration for the specified `predictor_type`
                is not found in the settings.
        """
        self.settings = settings
        self.predictor_type = predictor_type

        self._weights_manager = weights_manager
        self._model_path = self._weights_manager.ensure_weights(self.predictor_type)
        self._config: PredictorConfig = self._get_predictor_config()

        self._model = None
        self._model_loaded = False
        self._logger = logging.getLogger(
            f"culicidaelab.predictor.{self.predictor_type}",
        )

        if load_model:
            self.load_model()

    def __call__(self, input_data: np.ndarray, **kwargs: Any) -> Any:
        """Convenience method that calls `predict()`."""
        if not self._model_loaded:
            self.load_model()
        return self.predict(input_data, **kwargs)

    def __enter__(self):
        """Context manager entry."""
        if not self._model_loaded:
            self.load_model()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        pass

    @property
    def config(self) -> PredictorConfig:
        """Get the predictor configuration Pydantic model."""
        return self._config

    @property
    def model_loaded(self) -> bool:
        """Check if the model is loaded."""
        return self._model_loaded

    @property
    def model_path(self) -> Path:
        """Gets the path to the model weights file."""
        return self._model_path

    @contextmanager
    def model_context(self):
        """A context manager for temporary model loading.

        Ensures the model is loaded upon entering the context and unloaded
        upon exiting. This is useful for managing memory in pipelines.

        Yields:
            BasePredictor: The predictor instance itself.

        Example:
            >>> with predictor.model_context():
            ...     predictions = predictor.predict(data)
        """
        was_loaded = self._model_loaded
        try:
            if not was_loaded:
                self.load_model()
            yield self
        finally:
            if not was_loaded and self._model_loaded:
                self.unload_model()

    def evaluate(
        self,
        ground_truth: GroundTruthType,
        prediction: PredictionType | None = None,
        input_data: np.ndarray | None = None,
        **predict_kwargs: Any,
    ) -> dict[str, float]:
        """Evaluate a prediction against a ground truth.

        Either `prediction` or `input_data` must be provided. If `prediction`
        is provided, it is used directly. If `prediction` is None, `input_data`
        is used to generate a new prediction.

        Args:
            ground_truth (GroundTruthType): The ground truth annotation.
            prediction (PredictionType, optional): A pre-computed prediction.
            input_data (np.ndarray, optional): Input data to generate a
                prediction from, if one isn't provided.
            **predict_kwargs (Any): Additional arguments passed to the `predict`
                method.

        Returns:
            dict[str, float]: Dictionary containing evaluation metrics for a
            single item.

        Raises:
            ValueError: If neither `prediction` nor `input_data` is provided.
        """
        if prediction is None:
            if input_data is not None:
                prediction = self.predict(input_data, **predict_kwargs)
            else:
                raise ValueError(
                    "Either 'prediction' or 'input_data' must be provided.",
                )
        return self._evaluate_from_prediction(
            prediction=prediction,
            ground_truth=ground_truth,
        )

    def evaluate_batch(
        self,
        ground_truth_batch: list[GroundTruthType],
        predictions_batch: list[PredictionType] | None = None,
        input_data_batch: list[np.ndarray] | None = None,
        num_workers: int = 4,
        show_progress: bool = True,
        **predict_kwargs: Any,
    ) -> dict[str, float]:
        """Evaluate on a batch of items using parallel processing.

        Either `predictions_batch` or `input_data_batch` must be provided.

        Args:
            ground_truth_batch (list[GroundTruthType]): List of corresponding
                ground truth annotations.
            predictions_batch (list[PredictionType], optional): A pre-computed
                list of predictions.
            input_data_batch (list[np.ndarray], optional): List of input data
                to generate predictions from.
            num_workers (int): Number of parallel workers for calculating metrics.
            show_progress (bool): Whether to show a progress bar.
            **predict_kwargs (Any): Additional arguments passed to `predict_batch`.

        Returns:
            dict[str, float]: Dictionary containing aggregated evaluation metrics.

        Raises:
            ValueError: If the number of predictions does not match the number
                of ground truths.
        """
        if predictions_batch is None:
            if input_data_batch is not None:
                predictions_batch = self.predict_batch(
                    input_data_batch,
                    show_progress=show_progress,
                    **predict_kwargs,
                )
            else:
                raise ValueError(
                    "Either 'predictions_batch' or 'input_data_batch' must be provided.",
                )

        if len(predictions_batch) != len(ground_truth_batch):
            raise ValueError(
                f"Number of predictions ({len(predictions_batch)}) must match "
                f"number of ground truths ({len(ground_truth_batch)}).",
            )

        per_item_metrics = self._calculate_metrics_parallel(
            predictions_batch,
            ground_truth_batch,
            num_workers,
            show_progress,
        )
        aggregated_metrics = self._aggregate_metrics(per_item_metrics)
        final_report = self._finalize_evaluation_report(
            aggregated_metrics,
            predictions_batch,
            ground_truth_batch,
        )
        return final_report

    def get_model_info(self) -> dict[str, Any]:
        """Gets information about the loaded model.

        Returns:
            dict[str, Any]: A dictionary containing details about the model, such
            as architecture, path, etc.
        """
        return {
            "predictor_type": self.predictor_type,
            "model_path": str(self._model_path),
            "model_loaded": self._model_loaded,
            "config": self.config.model_dump(),
        }

    def load_model(self) -> None:
        """Loads the model if it is not already loaded.

        This is a convenience wrapper around `_load_model` that prevents
        reloading.

        Raises:
            RuntimeError: If model loading fails.
        """
        if self._model_loaded:
            self._logger.info(f"Model for {self.predictor_type} already loaded")
            return

        try:
            self._logger.info(
                f"Loading model for {self.predictor_type} from {self._model_path}",
            )
            self._load_model()
            self._model_loaded = True
            self._logger.info(f"Successfully loaded model for {self.predictor_type}")
        except Exception as e:
            self._logger.error(f"Failed to load model for {self.predictor_type}: {e}")
            raise RuntimeError(
                f"Failed to load model for {self.predictor_type}: {e}",
            ) from e

    def predict_batch(
        self,
        input_data_batch: list[np.ndarray],
        show_progress: bool = True,
        **kwargs: Any,
    ) -> list[PredictionType]:
        """Makes predictions on a batch of inputs.

        This base implementation processes items serially. Subclasses with
        native batching capabilities SHOULD override this method.

        Args:
            input_data_batch (list[np.ndarray]): List of input data to make
                predictions on.
            show_progress (bool): Whether to show a progress bar.
            **kwargs (Any): Additional arguments passed to each `predict` call.

        Returns:
            list[PredictionType]: List of predictions.

        Raises:
            RuntimeError: If model fails to load or predict.
        """
        if not input_data_batch:
            return []

        if not self._model_loaded:
            self.load_model()
            if not self._model_loaded:
                raise RuntimeError("Failed to load model for batch prediction")

        in_notebook = "ipykernel" in sys.modules
        tqdm_iterator = tqdm_notebook if in_notebook else tqdm_console
        iterator = input_data_batch

        if show_progress:
            iterator = tqdm_iterator(
                input_data_batch,
                desc=f"Predicting batch ({self.predictor_type})",
                leave=False,
            )
        try:
            return [self.predict(item, **kwargs) for item in iterator]
        except Exception as e:
            self._logger.error(f"Batch prediction failed: {e}", exc_info=True)
            raise RuntimeError(f"Batch prediction failed: {e}") from e

    def unload_model(self) -> None:
        """Unloads the model to free memory."""
        if self._model_loaded:
            self._model = None
            self._model_loaded = False
            self._logger.info(f"Unloaded model for {self.predictor_type}")

    # Abstract Methods
    @abstractmethod
    def _evaluate_from_prediction(
        self,
        prediction: PredictionType,
        ground_truth: GroundTruthType,
    ) -> dict[str, float]:
        """The core metric calculation logic for a single item.

        Args:
            prediction (PredictionType): Model prediction.
            ground_truth (GroundTruthType): Ground truth annotation.

        Returns:
            dict[str, float]: Dictionary containing evaluation metrics.
        """
        pass

    @abstractmethod
    def _load_model(self) -> None:
        """Loads the model from the path specified in the configuration.

        This method must be implemented by child classes. It should handle
        the specifics of loading a model file (e.g., PyTorch, TensorFlow)
        and assign it to an internal attribute like `self._model`.

        Raises:
            RuntimeError: If the model file cannot be found or loaded.
        """
        pass

    @abstractmethod
    def predict(self, input_data: np.ndarray, **kwargs: Any) -> PredictionType:
        """Makes a prediction on a single input data sample.

        Args:
            input_data (np.ndarray): The input data (e.g., an image as a NumPy
                array) to make a prediction on.
            **kwargs (Any): Additional predictor-specific arguments.

        Returns:
            PredictionType: The prediction result, with a format specific to the
            predictor type.

        Raises:
            RuntimeError: If the model is not loaded before calling this method.
        """
        pass

    @abstractmethod
    def visualize(
        self,
        input_data: np.ndarray,
        predictions: PredictionType,
        save_path: str | Path | None = None,
    ) -> np.ndarray:
        """Visualizes the predictions on the input data.

        Args:
            input_data (np.ndarray): The original input data (e.g., an image).
            predictions (PredictionType): The prediction result obtained from
                the `predict` method.
            save_path (str | Path, optional): An optional path to save the
                visualization to a file.

        Returns:
            np.ndarray: A NumPy array representing the visualized image.
        """
        pass

    # Protected Methods
    def _aggregate_metrics(
        self,
        metrics_list: list[dict[str, float]],
    ) -> dict[str, float]:
        """Aggregates metrics from multiple evaluations."""
        if not metrics_list:
            return {}

        valid_metrics = [m for m in metrics_list if m]
        if not valid_metrics:
            self._logger.warning("No valid metrics found for aggregation")
            return {}

        all_keys = {k for m in valid_metrics for k in m.keys()}
        aggregated = {}
        for key in all_keys:
            values = [m[key] for m in valid_metrics if key in m]
            if values:
                aggregated[f"{key}_mean"] = float(np.mean(values))
                aggregated[f"{key}_std"] = float(np.std(values))

        aggregated["count"] = len(valid_metrics)
        return aggregated

    def _calculate_metrics_parallel(
        self,
        predictions: list[PredictionType],
        ground_truths: list[GroundTruthType],
        num_workers: int = 4,
        show_progress: bool = True,
    ) -> list[dict[str, float]]:
        """Calculates metrics for individual items in parallel."""
        per_item_metrics = []
        in_notebook = "ipykernel" in sys.modules
        tqdm_iterator = tqdm_notebook if in_notebook else tqdm_console

        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            future_to_idx = {
                executor.submit(
                    self._evaluate_from_prediction,
                    predictions[i],
                    ground_truths[i],
                ): i
                for i in range(len(predictions))
            }
            iterator = as_completed(future_to_idx)
            if show_progress:
                iterator = tqdm_iterator(
                    iterator,
                    total=len(future_to_idx),
                    desc="Calculating metrics",
                )
            for future in iterator:
                try:
                    per_item_metrics.append(future.result())
                except Exception as e:
                    self._logger.error(
                        f"Error calculating metrics for item {future_to_idx[future]}: {e}",
                    )
                    per_item_metrics.append({})
        return per_item_metrics

    def _finalize_evaluation_report(
        self,
        aggregated_metrics: dict[str, float],
        predictions: list[PredictionType],
        ground_truths: list[GroundTruthType],
    ) -> dict[str, float]:
        """Optional hook to post-process the final evaluation report."""
        return aggregated_metrics

    def _get_predictor_config(self) -> PredictorConfig:
        """Gets the configuration for this predictor.

        Returns:
            PredictorConfig: A Pydantic `PredictorConfig` model for this
            predictor instance.

        Raises:
            ValueError: If the configuration is invalid.
        """
        config = self.settings.get_config(f"predictors.{self.predictor_type}")
        if not isinstance(config, PredictorConfig):
            raise ValueError(
                f"Configuration for predictor '{self.predictor_type}' not found or is invalid.",
            )
        return config
settings = settings instance-attribute
predictor_type = predictor_type instance-attribute
config: PredictorConfig property

Get the predictor configuration Pydantic model.

model_loaded: bool property

Check if the model is loaded.

model_path: Path property

Gets the path to the model weights file.

__init__(settings: Settings, predictor_type: str, weights_manager: WeightsManagerProtocol, load_model: bool = False)

Initializes the predictor.

Raises:

Type Description
ValueError

If the configuration for the specified predictor_type is not found in the settings.

Source code in culicidaelab/core/base_predictor.py
def __init__(
    self,
    settings: Settings,
    predictor_type: str,
    weights_manager: WeightsManagerProtocol,
    load_model: bool = False,
):
    """Initializes the predictor.

    Raises:
        ValueError: If the configuration for the specified `predictor_type`
            is not found in the settings.
    """
    self.settings = settings
    self.predictor_type = predictor_type

    self._weights_manager = weights_manager
    self._model_path = self._weights_manager.ensure_weights(self.predictor_type)
    self._config: PredictorConfig = self._get_predictor_config()

    self._model = None
    self._model_loaded = False
    self._logger = logging.getLogger(
        f"culicidaelab.predictor.{self.predictor_type}",
    )

    if load_model:
        self.load_model()
__call__(input_data: np.ndarray, **kwargs: Any) -> Any

Convenience method that calls predict().

Source code in culicidaelab/core/base_predictor.py
def __call__(self, input_data: np.ndarray, **kwargs: Any) -> Any:
    """Convenience method that calls `predict()`."""
    if not self._model_loaded:
        self.load_model()
    return self.predict(input_data, **kwargs)
__enter__()

Context manager entry.

Source code in culicidaelab/core/base_predictor.py
def __enter__(self):
    """Context manager entry."""
    if not self._model_loaded:
        self.load_model()
    return self
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in culicidaelab/core/base_predictor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""
    pass
model_context()

A context manager for temporary model loading.

Ensures the model is loaded upon entering the context and unloaded upon exiting. This is useful for managing memory in pipelines.

Yields:

Name Type Description
BasePredictor

The predictor instance itself.

Example

with predictor.model_context(): ... predictions = predictor.predict(data)

Source code in culicidaelab/core/base_predictor.py
@contextmanager
def model_context(self):
    """A context manager for temporary model loading.

    Ensures the model is loaded upon entering the context and unloaded
    upon exiting. This is useful for managing memory in pipelines.

    Yields:
        BasePredictor: The predictor instance itself.

    Example:
        >>> with predictor.model_context():
        ...     predictions = predictor.predict(data)
    """
    was_loaded = self._model_loaded
    try:
        if not was_loaded:
            self.load_model()
        yield self
    finally:
        if not was_loaded and self._model_loaded:
            self.unload_model()
evaluate(ground_truth: GroundTruthType, prediction: PredictionType | None = None, input_data: np.ndarray | None = None, **predict_kwargs: Any) -> dict[str, float]

Evaluate a prediction against a ground truth.

Either prediction or input_data must be provided. If prediction is provided, it is used directly. If prediction is None, input_data is used to generate a new prediction.

Parameters:

Name Type Description Default
ground_truth GroundTruthType

The ground truth annotation.

required
prediction PredictionType

A pre-computed prediction.

None
input_data ndarray

Input data to generate a prediction from, if one isn't provided.

None
**predict_kwargs Any

Additional arguments passed to the predict method.

{}

Returns:

Type Description
dict[str, float]

dict[str, float]: Dictionary containing evaluation metrics for a

dict[str, float]

single item.

Raises:

Type Description
ValueError

If neither prediction nor input_data is provided.

Source code in culicidaelab/core/base_predictor.py
def evaluate(
    self,
    ground_truth: GroundTruthType,
    prediction: PredictionType | None = None,
    input_data: np.ndarray | None = None,
    **predict_kwargs: Any,
) -> dict[str, float]:
    """Evaluate a prediction against a ground truth.

    Either `prediction` or `input_data` must be provided. If `prediction`
    is provided, it is used directly. If `prediction` is None, `input_data`
    is used to generate a new prediction.

    Args:
        ground_truth (GroundTruthType): The ground truth annotation.
        prediction (PredictionType, optional): A pre-computed prediction.
        input_data (np.ndarray, optional): Input data to generate a
            prediction from, if one isn't provided.
        **predict_kwargs (Any): Additional arguments passed to the `predict`
            method.

    Returns:
        dict[str, float]: Dictionary containing evaluation metrics for a
        single item.

    Raises:
        ValueError: If neither `prediction` nor `input_data` is provided.
    """
    if prediction is None:
        if input_data is not None:
            prediction = self.predict(input_data, **predict_kwargs)
        else:
            raise ValueError(
                "Either 'prediction' or 'input_data' must be provided.",
            )
    return self._evaluate_from_prediction(
        prediction=prediction,
        ground_truth=ground_truth,
    )
evaluate_batch(ground_truth_batch: list[GroundTruthType], predictions_batch: list[PredictionType] | None = None, input_data_batch: list[np.ndarray] | None = None, num_workers: int = 4, show_progress: bool = True, **predict_kwargs: Any) -> dict[str, float]

Evaluate on a batch of items using parallel processing.

Either predictions_batch or input_data_batch must be provided.

Parameters:

Name Type Description Default
ground_truth_batch list[GroundTruthType]

List of corresponding ground truth annotations.

required
predictions_batch list[PredictionType]

A pre-computed list of predictions.

None
input_data_batch list[ndarray]

List of input data to generate predictions from.

None
num_workers int

Number of parallel workers for calculating metrics.

4
show_progress bool

Whether to show a progress bar.

True
**predict_kwargs Any

Additional arguments passed to predict_batch.

{}

Returns:

Type Description
dict[str, float]

dict[str, float]: Dictionary containing aggregated evaluation metrics.

Raises:

Type Description
ValueError

If the number of predictions does not match the number of ground truths.

Source code in culicidaelab/core/base_predictor.py
def evaluate_batch(
    self,
    ground_truth_batch: list[GroundTruthType],
    predictions_batch: list[PredictionType] | None = None,
    input_data_batch: list[np.ndarray] | None = None,
    num_workers: int = 4,
    show_progress: bool = True,
    **predict_kwargs: Any,
) -> dict[str, float]:
    """Evaluate on a batch of items using parallel processing.

    Either `predictions_batch` or `input_data_batch` must be provided.

    Args:
        ground_truth_batch (list[GroundTruthType]): List of corresponding
            ground truth annotations.
        predictions_batch (list[PredictionType], optional): A pre-computed
            list of predictions.
        input_data_batch (list[np.ndarray], optional): List of input data
            to generate predictions from.
        num_workers (int): Number of parallel workers for calculating metrics.
        show_progress (bool): Whether to show a progress bar.
        **predict_kwargs (Any): Additional arguments passed to `predict_batch`.

    Returns:
        dict[str, float]: Dictionary containing aggregated evaluation metrics.

    Raises:
        ValueError: If the number of predictions does not match the number
            of ground truths.
    """
    if predictions_batch is None:
        if input_data_batch is not None:
            predictions_batch = self.predict_batch(
                input_data_batch,
                show_progress=show_progress,
                **predict_kwargs,
            )
        else:
            raise ValueError(
                "Either 'predictions_batch' or 'input_data_batch' must be provided.",
            )

    if len(predictions_batch) != len(ground_truth_batch):
        raise ValueError(
            f"Number of predictions ({len(predictions_batch)}) must match "
            f"number of ground truths ({len(ground_truth_batch)}).",
        )

    per_item_metrics = self._calculate_metrics_parallel(
        predictions_batch,
        ground_truth_batch,
        num_workers,
        show_progress,
    )
    aggregated_metrics = self._aggregate_metrics(per_item_metrics)
    final_report = self._finalize_evaluation_report(
        aggregated_metrics,
        predictions_batch,
        ground_truth_batch,
    )
    return final_report
get_model_info() -> dict[str, Any]

Gets information about the loaded model.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A dictionary containing details about the model, such

dict[str, Any]

as architecture, path, etc.

Source code in culicidaelab/core/base_predictor.py
def get_model_info(self) -> dict[str, Any]:
    """Gets information about the loaded model.

    Returns:
        dict[str, Any]: A dictionary containing details about the model, such
        as architecture, path, etc.
    """
    return {
        "predictor_type": self.predictor_type,
        "model_path": str(self._model_path),
        "model_loaded": self._model_loaded,
        "config": self.config.model_dump(),
    }
load_model() -> None

Loads the model if it is not already loaded.

This is a convenience wrapper around _load_model that prevents reloading.

Raises:

Type Description
RuntimeError

If model loading fails.

Source code in culicidaelab/core/base_predictor.py
def load_model(self) -> None:
    """Loads the model if it is not already loaded.

    This is a convenience wrapper around `_load_model` that prevents
    reloading.

    Raises:
        RuntimeError: If model loading fails.
    """
    if self._model_loaded:
        self._logger.info(f"Model for {self.predictor_type} already loaded")
        return

    try:
        self._logger.info(
            f"Loading model for {self.predictor_type} from {self._model_path}",
        )
        self._load_model()
        self._model_loaded = True
        self._logger.info(f"Successfully loaded model for {self.predictor_type}")
    except Exception as e:
        self._logger.error(f"Failed to load model for {self.predictor_type}: {e}")
        raise RuntimeError(
            f"Failed to load model for {self.predictor_type}: {e}",
        ) from e
predict_batch(input_data_batch: list[np.ndarray], show_progress: bool = True, **kwargs: Any) -> list[PredictionType]

Makes predictions on a batch of inputs.

This base implementation processes items serially. Subclasses with native batching capabilities SHOULD override this method.

Parameters:

Name Type Description Default
input_data_batch list[ndarray]

List of input data to make predictions on.

required
show_progress bool

Whether to show a progress bar.

True
**kwargs Any

Additional arguments passed to each predict call.

{}

Returns:

Type Description
list[PredictionType]

list[PredictionType]: List of predictions.

Raises:

Type Description
RuntimeError

If model fails to load or predict.

Source code in culicidaelab/core/base_predictor.py
def predict_batch(
    self,
    input_data_batch: list[np.ndarray],
    show_progress: bool = True,
    **kwargs: Any,
) -> list[PredictionType]:
    """Makes predictions on a batch of inputs.

    This base implementation processes items serially. Subclasses with
    native batching capabilities SHOULD override this method.

    Args:
        input_data_batch (list[np.ndarray]): List of input data to make
            predictions on.
        show_progress (bool): Whether to show a progress bar.
        **kwargs (Any): Additional arguments passed to each `predict` call.

    Returns:
        list[PredictionType]: List of predictions.

    Raises:
        RuntimeError: If model fails to load or predict.
    """
    if not input_data_batch:
        return []

    if not self._model_loaded:
        self.load_model()
        if not self._model_loaded:
            raise RuntimeError("Failed to load model for batch prediction")

    in_notebook = "ipykernel" in sys.modules
    tqdm_iterator = tqdm_notebook if in_notebook else tqdm_console
    iterator = input_data_batch

    if show_progress:
        iterator = tqdm_iterator(
            input_data_batch,
            desc=f"Predicting batch ({self.predictor_type})",
            leave=False,
        )
    try:
        return [self.predict(item, **kwargs) for item in iterator]
    except Exception as e:
        self._logger.error(f"Batch prediction failed: {e}", exc_info=True)
        raise RuntimeError(f"Batch prediction failed: {e}") from e
unload_model() -> None

Unloads the model to free memory.

Source code in culicidaelab/core/base_predictor.py
def unload_model(self) -> None:
    """Unloads the model to free memory."""
    if self._model_loaded:
        self._model = None
        self._model_loaded = False
        self._logger.info(f"Unloaded model for {self.predictor_type}")
predict(input_data: np.ndarray, **kwargs: Any) -> PredictionType abstractmethod

Makes a prediction on a single input data sample.

Parameters:

Name Type Description Default
input_data ndarray

The input data (e.g., an image as a NumPy array) to make a prediction on.

required
**kwargs Any

Additional predictor-specific arguments.

{}

Returns:

Name Type Description
PredictionType PredictionType

The prediction result, with a format specific to the

PredictionType

predictor type.

Raises:

Type Description
RuntimeError

If the model is not loaded before calling this method.

Source code in culicidaelab/core/base_predictor.py
@abstractmethod
def predict(self, input_data: np.ndarray, **kwargs: Any) -> PredictionType:
    """Makes a prediction on a single input data sample.

    Args:
        input_data (np.ndarray): The input data (e.g., an image as a NumPy
            array) to make a prediction on.
        **kwargs (Any): Additional predictor-specific arguments.

    Returns:
        PredictionType: The prediction result, with a format specific to the
        predictor type.

    Raises:
        RuntimeError: If the model is not loaded before calling this method.
    """
    pass
visualize(input_data: np.ndarray, predictions: PredictionType, save_path: str | Path | None = None) -> np.ndarray abstractmethod

Visualizes the predictions on the input data.

Parameters:

Name Type Description Default
input_data ndarray

The original input data (e.g., an image).

required
predictions PredictionType

The prediction result obtained from the predict method.

required
save_path str | Path

An optional path to save the visualization to a file.

None

Returns:

Type Description
ndarray

np.ndarray: A NumPy array representing the visualized image.

Source code in culicidaelab/core/base_predictor.py
@abstractmethod
def visualize(
    self,
    input_data: np.ndarray,
    predictions: PredictionType,
    save_path: str | Path | None = None,
) -> np.ndarray:
    """Visualizes the predictions on the input data.

    Args:
        input_data (np.ndarray): The original input data (e.g., an image).
        predictions (PredictionType): The prediction result obtained from
            the `predict` method.
        save_path (str | Path, optional): An optional path to save the
            visualization to a file.

    Returns:
        np.ndarray: A NumPy array representing the visualized image.
    """
    pass
BaseProvider

Abstract base class for all data and model providers.

Source code in culicidaelab/core/base_provider.py
class BaseProvider(ABC):
    """Abstract base class for all data and model providers."""

    @abstractmethod
    def download_dataset(
        self,
        dataset_name: str,
        save_dir: str | None = None,
        *args: Any,
        **kwargs: Any,
    ) -> Path:
        """Downloads a dataset from a source.

        Args:
            dataset_name (str): The name of the dataset to download.
            save_dir (str, optional): The directory to save the dataset.
                Defaults to None.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments to pass to the download method.

        Returns:
            Path: The path to the downloaded dataset.
        """
        raise NotImplementedError("Subclasses must implement this method")

    @abstractmethod
    def download_model_weights(
        self,
        model_type: str,
        *args: Any,
        **kwargs: Any,
    ) -> Path:
        """Downloads model weights and returns the path to them.

        Args:
            model_type (str): The type of model (e.g., 'detection', 'classification').
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.

        Returns:
            Path: The path to the model weights file.
        """
        raise NotImplementedError("Subclasses must implement this method")

    @abstractmethod
    def get_provider_name(self) -> str:
        """Gets the unique name of the provider.

        Returns:
            str: A string representing the provider's name (e.g., 'huggingface').
        """
        pass

    @abstractmethod
    def load_dataset(
        self,
        dataset_path: str | Path,
        **kwargs: Any,
    ) -> Any:
        """Loads a dataset from a local path.

        Args:
            dataset_path (str | Path): The local path to the dataset, typically
                returned by `download_dataset`.
            **kwargs: Additional keyword arguments for loading.

        Returns:
            Any: The loaded dataset object (e.g., a Hugging Face Dataset, a
            PyTorch Dataset, or a Pandas DataFrame).
        """
        raise NotImplementedError("Subclasses must implement this method")
download_dataset(dataset_name: str, save_dir: str | None = None, *args: Any, **kwargs: Any) -> Path abstractmethod

Downloads a dataset from a source.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset to download.

required
save_dir str

The directory to save the dataset. Defaults to None.

None
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments to pass to the download method.

{}

Returns:

Name Type Description
Path Path

The path to the downloaded dataset.

Source code in culicidaelab/core/base_provider.py
@abstractmethod
def download_dataset(
    self,
    dataset_name: str,
    save_dir: str | None = None,
    *args: Any,
    **kwargs: Any,
) -> Path:
    """Downloads a dataset from a source.

    Args:
        dataset_name (str): The name of the dataset to download.
        save_dir (str, optional): The directory to save the dataset.
            Defaults to None.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments to pass to the download method.

    Returns:
        Path: The path to the downloaded dataset.
    """
    raise NotImplementedError("Subclasses must implement this method")
download_model_weights(model_type: str, *args: Any, **kwargs: Any) -> Path abstractmethod

Downloads model weights and returns the path to them.

Parameters:

Name Type Description Default
model_type str

The type of model (e.g., 'detection', 'classification').

required
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}

Returns:

Name Type Description
Path Path

The path to the model weights file.

Source code in culicidaelab/core/base_provider.py
@abstractmethod
def download_model_weights(
    self,
    model_type: str,
    *args: Any,
    **kwargs: Any,
) -> Path:
    """Downloads model weights and returns the path to them.

    Args:
        model_type (str): The type of model (e.g., 'detection', 'classification').
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        Path: The path to the model weights file.
    """
    raise NotImplementedError("Subclasses must implement this method")
get_provider_name() -> str abstractmethod

Gets the unique name of the provider.

Returns:

Name Type Description
str str

A string representing the provider's name (e.g., 'huggingface').

Source code in culicidaelab/core/base_provider.py
@abstractmethod
def get_provider_name(self) -> str:
    """Gets the unique name of the provider.

    Returns:
        str: A string representing the provider's name (e.g., 'huggingface').
    """
    pass
load_dataset(dataset_path: str | Path, **kwargs: Any) -> Any abstractmethod

Loads a dataset from a local path.

Parameters:

Name Type Description Default
dataset_path str | Path

The local path to the dataset, typically returned by download_dataset.

required
**kwargs Any

Additional keyword arguments for loading.

{}

Returns:

Name Type Description
Any Any

The loaded dataset object (e.g., a Hugging Face Dataset, a

Any

PyTorch Dataset, or a Pandas DataFrame).

Source code in culicidaelab/core/base_provider.py
@abstractmethod
def load_dataset(
    self,
    dataset_path: str | Path,
    **kwargs: Any,
) -> Any:
    """Loads a dataset from a local path.

    Args:
        dataset_path (str | Path): The local path to the dataset, typically
            returned by `download_dataset`.
        **kwargs: Additional keyword arguments for loading.

    Returns:
        Any: The loaded dataset object (e.g., a Hugging Face Dataset, a
        PyTorch Dataset, or a Pandas DataFrame).
    """
    raise NotImplementedError("Subclasses must implement this method")
WeightsManagerProtocol

Defines the interface for any class that manages model weights.

This protocol ensures that core components can work with any weights manager without depending on its concrete implementation.

Source code in culicidaelab/core/weights_manager_protocol.py
class WeightsManagerProtocol(Protocol):
    """Defines the interface for any class that manages model weights.

    This protocol ensures that core components can work with any weights
    manager without depending on its concrete implementation.
    """

    def ensure_weights(self, predictor_type: str) -> Path:
        """Ensures weights for a given predictor type are available locally.

        This method might download the weights if they are missing or simply
        return the path if they already exist.

        Args:
            predictor_type (str): The key for the predictor (e.g., 'classifier').

        Returns:
            Path: The local path to the model weights file.
        """
        ...
ensure_weights(predictor_type: str) -> Path

Ensures weights for a given predictor type are available locally.

This method might download the weights if they are missing or simply return the path if they already exist.

Parameters:

Name Type Description Default
predictor_type str

The key for the predictor (e.g., 'classifier').

required

Returns:

Name Type Description
Path Path

The local path to the model weights file.

Source code in culicidaelab/core/weights_manager_protocol.py
def ensure_weights(self, predictor_type: str) -> Path:
    """Ensures weights for a given predictor type are available locally.

    This method might download the weights if they are missing or simply
    return the path if they already exist.

    Args:
        predictor_type (str): The key for the predictor (e.g., 'classifier').

    Returns:
        Path: The local path to the model weights file.
    """
    ...
ConfigManager

Handles loading, merging, and validating configurations for the library.

This manager implements a robust loading strategy: 1. Loads default YAML configurations bundled with the library. 2. Loads user-provided YAML configurations from a specified directory. 3. Merges the user's configuration on top of the defaults. 4. Validates the final merged configuration against Pydantic models.

Parameters:

Name Type Description Default
user_config_dir str | Path

Path to a directory containing user-defined YAML configuration files. These will override the defaults.

None

Attributes:

Name Type Description
user_config_dir Path | None

The user configuration directory.

default_config_path Path

The path to the default config directory.

config CulicidaeLabConfig

The validated configuration object.

Source code in culicidaelab/core/config_manager.py
class ConfigManager:
    """Handles loading, merging, and validating configurations for the library.

    This manager implements a robust loading strategy:
    1. Loads default YAML configurations bundled with the library.
    2. Loads user-provided YAML configurations from a specified directory.
    3. Merges the user's configuration on top of the defaults.
    4. Validates the final merged configuration against Pydantic models.

    Args:
        user_config_dir (str | Path, optional): Path to a directory containing
            user-defined YAML configuration files. These will override the defaults.

    Attributes:
        user_config_dir (Path | None): The user configuration directory.
        default_config_path (Path): The path to the default config directory.
        config (CulicidaeLabConfig): The validated configuration object.
    """

    def __init__(self, user_config_dir: str | Path | None = None):
        """Initializes the ConfigManager."""
        self.user_config_dir = Path(user_config_dir) if user_config_dir else None
        self.default_config_path = self._get_default_config_path()
        self.config: CulicidaeLabConfig = self._load()

    def get_config(self) -> CulicidaeLabConfig:
        """Returns the fully validated Pydantic configuration object.

        Returns:
            CulicidaeLabConfig: The `CulicidaeLabConfig` Pydantic model instance.
        """
        return self.config

    def instantiate_from_config(self, config_obj: Any, **kwargs: Any) -> Any:
        """Instantiates a Python object from its Pydantic config model.

        The config model must have a `_target_` field specifying the fully
        qualified class path (e.g., 'my_module.my_class.MyClass').

        Args:
            config_obj (Any): A Pydantic model instance (e.g., a predictor config).
            **kwargs (Any): Additional keyword arguments to pass to the object's
                constructor, overriding any existing parameters in the config.

        Returns:
            Any: An instantiated Python object.

        Raises:
            ValueError: If the `_target_` key is not found in the config object.
            ImportError: If the class could not be imported and instantiated.
        """
        if not hasattr(config_obj, "target_"):
            raise ValueError("Target key '_target_' not found in configuration object")

        target_path = config_obj.target_
        config_params = config_obj.model_dump()
        config_params.pop("target_", None)
        config_params.update(kwargs)

        try:
            module_path, class_name = target_path.rsplit(".", 1)
            module = __import__(module_path, fromlist=[class_name])
            cls = getattr(module, class_name)
            return cls(**config_params)
        except (ValueError, ImportError, AttributeError) as e:
            raise ImportError(
                f"Could not import and instantiate '{target_path}': {e}",
            )

    def save_config(self, file_path: str | Path) -> None:
        """Saves the current configuration state to a YAML file.

        This is useful for exporting the fully merged and validated config.

        Args:
            file_path (str | Path): The path where the YAML config will be saved.
        """
        path = Path(file_path)
        path.parent.mkdir(parents=True, exist_ok=True)
        config_dict = self.config.model_dump(mode="json")
        OmegaConf.save(config=config_dict, f=path)

    def _get_default_config_path(self) -> Path:
        """Reliably finds the path to the bundled 'conf' directory."""
        try:
            files = resources.files("culicidaelab")
            # Check for Traversable with _path (for installed packages)
            if hasattr(files, "_path"):
                return Path(files._path) / "conf"
            # Otherwise, use string representation (for zip files, etc.)
            else:
                return Path(str(files)) / "conf"
        except (ModuleNotFoundError, FileNotFoundError):
            # Fallback for development mode
            dev_path = Path(__file__).parent.parent / "conf"
            if dev_path.exists():
                return dev_path
            raise FileNotFoundError(
                "Could not find the default 'conf' directory. "
                "Ensure the 'culicidaelab' package is installed correctly or "
                "you are in the project root.",
            )

    def _load(self) -> CulicidaeLabConfig:
        """Executes the full load, merge, and validation process."""
        default_config_dict = self._load_config_from_dir(
            cast(Path, self.default_config_path),
        )
        user_config_dict = self._load_config_from_dir(self.user_config_dir)

        # User configs override defaults
        merged_config = _deep_merge(user_config_dict, default_config_dict)

        try:
            validated_config = CulicidaeLabConfig(**merged_config)
            return validated_config
        except ValidationError as e:
            print(
                "FATAL: Configuration validation failed. Please check your " "YAML files or environment variables.",
            )
            print(e)
            raise

    def _load_config_from_dir(self, config_dir: Path | None) -> ConfigDict:
        """Loads all YAML files from a directory into a nested dictionary.

        Args:
            config_dir (Path | None): Directory containing YAML config files, or None.

        Returns:
            ConfigDict: A nested dictionary containing the loaded configuration.
        """
        config_dict: ConfigDict = {}
        if config_dir is None or not config_dir.is_dir():
            return config_dict

        for yaml_file in config_dir.glob("**/*.yaml"):
            try:
                with yaml_file.open("r") as f:
                    data = yaml.safe_load(f)
                    if data is None:
                        continue

                relative_path = yaml_file.relative_to(config_dir)
                keys = list(relative_path.parts[:-1]) + [relative_path.stem]

                d = config_dict
                for key in keys[:-1]:
                    d = d.setdefault(key, {})
                d[keys[-1]] = data
            except Exception as e:
                print(f"Warning: Could not load or parse {yaml_file}: {e}")
        return config_dict
user_config_dir = Path(user_config_dir) if user_config_dir else None instance-attribute
default_config_path = self._get_default_config_path() instance-attribute
config: CulicidaeLabConfig = self._load() instance-attribute
__init__(user_config_dir: str | Path | None = None)

Initializes the ConfigManager.

Source code in culicidaelab/core/config_manager.py
def __init__(self, user_config_dir: str | Path | None = None):
    """Initializes the ConfigManager."""
    self.user_config_dir = Path(user_config_dir) if user_config_dir else None
    self.default_config_path = self._get_default_config_path()
    self.config: CulicidaeLabConfig = self._load()
get_config() -> CulicidaeLabConfig

Returns the fully validated Pydantic configuration object.

Returns:

Name Type Description
CulicidaeLabConfig CulicidaeLabConfig

The CulicidaeLabConfig Pydantic model instance.

Source code in culicidaelab/core/config_manager.py
def get_config(self) -> CulicidaeLabConfig:
    """Returns the fully validated Pydantic configuration object.

    Returns:
        CulicidaeLabConfig: The `CulicidaeLabConfig` Pydantic model instance.
    """
    return self.config
instantiate_from_config(config_obj: Any, **kwargs: Any) -> Any

Instantiates a Python object from its Pydantic config model.

The config model must have a _target_ field specifying the fully qualified class path (e.g., 'my_module.my_class.MyClass').

Parameters:

Name Type Description Default
config_obj Any

A Pydantic model instance (e.g., a predictor config).

required
**kwargs Any

Additional keyword arguments to pass to the object's constructor, overriding any existing parameters in the config.

{}

Returns:

Name Type Description
Any Any

An instantiated Python object.

Raises:

Type Description
ValueError

If the _target_ key is not found in the config object.

ImportError

If the class could not be imported and instantiated.

Source code in culicidaelab/core/config_manager.py
def instantiate_from_config(self, config_obj: Any, **kwargs: Any) -> Any:
    """Instantiates a Python object from its Pydantic config model.

    The config model must have a `_target_` field specifying the fully
    qualified class path (e.g., 'my_module.my_class.MyClass').

    Args:
        config_obj (Any): A Pydantic model instance (e.g., a predictor config).
        **kwargs (Any): Additional keyword arguments to pass to the object's
            constructor, overriding any existing parameters in the config.

    Returns:
        Any: An instantiated Python object.

    Raises:
        ValueError: If the `_target_` key is not found in the config object.
        ImportError: If the class could not be imported and instantiated.
    """
    if not hasattr(config_obj, "target_"):
        raise ValueError("Target key '_target_' not found in configuration object")

    target_path = config_obj.target_
    config_params = config_obj.model_dump()
    config_params.pop("target_", None)
    config_params.update(kwargs)

    try:
        module_path, class_name = target_path.rsplit(".", 1)
        module = __import__(module_path, fromlist=[class_name])
        cls = getattr(module, class_name)
        return cls(**config_params)
    except (ValueError, ImportError, AttributeError) as e:
        raise ImportError(
            f"Could not import and instantiate '{target_path}': {e}",
        )
save_config(file_path: str | Path) -> None

Saves the current configuration state to a YAML file.

This is useful for exporting the fully merged and validated config.

Parameters:

Name Type Description Default
file_path str | Path

The path where the YAML config will be saved.

required
Source code in culicidaelab/core/config_manager.py
def save_config(self, file_path: str | Path) -> None:
    """Saves the current configuration state to a YAML file.

    This is useful for exporting the fully merged and validated config.

    Args:
        file_path (str | Path): The path where the YAML config will be saved.
    """
    path = Path(file_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    config_dict = self.config.model_dump(mode="json")
    OmegaConf.save(config=config_dict, f=path)
CulicidaeLabConfig

The root Pydantic model for all CulicidaeLab configurations.

It validates the entire configuration structure after loading from YAML files.

Source code in culicidaelab/core/config_models.py
class CulicidaeLabConfig(BaseModel):
    """The root Pydantic model for all CulicidaeLab configurations.

    It validates the entire configuration structure after loading from YAML files.
    """

    model_config = ConfigDict(extra="allow")
    app_settings: AppSettings = Field(default_factory=AppSettings)
    processing: ProcessingConfig = Field(default_factory=ProcessingConfig)
    datasets: dict[str, DatasetConfig] = {}
    predictors: dict[str, PredictorConfig] = {}
    providers: dict[str, ProviderConfig] = {}
    species: SpeciesModel = Field(default_factory=SpeciesModel)
model_config = ConfigDict(extra='allow') class-attribute instance-attribute
app_settings: AppSettings = Field(default_factory=AppSettings) class-attribute instance-attribute
processing: ProcessingConfig = Field(default_factory=ProcessingConfig) class-attribute instance-attribute
datasets: dict[str, DatasetConfig] = {} class-attribute instance-attribute
predictors: dict[str, PredictorConfig] = {} class-attribute instance-attribute
providers: dict[str, ProviderConfig] = {} class-attribute instance-attribute
species: SpeciesModel = Field(default_factory=SpeciesModel) class-attribute instance-attribute
DatasetConfig

Configuration for a single dataset.

Source code in culicidaelab/core/config_models.py
class DatasetConfig(BaseModel):
    """Configuration for a single dataset."""

    model_config = ConfigDict(extra="allow")
    name: str
    path: str
    format: str
    classes: list[str]
    provider_name: str
model_config = ConfigDict(extra='allow') class-attribute instance-attribute
name: str instance-attribute
path: str instance-attribute
format: str instance-attribute
classes: list[str] instance-attribute
provider_name: str instance-attribute
PredictorConfig

Configuration for a single predictor.

Source code in culicidaelab/core/config_models.py
class PredictorConfig(BaseModel):
    """Configuration for a single predictor."""

    model_config = ConfigDict(extra="allow", protected_namespaces=())
    target_: str = Field(..., alias="_target_")
    model_path: str
    confidence: float = 0.5
    device: str = "cpu"
    params: dict[str, Any] = {}
    repository_id: str | None = None
    filename: str | None = None
    provider_name: str | None = None
    model_arch: str | None = None
    model_config_path: str | None
    model_config_filename: str | None
    visualization: VisualizationConfig = Field(default_factory=VisualizationConfig)
model_config = ConfigDict(extra='allow', protected_namespaces=()) class-attribute instance-attribute
target_: str = Field(..., alias='_target_') class-attribute instance-attribute
model_path: str instance-attribute
confidence: float = 0.5 class-attribute instance-attribute
device: str = 'cpu' class-attribute instance-attribute
params: dict[str, Any] = {} class-attribute instance-attribute
repository_id: str | None = None class-attribute instance-attribute
filename: str | None = None class-attribute instance-attribute
provider_name: str | None = None class-attribute instance-attribute
model_arch: str | None = None class-attribute instance-attribute
model_config_path: str | None instance-attribute
model_config_filename: str | None instance-attribute
visualization: VisualizationConfig = Field(default_factory=VisualizationConfig) class-attribute instance-attribute
ProviderConfig

Configuration for a data provider.

Source code in culicidaelab/core/config_models.py
class ProviderConfig(BaseModel):
    """Configuration for a data provider."""

    model_config = ConfigDict(extra="allow")
    target_: str = Field(..., alias="_target_")
    dataset_url: str
    api_key: str | None = None
model_config = ConfigDict(extra='allow') class-attribute instance-attribute
target_: str = Field(..., alias='_target_') class-attribute instance-attribute
dataset_url: str instance-attribute
api_key: str | None = None class-attribute instance-attribute
SpeciesModel

Configuration for the entire 'species' section.

Source code in culicidaelab/core/config_models.py
class SpeciesModel(BaseModel):
    """Configuration for the entire 'species' section."""

    model_config = ConfigDict(extra="allow")
    species_classes: dict[int, str] = Field(default_factory=dict)
    species_metadata: SpeciesFiles = Field(default_factory=SpeciesFiles)
model_config = ConfigDict(extra='allow') class-attribute instance-attribute
species_classes: dict[int, str] = Field(default_factory=dict) class-attribute instance-attribute
species_metadata: SpeciesFiles = Field(default_factory=SpeciesFiles) class-attribute instance-attribute
SpeciesConfig

A user-friendly facade for accessing species configuration.

This class acts as an adapter, taking the complex, validated SpeciesModel object and providing simple methods and properties for accessing species data.

Parameters:

Name Type Description Default
config SpeciesModel

A validated SpeciesModel Pydantic object from the main settings.

required

Attributes:

Name Type Description
_config SpeciesModel

The source configuration model.

_species_map dict[int, str]

A mapping of class indices to full species names.

_reverse_species_map dict[str, int]

A reverse mapping of species names to indices.

_metadata_store dict

A store for species metadata.

Source code in culicidaelab/core/species_config.py
class SpeciesConfig:
    """A user-friendly facade for accessing species configuration.

    This class acts as an adapter, taking the complex, validated `SpeciesModel`
    object and providing simple methods and properties for accessing species data.

    Args:
        config (SpeciesModel): A validated `SpeciesModel` Pydantic object from
            the main settings.

    Attributes:
        _config (SpeciesModel): The source configuration model.
        _species_map (dict[int, str]): A mapping of class indices to full species names.
        _reverse_species_map (dict[str, int]): A reverse mapping of species
            names to indices.
        _metadata_store (dict): A store for species metadata.
    """

    def __init__(self, config: SpeciesModel):
        """Initializes the species configuration helper."""
        self._config = config
        self._species_map: dict[int, str] = {}
        class_to_full_name_map = self._config.species_metadata.species_info_mapping

        for idx, class_name in self._config.species_classes.items():
            full_name = class_to_full_name_map.get(class_name, class_name)
            self._species_map[idx] = full_name

        self._reverse_species_map: dict[str, int] = {name: idx for idx, name in self._species_map.items()}
        self._metadata_store: dict[
            str,
            SingleSpeciesMetadataModel,
        ] = self._config.species_metadata.species_metadata

    @property
    def species_map(self) -> dict[int, str]:
        """Gets the mapping of class indices to full, human-readable species names.

        Example:
            {0: "Aedes aegypti", 1: "Aedes albopictus"}
        """
        return self._species_map

    def get_index_by_species(self, species_name: str) -> int | None:
        """Gets the class index by its full species name.

        Args:
            species_name (str): The full name of the species.

        Returns:
            int | None: The integer class index, or None if not found.
        """
        return self._reverse_species_map.get(species_name)

    def get_species_by_index(self, index: int) -> str | None:
        """Gets the full species name by its class index.

        Args:
            index (int): The integer class index.

        Returns:
            str | None: The full species name as a string, or None if not found.
        """
        return self._species_map.get(index)

    def get_species_metadata(self, species_name: str) -> dict[str, Any] | None:
        """Gets the detailed metadata for a specific species as a dictionary.

        Args:
            species_name (str): The full name of the species (e.g., "Aedes aegypti").

        Returns:
            dict[str, Any] | None: A dictionary representing the species metadata,
            or None if not found.
        """
        model_object = self._metadata_store.get(species_name)
        return model_object.model_dump() if model_object else None

    def list_species_names(self) -> list[str]:
        """Returns a list of all configured full species names.

        Returns:
            list[str]: A list of strings, where each string is a species name.
        """
        return list(self._reverse_species_map.keys())
species_map: dict[int, str] property

Gets the mapping of class indices to full, human-readable species names.

Example

{0: "Aedes aegypti", 1: "Aedes albopictus"}

__init__(config: SpeciesModel)

Initializes the species configuration helper.

Source code in culicidaelab/core/species_config.py
def __init__(self, config: SpeciesModel):
    """Initializes the species configuration helper."""
    self._config = config
    self._species_map: dict[int, str] = {}
    class_to_full_name_map = self._config.species_metadata.species_info_mapping

    for idx, class_name in self._config.species_classes.items():
        full_name = class_to_full_name_map.get(class_name, class_name)
        self._species_map[idx] = full_name

    self._reverse_species_map: dict[str, int] = {name: idx for idx, name in self._species_map.items()}
    self._metadata_store: dict[
        str,
        SingleSpeciesMetadataModel,
    ] = self._config.species_metadata.species_metadata
get_index_by_species(species_name: str) -> int | None

Gets the class index by its full species name.

Parameters:

Name Type Description Default
species_name str

The full name of the species.

required

Returns:

Type Description
int | None

int | None: The integer class index, or None if not found.

Source code in culicidaelab/core/species_config.py
def get_index_by_species(self, species_name: str) -> int | None:
    """Gets the class index by its full species name.

    Args:
        species_name (str): The full name of the species.

    Returns:
        int | None: The integer class index, or None if not found.
    """
    return self._reverse_species_map.get(species_name)
get_species_by_index(index: int) -> str | None

Gets the full species name by its class index.

Parameters:

Name Type Description Default
index int

The integer class index.

required

Returns:

Type Description
str | None

str | None: The full species name as a string, or None if not found.

Source code in culicidaelab/core/species_config.py
def get_species_by_index(self, index: int) -> str | None:
    """Gets the full species name by its class index.

    Args:
        index (int): The integer class index.

    Returns:
        str | None: The full species name as a string, or None if not found.
    """
    return self._species_map.get(index)
get_species_metadata(species_name: str) -> dict[str, Any] | None

Gets the detailed metadata for a specific species as a dictionary.

Parameters:

Name Type Description Default
species_name str

The full name of the species (e.g., "Aedes aegypti").

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: A dictionary representing the species metadata,

dict[str, Any] | None

or None if not found.

Source code in culicidaelab/core/species_config.py
def get_species_metadata(self, species_name: str) -> dict[str, Any] | None:
    """Gets the detailed metadata for a specific species as a dictionary.

    Args:
        species_name (str): The full name of the species (e.g., "Aedes aegypti").

    Returns:
        dict[str, Any] | None: A dictionary representing the species metadata,
        or None if not found.
    """
    model_object = self._metadata_store.get(species_name)
    return model_object.model_dump() if model_object else None
list_species_names() -> list[str]

Returns a list of all configured full species names.

Returns:

Type Description
list[str]

list[str]: A list of strings, where each string is a species name.

Source code in culicidaelab/core/species_config.py
def list_species_names(self) -> list[str]:
    """Returns a list of all configured full species names.

    Returns:
        list[str]: A list of strings, where each string is a species name.
    """
    return list(self._reverse_species_map.keys())
ProviderService

Manages the instantiation and lifecycle of data providers.

Parameters:

Name Type Description Default
settings Settings

The main Settings object for the library.

required

Attributes:

Name Type Description
_settings Settings

The settings instance.

_providers dict[str, BaseProvider]

A cache of instantiated providers.

Source code in culicidaelab/core/provider_service.py
class ProviderService:
    """Manages the instantiation and lifecycle of data providers.

    Args:
        settings (Settings): The main `Settings` object for the library.

    Attributes:
        _settings (Settings): The settings instance.
        _providers (dict[str, BaseProvider]): A cache of instantiated providers.
    """

    def __init__(self, settings: Settings):
        """Initializes the ProviderService."""
        self._settings = settings
        self._providers: dict[str, BaseProvider] = {}

    def get_provider(self, provider_name: str) -> BaseProvider:
        """Retrieves an instantiated provider by its name.

        It looks up the provider's configuration, instantiates it if it hasn't
        been already, and caches it for future calls.

        Args:
            provider_name (str): The name of the provider (e.g., 'huggingface').

        Returns:
            BaseProvider: An instance of a class that inherits from `BaseProvider`.

        Raises:
            ValueError: If the provider is not found in the configuration.
        """
        if provider_name not in self._providers:
            provider_path = f"providers.{provider_name}"

            if not self._settings.get_config(provider_path):
                raise ValueError(
                    f"Provider '{provider_name}' not found in configuration.",
                )

            # Use `instantiate_from_config` from `Settings`
            provider_instance = self._settings.instantiate_from_config(
                provider_path,
                settings=self._settings,
            )
            self._providers[provider_name] = provider_instance

        return self._providers[provider_name]
__init__(settings: Settings)

Initializes the ProviderService.

Source code in culicidaelab/core/provider_service.py
def __init__(self, settings: Settings):
    """Initializes the ProviderService."""
    self._settings = settings
    self._providers: dict[str, BaseProvider] = {}
get_provider(provider_name: str) -> BaseProvider

Retrieves an instantiated provider by its name.

It looks up the provider's configuration, instantiates it if it hasn't been already, and caches it for future calls.

Parameters:

Name Type Description Default
provider_name str

The name of the provider (e.g., 'huggingface').

required

Returns:

Name Type Description
BaseProvider BaseProvider

An instance of a class that inherits from BaseProvider.

Raises:

Type Description
ValueError

If the provider is not found in the configuration.

Source code in culicidaelab/core/provider_service.py
def get_provider(self, provider_name: str) -> BaseProvider:
    """Retrieves an instantiated provider by its name.

    It looks up the provider's configuration, instantiates it if it hasn't
    been already, and caches it for future calls.

    Args:
        provider_name (str): The name of the provider (e.g., 'huggingface').

    Returns:
        BaseProvider: An instance of a class that inherits from `BaseProvider`.

    Raises:
        ValueError: If the provider is not found in the configuration.
    """
    if provider_name not in self._providers:
        provider_path = f"providers.{provider_name}"

        if not self._settings.get_config(provider_path):
            raise ValueError(
                f"Provider '{provider_name}' not found in configuration.",
            )

        # Use `instantiate_from_config` from `Settings`
        provider_instance = self._settings.instantiate_from_config(
            provider_path,
            settings=self._settings,
        )
        self._providers[provider_name] = provider_instance

    return self._providers[provider_name]
ResourceManager

Centralized resource management for models, datasets, and temporary files.

This class provides thread-safe operations for managing application resources, including models, datasets, cache files, and temporary workspaces.

Parameters:

Name Type Description Default
app_name str

Application name used for directory naming. If None, the name is loaded from pyproject.toml.

None
custom_base_dir str | Path

Custom base directory for all resources. If None, system-default paths are used.

None

Attributes:

Name Type Description
app_name str

The application name.

user_data_dir Path

User-specific data directory for persistent storage.

user_cache_dir Path

User-specific cache directory for temporary files.

temp_dir Path

Temporary directory for runtime operations.

model_dir Path

Directory for storing model files.

dataset_dir Path

Directory for storing dataset files.

downloads_dir Path

Directory for downloaded files.

Raises:

Type Description
ResourceManagerError

If initialization fails.

Source code in culicidaelab/core/resource_manager.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class ResourceManager:
    """Centralized resource management for models, datasets, and temporary files.

    This class provides thread-safe operations for managing application resources,
    including models, datasets, cache files, and temporary workspaces.

    Args:
        app_name (str, optional): Application name used for directory naming.
            If None, the name is loaded from `pyproject.toml`.
        custom_base_dir (str | Path, optional): Custom base directory for all
            resources. If None, system-default paths are used.

    Attributes:
        app_name (str): The application name.
        user_data_dir (Path): User-specific data directory for persistent storage.
        user_cache_dir (Path): User-specific cache directory for temporary files.
        temp_dir (Path): Temporary directory for runtime operations.
        model_dir (Path): Directory for storing model files.
        dataset_dir (Path): Directory for storing dataset files.
        downloads_dir (Path): Directory for downloaded files.

    Raises:
        ResourceManagerError: If initialization fails.
    """

    def __init__(
        self,
        app_name: str | None = None,
        custom_base_dir: str | Path | None = None,
    ):
        """Initializes resource paths with cross-platform compatibility."""
        self._lock = Lock()
        self._workspace_registry: dict[str, Path] = {}
        self.app_name = self._determine_app_name(app_name)
        self._initialize_paths(custom_base_dir)
        self._initialize_directories()
        logger.info(f"ResourceManager initialized for app: {self.app_name}")
        logger.debug(f"Resource directories: {self.get_all_directories()}")

    def __repr__(self) -> str:
        """String representation of ResourceManager."""
        return f"ResourceManager(app_name='{self.app_name}', " f"user_data_dir='{self.user_data_dir}')"

    @contextmanager
    def temp_workspace(self, prefix: str = "workspace", suffix: str = ""):
        """A context manager for temporary workspaces that auto-cleans on exit.

        Args:
            prefix (str): A prefix for the temporary directory name.
            suffix (str): A suffix for the temporary directory name.

        Yields:
            Path: The path to the temporary workspace.

        Example:
            >>> with resource_manager.temp_workspace("processing") as ws:
            ...     # Use ws for temporary operations
            ...     (ws / "temp.txt").write_text("data")
            # The workspace is automatically removed here.
        """
        workspace = self.create_temp_workspace(prefix, suffix)
        try:
            yield workspace
        finally:
            self.clean_temp_workspace(workspace, force=True)

    def clean_old_files(
        self,
        days: int = 5,
        include_cache: bool = True,
    ) -> dict[str, int]:
        """Cleans up old download and temporary files.

        Args:
            days (int): The number of days after which files are considered old.
            include_cache (bool): Whether to include the cache directory in cleanup.

        Returns:
            dict[str, int]: A dictionary with cleanup statistics.

        Raises:
            ValueError: If `days` is negative.
        """
        if days < 0:
            raise ValueError("Days must be non-negative")

        cleanup_stats = {"downloads_cleaned": 0, "temp_cleaned": 0, "cache_cleaned": 0}
        cutoff_time = time.time() - (days * 86400)

        cleanup_stats["downloads_cleaned"] = self._clean_directory(
            self.downloads_dir,
            cutoff_time,
        )
        cleanup_stats["temp_cleaned"] = self._clean_directory(
            self.temp_dir,
            cutoff_time,
        )
        if include_cache:
            cleanup_stats["cache_cleaned"] = self._clean_directory(
                self.user_cache_dir,
                cutoff_time,
            )

        logger.info(f"Cleanup completed: {cleanup_stats}")
        return cleanup_stats

    def clean_temp_workspace(self, workspace_path: Path, force: bool = False) -> None:
        """Cleans up a temporary workspace.

        Args:
            workspace_path (Path): The path to the workspace to clean.
            force (bool): If True, force remove even if not in a temp directory.

        Raises:
            ResourceManagerError: If cleanup fails.
            ValueError: If the workspace is outside the temp dir and `force=False`.
        """
        try:
            if not force and not self._is_safe_to_delete(workspace_path):
                raise ValueError(
                    "Cannot clean workspace outside of temp directory without force=True",
                )
            if workspace_path.exists():
                if workspace_path.is_dir():
                    shutil.rmtree(workspace_path)
                else:
                    workspace_path.unlink()
                logger.info(f"Cleaned workspace: {workspace_path}")

            with self._lock:
                self._workspace_registry.pop(workspace_path.name, None)
        except Exception as e:
            raise ResourceManagerError(
                f"Failed to clean workspace {workspace_path}: {e}",
            ) from e

    def create_checksum(self, file_path: str | Path, algorithm: str = "md5") -> str:
        """Creates a checksum for a file.

        Args:
            file_path (str | Path): The path to the file.
            algorithm (str): The hashing algorithm to use ('md5', 'sha1', 'sha256').

        Returns:
            str: The hexadecimal checksum string.

        Raises:
            ResourceManagerError: If the file does not exist or creation fails.
        """
        file_path = Path(file_path)
        if not file_path.exists():
            raise ResourceManagerError(f"File does not exist: {file_path}")
        try:
            hash_obj = hashlib.new(algorithm)
            with open(file_path, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_obj.update(chunk)
            return hash_obj.hexdigest()
        except Exception as e:
            raise ResourceManagerError(
                f"Failed to create checksum for {file_path}: {e}",
            ) from e

    def create_temp_workspace(
        self,
        prefix: str = "workspace",
        suffix: str = "",
    ) -> Path:
        """Creates a temporary workspace for runtime operations.

        Args:
            prefix (str): A prefix for the temporary directory name.
            suffix (str): A suffix for the temporary directory name.

        Returns:
            Path: The path to the created temporary workspace.

        Raises:
            ResourceManagerError: If workspace creation fails.
        """
        try:
            timestamp = str(int(time.time()))
            pid = str(os.getpid())
            workspace_name = f"{prefix}_{timestamp}_{pid}"
            if suffix:
                workspace_name += f"_{suffix}"

            temp_workspace = self.temp_dir / workspace_name
            temp_workspace.mkdir(parents=True, exist_ok=True)
            with self._lock:
                self._workspace_registry[workspace_name] = temp_workspace
            logger.info(f"Created temporary workspace: {temp_workspace}")
            return temp_workspace
        except Exception as e:
            raise ResourceManagerError(
                f"Failed to create temporary workspace: {e}",
            ) from e

    def get_all_directories(self) -> dict[str, Path]:
        """Gets all managed directories.

        Returns:
            dict[str, Path]: A dictionary mapping directory names to their paths.
        """
        return {
            "user_data_dir": self.user_data_dir,
            "user_cache_dir": self.user_cache_dir,
            "temp_dir": self.temp_dir,
            "model_dir": self.model_dir,
            "dataset_dir": self.dataset_dir,
            "downloads_dir": self.downloads_dir,
            "logs_dir": self.logs_dir,
            "config_dir": self.config_dir,
        }

    def get_cache_path(self, cache_name: str, create_if_missing: bool = True) -> Path:
        """Gets a path for cache files.

        Args:
            cache_name (str): The name of the cache.
            create_if_missing (bool): Whether to create the directory if it
                doesn't exist.

        Returns:
            Path: The path to the cache directory.
        """
        if not cache_name or not cache_name.strip():
            raise ValueError("Cache name cannot be empty")

        cache_path = self.user_cache_dir / self._sanitize_name(cache_name)
        if create_if_missing:
            self._create_directory(cache_path, "cache")
        return cache_path

    def get_dataset_path(
        self,
        dataset_name: str,
        create_if_missing: bool = True,
    ) -> Path:
        """Gets a standardized path for a specific dataset.

        Args:
            dataset_name (str): The name of the dataset.
            create_if_missing (bool): Whether to create the directory if it
                doesn't exist.

        Returns:
            Path: The absolute path to the dataset directory.
        """
        if not dataset_name or not dataset_name.strip():
            raise ValueError("Dataset name cannot be empty")

        dataset_path = self.dataset_dir / self._sanitize_name(dataset_name)
        if create_if_missing:
            self._create_directory(dataset_path, "dataset")
        return dataset_path

    def get_disk_usage(self) -> dict[str, dict[str, int | str]]:
        """Gets disk usage statistics for all managed directories.

        Returns:
            dict: A dictionary with disk usage information for each directory.
        """
        directories = {
            "user_data": self.user_data_dir,
            "cache": self.user_cache_dir,
            "models": self.model_dir,
            "datasets": self.dataset_dir,
            "downloads": self.downloads_dir,
            "temp": self.temp_dir,
        }
        return {name: self._get_directory_size(path) for name, path in directories.items()}

    def get_model_path(self, model_name: str, create_if_missing: bool = True) -> Path:
        """Gets a standardized path for a specific model.

        Args:
            model_name (str): The name of the model.
            create_if_missing (bool): Whether to create the directory if it
                doesn't exist.

        Returns:
            Path: The absolute path to the model directory.
        """
        if not model_name or not model_name.strip():
            raise ValueError("Model name cannot be empty")

        model_path = self.model_dir / self._sanitize_name(model_name)
        if create_if_missing:
            self._create_directory(model_path, "model")
        return model_path

    def verify_checksum(
        self,
        file_path: str | Path,
        expected_checksum: str,
        algorithm: str = "md5",
    ) -> bool:
        """Verifies a file's checksum.

        Args:
            file_path (str | Path): The path to the file.
            expected_checksum (str): The expected checksum value.
            algorithm (str): The hashing algorithm used.

        Returns:
            bool: True if the checksum matches, False otherwise.
        """
        try:
            actual_checksum = self.create_checksum(file_path, algorithm)
            return actual_checksum.lower() == expected_checksum.lower()
        except ResourceManagerError as e:
            logger.error(f"Checksum verification failed: {e}")
            return False

    def _clean_directory(self, directory: Path, cutoff_time: float) -> int:
        """Cleans files older than `cutoff_time` in a directory."""
        cleaned_count = 0
        if not directory.exists():
            return cleaned_count

        try:
            for item in directory.iterdir():
                try:
                    if item.stat().st_mtime < cutoff_time:
                        if item.is_dir():
                            shutil.rmtree(item)
                        else:
                            item.unlink()
                        cleaned_count += 1
                        logger.debug(f"Cleaned old file/directory: {item}")
                except Exception as e:
                    logger.warning(f"Could not remove {item}: {e}")
        except Exception as e:
            logger.error(f"Error cleaning directory {directory}: {e}")
        return cleaned_count

    def _create_directory(self, path: Path, dir_type: str) -> None:
        """Helper method to create a directory."""
        try:
            path.mkdir(parents=True, exist_ok=True)
        except Exception as e:
            raise ResourceManagerError(
                f"Failed to create {dir_type} directory {path}: {e}",
            ) from e

    def _determine_app_name(self, app_name: str | None = None) -> str:
        """Determines the application name from various sources."""
        if app_name:
            return app_name
        try:
            project_root = self._find_project_root()
            config_path = project_root / "pyproject.toml"
            if config_path.exists():
                config = toml.load(str(config_path))
                name = config.get("project", {}).get("name") or config.get("tool", {}).get(
                    "poetry",
                    {},
                ).get("name")
                if name:
                    return name
        except Exception as e:
            logger.warning(f"Could not load app name from pyproject.toml: {e}")
        return "culicidaelab"

    def _find_project_root(self) -> Path:
        """Finds the project root directory."""
        current_path = Path(__file__).resolve()
        indicators = ["pyproject.toml", "setup.py", ".git", "requirements.txt"]
        while current_path.parent != current_path:
            if any((current_path / i).exists() for i in indicators):
                return current_path
            current_path = current_path.parent
        logger.warning("Could not find project root, using module directory")
        return Path(__file__).parent.parent

    def _format_bytes(self, bytes_count: int | float) -> str:
        """Formats bytes into a human-readable string."""
        if bytes_count is None:
            raise ValueError("bytes_count must not be None")
        units = ["B", "KB", "MB", "GB", "TB", "PB"]
        for unit in units:
            if bytes_count < 1024:
                return f"{bytes_count:.1f} {unit}"
            bytes_count /= 1024
        return f"{bytes_count:.1f} {units[-1]}"

    def _get_directory_size(self, path: Path) -> dict[str, int | str]:
        """Gets size information for a directory."""
        if not path.exists():
            return {"size_bytes": 0, "size_human": "0 B", "file_count": 0}

        total_size = 0
        file_count = 0
        try:
            for item in path.rglob("*"):
                if item.is_file():
                    total_size += item.stat().st_size
                    file_count += 1
        except Exception as e:
            logger.warning(f"Error calculating size for {path}: {e}")

        return {
            "size_bytes": total_size,
            "size_human": self._format_bytes(total_size),
            "file_count": file_count,
        }

    def _initialize_directories(self) -> None:
        """Creates necessary directories with proper permissions."""
        directories = self.get_all_directories().values()
        for directory in directories:
            try:
                directory.mkdir(parents=True, exist_ok=True)
                logger.debug(f"Created/verified directory: {directory}")
            except Exception as e:
                raise ResourceManagerError(
                    f"Failed to create directory {directory}: {e}",
                ) from e

        if platform.system() != "Windows":
            self._set_directory_permissions(list(directories))

    def _initialize_paths(self, custom_base_dir: str | Path | None = None) -> None:
        """Initializes all resource paths."""
        if custom_base_dir:
            base_dir = Path(custom_base_dir).resolve()
            self.user_data_dir = base_dir / "data"
            self.user_cache_dir = base_dir / "cache"
        else:
            self.user_data_dir = Path(appdirs.user_data_dir(self.app_name))
            self.user_cache_dir = Path(appdirs.user_cache_dir(self.app_name))

        self.temp_dir = Path(tempfile.gettempdir()) / self.app_name
        self.model_dir = self.user_data_dir / "models"
        self.dataset_dir = self.user_data_dir / "datasets"
        self.downloads_dir = self.user_data_dir / "downloads"
        self.logs_dir = self.user_data_dir / "logs"
        self.config_dir = self.user_data_dir / "config"

    def _is_safe_to_delete(self, path: Path) -> bool:
        """Checks if a path is safe to delete (i.e., within managed dirs)."""
        safe_parents = [self.temp_dir, self.user_cache_dir]
        try:
            resolved_path = path.resolve()
            return any(str(resolved_path).startswith(str(p.resolve())) for p in safe_parents)
        except Exception:
            return False

    def _sanitize_name(self, name: str) -> str:
        """Sanitizes a name for use as a directory/file name."""
        import re

        sanitized = re.sub(r'[<>:"/\\|?*]', "_", name).strip(". ")
        return sanitized or "unnamed"

    def _set_directory_permissions(self, directories: list[Path]) -> None:
        """Sets directory permissions on Unix-like systems (0o700)."""
        try:
            for directory in directories:
                os.chmod(directory, 0o700)
        except Exception as e:
            logger.warning(f"Could not set directory permissions: {e}")
app_name = self._determine_app_name(app_name) instance-attribute
__init__(app_name: str | None = None, custom_base_dir: str | Path | None = None)

Initializes resource paths with cross-platform compatibility.

Source code in culicidaelab/core/resource_manager.py
def __init__(
    self,
    app_name: str | None = None,
    custom_base_dir: str | Path | None = None,
):
    """Initializes resource paths with cross-platform compatibility."""
    self._lock = Lock()
    self._workspace_registry: dict[str, Path] = {}
    self.app_name = self._determine_app_name(app_name)
    self._initialize_paths(custom_base_dir)
    self._initialize_directories()
    logger.info(f"ResourceManager initialized for app: {self.app_name}")
    logger.debug(f"Resource directories: {self.get_all_directories()}")
__repr__() -> str

String representation of ResourceManager.

Source code in culicidaelab/core/resource_manager.py
def __repr__(self) -> str:
    """String representation of ResourceManager."""
    return f"ResourceManager(app_name='{self.app_name}', " f"user_data_dir='{self.user_data_dir}')"
temp_workspace(prefix: str = 'workspace', suffix: str = '')

A context manager for temporary workspaces that auto-cleans on exit.

Parameters:

Name Type Description Default
prefix str

A prefix for the temporary directory name.

'workspace'
suffix str

A suffix for the temporary directory name.

''

Yields:

Name Type Description
Path

The path to the temporary workspace.

Example

with resource_manager.temp_workspace("processing") as ws: ... # Use ws for temporary operations ... (ws / "temp.txt").write_text("data")

The workspace is automatically removed here.
Source code in culicidaelab/core/resource_manager.py
@contextmanager
def temp_workspace(self, prefix: str = "workspace", suffix: str = ""):
    """A context manager for temporary workspaces that auto-cleans on exit.

    Args:
        prefix (str): A prefix for the temporary directory name.
        suffix (str): A suffix for the temporary directory name.

    Yields:
        Path: The path to the temporary workspace.

    Example:
        >>> with resource_manager.temp_workspace("processing") as ws:
        ...     # Use ws for temporary operations
        ...     (ws / "temp.txt").write_text("data")
        # The workspace is automatically removed here.
    """
    workspace = self.create_temp_workspace(prefix, suffix)
    try:
        yield workspace
    finally:
        self.clean_temp_workspace(workspace, force=True)
clean_old_files(days: int = 5, include_cache: bool = True) -> dict[str, int]

Cleans up old download and temporary files.

Parameters:

Name Type Description Default
days int

The number of days after which files are considered old.

5
include_cache bool

Whether to include the cache directory in cleanup.

True

Returns:

Type Description
dict[str, int]

dict[str, int]: A dictionary with cleanup statistics.

Raises:

Type Description
ValueError

If days is negative.

Source code in culicidaelab/core/resource_manager.py
def clean_old_files(
    self,
    days: int = 5,
    include_cache: bool = True,
) -> dict[str, int]:
    """Cleans up old download and temporary files.

    Args:
        days (int): The number of days after which files are considered old.
        include_cache (bool): Whether to include the cache directory in cleanup.

    Returns:
        dict[str, int]: A dictionary with cleanup statistics.

    Raises:
        ValueError: If `days` is negative.
    """
    if days < 0:
        raise ValueError("Days must be non-negative")

    cleanup_stats = {"downloads_cleaned": 0, "temp_cleaned": 0, "cache_cleaned": 0}
    cutoff_time = time.time() - (days * 86400)

    cleanup_stats["downloads_cleaned"] = self._clean_directory(
        self.downloads_dir,
        cutoff_time,
    )
    cleanup_stats["temp_cleaned"] = self._clean_directory(
        self.temp_dir,
        cutoff_time,
    )
    if include_cache:
        cleanup_stats["cache_cleaned"] = self._clean_directory(
            self.user_cache_dir,
            cutoff_time,
        )

    logger.info(f"Cleanup completed: {cleanup_stats}")
    return cleanup_stats
clean_temp_workspace(workspace_path: Path, force: bool = False) -> None

Cleans up a temporary workspace.

Parameters:

Name Type Description Default
workspace_path Path

The path to the workspace to clean.

required
force bool

If True, force remove even if not in a temp directory.

False

Raises:

Type Description
ResourceManagerError

If cleanup fails.

ValueError

If the workspace is outside the temp dir and force=False.

Source code in culicidaelab/core/resource_manager.py
def clean_temp_workspace(self, workspace_path: Path, force: bool = False) -> None:
    """Cleans up a temporary workspace.

    Args:
        workspace_path (Path): The path to the workspace to clean.
        force (bool): If True, force remove even if not in a temp directory.

    Raises:
        ResourceManagerError: If cleanup fails.
        ValueError: If the workspace is outside the temp dir and `force=False`.
    """
    try:
        if not force and not self._is_safe_to_delete(workspace_path):
            raise ValueError(
                "Cannot clean workspace outside of temp directory without force=True",
            )
        if workspace_path.exists():
            if workspace_path.is_dir():
                shutil.rmtree(workspace_path)
            else:
                workspace_path.unlink()
            logger.info(f"Cleaned workspace: {workspace_path}")

        with self._lock:
            self._workspace_registry.pop(workspace_path.name, None)
    except Exception as e:
        raise ResourceManagerError(
            f"Failed to clean workspace {workspace_path}: {e}",
        ) from e
create_checksum(file_path: str | Path, algorithm: str = 'md5') -> str

Creates a checksum for a file.

Parameters:

Name Type Description Default
file_path str | Path

The path to the file.

required
algorithm str

The hashing algorithm to use ('md5', 'sha1', 'sha256').

'md5'

Returns:

Name Type Description
str str

The hexadecimal checksum string.

Raises:

Type Description
ResourceManagerError

If the file does not exist or creation fails.

Source code in culicidaelab/core/resource_manager.py
def create_checksum(self, file_path: str | Path, algorithm: str = "md5") -> str:
    """Creates a checksum for a file.

    Args:
        file_path (str | Path): The path to the file.
        algorithm (str): The hashing algorithm to use ('md5', 'sha1', 'sha256').

    Returns:
        str: The hexadecimal checksum string.

    Raises:
        ResourceManagerError: If the file does not exist or creation fails.
    """
    file_path = Path(file_path)
    if not file_path.exists():
        raise ResourceManagerError(f"File does not exist: {file_path}")
    try:
        hash_obj = hashlib.new(algorithm)
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_obj.update(chunk)
        return hash_obj.hexdigest()
    except Exception as e:
        raise ResourceManagerError(
            f"Failed to create checksum for {file_path}: {e}",
        ) from e
create_temp_workspace(prefix: str = 'workspace', suffix: str = '') -> Path

Creates a temporary workspace for runtime operations.

Parameters:

Name Type Description Default
prefix str

A prefix for the temporary directory name.

'workspace'
suffix str

A suffix for the temporary directory name.

''

Returns:

Name Type Description
Path Path

The path to the created temporary workspace.

Raises:

Type Description
ResourceManagerError

If workspace creation fails.

Source code in culicidaelab/core/resource_manager.py
def create_temp_workspace(
    self,
    prefix: str = "workspace",
    suffix: str = "",
) -> Path:
    """Creates a temporary workspace for runtime operations.

    Args:
        prefix (str): A prefix for the temporary directory name.
        suffix (str): A suffix for the temporary directory name.

    Returns:
        Path: The path to the created temporary workspace.

    Raises:
        ResourceManagerError: If workspace creation fails.
    """
    try:
        timestamp = str(int(time.time()))
        pid = str(os.getpid())
        workspace_name = f"{prefix}_{timestamp}_{pid}"
        if suffix:
            workspace_name += f"_{suffix}"

        temp_workspace = self.temp_dir / workspace_name
        temp_workspace.mkdir(parents=True, exist_ok=True)
        with self._lock:
            self._workspace_registry[workspace_name] = temp_workspace
        logger.info(f"Created temporary workspace: {temp_workspace}")
        return temp_workspace
    except Exception as e:
        raise ResourceManagerError(
            f"Failed to create temporary workspace: {e}",
        ) from e
get_all_directories() -> dict[str, Path]

Gets all managed directories.

Returns:

Type Description
dict[str, Path]

dict[str, Path]: A dictionary mapping directory names to their paths.

Source code in culicidaelab/core/resource_manager.py
def get_all_directories(self) -> dict[str, Path]:
    """Gets all managed directories.

    Returns:
        dict[str, Path]: A dictionary mapping directory names to their paths.
    """
    return {
        "user_data_dir": self.user_data_dir,
        "user_cache_dir": self.user_cache_dir,
        "temp_dir": self.temp_dir,
        "model_dir": self.model_dir,
        "dataset_dir": self.dataset_dir,
        "downloads_dir": self.downloads_dir,
        "logs_dir": self.logs_dir,
        "config_dir": self.config_dir,
    }
get_cache_path(cache_name: str, create_if_missing: bool = True) -> Path

Gets a path for cache files.

Parameters:

Name Type Description Default
cache_name str

The name of the cache.

required
create_if_missing bool

Whether to create the directory if it doesn't exist.

True

Returns:

Name Type Description
Path Path

The path to the cache directory.

Source code in culicidaelab/core/resource_manager.py
def get_cache_path(self, cache_name: str, create_if_missing: bool = True) -> Path:
    """Gets a path for cache files.

    Args:
        cache_name (str): The name of the cache.
        create_if_missing (bool): Whether to create the directory if it
            doesn't exist.

    Returns:
        Path: The path to the cache directory.
    """
    if not cache_name or not cache_name.strip():
        raise ValueError("Cache name cannot be empty")

    cache_path = self.user_cache_dir / self._sanitize_name(cache_name)
    if create_if_missing:
        self._create_directory(cache_path, "cache")
    return cache_path
get_dataset_path(dataset_name: str, create_if_missing: bool = True) -> Path

Gets a standardized path for a specific dataset.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset.

required
create_if_missing bool

Whether to create the directory if it doesn't exist.

True

Returns:

Name Type Description
Path Path

The absolute path to the dataset directory.

Source code in culicidaelab/core/resource_manager.py
def get_dataset_path(
    self,
    dataset_name: str,
    create_if_missing: bool = True,
) -> Path:
    """Gets a standardized path for a specific dataset.

    Args:
        dataset_name (str): The name of the dataset.
        create_if_missing (bool): Whether to create the directory if it
            doesn't exist.

    Returns:
        Path: The absolute path to the dataset directory.
    """
    if not dataset_name or not dataset_name.strip():
        raise ValueError("Dataset name cannot be empty")

    dataset_path = self.dataset_dir / self._sanitize_name(dataset_name)
    if create_if_missing:
        self._create_directory(dataset_path, "dataset")
    return dataset_path
get_disk_usage() -> dict[str, dict[str, int | str]]

Gets disk usage statistics for all managed directories.

Returns:

Name Type Description
dict dict[str, dict[str, int | str]]

A dictionary with disk usage information for each directory.

Source code in culicidaelab/core/resource_manager.py
def get_disk_usage(self) -> dict[str, dict[str, int | str]]:
    """Gets disk usage statistics for all managed directories.

    Returns:
        dict: A dictionary with disk usage information for each directory.
    """
    directories = {
        "user_data": self.user_data_dir,
        "cache": self.user_cache_dir,
        "models": self.model_dir,
        "datasets": self.dataset_dir,
        "downloads": self.downloads_dir,
        "temp": self.temp_dir,
    }
    return {name: self._get_directory_size(path) for name, path in directories.items()}
get_model_path(model_name: str, create_if_missing: bool = True) -> Path

Gets a standardized path for a specific model.

Parameters:

Name Type Description Default
model_name str

The name of the model.

required
create_if_missing bool

Whether to create the directory if it doesn't exist.

True

Returns:

Name Type Description
Path Path

The absolute path to the model directory.

Source code in culicidaelab/core/resource_manager.py
def get_model_path(self, model_name: str, create_if_missing: bool = True) -> Path:
    """Gets a standardized path for a specific model.

    Args:
        model_name (str): The name of the model.
        create_if_missing (bool): Whether to create the directory if it
            doesn't exist.

    Returns:
        Path: The absolute path to the model directory.
    """
    if not model_name or not model_name.strip():
        raise ValueError("Model name cannot be empty")

    model_path = self.model_dir / self._sanitize_name(model_name)
    if create_if_missing:
        self._create_directory(model_path, "model")
    return model_path
verify_checksum(file_path: str | Path, expected_checksum: str, algorithm: str = 'md5') -> bool

Verifies a file's checksum.

Parameters:

Name Type Description Default
file_path str | Path

The path to the file.

required
expected_checksum str

The expected checksum value.

required
algorithm str

The hashing algorithm used.

'md5'

Returns:

Name Type Description
bool bool

True if the checksum matches, False otherwise.

Source code in culicidaelab/core/resource_manager.py
def verify_checksum(
    self,
    file_path: str | Path,
    expected_checksum: str,
    algorithm: str = "md5",
) -> bool:
    """Verifies a file's checksum.

    Args:
        file_path (str | Path): The path to the file.
        expected_checksum (str): The expected checksum value.
        algorithm (str): The hashing algorithm used.

    Returns:
        bool: True if the checksum matches, False otherwise.
    """
    try:
        actual_checksum = self.create_checksum(file_path, algorithm)
        return actual_checksum.lower() == expected_checksum.lower()
    except ResourceManagerError as e:
        logger.error(f"Checksum verification failed: {e}")
        return False
ResourceManagerError

Custom exception for ResourceManager operations.

Source code in culicidaelab/core/resource_manager.py
class ResourceManagerError(Exception):
    """Custom exception for ResourceManager operations."""

    pass
Settings

User-friendly facade for CulicidaeLab configuration management.

This class provides a simple, stable interface to access configuration values, resource directories, and application settings. All actual operations are delegated to a validated configuration object managed by ConfigManager and a ResourceManager.

Source code in culicidaelab/core/settings.py
class Settings:
    """
    User-friendly facade for CulicidaeLab configuration management.

    This class provides a simple, stable interface to access configuration values,
    resource directories, and application settings. All actual operations
    are delegated to a validated configuration object managed by ConfigManager
    and a ResourceManager.
    """

    _instance: Optional["Settings"] = None
    _lock = threading.Lock()
    _initialized = False

    def __init__(self, config_dir: str | Path | None = None) -> None:
        """Initializes the Settings facade.

        This loads the configuration using a ConfigManager and sets up a
        ResourceManager for file paths.

        Args:
            config_dir: Optional path to a user-provided configuration directory.
        """
        if self._initialized:
            return

        self._config_manager = ConfigManager(user_config_dir=config_dir)
        self.config: CulicidaeLabConfig = self._config_manager.get_config()
        self._resource_manager = ResourceManager()

        # Cache for species config (lazy loaded)
        self._species_config: SpeciesConfig | None = None

        # Store for singleton check
        self._current_config_dir = self._config_manager.user_config_dir

        self._initialized = True

    # Configuration Access
    def get_config(self, path: str | None = None, default: Any = None) -> Any:
        """Gets a configuration value using a dot-separated path.

        Example:
            >>> settings.get_config("predictors.classifier.confidence")

        Args:
            path: A dot-separated string path to the configuration value.
                If None, returns the entire configuration object.
            default: A default value to return if the path is not found.

        Returns:
            The configuration value, or the default value if not found.
        """
        if not path:
            return self.config

        obj = self.config
        try:
            for key in path.split("."):
                if isinstance(obj, dict):
                    obj = obj.get(key)
                else:
                    obj = getattr(obj, key)
            return obj if obj is not None else default
        except (AttributeError, KeyError):
            return default

    def set_config(self, path: str, value: Any) -> None:
        """
        Sets a configuration value at a specified dot-separated path.
        This method can traverse both objects (Pydantic models) and dictionaries.

        Note: This modifies the configuration in memory. To make it persistent,
        call `save_config()`.

        Args:
            path: A dot-separated string path to the configuration value.
            value: The new value to set.
        """
        keys = path.split(".")
        obj = self.config

        for key in keys[:-1]:
            if isinstance(obj, dict):
                obj = obj.get(key)
            else:
                obj = getattr(obj, key)

            if obj is None:
                raise KeyError(f"The path part '{key}' in '{path}' was not found.")

        last_key = keys[-1]
        if isinstance(obj, dict):
            obj[last_key] = value
        else:
            setattr(obj, last_key, value)

    def save_config(self, file_path: str | Path | None = None) -> None:
        """Save current configuration to a user config file.
        Args:
            file_path: Optional path to save the configuration file.
                If None, defaults to "culicidaelab_saved.yaml" in the user config directory.
        """
        if file_path is None:
            if not self._config_manager.user_config_dir:
                raise ValueError("Cannot save config without a specified user config directory.")
            file_path = self._config_manager.user_config_dir / "culicidaelab_saved.yaml"
        self._config_manager.save_config(file_path)

    # Resource Directory Access
    @property
    def model_dir(self) -> Path:
        """Model weights directory."""
        return self._resource_manager.model_dir

    @property
    def weights_dir(self) -> Path:
        """Alias for model_dir."""
        return self.model_dir

    @property
    def dataset_dir(self) -> Path:
        """Datasets directory."""
        return self._resource_manager.dataset_dir

    @property
    def cache_dir(self) -> Path:
        """Cache directory."""
        return self._resource_manager.user_cache_dir

    @property
    def config_dir(self) -> Path:
        """The active user configuration directory."""
        return self._config_manager.user_config_dir or self._config_manager.default_config_path

    @property
    def species_config(self) -> SpeciesConfig:
        """Species configuration (lazily loaded)."""
        if self._species_config is None:
            self._species_config = SpeciesConfig(self.config.species)
        return self._species_config

    # Dataset Management
    def get_dataset_path(self, dataset_type: str) -> Path:
        """Gets the standardized path for a specific dataset directory.

        Args:
            dataset_type: The name of the dataset type (e.g., 'classification').

        Returns:
            An absolute path to the dataset directory.
        """
        if dataset_type not in self.config.datasets:
            raise ValueError(f"Dataset type '{dataset_type}' not configured.")

        dataset_path_str = self.config.datasets[dataset_type].path
        path = Path(dataset_path_str)
        if not path.is_absolute():
            path = self.dataset_dir / path

        path.mkdir(parents=True, exist_ok=True)
        return path

    def list_datasets(self) -> list[str]:
        """Get list of configured dataset types."""
        return list(self.config.datasets.keys())

    # Model Management
    def get_model_weights_path(self, model_type: str) -> Path:
        """Gets the configured path to a model's weights file.

        Args:
            model_type: The name of the model type (e.g., 'classifier').

        Returns:
            The path to the model weights file.
        """
        if model_type not in self.config.predictors:
            raise ValueError(f"Model type '{model_type}' not configured in 'predictors'.")

        weights_file = self.config.predictors[model_type].model_path
        weights_path = Path(weights_file)
        if not weights_path.is_absolute():
            weights_path = self.model_dir / weights_path

        return weights_path

    def list_model_types(self) -> list[str]:
        """Get list of available model types."""
        return list(self.config.predictors.keys())

    def set_model_weights_path(self, model_type: str, weights_path: str | Path) -> None:
        """Set custom weights path for model type.
        Args:
            model_type: The name of the model type (e.g., 'classifier').
            weights_path: The new path to the model weights file.
        """
        if model_type not in self.config.predictors:
            raise ValueError(f"Cannot set weights for unconfigured model type: '{model_type}'.")
        self.config.predictors[model_type].model_path = str(weights_path)

    # API Key Management
    def get_api_key(self, provider: str) -> str | None:
        """Get API key for external provider from environment variables.
        Args:
            provider: The name of the provider (e.g., 'kaggle', 'huggingface', 'roboflow').
        """
        api_keys = {
            "kaggle": "KAGGLE_API_KEY",
            "huggingface": "HUGGINGFACE_API_KEY",
            "roboflow": "ROBOFLOW_API_KEY",
        }
        if provider in api_keys:
            import os

            return os.getenv(api_keys[provider])
        return None

    # Utility Methods (delegated to ResourceManager)
    @contextmanager
    def temp_workspace(self, prefix: str = "workspace"):
        with self._resource_manager.temp_workspace(prefix) as workspace:
            yield workspace

    # Instantiation
    def instantiate_from_config(self, config_path: str, **kwargs: Any) -> Any:
        """Instantiates an object from a configuration path.

        This is a convenience method that finds a config object by its path
        and uses the underlying ConfigManager to instantiate it.

        Args:
            config_path: A dot-separated path to the configuration object
                (e.g., "predictors.classifier").
            **kwargs: Additional keyword arguments to pass to the constructor.

        Returns:
            The instantiated object.
        """

        config_obj = self.get_config(config_path)
        if not config_obj:
            raise ValueError(f"No configuration object found at path: {config_path}")
        return self._config_manager.instantiate_from_config(config_obj, **kwargs)
config: CulicidaeLabConfig = self._config_manager.get_config() instance-attribute
model_dir: Path property

Model weights directory.

weights_dir: Path property

Alias for model_dir.

dataset_dir: Path property

Datasets directory.

cache_dir: Path property

Cache directory.

config_dir: Path property

The active user configuration directory.

species_config: SpeciesConfig property

Species configuration (lazily loaded).

__init__(config_dir: str | Path | None = None) -> None

Initializes the Settings facade.

This loads the configuration using a ConfigManager and sets up a ResourceManager for file paths.

Parameters:

Name Type Description Default
config_dir str | Path | None

Optional path to a user-provided configuration directory.

None
Source code in culicidaelab/core/settings.py
def __init__(self, config_dir: str | Path | None = None) -> None:
    """Initializes the Settings facade.

    This loads the configuration using a ConfigManager and sets up a
    ResourceManager for file paths.

    Args:
        config_dir: Optional path to a user-provided configuration directory.
    """
    if self._initialized:
        return

    self._config_manager = ConfigManager(user_config_dir=config_dir)
    self.config: CulicidaeLabConfig = self._config_manager.get_config()
    self._resource_manager = ResourceManager()

    # Cache for species config (lazy loaded)
    self._species_config: SpeciesConfig | None = None

    # Store for singleton check
    self._current_config_dir = self._config_manager.user_config_dir

    self._initialized = True
get_config(path: str | None = None, default: Any = None) -> Any

Gets a configuration value using a dot-separated path.

Example

settings.get_config("predictors.classifier.confidence")

Parameters:

Name Type Description Default
path str | None

A dot-separated string path to the configuration value. If None, returns the entire configuration object.

None
default Any

A default value to return if the path is not found.

None

Returns:

Type Description
Any

The configuration value, or the default value if not found.

Source code in culicidaelab/core/settings.py
def get_config(self, path: str | None = None, default: Any = None) -> Any:
    """Gets a configuration value using a dot-separated path.

    Example:
        >>> settings.get_config("predictors.classifier.confidence")

    Args:
        path: A dot-separated string path to the configuration value.
            If None, returns the entire configuration object.
        default: A default value to return if the path is not found.

    Returns:
        The configuration value, or the default value if not found.
    """
    if not path:
        return self.config

    obj = self.config
    try:
        for key in path.split("."):
            if isinstance(obj, dict):
                obj = obj.get(key)
            else:
                obj = getattr(obj, key)
        return obj if obj is not None else default
    except (AttributeError, KeyError):
        return default
set_config(path: str, value: Any) -> None

Sets a configuration value at a specified dot-separated path. This method can traverse both objects (Pydantic models) and dictionaries.

Note: This modifies the configuration in memory. To make it persistent, call save_config().

Parameters:

Name Type Description Default
path str

A dot-separated string path to the configuration value.

required
value Any

The new value to set.

required
Source code in culicidaelab/core/settings.py
def set_config(self, path: str, value: Any) -> None:
    """
    Sets a configuration value at a specified dot-separated path.
    This method can traverse both objects (Pydantic models) and dictionaries.

    Note: This modifies the configuration in memory. To make it persistent,
    call `save_config()`.

    Args:
        path: A dot-separated string path to the configuration value.
        value: The new value to set.
    """
    keys = path.split(".")
    obj = self.config

    for key in keys[:-1]:
        if isinstance(obj, dict):
            obj = obj.get(key)
        else:
            obj = getattr(obj, key)

        if obj is None:
            raise KeyError(f"The path part '{key}' in '{path}' was not found.")

    last_key = keys[-1]
    if isinstance(obj, dict):
        obj[last_key] = value
    else:
        setattr(obj, last_key, value)
save_config(file_path: str | Path | None = None) -> None

Save current configuration to a user config file. Args: file_path: Optional path to save the configuration file. If None, defaults to "culicidaelab_saved.yaml" in the user config directory.

Source code in culicidaelab/core/settings.py
def save_config(self, file_path: str | Path | None = None) -> None:
    """Save current configuration to a user config file.
    Args:
        file_path: Optional path to save the configuration file.
            If None, defaults to "culicidaelab_saved.yaml" in the user config directory.
    """
    if file_path is None:
        if not self._config_manager.user_config_dir:
            raise ValueError("Cannot save config without a specified user config directory.")
        file_path = self._config_manager.user_config_dir / "culicidaelab_saved.yaml"
    self._config_manager.save_config(file_path)
get_dataset_path(dataset_type: str) -> Path

Gets the standardized path for a specific dataset directory.

Parameters:

Name Type Description Default
dataset_type str

The name of the dataset type (e.g., 'classification').

required

Returns:

Type Description
Path

An absolute path to the dataset directory.

Source code in culicidaelab/core/settings.py
def get_dataset_path(self, dataset_type: str) -> Path:
    """Gets the standardized path for a specific dataset directory.

    Args:
        dataset_type: The name of the dataset type (e.g., 'classification').

    Returns:
        An absolute path to the dataset directory.
    """
    if dataset_type not in self.config.datasets:
        raise ValueError(f"Dataset type '{dataset_type}' not configured.")

    dataset_path_str = self.config.datasets[dataset_type].path
    path = Path(dataset_path_str)
    if not path.is_absolute():
        path = self.dataset_dir / path

    path.mkdir(parents=True, exist_ok=True)
    return path
list_datasets() -> list[str]

Get list of configured dataset types.

Source code in culicidaelab/core/settings.py
def list_datasets(self) -> list[str]:
    """Get list of configured dataset types."""
    return list(self.config.datasets.keys())
get_model_weights_path(model_type: str) -> Path

Gets the configured path to a model's weights file.

Parameters:

Name Type Description Default
model_type str

The name of the model type (e.g., 'classifier').

required

Returns:

Type Description
Path

The path to the model weights file.

Source code in culicidaelab/core/settings.py
def get_model_weights_path(self, model_type: str) -> Path:
    """Gets the configured path to a model's weights file.

    Args:
        model_type: The name of the model type (e.g., 'classifier').

    Returns:
        The path to the model weights file.
    """
    if model_type not in self.config.predictors:
        raise ValueError(f"Model type '{model_type}' not configured in 'predictors'.")

    weights_file = self.config.predictors[model_type].model_path
    weights_path = Path(weights_file)
    if not weights_path.is_absolute():
        weights_path = self.model_dir / weights_path

    return weights_path
list_model_types() -> list[str]

Get list of available model types.

Source code in culicidaelab/core/settings.py
def list_model_types(self) -> list[str]:
    """Get list of available model types."""
    return list(self.config.predictors.keys())
set_model_weights_path(model_type: str, weights_path: str | Path) -> None

Set custom weights path for model type. Args: model_type: The name of the model type (e.g., 'classifier'). weights_path: The new path to the model weights file.

Source code in culicidaelab/core/settings.py
def set_model_weights_path(self, model_type: str, weights_path: str | Path) -> None:
    """Set custom weights path for model type.
    Args:
        model_type: The name of the model type (e.g., 'classifier').
        weights_path: The new path to the model weights file.
    """
    if model_type not in self.config.predictors:
        raise ValueError(f"Cannot set weights for unconfigured model type: '{model_type}'.")
    self.config.predictors[model_type].model_path = str(weights_path)
get_api_key(provider: str) -> str | None

Get API key for external provider from environment variables. Args: provider: The name of the provider (e.g., 'kaggle', 'huggingface', 'roboflow').

Source code in culicidaelab/core/settings.py
def get_api_key(self, provider: str) -> str | None:
    """Get API key for external provider from environment variables.
    Args:
        provider: The name of the provider (e.g., 'kaggle', 'huggingface', 'roboflow').
    """
    api_keys = {
        "kaggle": "KAGGLE_API_KEY",
        "huggingface": "HUGGINGFACE_API_KEY",
        "roboflow": "ROBOFLOW_API_KEY",
    }
    if provider in api_keys:
        import os

        return os.getenv(api_keys[provider])
    return None
temp_workspace(prefix: str = 'workspace')
Source code in culicidaelab/core/settings.py
@contextmanager
def temp_workspace(self, prefix: str = "workspace"):
    with self._resource_manager.temp_workspace(prefix) as workspace:
        yield workspace
instantiate_from_config(config_path: str, **kwargs: Any) -> Any

Instantiates an object from a configuration path.

This is a convenience method that finds a config object by its path and uses the underlying ConfigManager to instantiate it.

Parameters:

Name Type Description Default
config_path str

A dot-separated path to the configuration object (e.g., "predictors.classifier").

required
**kwargs Any

Additional keyword arguments to pass to the constructor.

{}

Returns:

Type Description
Any

The instantiated object.

Source code in culicidaelab/core/settings.py
def instantiate_from_config(self, config_path: str, **kwargs: Any) -> Any:
    """Instantiates an object from a configuration path.

    This is a convenience method that finds a config object by its path
    and uses the underlying ConfigManager to instantiate it.

    Args:
        config_path: A dot-separated path to the configuration object
            (e.g., "predictors.classifier").
        **kwargs: Additional keyword arguments to pass to the constructor.

    Returns:
        The instantiated object.
    """

    config_obj = self.get_config(config_path)
    if not config_obj:
        raise ValueError(f"No configuration object found at path: {config_path}")
    return self._config_manager.instantiate_from_config(config_obj, **kwargs)
get_settings(config_dir: str | Path | None = None) -> Settings

Get the Settings singleton instance.

This is the primary way to access Settings throughout the application. If a config_dir is provided that differs from the existing instance, a new instance will be created and returned.

Parameters:

Name Type Description Default
config_dir str | Path | None

Optional path to a user-provided configuration directory.

None

Returns:

Type Description
Settings

The Settings instance.

Source code in culicidaelab/core/settings.py
def get_settings(config_dir: str | Path | None = None) -> Settings:
    """
    Get the Settings singleton instance.

    This is the primary way to access Settings throughout the application.
    If a `config_dir` is provided that differs from the existing instance,
    a new instance will be created and returned.

    Args:
        config_dir: Optional path to a user-provided configuration directory.

    Returns:
        The Settings instance.
    """
    global _SETTINGS_INSTANCE
    with _SETTINGS_LOCK:
        resolved_path = Path(config_dir).resolve() if config_dir else None

        # Create a new instance if one doesn't exist, or if the config path has changed.
        if _SETTINGS_INSTANCE is None or _SETTINGS_INSTANCE._current_config_dir != resolved_path:
            _SETTINGS_INSTANCE = Settings(config_dir=config_dir)

        return _SETTINGS_INSTANCE
download_file(url: str, destination: str | Path | None = None, downloads_dir: str | Path | None = None, progress_callback: Callable | None = None, chunk_size: int = 8192, timeout: int = 30, desc: str | None = None) -> Path

Downloads a file from a URL with progress tracking.

Parameters:

Name Type Description Default
url str

The URL of the file to download.

required
destination str | Path

The specific destination path for the file.

None
downloads_dir str | Path

Default directory for downloads.

None
progress_callback Callable

A custom progress callback.

None
chunk_size int

The size of chunks to download in bytes.

8192
timeout int

The timeout for the download request in seconds.

30
desc str

A description for the progress bar.

None

Returns:

Name Type Description
Path Path

The path to the downloaded file.

Raises:

Type Description
ValueError

If the URL is invalid.

RuntimeError

If the download or file write fails.

Source code in culicidaelab/core/utils.py
def download_file(
    url: str,
    destination: str | Path | None = None,
    downloads_dir: str | Path | None = None,
    progress_callback: Callable | None = None,
    chunk_size: int = 8192,
    timeout: int = 30,
    desc: str | None = None,
) -> Path:
    """Downloads a file from a URL with progress tracking.

    Args:
        url (str): The URL of the file to download.
        destination (str | Path, optional): The specific destination path for the file.
        downloads_dir (str | Path, optional): Default directory for downloads.
        progress_callback (Callable, optional): A custom progress callback.
        chunk_size (int): The size of chunks to download in bytes.
        timeout (int): The timeout for the download request in seconds.
        desc (str, optional): A description for the progress bar.

    Returns:
        Path: The path to the downloaded file.

    Raises:
        ValueError: If the URL is invalid.
        RuntimeError: If the download or file write fails.
    """
    if not url or not url.startswith(("http://", "https://")):
        raise ValueError(f"Invalid URL: {url}")

    dest_path = Path(destination) if destination else None
    if dest_path is None:
        base_dir = Path(downloads_dir) if downloads_dir else Path.cwd()
        base_dir.mkdir(parents=True, exist_ok=True)
        filename = url.split("/")[-1]
        dest_path = base_dir / filename

    dest_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        with requests.get(url, stream=True, timeout=timeout) as response:
            response.raise_for_status()
            total_size = int(response.headers.get("content-length", 0))
            progress_desc = desc or f"Downloading {dest_path.name}"

            with tqdm.tqdm(
                total=total_size,
                unit="iB",
                unit_scale=True,
                desc=progress_desc,
            ) as pbar:
                with open(dest_path, "wb") as file:
                    for chunk in response.iter_content(chunk_size=chunk_size):
                        written_size = file.write(chunk)
                        pbar.update(written_size)
                        if progress_callback:
                            try:
                                progress_callback(pbar.n, total_size)
                            except Exception as cb_err:
                                logging.warning(f"Progress callback error: {cb_err}")
        return dest_path
    except requests.RequestException as e:
        logging.error(f"Download failed for {url}: {e}")
        raise RuntimeError(f"Failed to download file from {url}: {e}") from e
    except OSError as e:
        logging.error(f"File write error for {dest_path}: {e}")
        raise RuntimeError(f"Failed to write file to {dest_path}: {e}") from e
str_to_bgr(str_color: str) -> tuple[int, int, int]

Converts a hexadecimal color string to a BGR tuple.

Parameters:

Name Type Description Default
str_color str

A hex color string in '#RRGGBB' or 'RRGGBB' format.

required

Returns:

Type Description
tuple[int, int, int]

tuple[int, int, int]: A (B, G, R) tuple of integers.

Raises:

Type Description
ValueError

If the string has an invalid format or invalid characters.

Source code in culicidaelab/core/utils.py
def str_to_bgr(str_color: str) -> tuple[int, int, int]:
    """Converts a hexadecimal color string to a BGR tuple.

    Args:
        str_color (str): A hex color string in '#RRGGBB' or 'RRGGBB' format.

    Returns:
        tuple[int, int, int]: A (B, G, R) tuple of integers.

    Raises:
        ValueError: If the string has an invalid format or invalid characters.
    """
    hex_color = str_color.lstrip("#")
    if len(hex_color) != 6:
        raise ValueError(f"Invalid hex color string format: '{str_color}'.")
    try:
        r = int(hex_color[0:2], 16)
        g = int(hex_color[2:4], 16)
        b = int(hex_color[4:6], 16)
        return (b, g, r)
    except ValueError:
        raise ValueError(f"Invalid characters in hex string: '{str_color}'.")