1
1
import asyncio
2
2
import sys
3
3
import warnings
4
- from typing import Any , AsyncGenerator , Dict , Generator , Optional , Union , overload
4
+ from typing import Any , AsyncGenerator , Dict , Generator , Optional , Union , cast , overload
5
5
6
6
from graphql import (
7
7
DocumentNode ,
8
8
ExecutionResult ,
9
9
GraphQLSchema ,
10
+ IntrospectionQuery ,
10
11
build_ast_schema ,
11
12
get_introspection_query ,
12
13
parse ,
@@ -55,7 +56,7 @@ class Client:
55
56
def __init__ (
56
57
self ,
57
58
schema : Optional [Union [str , GraphQLSchema ]] = None ,
58
- introspection = None ,
59
+ introspection : Optional [ IntrospectionQuery ] = None ,
59
60
transport : Optional [Union [Transport , AsyncTransport ]] = None ,
60
61
fetch_schema_from_transport : bool = False ,
61
62
execute_timeout : Optional [Union [int , float ]] = 10 ,
@@ -106,7 +107,7 @@ def __init__(
106
107
self .schema : Optional [GraphQLSchema ] = schema
107
108
108
109
# Answer of the introspection query
109
- self .introspection = introspection
110
+ self .introspection : Optional [ IntrospectionQuery ] = introspection
110
111
111
112
# GraphQL transport chosen
112
113
self .transport : Optional [Union [Transport , AsyncTransport ]] = transport
@@ -131,6 +132,22 @@ def validate(self, document: DocumentNode):
131
132
if validation_errors :
132
133
raise validation_errors [0 ]
133
134
135
+ def _build_schema_from_introspection (self , execution_result : ExecutionResult ):
136
+ if execution_result .errors :
137
+ raise TransportQueryError (
138
+ (
139
+ f"Error while fetching schema: { execution_result .errors [0 ]!s} \n "
140
+ "If you don't need the schema, you can try with: "
141
+ '"fetch_schema_from_transport=False"'
142
+ ),
143
+ errors = execution_result .errors ,
144
+ data = execution_result .data ,
145
+ extensions = execution_result .extensions ,
146
+ )
147
+
148
+ self .introspection = cast (IntrospectionQuery , execution_result .data )
149
+ self .schema = build_client_schema (self .introspection )
150
+
134
151
@overload
135
152
def execute_sync (
136
153
self ,
@@ -803,20 +820,7 @@ def fetch_schema(self) -> None:
803
820
attribute to True"""
804
821
execution_result = self .transport .execute (parse (get_introspection_query ()))
805
822
806
- if execution_result .errors :
807
- raise TransportQueryError (
808
- (
809
- f"Error while fetching schema: { execution_result .errors [0 ]!s} \n "
810
- "If you don't need the schema, you can try with: "
811
- '"fetch_schema_from_transport=False"'
812
- ),
813
- errors = execution_result .errors ,
814
- data = execution_result .data ,
815
- extensions = execution_result .extensions ,
816
- )
817
-
818
- self .client .introspection = execution_result .data
819
- self .client .schema = build_client_schema (self .client .introspection )
823
+ self .client ._build_schema_from_introspection (execution_result )
820
824
821
825
@property
822
826
def transport (self ):
@@ -1189,20 +1193,7 @@ async def fetch_schema(self) -> None:
1189
1193
parse (get_introspection_query ())
1190
1194
)
1191
1195
1192
- if execution_result .errors :
1193
- raise TransportQueryError (
1194
- (
1195
- f"Error while fetching schema: { execution_result .errors [0 ]!s} \n "
1196
- "If you don't need the schema, you can try with: "
1197
- '"fetch_schema_from_transport=False"'
1198
- ),
1199
- errors = execution_result .errors ,
1200
- data = execution_result .data ,
1201
- extensions = execution_result .extensions ,
1202
- )
1203
-
1204
- self .client .introspection = execution_result .data
1205
- self .client .schema = build_client_schema (self .client .introspection )
1196
+ self .client ._build_schema_from_introspection (execution_result )
1206
1197
1207
1198
@property
1208
1199
def transport (self ):
0 commit comments