-
Notifications
You must be signed in to change notification settings - Fork 56
Content Handling Improvement in call_http method #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
It seems to me that this is a breaking change. Previously we serialized Here's an example that introduces an optional def call_http(self, method: str, uri: str, content: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
token_source: TokenSource = None,
is_raw_str: bool = False) -> TaskBase:
if content is not None:
if isinstance(content, str) and is_raw_str:
# don't serialize the str value - use it as the raw HTTP request payload
json_content = content
else:
json_content = json.dumps(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nytian, what are your thoughts about my suggestion to avoid a breaking change?
Even if we decide that a breaking change is the right thing to do, it would be good to add my suggested parameter (we can just change the default value) so that in the 1% chance that a customer is relying on str
being serialized, they have a way to access the old behavior.
Thanks, @cgillum ! It's good to me as we don't want to break customer. My hesitation is around what we want to take here—whether we should leave the string as-is or serialize it into a JSON string. That's why I haven't addressed this feedback yet haha. But I will make the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR. Agreed that the changing behavior should be opt-in. Left some notes on the tests, types, and missing comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost done, just realized we may want to validate that is_raw_str
is only used in valid scenarios (where content is 'str').
I added a few suggestions in line with that, and asked for 2 more tests: one for the validation I'm suggesting, and another that None
serializes to null
when is_raw_str
is False
When is this going to be available in the main branch? I'm still facing this issue in 1.2.10; e.g. current version. |
@nytian - can you please check if this has released already? |
Thanks for checking @davidmrdavid It's already released with v1.2.10. @ThijsHakkenbergLobster can you send me the error details for me to check? Feel free to create a new issue page and I will assign it to me |
Implementation:@skillsInvocation.orchestration_trigger(context_name="context")
def invoke_skill_orchestrator(context: DurableOrchestrationContext) -> str:
code, function_name, payload = context.get_input()
metadata = {
"code": code,
"function_name": function_name,
}
context.set_custom_status("InvocationReceived")
Telemetry(
"invoke_skill_orchestrator called",
distinct_id="invoke_skill_orchestrator",
metadata=metadata,
)
copilot_function_registry = EntityId("FunctionNamesRegistryEntity", code)
copilot_functions_state = yield context.call_entity(
copilot_function_registry, "retrieve"
)
if not copilot_functions_state:
msg = f"No skills registered for {code}"
warning(msg)
return msg
if function_name not in copilot_functions_state:
msg = f"Function {function_name} not registered for {code}: {copilot_functions_state}"
warning(msg)
return msg
context.set_custom_status("CopilotValidated")
registry_entity_id = EntityId("SkillFunctionRegistryEntity", "Global")
skill_ids_function_names: List[Tuple[SkillIdStr, FunctionNameStr]] = (
yield context.call_entity(registry_entity_id, "get")
) # List of tuples (skill_id, function_name)
skill_id = None
for id, name in skill_ids_function_names:
if name == function_name:
skill_id = id
context.set_custom_status("SkillFound")
break
if skill_id is None:
msg = f"No skill_id found for function name {function_name}"
warning(msg)
return msg
skill_entity_id = EntityId("SkillEntity", skill_id)
skill = yield context.call_entity(skill_entity_id, "get")
skill = SkillPointer.model_validate(skill)
context.set_custom_status("SkillFound")
invocation_target_id = EntityId("InvocationTargetEntity", skill.invocation_target.unique_identifier)
invocation_target = yield context.call_entity(invocation_target_id, "get")
webhook = Webhook.model_validate(invocation_target)
context.set_custom_status("InvocationTargetLoaded")
headers, body, api_url, meta = webhook.prepare_request(payload)
timeout = 90.0
metadata["timeout"] = timeout
match webhook.method:
case "GET":
response = yield context.call_http(
method=webhook.method,
uri=api_url,
content=None,
headers=headers,
is_raw_str=False,
)
context.set_custom_status("InvocationTargetResponded")
res = DurableOrchestratorHTTPCallResponse.model_validate(response)
status_code = res.statusCode
content_string = res.content
try:
json_content = loads(content_string)
none_count = content_string.count("None")
except ValueError as ve:
warning(ve)
none_count = 0
json_content = None
case ("POST" | "PATCH"):
response = yield context.call_http(
method=webhook.method,
uri=api_url,
content=body,
headers=headers,
is_raw_str=True
)
warning(f"response: {response}")
try:
json_content = response
content_string = dumps(json_content)
none_count = content_string.count("None")
status_code = int(response["statusCode"])
warning(f"status code: {status_code}")
except ValueError:
content_string = response.text
none_count = 0
json_content = None
status_code = None
case _:
raise ValueError(webhook.method)
if status_code >= 400:
msg = f"Received exception when trying to call function"
warning(msg)
warning(REDACTED) # Redacted URL or sensitive information
raise ValueError(status_code, content_string)
elif status_code in range(200, 300):
if none_count >= 10:
warning("cleaning crappy api data...")
cleaned = clean_response(json_content)
metadata["cleaned_output_token_count"] = count_tokens(cleaned)
Telemetry(
event="invoke_skill_orchestrator completed",
distinct_id="invoke_skill_orchestrator",
metadata=metadata,
)
return str(cleaned)
else:
return content_string
else:
return f"Unexpected status code {status_code} with content {content_string}" Exception025-02-11T11:57:08Z [Warning] response: {'statusCode': 502, 'headers': {'Date': 'Tue, 11 Feb 2025 11:57:06 GMT', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'x-ms-failure-cause': 'Trigger', 'x-ms-workflow-run-id': 'REDACTED', 'x-ms-correlation-id': 'REDACTED', 'x-ms-client-tracking-id': 'REDACTED', 'x-ms-trigger-history-name': 'REDACTED', 'x-ms-execution-location': 'eastus', 'x-ms-workflow-system-id': '/locations/eastus/scaleunits/prod-95/workflows/REDACTED', 'x-ms-workflow-id': 'REDACTED', 'x-ms-workflow-version': 'REDACTED', 'x-ms-workflow-name': 'REDACTED', 'x-ms-tracking-id': 'REDACTED', 'x-ms-ratelimit-burst-remaining-workflow-writes': 'REDACTED', 'x-ms-ratelimit-remaining-workflow-download-contentsize': 'REDACTED', 'x-ms-ratelimit-remaining-workflow-upload-contentsize': 'REDACTED', 'x-ms-ratelimit-time-remaining-directapirequests': 'REDACTED', 'x-ms-request-id': 'REDACTED'}, 'content': '{"error":{"code":"NoResponse","message":"The server did not receive a response from an upstream server. Request tracking id \'REDACTED\'."}}'}
2025-02-11T11:57:08Z [Warning] status code: 502
2025-02-11T11:57:08Z [Warning] Received exception when trying to call function
2025-02-11T11:57:08Z [Warning] REDACTED # Redacted URL
2025-02-11T11:57:08Z [Error] REDACTED: Function 'invoke_skill_orchestrator (Orchestrator)' failed with an error. Reason: DurableTask.Core.Exceptions.OrchestrationFailureException
at Microsoft.Azure.WebJobs.Extensions.DurableTask.OutOfProcOrchestrationShim.ScheduleDurableTaskEvents(OrchestrationInvocationResult result) in D:\a\_work\1\s\src\WebJobs.Extensions.DurableTask\Listener\OutOfProcOrchestrationShim.cs:line 88 Received payload: {
"headers": {
"Host": "REDACTED",
"Max-Forwards": "10",
"User-Agent": "Microsoft.Azure.WebJobs.Extensions.DurableTask/2.13.5",
"Request-Id": "|REDACTED|",
"traceparent": "00-REDACTED",
"Request-Context": "appId=cid-v1:REDACTED",
"X-ARR-LOG-ID": "REDACTED",
"CLIENT-IP": "REDACTED",
"DISGUISED-HOST": "REDACTED",
"X-SITE-DEPLOYMENT-ID": "REDACTED",
"WAS-DEFAULT-HOSTNAME": "REDACTED",
"X-Forwarded-Proto": "https",
"X-AppService-Proto": "https",
"X-ARR-SSL": "REDACTED",
"X-Forwarded-TlsVersion": "1.3",
"X-Forwarded-For": "REDACTED",
"X-Original-URL": "REDACTED",
"X-WAWS-Unencoded-URL": "REDACTED",
"Content-Length": "79",
"Content-Type": "text/plain"
},
"body": "{\"topK\": 5, \"model\": \"multilingual-e5-large\", \"inputs\": [{\"text\": \"bed bugs\"}]}"
} If you want to test it yourself, create an LogicApp accepting any JSON body and send the output. I've also tested it by dumps and sending that over and setting is_raw_str=False. |
The problem is that in class DurableHttpRequest (should be a dataclass imho but that's beside the point) on the method to_json the line add_attrib(json_dict, self, 'content') just adds the string version of content. The return type of to_json is correct: Dict[str, Union[str, int]]:. But in my case i expect neither a string nor an int but an object (e.g. typing.Any). So in other words, the payload provided will aways be serialized as string.. |
Fix #478
This PR refactored the logic in the call_http method of the DurableOrchestrationContext class to enhance both clarity and correctness.
is_raw_str
, with a default value of False to maintain compatibility with the current implementation. The string will only be preserved in its original form ifis_raw_str
is explicitly set to True. Otherwise, it will be serialized into a JSON string using json.dumps(), which is align with current implementation.