Endpoint currently down. We’re working to restore service as quickly as possible.

Step 1: Prepare Your Data


# Import necessary libraries
import pandas as pd
import requests
import json
import time

# Function to convert DataFrame to API-compatible dictionary
def time_series_dict(df,interval,interval_unit):
    return {
        "datetime": df['datetime'].tolist(),
        "open": df['open'].tolist(),
        "high": df['high'].tolist(),
        "low": df['low'].tolist(),
        "close": df['close'].tolist(),
        "interval": interval,
        "interval_unit":interval_unit
    }

# Sample data loading (replace with your data source)
df = pd.read_csv("your_time_series_data.csv")
interval = 1
interval_unit = "minutes" # options: seconds, minutes, days 

# Ensure data has the required columns
required_columns = ['datetime', 'open', 'high', 'low', 'close']
if not all(col in df.columns for col in required_columns):
    raise ValueError(f"Data must contain all required columns: {required_columns}")

# Convert datetime to proper format if needed
df['datetime'] = pd.to_datetime(df['datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')

# Verify data length requirements (6000-30,000 periods)
data_length = len(df)
if data_length < 2500:
    raise ValueError(f"Data length ({data_length}) is less than minimum required (2500)")
if data_length > 30000:
    raise ValueError(f"Data length ({data_length}) exceeds maximum allowed (30,000)")

# Uncomment to add future prediction point with zeros for unknown values
'''
future_date = "2025-01-02 10:00:00"  # Replace with your target future date
future_row = pd.DataFrame({
    'datetime': [future_date],
    'open': [0],
    'high': [0],
    'low': [0],
    'close': [0]
})
df = pd.concat([df, future_row]).reset_index(drop=True)
'''

Step 2: Submit API Request


# API configuration
BASE_URL = "https://www.sumtyme.com/shared"

# Prepare request
api_endpoint = f"{BASE_URL}/v1/reasoning-model-ts"
headers = {
    "Content-Type": "application/json"
}

# Create payload from data
payload = time_series_dict(df,interval,interval_unit)

# Send POST request to API
response = requests.post(api_endpoint, json=payload, headers=headers)

# Check for successful request
if response.status_code != 200:
    raise Exception(f"API request failed with status code {response.status_code}: {response.text}")

# Parse response
result = response.json()
task_id = result.get("task_id")
status = result.get("status")
tasks_ahead = result.get("tasks_ahead")

print(f"Request submitted. Task ID: {task_id}")
print(f"Current status: {status}")
print(f"Tasks ahead in queue: {tasks_ahead}")

Step 3: Retrieve Analysis Results


# Function to transform response dict to dataframe
def dict_to_dataframe(response_dict):
    # Extract the result dictionary
    result_dict = response_dict['result']
    
    # Create lists for datetime and trend_identified values
    datetimes = []
    trends = []
    
    # Extract values from the nested dictionary
    for timestamp, data in result_dict.items():
        datetimes.append(timestamp)
        trends.append(data['trend_identified'])
    
    # Create the DataFrame
    df = pd.DataFrame({
        'datetime': datetimes,
        'trend_identified': trends
    })
    
    # Convert datetime strings to pandas datetime objects
    df['datetime'] = pd.to_datetime(df['datetime'])
    
    # Sort by datetime
    df = df.sort_values('datetime')
    
    return df

# Function to poll for results
def get_analysis_results(task_id, max_attempts=10, delay=30):
    """
    Poll the results endpoint until analysis is complete or max attempts reached
    
    Parameters:
    task_id (str): Task ID from initial request
    max_attempts (int): Maximum number of polling attempts
    delay (int): Seconds to wait between polling attempts
    
    Returns:
    dict: Analysis results or error message
    """
    results_endpoint = f"https://www.sumtyme.com/results/{task_id}"
    
    for attempt in range(max_attempts):
        print(f"Checking results (attempt {attempt+1}/{max_attempts})...")
        
        response = requests.get(results_endpoint)
        
        if response.status_code != 200:
            print(f"Error checking results: {response.status_code}, {response.text}")
            return None
            
        result_data = response.json()
        
        # Check if processing is complete
        if "status" in result_data and result_data["status"] == "processing":
            print(f"Still processing. Tasks ahead: {result_data.get('tasks_ahead', 'unknown')}")
            time.sleep(delay)
            continue
            
        # If we get here, we have results
        return result_data
        
    print("Maximum polling attempts reached. Try checking results manually later.")
    return None

# Retrieve results
analysis_results = get_analysis_results(task_id)

if analysis_results:
    print("Analysis complete!")
    print(json.dumps(analysis_results['result'], indent=2))


## Uncomment to turn json to dataframe 
# dataframe = dict_to_dataframe(analysis_results)