Skip to content

Models

Model

Parent class for all BlendSQL Models.

Source code in blendsql/models/model_base.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class ModelBase:
    """Parent class for all BlendSQL Models."""

    def __init__(
        self,
        model_name_or_path: str,
        base_url: str | None = None,
        api_key: str = "N/A",
        extra_body: dict | None = None,
        chat_template_kwargs: dict | None = None,
        caching: bool = False,
        **kwargs,
    ):
        from openai import AsyncOpenAI

        self.model_name_or_path = model_name_or_path
        self.caching = caching
        self.extra_body = extra_body or dict()
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        if chat_template_kwargs is None:
            self.chat_template_kwargs = {}
        if "chat_template_kwargs" in self.extra_body:
            self.chat_template_kwargs = self.extra_body.pop("chat_template_kwargs")
        self.cache = Cache(
            Path(platformdirs.user_cache_dir("blendsql"))
            / f"{self.model_name_or_path}.diskcache"
        )
        self._session: aiohttp.ClientSession | None = None

        # Initialize counters
        self.prompt_tokens: int = 0
        self.completion_tokens: int = 0
        self.cached_tokens: int = 0
        self.num_generation_calls: int = 0
        self.num_cache_hits: int = 0

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

    async def generate(
        self, item: GenerationItem, cancel_event: asyncio.Event | None = None
    ):
        buffer = ""
        extra_body = {
            "max_tokens": int(os.getenv(MAX_TOKENS_KEY, DEFAULT_MAX_TOKENS))
        } | self.extra_body

        messages, extra_body = await self._format_inputs(extra_body, item)

        stream = await self.client.chat.completions.create(
            model=self.model_name_or_path,
            messages=messages,
            stream=True,
            stream_options={"include_usage": True},
            extra_body=extra_body,
        )
        self.num_generation_calls += 1

        try:
            async for chunk in stream:
                if cancel_event and cancel_event.is_set():
                    return GenerationResult(item.identifier, buffer, completed=False)

                if chunk.choices and chunk.choices[0].delta.content:
                    buffer += chunk.choices[0].delta.content

                if hasattr(chunk, "usage") and chunk.usage is not None:
                    self.prompt_tokens += chunk.usage.prompt_tokens
                    self.completion_tokens += chunk.usage.completion_tokens
                    if chunk.usage.prompt_tokens_details is not None:
                        self.cached_tokens += (
                            chunk.usage.prompt_tokens_details.cached_tokens
                        )

        finally:
            await stream.close()

        add_to_global_history(
            f"[USER]{item.prompt}[/USER]\n\n[ASSISTANT]{buffer}[/ASSISTANT]"
        )
        return GenerationResult(item.identifier, buffer, completed=True)

    def _create_key(
        self, *args, funcs: Sequence[Callable] | None = None, **kwargs
    ) -> str:
        """Generates a hash to use in diskcache Cache.
        This way, we don't need to send our prompts to the same Model
        if our context of Model + args + kwargs is the same.

        Returns:
            md5 hash used as key in diskcache
        """
        hasher = hashlib.md5()
        params_str = ""
        if len(kwargs) > 0:
            params_str += str(sorted([(k, str(v)) for k, v in kwargs.items()]))
        if len(args) > 0:
            params_str += str([arg for arg in args])
        if funcs:
            params_str += "\n".join([dedent(inspect.getsource(func)) for func in funcs])
        combined_str = "{}||{}".format(
            f"{self.model_name_or_path}||{type(self)}",
            params_str,
        ).encode()
        hasher.update(combined_str)
        return hasher.hexdigest()

    def check_cache(
        self, *args, funcs: Sequence[Callable] | None = None, **kwargs
    ) -> Tuple[Any, str]:
        response: dict[str, str] = None  # type: ignore
        key: str = self._create_key(funcs=funcs, *args, **kwargs)
        if key in self.cache:
            self.num_cache_hits += 1
            logger.debug(
                Color.model_or_data_update(
                    f"Using model cache ({self.num_cache_hits})..."
                )
            )
            response = self.cache.get(key)  # type: ignore
        return (response, key)

    def reset_stats(self):
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.cached_tokens = 0
        self.num_generation_calls = 0
        self.num_cache_hits = 0

VLLM

Bases: ModelBase

Class for vLLM endpoints.

Parameters:

Name Type Description Default
model_name_or_path

Name of the model

required
base_url str | None

Base URL for http requests. Defaults to "http://localhost:8000/v1/"

None

Examples:

from blendsql.models import VLLM

model = VLLM("RedHatAI/gemma-3-12b-it-quantized.w4a16", base_url="http://localhost:8000/v1/")
Source code in blendsql/models/vllm.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class VLLM(ModelBase):
    """Class for vLLM endpoints.

    Args:
        model_name_or_path: Name of the model
        base_url: Base URL for http requests. Defaults to "http://localhost:8000/v1/"

    Examples:
        ```python
        from blendsql.models import VLLM

        model = VLLM("RedHatAI/gemma-3-12b-it-quantized.w4a16", base_url="http://localhost:8000/v1/")
        ```
    """

    def __init__(
        self, api_key: str | None = None, base_url: str | None = None, *args, **kwargs
    ) -> None:
        api_key = api_key or "N.A"
        base_url = base_url or "http://localhost:8000/v1/"
        super().__init__(api_key=api_key, base_url=base_url, *args, **kwargs)

    async def _format_inputs(
        self, extra_body: dict, item: GenerationItem
    ) -> tuple[list[dict], dict]:
        if len(item.image_urls) > 0:
            content = [{"type": "text", "text": item.prompt}]
            for image_url in item.image_urls:
                session = await self._get_session()
                encoded = await openai_compatible_image_url(image_url, session)
                content.append({"type": "image_url", "image_url": {"url": encoded}})
            messages = [{"role": "user", "content": content}]
        else:
            messages = [{"role": "user", "content": item.prompt}]

        if item.grammar:
            extra_body |= {
                "guided_decoding_backend": "guidance",
                "guided_grammar": item.grammar,
                "structured_outputs": {"grammar": item.grammar},
            }
        return messages, extra_body