Examples

Practical examples and code snippets to get you started with OpenMCP

Quick Examples

Simple Screenshot

Take a screenshot of any website with just one line of code.

import openmcp
import asyncio

# One-liner screenshot
asyncio.run(openmcp.screenshot("https://example.com", "example.png"))

Basic Navigation

Navigate to a website and interact with elements.

import openmcp
import asyncio

async def basic_navigation():
    async with openmcp.browser() as browser:
        # Navigate to website
        await browser.navigate("https://httpbin.org/forms/post")
        
        # Fill out form
        await browser.type("input[name='custname']", "John Doe")
        await browser.type("input[name='custemail']", "john@example.com")
        
        # Click submit
        await browser.click("input[type='submit']")
        
        # Take screenshot of result
        await browser.screenshot("form_result.png")

asyncio.run(basic_navigation())

Finding Elements

Find and interact with specific elements on a page.

import openmcp
import asyncio

async def find_elements():
    async with openmcp.browser() as browser:
        await browser.navigate("https://example.com")
        
        # Find all links
        links = await browser.find_elements("a")
        print(f"Found {len(links)} links")
        
        # Find specific element by ID
        header = await browser.find_elements("#header", by="id")
        
        # Find by XPath
        buttons = await browser.find_elements("//button[@type='submit']", by="xpath")
        
        # Click first button if found
        if buttons:
            await browser.click("//button[@type='submit']", by="xpath")

asyncio.run(find_elements())

Web Scraping Examples

News Headlines Scraper

Scrape news headlines from a website.

import openmcp
import asyncio

async def scrape_news():
    async with openmcp.browser() as browser:
        await browser.navigate("https://news.ycombinator.com")
        
        # Find all story titles
        titles = await browser.find_elements(".titleline > a")
        
        headlines = []
        for i, title in enumerate(titles[:10]):  # Get top 10
            text = await browser.get_element_text(title)
            href = await browser.get_element_attribute(title, "href")
            headlines.append({
                "title": text,
                "url": href,
                "rank": i + 1
            })
        
        # Print results
        for headline in headlines:
            print(f"{headline['rank']}. {headline['title']}")
            print(f"   URL: {headline['url']}\n")
        
        # Take screenshot
        await browser.screenshot("hackernews.png")

asyncio.run(scrape_news())

Price Monitor

Monitor product prices on e-commerce sites.

import openmcp
import asyncio
import re

async def monitor_price(product_url, price_selector):
    async with openmcp.browser() as browser:
        await browser.navigate(product_url)
        
        # Wait for price element to load
        await browser.wait_for_element(price_selector)
        
        # Get price text
        price_element = await browser.find_elements(price_selector)
        if price_element:
            price_text = await browser.get_element_text(price_element[0])
            
            # Extract numeric price
            price_match = re.search(r'[\d,]+\.?\d*', price_text)
            if price_match:
                price = float(price_match.group().replace(',', ''))
                print(f"Current price: ${price}")
                
                # Take screenshot for record
                await browser.screenshot(f"price_check_{price}.png")
                
                return price
        
        return None

# Example usage
async def main():
    # Monitor a product (example selectors)
    price = await monitor_price(
        "https://example-store.com/product/123",
        ".price-current"
    )
    
    if price and price < 100:  # Alert if price drops below $100
        print("🚨 Price alert! Product is now under $100")

asyncio.run(main())

Testing Examples

Login Flow Test

Test a complete login flow with error handling.

import openmcp
import asyncio

async def test_login(username, password):
    async with openmcp.browser(headless=False) as browser:
        try:
            # Navigate to login page
            await browser.navigate("https://example.com/login")
            
            # Fill login form
            await browser.type("input[name='username']", username)
            await browser.type("input[name='password']", password)
            
            # Click login button
            await browser.click("button[type='submit']")
            
            # Wait for either success or error
            try:
                # Check for successful login (dashboard page)
                await browser.wait_for_url("https://example.com/dashboard", timeout=5)
                print("✅ Login successful!")
                await browser.screenshot("login_success.png")
                return True
                
            except TimeoutError:
                # Check for error message
                error_elements = await browser.find_elements(".error-message")
                if error_elements:
                    error_text = await browser.get_element_text(error_elements[0])
                    print(f"❌ Login failed: {error_text}")
                    await browser.screenshot("login_error.png")
                else:
                    print("❌ Login failed: Unknown error")
                    await browser.screenshot("login_unknown_error.png")
                return False
                
        except Exception as e:
            print(f"❌ Test failed with exception: {e}")
            await browser.screenshot("login_exception.png")
            return False

# Run test
async def main():
    success = await test_login("testuser", "testpass")
    if success:
        print("Login test passed!")
    else:
        print("Login test failed!")

asyncio.run(main())

Form Validation Test

Test form validation with various input scenarios.

import openmcp
import asyncio

async def test_form_validation():
    test_cases = [
        {"email": "", "expected_error": "Email is required"},
        {"email": "invalid-email", "expected_error": "Invalid email format"},
        {"email": "test@example.com", "expected_error": None},
    ]
    
    async with openmcp.browser() as browser:
        for i, test_case in enumerate(test_cases):
            print(f"Running test case {i+1}: {test_case}")
            
            # Navigate to form
            await browser.navigate("https://example.com/signup")
            
            # Clear and fill email field
            await browser.clear("input[name='email']")
            if test_case["email"]:
                await browser.type("input[name='email']", test_case["email"])
            
            # Submit form
            await browser.click("button[type='submit']")
            
            # Check for validation error
            error_elements = await browser.find_elements(".email-error")
            
            if test_case["expected_error"]:
                # Should have error
                if error_elements:
                    error_text = await browser.get_element_text(error_elements[0])
                    if test_case["expected_error"] in error_text:
                        print(f"✅ Test case {i+1} passed: Found expected error")
                    else:
                        print(f"❌ Test case {i+1} failed: Wrong error message")
                        print(f"   Expected: {test_case['expected_error']}")
                        print(f"   Got: {error_text}")
                else:
                    print(f"❌ Test case {i+1} failed: No error message found")
            else:
                # Should not have error
                if not error_elements:
                    print(f"✅ Test case {i+1} passed: No error as expected")
                else:
                    error_text = await browser.get_element_text(error_elements[0])
                    print(f"❌ Test case {i+1} failed: Unexpected error: {error_text}")
            
            # Take screenshot for each test case
            await browser.screenshot(f"form_test_case_{i+1}.png")

asyncio.run(test_form_validation())

AI Integration Examples

Cursor Web Helper

A helper class for Cursor AI to perform web automation tasks.

import openmcp
import os
import asyncio

class CursorWebHelper:
    """Helper class for Cursor AI to interact with websites."""
    
    def __init__(self, api_key=None):
        self.api_key = api_key or os.getenv("OPENMCP_API_KEY")
        self.session = None
    
    async def start_browser(self, headless=True):
        """Start a browser session."""
        self.mcp = openmcp.MCP("browseruse", api_key=self.api_key)
        self.session = await self.mcp.create_session(headless=headless)
        return self.session.session_id
    
    async def navigate(self, url):
        """Navigate to a URL."""
        if not self.session:
            await self.start_browser()
        return await self.session.navigate(url)
    
    async def click(self, selector):
        """Click an element."""
        return await self.session.click(selector)
    
    async def type_text(self, selector, text):
        """Type text into an element."""
        return await self.session.type(selector, text)
    
    async def screenshot(self, filename=None):
        """Take a screenshot."""
        screenshot_data = await self.session.screenshot()
        if filename:
            import base64
            with open(filename, "wb") as f:
                f.write(base64.b64decode(screenshot_data))
        return screenshot_data
    
    async def get_page_text(self):
        """Get all text content from the page."""
        return await self.session.get_page_text()
    
    async def close(self):
        """Close the browser session."""
        if self.session:
            await self.session.close()

# Example usage for Cursor AI
async def cursor_example():
    helper = CursorWebHelper()
    
    try:
        await helper.start_browser()
        await helper.navigate("https://example.com")
        await helper.screenshot("page.png")
        
        # Let Cursor AI generate automation code here
        # For example: "Fill out the contact form"
        await helper.type_text("input[name='name']", "AI Assistant")
        await helper.type_text("input[name='email']", "ai@example.com")
        await helper.click("button[type='submit']")
        
        await helper.screenshot("after_submit.png")
        
    finally:
        await helper.close()

# For Cursor: You can ask it to generate automation code using this helper
if __name__ == "__main__":
    asyncio.run(cursor_example())

Claude Desktop MCP Integration

Configure Claude Desktop to use OpenMCP for web automation.

# Add to Claude Desktop MCP configuration
{
  "mcpServers": {
    "openmcp": {
      "command": "openmcp",
      "args": ["serve", "--protocol", "mcp"],
      "env": {
        "OPENMCP_API_KEY": "your-api-key-here"
      }
    }
  }
}

Then Claude can use these tools directly:

# Claude can now use these MCP tools:
# - create_browser_session
# - navigate
# - find_elements
# - click_element
# - type_text
# - take_screenshot
# - close_session

# Example Claude conversation:
# User: "Please take a screenshot of example.com"
# Claude: I'll help you take a screenshot of example.com

# 1. First, I'll create a browser session
create_browser_session({"headless": true})

# 2. Navigate to the website
navigate({
    "session_id": "session_123",
    "url": "https://example.com"
})

# 3. Take a screenshot
take_screenshot({"session_id": "session_123"})

# 4. Close the session
close_session({"session_id": "session_123"})

Advanced Examples

Multi-page Workflow

Navigate through multiple pages and collect data.

import openmcp
import asyncio
import json

async def multi_page_workflow():
    """Example: Scrape product catalog across multiple pages."""
    products = []
    
    async with openmcp.browser() as browser:
        # Start from category page
        await browser.navigate("https://example-store.com/category/electronics")
        
        page = 1
        while True:
            print(f"Scraping page {page}...")
            
            # Get products on current page
            product_elements = await browser.find_elements(".product-item")
            
            if not product_elements:
                print("No more products found")
                break
            
            # Extract product data
            for element in product_elements:
                name = await browser.get_element_text(
                    element, ".product-name"
                )
                price = await browser.get_element_text(
                    element, ".product-price"
                )
                link = await browser.get_element_attribute(
                    element, "a", "href"
                )
                
                products.append({
                    "name": name,
                    "price": price,
                    "link": link,
                    "page": page
                })
            
            # Try to go to next page
            next_button = await browser.find_elements(".pagination .next")
            if not next_button:
                print("No next page button found")
                break
            
            # Check if next button is disabled
            is_disabled = await browser.get_element_attribute(
                next_button[0], "class"
            )
            if "disabled" in is_disabled:
                print("Reached last page")
                break
            
            # Click next page
            await browser.click(".pagination .next")
            await browser.wait_for_load()
            page += 1
            
            # Limit to prevent infinite loops
            if page > 10:
                print("Reached page limit")
                break
        
        # Save results
        with open("products.json", "w") as f:
            json.dump(products, f, indent=2)
        
        print(f"Scraped {len(products)} products from {page} pages")
        await browser.screenshot("final_page.png")

asyncio.run(multi_page_workflow())

Parallel Processing

Process multiple URLs concurrently for better performance.

import openmcp
import asyncio
import aiohttp

async def process_url(url, semaphore):
    """Process a single URL with rate limiting."""
    async with semaphore:  # Limit concurrent requests
        try:
            async with openmcp.browser() as browser:
                await browser.navigate(url)
                
                # Extract data
                title = await browser.get_page_title()
                screenshot = await browser.screenshot()
                
                # Get page metrics
                load_time = await browser.get_load_time()
                
                return {
                    "url": url,
                    "title": title,
                    "load_time": load_time,
                    "screenshot_size": len(screenshot),
                    "status": "success"
                }
        except Exception as e:
            return {
                "url": url,
                "error": str(e),
                "status": "error"
            }

async def parallel_processing():
    """Process multiple URLs in parallel."""
    urls = [
        "https://example.com",
        "https://httpbin.org",
        "https://jsonplaceholder.typicode.com",
        "https://reqres.in",
        "https://postman-echo.com"
    ]
    
    # Limit to 3 concurrent browsers
    semaphore = asyncio.Semaphore(3)
    
    # Process all URLs concurrently
    tasks = [process_url(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    
    # Print results
    for result in results:
        if result["status"] == "success":
            print(f"✅ {result['url']}")
            print(f"   Title: {result['title']}")
            print(f"   Load time: {result['load_time']}ms")
        else:
            print(f"❌ {result['url']}: {result['error']}")
    
    return results

# Run parallel processing
asyncio.run(parallel_processing())

HTTP API Examples

cURL Examples

Use OpenMCP directly via HTTP API with cURL.

#!/bin/bash

# Set your API key
API_KEY="your-api-key-here"
BASE_URL="http://localhost:9000/api/v1"

# Create a browser session
SESSION_RESPONSE=$(curl -s -X POST \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"headless": true, "timeout": 30}' \
  "$BASE_URL/browseruse/create_session")

SESSION_ID=$(echo $SESSION_RESPONSE | jq -r '.session_id')
echo "Created session: $SESSION_ID"

# Navigate to a website
curl -X POST \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d "{\"session_id\": \"$SESSION_ID\", \"url\": \"https://example.com\"}" \
  "$BASE_URL/browseruse/navigate"

# Take a screenshot
SCREENSHOT_RESPONSE=$(curl -s -X POST \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d "{\"session_id\": \"$SESSION_ID\"}" \
  "$BASE_URL/browseruse/take_screenshot")

# Save screenshot to file
echo $SCREENSHOT_RESPONSE | jq -r '.screenshot' | base64 -d > screenshot.png
echo "Screenshot saved to screenshot.png"

# Close the session
curl -X POST \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d "{\"session_id\": \"$SESSION_ID\"}" \
  "$BASE_URL/browseruse/close_session"

echo "Session closed"

JavaScript/Node.js Example

Use OpenMCP from JavaScript applications.

const axios = require('axios');
const fs = require('fs');

class OpenMCPClient {
    constructor(apiKey, baseUrl = 'http://localhost:9000/api/v1') {
        this.apiKey = apiKey;
        this.baseUrl = baseUrl;
        this.headers = {
            'Authorization': `Bearer ${apiKey}`,
            'Content-Type': 'application/json'
        };
    }

    async createSession(options = {}) {
        const response = await axios.post(
            `${this.baseUrl}/browseruse/create_session`,
            { headless: true, timeout: 30, ...options },
            { headers: this.headers }
        );
        return response.data.session_id;
    }

    async navigate(sessionId, url) {
        const response = await axios.post(
            `${this.baseUrl}/browseruse/navigate`,
            { session_id: sessionId, url },
            { headers: this.headers }
        );
        return response.data;
    }

    async click(sessionId, selector) {
        const response = await axios.post(
            `${this.baseUrl}/browseruse/click_element`,
            { session_id: sessionId, selector },
            { headers: this.headers }
        );
        return response.data;
    }

    async type(sessionId, selector, text) {
        const response = await axios.post(
            `${this.baseUrl}/browseruse/type_text`,
            { session_id: sessionId, selector, text },
            { headers: this.headers }
        );
        return response.data;
    }

    async screenshot(sessionId, filename = null) {
        const response = await axios.post(
            `${this.baseUrl}/browseruse/take_screenshot`,
            { session_id: sessionId },
            { headers: this.headers }
        );
        
        const screenshotData = response.data.screenshot;
        
        if (filename) {
            const buffer = Buffer.from(screenshotData, 'base64');
            fs.writeFileSync(filename, buffer);
        }
        
        return screenshotData;
    }

    async closeSession(sessionId) {
        const response = await axios.post(
            `${this.baseUrl}/browseruse/close_session`,
            { session_id: sessionId },
            { headers: this.headers }
        );
        return response.data;
    }
}

// Example usage
async function main() {
    const client = new OpenMCPClient('your-api-key-here');
    
    try {
        // Create session
        const sessionId = await client.createSession();
        console.log('Created session:', sessionId);
        
        // Navigate and interact
        await client.navigate(sessionId, 'https://example.com');
        await client.screenshot(sessionId, 'example.png');
        
        console.log('Screenshot saved to example.png');
        
        // Close session
        await client.closeSession(sessionId);
        console.log('Session closed');
        
    } catch (error) {
        console.error('Error:', error.response?.data || error.message);
    }
}

main();

Best Practices

Always use context managers - Use async with openmcp.browser() to ensure sessions are properly closed.
Handle errors gracefully - Wrap operations in try-catch blocks and provide meaningful error messages.
Set appropriate timeouts - Configure timeouts based on your use case to avoid hanging operations.
Use headless mode in production - Headless browsers are faster and use fewer resources.
Implement rate limiting - Use semaphores or delays to avoid overwhelming target websites.
Respect robots.txt - Check website policies before scraping and follow ethical guidelines.