Examples
Practical examples and code snippets to get you started with OpenMCP
Quick Examples
Simple Screenshot
Take a screenshot of any website with just one line of code.
import openmcp
import asyncio
# One-liner screenshot
asyncio.run(openmcp.screenshot("https://example.com", "example.png"))
Basic Navigation
Navigate to a website and interact with elements.
import openmcp
import asyncio
async def basic_navigation():
async with openmcp.browser() as browser:
# Navigate to website
await browser.navigate("https://httpbin.org/forms/post")
# Fill out form
await browser.type("input[name='custname']", "John Doe")
await browser.type("input[name='custemail']", "john@example.com")
# Click submit
await browser.click("input[type='submit']")
# Take screenshot of result
await browser.screenshot("form_result.png")
asyncio.run(basic_navigation())
Finding Elements
Find and interact with specific elements on a page.
import openmcp
import asyncio
async def find_elements():
async with openmcp.browser() as browser:
await browser.navigate("https://example.com")
# Find all links
links = await browser.find_elements("a")
print(f"Found {len(links)} links")
# Find specific element by ID
header = await browser.find_elements("#header", by="id")
# Find by XPath
buttons = await browser.find_elements("//button[@type='submit']", by="xpath")
# Click first button if found
if buttons:
await browser.click("//button[@type='submit']", by="xpath")
asyncio.run(find_elements())
Web Scraping Examples
News Headlines Scraper
Scrape news headlines from a website.
import openmcp
import asyncio
async def scrape_news():
async with openmcp.browser() as browser:
await browser.navigate("https://news.ycombinator.com")
# Find all story titles
titles = await browser.find_elements(".titleline > a")
headlines = []
for i, title in enumerate(titles[:10]): # Get top 10
text = await browser.get_element_text(title)
href = await browser.get_element_attribute(title, "href")
headlines.append({
"title": text,
"url": href,
"rank": i + 1
})
# Print results
for headline in headlines:
print(f"{headline['rank']}. {headline['title']}")
print(f" URL: {headline['url']}\n")
# Take screenshot
await browser.screenshot("hackernews.png")
asyncio.run(scrape_news())
Price Monitor
Monitor product prices on e-commerce sites.
import openmcp
import asyncio
import re
async def monitor_price(product_url, price_selector):
async with openmcp.browser() as browser:
await browser.navigate(product_url)
# Wait for price element to load
await browser.wait_for_element(price_selector)
# Get price text
price_element = await browser.find_elements(price_selector)
if price_element:
price_text = await browser.get_element_text(price_element[0])
# Extract numeric price
price_match = re.search(r'[\d,]+\.?\d*', price_text)
if price_match:
price = float(price_match.group().replace(',', ''))
print(f"Current price: ${price}")
# Take screenshot for record
await browser.screenshot(f"price_check_{price}.png")
return price
return None
# Example usage
async def main():
# Monitor a product (example selectors)
price = await monitor_price(
"https://example-store.com/product/123",
".price-current"
)
if price and price < 100: # Alert if price drops below $100
print("🚨 Price alert! Product is now under $100")
asyncio.run(main())
Testing Examples
Login Flow Test
Test a complete login flow with error handling.
import openmcp
import asyncio
async def test_login(username, password):
async with openmcp.browser(headless=False) as browser:
try:
# Navigate to login page
await browser.navigate("https://example.com/login")
# Fill login form
await browser.type("input[name='username']", username)
await browser.type("input[name='password']", password)
# Click login button
await browser.click("button[type='submit']")
# Wait for either success or error
try:
# Check for successful login (dashboard page)
await browser.wait_for_url("https://example.com/dashboard", timeout=5)
print("✅ Login successful!")
await browser.screenshot("login_success.png")
return True
except TimeoutError:
# Check for error message
error_elements = await browser.find_elements(".error-message")
if error_elements:
error_text = await browser.get_element_text(error_elements[0])
print(f"❌ Login failed: {error_text}")
await browser.screenshot("login_error.png")
else:
print("❌ Login failed: Unknown error")
await browser.screenshot("login_unknown_error.png")
return False
except Exception as e:
print(f"❌ Test failed with exception: {e}")
await browser.screenshot("login_exception.png")
return False
# Run test
async def main():
success = await test_login("testuser", "testpass")
if success:
print("Login test passed!")
else:
print("Login test failed!")
asyncio.run(main())
Form Validation Test
Test form validation with various input scenarios.
import openmcp
import asyncio
async def test_form_validation():
test_cases = [
{"email": "", "expected_error": "Email is required"},
{"email": "invalid-email", "expected_error": "Invalid email format"},
{"email": "test@example.com", "expected_error": None},
]
async with openmcp.browser() as browser:
for i, test_case in enumerate(test_cases):
print(f"Running test case {i+1}: {test_case}")
# Navigate to form
await browser.navigate("https://example.com/signup")
# Clear and fill email field
await browser.clear("input[name='email']")
if test_case["email"]:
await browser.type("input[name='email']", test_case["email"])
# Submit form
await browser.click("button[type='submit']")
# Check for validation error
error_elements = await browser.find_elements(".email-error")
if test_case["expected_error"]:
# Should have error
if error_elements:
error_text = await browser.get_element_text(error_elements[0])
if test_case["expected_error"] in error_text:
print(f"✅ Test case {i+1} passed: Found expected error")
else:
print(f"❌ Test case {i+1} failed: Wrong error message")
print(f" Expected: {test_case['expected_error']}")
print(f" Got: {error_text}")
else:
print(f"❌ Test case {i+1} failed: No error message found")
else:
# Should not have error
if not error_elements:
print(f"✅ Test case {i+1} passed: No error as expected")
else:
error_text = await browser.get_element_text(error_elements[0])
print(f"❌ Test case {i+1} failed: Unexpected error: {error_text}")
# Take screenshot for each test case
await browser.screenshot(f"form_test_case_{i+1}.png")
asyncio.run(test_form_validation())
AI Integration Examples
Cursor Web Helper
A helper class for Cursor AI to perform web automation tasks.
import openmcp
import os
import asyncio
class CursorWebHelper:
"""Helper class for Cursor AI to interact with websites."""
def __init__(self, api_key=None):
self.api_key = api_key or os.getenv("OPENMCP_API_KEY")
self.session = None
async def start_browser(self, headless=True):
"""Start a browser session."""
self.mcp = openmcp.MCP("browseruse", api_key=self.api_key)
self.session = await self.mcp.create_session(headless=headless)
return self.session.session_id
async def navigate(self, url):
"""Navigate to a URL."""
if not self.session:
await self.start_browser()
return await self.session.navigate(url)
async def click(self, selector):
"""Click an element."""
return await self.session.click(selector)
async def type_text(self, selector, text):
"""Type text into an element."""
return await self.session.type(selector, text)
async def screenshot(self, filename=None):
"""Take a screenshot."""
screenshot_data = await self.session.screenshot()
if filename:
import base64
with open(filename, "wb") as f:
f.write(base64.b64decode(screenshot_data))
return screenshot_data
async def get_page_text(self):
"""Get all text content from the page."""
return await self.session.get_page_text()
async def close(self):
"""Close the browser session."""
if self.session:
await self.session.close()
# Example usage for Cursor AI
async def cursor_example():
helper = CursorWebHelper()
try:
await helper.start_browser()
await helper.navigate("https://example.com")
await helper.screenshot("page.png")
# Let Cursor AI generate automation code here
# For example: "Fill out the contact form"
await helper.type_text("input[name='name']", "AI Assistant")
await helper.type_text("input[name='email']", "ai@example.com")
await helper.click("button[type='submit']")
await helper.screenshot("after_submit.png")
finally:
await helper.close()
# For Cursor: You can ask it to generate automation code using this helper
if __name__ == "__main__":
asyncio.run(cursor_example())
Claude Desktop MCP Integration
Configure Claude Desktop to use OpenMCP for web automation.
# Add to Claude Desktop MCP configuration
{
"mcpServers": {
"openmcp": {
"command": "openmcp",
"args": ["serve", "--protocol", "mcp"],
"env": {
"OPENMCP_API_KEY": "your-api-key-here"
}
}
}
}
Then Claude can use these tools directly:
# Claude can now use these MCP tools:
# - create_browser_session
# - navigate
# - find_elements
# - click_element
# - type_text
# - take_screenshot
# - close_session
# Example Claude conversation:
# User: "Please take a screenshot of example.com"
# Claude: I'll help you take a screenshot of example.com
# 1. First, I'll create a browser session
create_browser_session({"headless": true})
# 2. Navigate to the website
navigate({
"session_id": "session_123",
"url": "https://example.com"
})
# 3. Take a screenshot
take_screenshot({"session_id": "session_123"})
# 4. Close the session
close_session({"session_id": "session_123"})
Advanced Examples
Multi-page Workflow
Navigate through multiple pages and collect data.
import openmcp
import asyncio
import json
async def multi_page_workflow():
"""Example: Scrape product catalog across multiple pages."""
products = []
async with openmcp.browser() as browser:
# Start from category page
await browser.navigate("https://example-store.com/category/electronics")
page = 1
while True:
print(f"Scraping page {page}...")
# Get products on current page
product_elements = await browser.find_elements(".product-item")
if not product_elements:
print("No more products found")
break
# Extract product data
for element in product_elements:
name = await browser.get_element_text(
element, ".product-name"
)
price = await browser.get_element_text(
element, ".product-price"
)
link = await browser.get_element_attribute(
element, "a", "href"
)
products.append({
"name": name,
"price": price,
"link": link,
"page": page
})
# Try to go to next page
next_button = await browser.find_elements(".pagination .next")
if not next_button:
print("No next page button found")
break
# Check if next button is disabled
is_disabled = await browser.get_element_attribute(
next_button[0], "class"
)
if "disabled" in is_disabled:
print("Reached last page")
break
# Click next page
await browser.click(".pagination .next")
await browser.wait_for_load()
page += 1
# Limit to prevent infinite loops
if page > 10:
print("Reached page limit")
break
# Save results
with open("products.json", "w") as f:
json.dump(products, f, indent=2)
print(f"Scraped {len(products)} products from {page} pages")
await browser.screenshot("final_page.png")
asyncio.run(multi_page_workflow())
Parallel Processing
Process multiple URLs concurrently for better performance.
import openmcp
import asyncio
import aiohttp
async def process_url(url, semaphore):
"""Process a single URL with rate limiting."""
async with semaphore: # Limit concurrent requests
try:
async with openmcp.browser() as browser:
await browser.navigate(url)
# Extract data
title = await browser.get_page_title()
screenshot = await browser.screenshot()
# Get page metrics
load_time = await browser.get_load_time()
return {
"url": url,
"title": title,
"load_time": load_time,
"screenshot_size": len(screenshot),
"status": "success"
}
except Exception as e:
return {
"url": url,
"error": str(e),
"status": "error"
}
async def parallel_processing():
"""Process multiple URLs in parallel."""
urls = [
"https://example.com",
"https://httpbin.org",
"https://jsonplaceholder.typicode.com",
"https://reqres.in",
"https://postman-echo.com"
]
# Limit to 3 concurrent browsers
semaphore = asyncio.Semaphore(3)
# Process all URLs concurrently
tasks = [process_url(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
# Print results
for result in results:
if result["status"] == "success":
print(f"✅ {result['url']}")
print(f" Title: {result['title']}")
print(f" Load time: {result['load_time']}ms")
else:
print(f"❌ {result['url']}: {result['error']}")
return results
# Run parallel processing
asyncio.run(parallel_processing())
HTTP API Examples
cURL Examples
Use OpenMCP directly via HTTP API with cURL.
#!/bin/bash
# Set your API key
API_KEY="your-api-key-here"
BASE_URL="http://localhost:9000/api/v1"
# Create a browser session
SESSION_RESPONSE=$(curl -s -X POST \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"headless": true, "timeout": 30}' \
"$BASE_URL/browseruse/create_session")
SESSION_ID=$(echo $SESSION_RESPONSE | jq -r '.session_id')
echo "Created session: $SESSION_ID"
# Navigate to a website
curl -X POST \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{\"session_id\": \"$SESSION_ID\", \"url\": \"https://example.com\"}" \
"$BASE_URL/browseruse/navigate"
# Take a screenshot
SCREENSHOT_RESPONSE=$(curl -s -X POST \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{\"session_id\": \"$SESSION_ID\"}" \
"$BASE_URL/browseruse/take_screenshot")
# Save screenshot to file
echo $SCREENSHOT_RESPONSE | jq -r '.screenshot' | base64 -d > screenshot.png
echo "Screenshot saved to screenshot.png"
# Close the session
curl -X POST \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{\"session_id\": \"$SESSION_ID\"}" \
"$BASE_URL/browseruse/close_session"
echo "Session closed"
JavaScript/Node.js Example
Use OpenMCP from JavaScript applications.
const axios = require('axios');
const fs = require('fs');
class OpenMCPClient {
constructor(apiKey, baseUrl = 'http://localhost:9000/api/v1') {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
this.headers = {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json'
};
}
async createSession(options = {}) {
const response = await axios.post(
`${this.baseUrl}/browseruse/create_session`,
{ headless: true, timeout: 30, ...options },
{ headers: this.headers }
);
return response.data.session_id;
}
async navigate(sessionId, url) {
const response = await axios.post(
`${this.baseUrl}/browseruse/navigate`,
{ session_id: sessionId, url },
{ headers: this.headers }
);
return response.data;
}
async click(sessionId, selector) {
const response = await axios.post(
`${this.baseUrl}/browseruse/click_element`,
{ session_id: sessionId, selector },
{ headers: this.headers }
);
return response.data;
}
async type(sessionId, selector, text) {
const response = await axios.post(
`${this.baseUrl}/browseruse/type_text`,
{ session_id: sessionId, selector, text },
{ headers: this.headers }
);
return response.data;
}
async screenshot(sessionId, filename = null) {
const response = await axios.post(
`${this.baseUrl}/browseruse/take_screenshot`,
{ session_id: sessionId },
{ headers: this.headers }
);
const screenshotData = response.data.screenshot;
if (filename) {
const buffer = Buffer.from(screenshotData, 'base64');
fs.writeFileSync(filename, buffer);
}
return screenshotData;
}
async closeSession(sessionId) {
const response = await axios.post(
`${this.baseUrl}/browseruse/close_session`,
{ session_id: sessionId },
{ headers: this.headers }
);
return response.data;
}
}
// Example usage
async function main() {
const client = new OpenMCPClient('your-api-key-here');
try {
// Create session
const sessionId = await client.createSession();
console.log('Created session:', sessionId);
// Navigate and interact
await client.navigate(sessionId, 'https://example.com');
await client.screenshot(sessionId, 'example.png');
console.log('Screenshot saved to example.png');
// Close session
await client.closeSession(sessionId);
console.log('Session closed');
} catch (error) {
console.error('Error:', error.response?.data || error.message);
}
}
main();
Best Practices
async with openmcp.browser()
to ensure sessions are properly closed.