Skip to content

pydantic_ai.models.gemini

Custom interface to the generativelanguage.googleapis.com API using HTTPX and Pydantic.

The Google SDK for interacting with the generativelanguage.googleapis.com API google-generativeai reads like it was written by a Java developer who thought they knew everything about OOP, spent 30 minutes trying to learn Python, gave up and decided to build the library to prove how horrible Python is. It also doesn't use httpx for HTTP requests, and tries to implement tool calling itself, but doesn't use Pydantic or equivalent for validation.

We therefore implement support for the API directly.

Despite these shortcomings, the Gemini model is actually quite powerful and very fast.

Setup

For details on how to set up authentication with this model, see model configuration for Gemini.

GeminiModelName module-attribute

GeminiModelName = Literal[
    "gemini-1.5-flash",
    "gemini-1.5-flash-8b",
    "gemini-1.5-pro",
    "gemini-1.0-pro",
    "gemini-2.0-flash-exp",
]

Named Gemini models.

See the Gemini API docs for a full list.

GeminiModel dataclass

Bases: Model

A model that uses Gemini via generativelanguage.googleapis.com API.

This is implemented from scratch rather than using a dedicated SDK, good API documentation is available here.

Apart from __init__, all methods are private or match those of the base class.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@dataclass(init=False)
class GeminiModel(Model):
    """A model that uses Gemini via `generativelanguage.googleapis.com` API.

    This is implemented from scratch rather than using a dedicated SDK, good API documentation is
    available [here](https://ai.google.dev/api).

    Apart from `__init__`, all methods are private or match those of the base class.
    """

    model_name: GeminiModelName
    auth: AuthProtocol
    http_client: AsyncHTTPClient
    url: str

    def __init__(
        self,
        model_name: GeminiModelName,
        *,
        api_key: str | None = None,
        http_client: AsyncHTTPClient | None = None,
        url_template: str = 'https://generativelanguage.googleapis.com/v1beta/models/{model}:',
    ):
        """Initialize a Gemini model.

        Args:
            model_name: The name of the model to use.
            api_key: The API key to use for authentication, if not provided, the `GEMINI_API_KEY` environment variable
                will be used if available.
            http_client: An existing `httpx.AsyncClient` to use for making HTTP requests.
            url_template: The URL template to use for making requests, you shouldn't need to change this,
                docs [here](https://ai.google.dev/gemini-api/docs/quickstart?lang=rest#make-first-request),
                `model` is substituted with the model name, and `function` is added to the end of the URL.
        """
        self.model_name = model_name
        if api_key is None:
            if env_api_key := os.getenv('GEMINI_API_KEY'):
                api_key = env_api_key
            else:
                raise exceptions.UserError('API key must be provided or set in the GEMINI_API_KEY environment variable')
        self.auth = ApiKeyAuth(api_key)
        self.http_client = http_client or cached_async_http_client()
        self.url = url_template.format(model=model_name)

    async def agent_model(
        self,
        *,
        function_tools: list[ToolDefinition],
        allow_text_result: bool,
        result_tools: list[ToolDefinition],
    ) -> GeminiAgentModel:
        return GeminiAgentModel(
            http_client=self.http_client,
            model_name=self.model_name,
            auth=self.auth,
            url=self.url,
            function_tools=function_tools,
            allow_text_result=allow_text_result,
            result_tools=result_tools,
        )

    def name(self) -> str:
        return f'google-gla:{self.model_name}'

__init__

__init__(
    model_name: GeminiModelName,
    *,
    api_key: str | None = None,
    http_client: AsyncClient | None = None,
    url_template: str = "https://generativelanguage.googleapis.com/v1beta/models/{model}:"
)

Initialize a Gemini model.

Parameters:

Name Type Description Default
model_name GeminiModelName

The name of the model to use.

required
api_key str | None

The API key to use for authentication, if not provided, the GEMINI_API_KEY environment variable will be used if available.

None
http_client AsyncClient | None

An existing httpx.AsyncClient to use for making HTTP requests.

None
url_template str

The URL template to use for making requests, you shouldn't need to change this, docs here, model is substituted with the model name, and function is added to the end of the URL.

'https://generativelanguage.googleapis.com/v1beta/models/{model}:'
Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    model_name: GeminiModelName,
    *,
    api_key: str | None = None,
    http_client: AsyncHTTPClient | None = None,
    url_template: str = 'https://generativelanguage.googleapis.com/v1beta/models/{model}:',
):
    """Initialize a Gemini model.

    Args:
        model_name: The name of the model to use.
        api_key: The API key to use for authentication, if not provided, the `GEMINI_API_KEY` environment variable
            will be used if available.
        http_client: An existing `httpx.AsyncClient` to use for making HTTP requests.
        url_template: The URL template to use for making requests, you shouldn't need to change this,
            docs [here](https://ai.google.dev/gemini-api/docs/quickstart?lang=rest#make-first-request),
            `model` is substituted with the model name, and `function` is added to the end of the URL.
    """
    self.model_name = model_name
    if api_key is None:
        if env_api_key := os.getenv('GEMINI_API_KEY'):
            api_key = env_api_key
        else:
            raise exceptions.UserError('API key must be provided or set in the GEMINI_API_KEY environment variable')
    self.auth = ApiKeyAuth(api_key)
    self.http_client = http_client or cached_async_http_client()
    self.url = url_template.format(model=model_name)

AuthProtocol

Bases: Protocol

Abstract definition for Gemini authentication.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
116
117
118
119
class AuthProtocol(Protocol):
    """Abstract definition for Gemini authentication."""

    async def headers(self) -> dict[str, str]: ...

ApiKeyAuth dataclass

Authentication using an API key for the X-Goog-Api-Key header.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
122
123
124
125
126
127
128
129
130
@dataclass
class ApiKeyAuth:
    """Authentication using an API key for the `X-Goog-Api-Key` header."""

    api_key: str

    async def headers(self) -> dict[str, str]:
        # https://cloud.google.com/docs/authentication/api-keys-use#using-with-rest
        return {'X-Goog-Api-Key': self.api_key}

GeminiAgentModel dataclass

Bases: AgentModel

Implementation of AgentModel for Gemini models.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@dataclass(init=False)
class GeminiAgentModel(AgentModel):
    """Implementation of `AgentModel` for Gemini models."""

    http_client: AsyncHTTPClient
    model_name: GeminiModelName
    auth: AuthProtocol
    tools: _GeminiTools | None
    tool_config: _GeminiToolConfig | None
    url: str

    def __init__(
        self,
        http_client: AsyncHTTPClient,
        model_name: GeminiModelName,
        auth: AuthProtocol,
        url: str,
        function_tools: list[ToolDefinition],
        allow_text_result: bool,
        result_tools: list[ToolDefinition],
    ):
        check_allow_model_requests()
        tools = [_function_from_abstract_tool(t) for t in function_tools]
        if result_tools:
            tools += [_function_from_abstract_tool(t) for t in result_tools]

        if allow_text_result:
            tool_config = None
        else:
            tool_config = _tool_config([t['name'] for t in tools])

        self.http_client = http_client
        self.model_name = model_name
        self.auth = auth
        self.tools = _GeminiTools(function_declarations=tools) if tools else None
        self.tool_config = tool_config
        self.url = url

    async def request(
        self, messages: list[ModelMessage], model_settings: ModelSettings | None
    ) -> tuple[ModelResponse, usage.Usage]:
        async with self._make_request(messages, False, model_settings) as http_response:
            response = _gemini_response_ta.validate_json(await http_response.aread())
        return self._process_response(response), _metadata_as_usage(response)

    @asynccontextmanager
    async def request_stream(
        self, messages: list[ModelMessage], model_settings: ModelSettings | None
    ) -> AsyncIterator[StreamedResponse]:
        async with self._make_request(messages, True, model_settings) as http_response:
            yield await self._process_streamed_response(http_response)

    @asynccontextmanager
    async def _make_request(
        self, messages: list[ModelMessage], streamed: bool, model_settings: ModelSettings | None
    ) -> AsyncIterator[HTTPResponse]:
        sys_prompt_parts, contents = self._message_to_gemini_content(messages)

        request_data = _GeminiRequest(contents=contents)
        if sys_prompt_parts:
            request_data['system_instruction'] = _GeminiTextContent(role='user', parts=sys_prompt_parts)
        if self.tools is not None:
            request_data['tools'] = self.tools
        if self.tool_config is not None:
            request_data['tool_config'] = self.tool_config

        generation_config: _GeminiGenerationConfig = {}
        if model_settings:
            if (max_tokens := model_settings.get('max_tokens')) is not None:
                generation_config['max_output_tokens'] = max_tokens
            if (temperature := model_settings.get('temperature')) is not None:
                generation_config['temperature'] = temperature
            if (top_p := model_settings.get('top_p')) is not None:
                generation_config['top_p'] = top_p
        if generation_config:
            request_data['generation_config'] = generation_config

        url = self.url + ('streamGenerateContent' if streamed else 'generateContent')

        headers = {
            'Content-Type': 'application/json',
            'User-Agent': get_user_agent(),
            **await self.auth.headers(),
        }

        request_json = _gemini_request_ta.dump_json(request_data, by_alias=True)

        async with self.http_client.stream(
            'POST',
            url,
            content=request_json,
            headers=headers,
            timeout=(model_settings or {}).get('timeout', USE_CLIENT_DEFAULT),
        ) as r:
            if r.status_code != 200:
                await r.aread()
                raise exceptions.UnexpectedModelBehavior(f'Unexpected response from gemini {r.status_code}', r.text)
            yield r

    @staticmethod
    def _process_response(response: _GeminiResponse) -> ModelResponse:
        if len(response['candidates']) != 1:
            raise UnexpectedModelBehavior('Expected exactly one candidate in Gemini response')
        parts = response['candidates'][0]['content']['parts']
        return _process_response_from_parts(parts)

    @staticmethod
    async def _process_streamed_response(http_response: HTTPResponse) -> StreamedResponse:
        """Process a streamed response, and prepare a streaming response to return."""
        aiter_bytes = http_response.aiter_bytes()
        start_response: _GeminiResponse | None = None
        content = bytearray()

        async for chunk in aiter_bytes:
            content.extend(chunk)
            responses = _gemini_streamed_response_ta.validate_json(
                content,
                experimental_allow_partial='trailing-strings',
            )
            if responses:
                last = responses[-1]
                if last['candidates'] and last['candidates'][0]['content']['parts']:
                    start_response = last
                    break

        if start_response is None:
            raise UnexpectedModelBehavior('Streamed response ended without content or tool calls')

        return GeminiStreamedResponse(_content=content, _stream=aiter_bytes)

    @classmethod
    def _message_to_gemini_content(
        cls, messages: list[ModelMessage]
    ) -> tuple[list[_GeminiTextPart], list[_GeminiContent]]:
        sys_prompt_parts: list[_GeminiTextPart] = []
        contents: list[_GeminiContent] = []
        for m in messages:
            if isinstance(m, ModelRequest):
                message_parts: list[_GeminiPartUnion] = []

                for part in m.parts:
                    if isinstance(part, SystemPromptPart):
                        sys_prompt_parts.append(_GeminiTextPart(text=part.content))
                    elif isinstance(part, UserPromptPart):
                        message_parts.append(_GeminiTextPart(text=part.content))
                    elif isinstance(part, ToolReturnPart):
                        message_parts.append(_response_part_from_response(part.tool_name, part.model_response_object()))
                    elif isinstance(part, RetryPromptPart):
                        if part.tool_name is None:
                            message_parts.append(_GeminiTextPart(text=part.model_response()))
                        else:
                            response = {'call_error': part.model_response()}
                            message_parts.append(_response_part_from_response(part.tool_name, response))
                    else:
                        assert_never(part)

                if message_parts:
                    contents.append(_GeminiContent(role='user', parts=message_parts))
            elif isinstance(m, ModelResponse):
                contents.append(_content_model_response(m))
            else:
                assert_never(m)

        return sys_prompt_parts, contents

GeminiStreamedResponse dataclass

Bases: StreamedResponse

Implementation of StreamedResponse for the Gemini model.

Source code in pydantic_ai_slim/pydantic_ai/models/gemini.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
@dataclass
class GeminiStreamedResponse(StreamedResponse):
    """Implementation of `StreamedResponse` for the Gemini model."""

    _content: bytearray
    _stream: AsyncIterator[bytes]
    _timestamp: datetime = field(default_factory=_utils.now_utc, init=False)

    async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
        async for gemini_response in self._get_gemini_responses():
            candidate = gemini_response['candidates'][0]
            gemini_part: _GeminiPartUnion
            for gemini_part in candidate['content']['parts']:
                if 'text' in gemini_part:
                    # Using vendor_part_id=None means we can produce multiple text parts if their deltas are sprinkled
                    # amongst the tool call deltas
                    yield self._parts_manager.handle_text_delta(vendor_part_id=None, content=gemini_part['text'])

                elif 'function_call' in gemini_part:
                    # Here, we assume all function_call parts are complete and don't have deltas.
                    # We do this by assigning a unique randomly generated "vendor_part_id".
                    # We need to confirm whether this is actually true, but if it isn't, we can still handle it properly
                    # it would just be a bit more complicated. And we'd need to confirm the intended semantics.
                    maybe_event = self._parts_manager.handle_tool_call_delta(
                        vendor_part_id=uuid4(),
                        tool_name=gemini_part['function_call']['name'],
                        args=gemini_part['function_call']['args'],
                        tool_call_id=None,
                    )
                    if maybe_event is not None:
                        yield maybe_event
                else:
                    assert 'function_response' in gemini_part, f'Unexpected part: {gemini_part}'

    async def _get_gemini_responses(self) -> AsyncIterator[_GeminiResponse]:
        # This method exists to ensure we only yield completed items, so we don't need to worry about
        # partial gemini responses, which would make everything more complicated

        gemini_responses: list[_GeminiResponse] = []
        current_gemini_response_index = 0
        # Right now, there are some circumstances where we will have information that could be yielded sooner than it is
        # But changing that would make things a lot more complicated.
        async for chunk in self._stream:
            self._content.extend(chunk)

            gemini_responses = _gemini_streamed_response_ta.validate_json(
                self._content,
                experimental_allow_partial='trailing-strings',
            )

            # The idea: yield only up to the latest response, which might still be partial.
            # Note that if the latest response is complete, we could yield it immediately, but there's not a good
            # allow_partial API to determine if the last item in the list is complete.
            responses_to_yield = gemini_responses[:-1]
            for r in responses_to_yield[current_gemini_response_index:]:
                current_gemini_response_index += 1
                self._usage += _metadata_as_usage(r)
                yield r

        # Now yield the final response, which should be complete
        if gemini_responses:
            r = gemini_responses[-1]
            self._usage += _metadata_as_usage(r)
            yield r

    def timestamp(self) -> datetime:
        return self._timestamp