Sometimes when fetching data from a database the connection might fail due to for example temporary network errors. In such cases it would be useful if the execution could be automatically retried after a certain delay period to see if the network issues have been resolved.
This Python example shows how you can catch the exception and use an API call to restart the execution after a delay. You can also define a maximum number for the retries.
Generate a Valohai API Token
- Go to Valohai.
- Click on the “
Hi, <username>!
” on the top-right corner. - Go to My Profile -> Authentication.
- Click on Manage Tokens and scroll to the bottom of the page to generate a new token.
- Make a note of the token that pops up on the top of your page. It will be shown only once.
Note that you should never include the token in your version control. Instead of pasting it directly into your code, we recommend storing it as a secret environment variable. You can add environment variables in a couple of ways in Valohai.
- Add the environment variable when creating an execution from the UI (Create Execution -> Environment Variables). The env variable are only available in the execution where it was created.
- Add the project environment variable (Project Settings -> “Environment Variables” tab -> Check ‘Secret’ checkbox). In this case, the env variable will be available for all executions of the project.
You can access the environment variables in your code as follows:
import os
auth_token = os.environ['MY_ENVIRONMENT_VARIABLE']
Code example for adding retries after a connection error
Below you can find an example how to use an API call to restart the execution in case it fails due a connection error.
- You will need to add two integer type parameters in your
valohai.yaml
delay
- Time in seconds before sending a request to retry the execution.retries
- Number of times to retry the execution after the original one.
- The example takes advantage of the execution configuration file available at
/valohai/config/execution.json
in all Valohai executions. - If the connection fails, this information will be passed to the status detail under the execution details tab.
- The restarted executions will be tagged with
retry: n
where n is the current retry count. There will be a link to the restarted execution in the logs. - The
sys.exit(1)
command takes care of setting th finished execution status to Error in case of connection error.
import valohai
import time
import json
import requests
import os
import sys
import random
retries = valohai.parameters("retries").value
delay = valohai.parameters("delay").value
# Get the execution tags to check for retry count
f = open("/valohai/config/execution.json")
data = json.load(f)
f.close()
tags = data["valohai.execution-tags"]
# Get also the project id, commit id, and step name for later use.
project_id = data["valohai.project-id"]
commit = data["valohai.commit-identifier"]
step_name = data["valohai.execution-step"]
# Search for retry tag, excpected format "retry: n"
if tags != []:
for tag in tags:
if "retry" in tag:
retry_count = int(tag.split(" ")[1])
else:
retry_count = 0
else:
retry_count = 0
# Try the database connection.
# If the connection fails:
# * The status detail will be updated
# * A new execution will be started with an API call (if retries remaining)
try:
# Your code goes here.
# EXAMPLE ONLY: Don't include this in your code.
# Define either 0 or 1 randomly to simulate e.g.
# network or other database connection issues.
k = random.randint(0, 3)
if k == 0:
# All good with the connection.
print("Executing your code...")
else:
# Connection error, deliberately raising an exception.
raise Exception("Database connection interrupted.")
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
# If retries remaining, set the status detail and start a new execution
if retry_count < retries:
if retry_count == 0:
valohai.set_status_detail("Failed due to: " + str(err)
+ " Will retry max "+ str(retries) + " times.")
else:
valohai.set_status_detail("Retry " + str(retry_count) + "/"
+ str(retries) + " failed due to: " + str(err))
print("Connection error.")
print("Waiting for " + str(delay) + " seconds before retrying...")
time.sleep(delay)
# We'll use an API call to start a new execution.
# The API_KEY is stored in a project environment variable.
# Remember to follow your organization's security standards when handling the key.
auth_token = os.environ["API_KEY"]
headers = {"Authorization": "Token %s" % auth_token}
new_retry_tag = "retry: " + str(retry_count + 1)
new_execution_json = {
"project": project_id,
"commit": commit,
"step": step_name,
"tags": [new_retry_tag],
}
resp = requests.post(
"https://app.valohai.com/api/v0/executions/",
headers=headers,
json=new_execution_json,
)
resp.raise_for_status()
display_url = resp.json()["urls"]["display"]
print("Retry available at: " + display_url)
# Exit the execution with 1 to set the status as errored.
sys.exit(1)
else:
valohai.set_status_detail("Failed due to: " + str(err)
+ " Max number of retries (" + str(retries) + ") executed.")
print("Connection error.")
print("Max number of retries (" + str(retries) + ") executed.")
# Exit the execution with 1 to set the status as errored.
sys.exit(1)