Developer Guide
Correlation Model
Developer Guide
Correlation Model
A fine-tuned model that dynamically tracks real-time correlation shifts across multiple instruments (2-30) to optimise portfolio risk management.
Step 1: Prepare Your Data
# Import necessary libraries
import pandas as pd
import requests
import json
import time
# Function to convert DataFrame to API-compatible dictionary
def correl_dict(df_list):
def time_series_dict(df):
return {
"datetime": df['datetime'].tolist(),
"open": df['open'].tolist(),
"high": df['high'].tolist(),
"low": df['low'].tolist(),
"close": df['close'].tolist()
}
data_dict = []
for df_ in df_list:
data_dict.append(time_series_dict(df_))
return ({'portfolio':data_dict})
# Sample data loading (replace with your data sources)
df = pd.read_csv("your_time_series_data.csv")
df1 = pd.read_csv("your_time_series_data_1.csv")
df_list = [df,df1]
if len(df_list) < 2 or len(df_list) > 30:
raise ValueError("Minimum of 2 instruments and a maximum of 30 instruments.")
for df in df_list:
# Convert datetime to proper format if needed
df['datetime'] = pd.to_datetime(df['datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
data_length = len(df)
if data_length < 200:
raise ValueError(f"Data length ({data_length}) is less than minimum required (200)")
if data_length > 10000:
raise ValueError(f"Data length ({data_length}) exceeds maximum allowed (10,000)")
Step 2: Submit API Request
# API configuration
BASE_URL = "https://www.sumtyme.com/shared"
# Prepare request
api_endpoint = f"{BASE_URL}/v1/correlation"
headers = {
"Content-Type": "application/json"
}
# Create payload from data
payload = correl_dict(df_list)
# Send POST request to API
response = requests.post(api_endpoint, json=payload, headers=headers)
# Check for successful request
if response.status_code != 200:
raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
# Parse response
result = response.json()
task_id = result.get("task_id")
status = result.get("status")
tasks_ahead = result.get("tasks_ahead")
print(f"Request submitted. Task ID: {task_id}")
print(f"Current status: {status}")
print(f"Tasks ahead in queue: {tasks_ahead}")
Step 3: Retrieve Analysis Results
# Function to transform response dict to dataframe
def dict_to_dataframe(response_dict):
# Extract the result dictionary
result_dict = response_dict['result']
# Create lists for datetime and correlation values
datetimes = []
correlations = []
# Extract values from the nested dictionary
for timestamp, data in result_dict.items():
datetimes.append(timestamp)
correlations.append(data['correlation'])
# Create the DataFrame
df = pd.DataFrame({
'datetime': datetimes,
'correlation': trends
})
# Convert datetime strings to pandas datetime objects
df['datetime'] = pd.to_datetime(df['datetime'])
# Sort by datetime
df = df.sort_values('datetime')
return df
# Function to poll for results
def get_analysis_results(task_id, max_attempts=10, delay=30):
"""
Poll the results endpoint until analysis is complete or max attempts reached
Parameters:
task_id (str): Task ID from initial request
max_attempts (int): Maximum number of polling attempts
delay (int): Seconds to wait between polling attempts
Returns:
dict: Analysis results or error message
"""
results_endpoint = f"https://www.sumtyme.com/results/{task_id}"
for attempt in range(max_attempts):
print(f"Checking results (attempt {attempt+1}/{max_attempts})...")
response = requests.get(results_endpoint)
if response.status_code != 200:
print(f"Error checking results: {response.status_code}, {response.text}")
return None
result_data = response.json()
# Check if processing is complete
if "status" in result_data and result_data["status"] == "processing":
print(f"Still processing. Tasks ahead: {result_data.get('tasks_ahead', 'unknown')}")
time.sleep(delay)
continue
# If we get here, we have results
return result_data
print("Maximum polling attempts reached. Try checking results manually later.")
return None
# Retrieve results
analysis_results = get_analysis_results(task_id)
if analysis_results:
print("Analysis complete!")
print(json.dumps(analysis_results['result'], indent=2))
## Uncomment to turn json to dataframe
# dataframe = dict_to_dataframe(analysis_results)