Documentation Index
Fetch the complete documentation index at: https://docs.scrapegraphai.com/llms.txt
Use this file to discover all available pages before exploring further.
Building Content & News Monitoring Systems
Learn how to build content aggregation systems and news monitoring platforms using ScrapeGraphAI.
Common Use Cases
- News Aggregation: Collect and organize news from multiple sources
- Blog Monitoring: Track multiple blogs for new content
- Social Media Aggregation: Aggregate content from social media platforms
- Industry News Tracking: Monitor industry-specific news and updates
Integration Examples
News Aggregator
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from scrapegraph_py import Client
# Schema for news article data
class NewsArticle(BaseModel):
title: str = Field(description="Article title")
content: str = Field(description="Article content")
summary: Optional[str] = Field(description="Article summary")
author: Optional[str] = Field(description="Article author")
publication_date: str = Field(description="Publication date")
source_url: str = Field(description="Source URL")
category: Optional[str] = Field(description="Article category")
tags: Optional[List[str]] = Field(description="Article tags")
image_url: Optional[str] = Field(description="Featured image URL")
# Schema for news aggregation results
class NewsAggregationResult(BaseModel):
articles: List[NewsArticle] = Field(description="List of aggregated articles")
total_articles: int = Field(description="Total number of articles collected")
sources: List[str] = Field(description="List of source URLs")
timestamp: str = Field(description="Aggregation timestamp")
client = Client(api_key="your-api-key")
# Define news sources to aggregate
news_sources = [
"https://news-site1.com",
"https://news-site2.com",
"https://news-site3.com"
]
# Aggregate news from multiple sources
aggregated_results = []
for source in news_sources:
response = client.extract(
url=source,
prompt="Extract all news articles from the homepage, including title, content, author, publication date, category, and tags. Also extract featured images if available.",
output_schema=NewsAggregationResult
)
aggregated_results.append(response)
# Process and display results
total_articles = sum(result.total_articles for result in aggregated_results)
print(f"Aggregated {total_articles} articles from {len(news_sources)} sources\n")
for result in aggregated_results:
print(f"Source: {result.sources[0]}")
print(f"Articles: {result.total_articles}")
for article in result.articles:
print(f"\nTitle: {article.title}")
print(f"Author: {article.author or 'Unknown'}")
print(f"Date: {article.publication_date}")
if article.category:
print(f"Category: {article.category}")
if article.tags:
print(f"Tags: {', '.join(article.tags)}")
Blog Content Monitor
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, timedelta
from scrapegraph_py import Client
import time # For job polling
# Schema for blog post data
class BlogPost(BaseModel):
title: str = Field(description="Blog post title")
content: str = Field(description="Blog post content")
excerpt: Optional[str] = Field(description="Post excerpt or summary")
author: str = Field(description="Post author")
publication_date: str = Field(description="Publication date")
url: str = Field(description="Post URL")
categories: Optional[List[str]] = Field(description="Post categories")
tags: Optional[List[str]] = Field(description="Post tags")
comments_count: Optional[int] = Field(description="Number of comments")
reading_time: Optional[int] = Field(description="Estimated reading time in minutes")
# Schema for blog monitoring results
class BlogMonitorResult(BaseModel):
posts: List[BlogPost] = Field(description="List of blog posts")
total_posts: int = Field(description="Total number of posts found")
blog_url: str = Field(description="Blog homepage URL")
last_updated: str = Field(description="Last monitoring timestamp")
client = Client(api_key="your-api-key")
# Start the crawler job
job = client.crawl.start(
url="https://example-blog.com",
depth=2, # Crawl up to 2 levels deep
include_patterns=["/blog/*"],
exclude_patterns=["/tag/*", "/author/*"]
)
# Wait for job completion and get results
job_id = job["id"]
while True:
status = client.crawl.status(job_id)
if status.get("status") == "completed":
response = status.get("data", {})
break
elif status.get("status") in ["failed", "cancelled", "error"]:
print(f"Job failed: {status.get('error')}")
break
time.sleep(5) # Wait 5 seconds before checking again
# Process the crawled content if successful
if response and response.get("pages"):
print(f"Total Pages Found: {len(response['pages'])}")
for post in response["pages"]:
# Check if post is recent
post_date = datetime.strptime(post["publication_date"], "%Y-%m-%d")
if post_date > datetime.now() - timedelta(days=7):
print(f"Title: {post['title']}")
print(f"Author: {post.get('author', 'Unknown')}")
print(f"Published: {post['publication_date']}")
print(f"Reading Time: {post.get('reading_time', 'N/A')} minutes")
if post.get("categories"):
print(f"Categories: {', '.join(post['categories'])}")
if post.get("tags"):
print(f"Tags: {', '.join(post['tags'])}")
if post.get("excerpt"):
print(f"\nExcerpt: {post['excerpt']}")
print(f"URL: {post['url']}\n")
Best Practices
- Content Freshness: Implement appropriate monitoring intervals for different content types
- Deduplication: Maintain a system to avoid duplicate content
- Content Storage: Use efficient storage solutions for historical content
- Error Handling: Implement robust error handling for failed scraping attempts
- Rate Limiting: Respect source websites’ rate limits and implement appropriate delays
- Content Attribution: Always maintain and display proper attribution for aggregated content