Developer Guide
Volume Model
Developer Guide
Volume Model
A fine-tuned model that generalises across all asset classes and time intervals to forecast directional volume-weighted market trends for time series data.
Step 1: Prepare Your Data
# Import necessary libraries
import pandas as pd
import requests
import json
import time
# Function to convert DataFrame to API-compatible dictionary
def volume_dict(df):
return {
"datetime": df['datetime'].tolist(),
"open": df['open'].tolist(),
"high": df['high'].tolist(),
"low": df['low'].tolist(),
"close": df['close'].tolist(),
"volume": df['volume'].tolist(),
}
# Sample data loading (replace with your data source)
df = pd.read_csv("your_time_series_data.csv")
# Ensure data has the required columns
required_columns = ['datetime', 'open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_columns):
raise ValueError(f"Data must contain all required columns: {required_columns}")
# Convert datetime to proper format if needed
df['datetime'] = pd.to_datetime(df['datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')
# Verify data length requirements (200-10,000 periods)
data_length = len(df)
if data_length < 200:
raise ValueError(f"Data length ({data_length}) is less than minimum required (200)")
if data_length > 10000:
raise ValueError(f"Data length ({data_length}) exceeds maximum allowed (10,000)")
# Uncomment to add future prediction point with zeros for unknown values
'''
future_date = "2025-01-02 10:00:00" # Replace with your target future date
future_row = pd.DataFrame({
'datetime': [future_date],
'open': [0],
'high': [0],
'low': [0],
'close': [0],
'Volumne': [0]
})
df = pd.concat([df, future_row]).reset_index(drop=True)
'''
Step 2: Submit API Request
# API configuration
BASE_URL = "https://www.sumtyme.com/shared"
# Prepare request
api_endpoint = f"{BASE_URL}/v1/volume"
headers = {
"Content-Type": "application/json"
}
# Create payload from data
payload = volume_dict(df)
# Send POST request to API
response = requests.post(api_endpoint, json=payload, headers=headers)
# Check for successful request
if response.status_code != 200:
raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
# Parse response
result = response.json()
task_id = result.get("task_id")
status = result.get("status")
tasks_ahead = result.get("tasks_ahead")
print(f"Request submitted. Task ID: {task_id}")
print(f"Current status: {status}")
print(f"Tasks ahead in queue: {tasks_ahead}")
Step 3: Retrieve Analysis Results
# Function to transform response dict to dataframe
def dict_to_dataframe(response_dict):
# Extract the result dictionary
result_dict = response_dict['result']
# Create lists for datetime and trend_identified values
datetimes = []
trends = []
# Extract values from the nested dictionary
for timestamp, data in result_dict.items():
datetimes.append(timestamp)
trends.append(data['trend_identified'])
# Create the DataFrame
df = pd.DataFrame({
'datetime': datetimes,
'trend_identified': trends
})
# Convert datetime strings to pandas datetime objects
df['datetime'] = pd.to_datetime(df['datetime'])
# Sort by datetime
df = df.sort_values('datetime')
return df
# Function to poll for results
def get_analysis_results(task_id, max_attempts=10, delay=30):
"""
Poll the results endpoint until analysis is complete or max attempts reached
Parameters:
task_id (str): Task ID from initial request
max_attempts (int): Maximum number of polling attempts
delay (int): Seconds to wait between polling attempts
Returns:
dict: Analysis results or error message
"""
results_endpoint = f"https://www.sumtyme.com/results/{task_id}"
for attempt in range(max_attempts):
print(f"Checking results (attempt {attempt+1}/{max_attempts})...")
response = requests.get(results_endpoint)
if response.status_code != 200:
print(f"Error checking results: {response.status_code}, {response.text}")
return None
result_data = response.json()
# Check if processing is complete
if "status" in result_data and result_data["status"] == "processing":
print(f"Still processing. Tasks ahead: {result_data.get('tasks_ahead', 'unknown')}")
time.sleep(delay)
continue
# If we get here, we have results
return result_data
print("Maximum polling attempts reached. Try checking results manually later.")
return None
# Retrieve results
analysis_results = get_analysis_results(task_id)
if analysis_results:
print("Analysis complete!")
print(json.dumps(analysis_results['result'], indent=2))
## Uncomment to turn json to dataframe
# dataframe = dict_to_dataframe(analysis_results)