Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.scrapegraphai.com/llms.txt

Use this file to discover all available pages before exploring further.

Building Content & News Monitoring Systems

Learn how to build content aggregation systems and news monitoring platforms using ScrapeGraphAI.

Common Use Cases

  • News Aggregation: Collect and organize news from multiple sources
  • Blog Monitoring: Track multiple blogs for new content
  • Social Media Aggregation: Aggregate content from social media platforms
  • Industry News Tracking: Monitor industry-specific news and updates

Integration Examples

News Aggregator

from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
from scrapegraph_py import Client

# Schema for news article data
class NewsArticle(BaseModel):
    title: str = Field(description="Article title")
    content: str = Field(description="Article content")
    summary: Optional[str] = Field(description="Article summary")
    author: Optional[str] = Field(description="Article author")
    publication_date: str = Field(description="Publication date")
    source_url: str = Field(description="Source URL")
    category: Optional[str] = Field(description="Article category")
    tags: Optional[List[str]] = Field(description="Article tags")
    image_url: Optional[str] = Field(description="Featured image URL")

# Schema for news aggregation results
class NewsAggregationResult(BaseModel):
    articles: List[NewsArticle] = Field(description="List of aggregated articles")
    total_articles: int = Field(description="Total number of articles collected")
    sources: List[str] = Field(description="List of source URLs")
    timestamp: str = Field(description="Aggregation timestamp")

client = Client(api_key="your-api-key")

# Define news sources to aggregate
news_sources = [
    "https://news-site1.com",
    "https://news-site2.com",
    "https://news-site3.com"
]

# Aggregate news from multiple sources
aggregated_results = []

for source in news_sources:
    response = client.extract(
        url=source,
        prompt="Extract all news articles from the homepage, including title, content, author, publication date, category, and tags. Also extract featured images if available.",
        output_schema=NewsAggregationResult
    )
    aggregated_results.append(response)

# Process and display results
total_articles = sum(result.total_articles for result in aggregated_results)
print(f"Aggregated {total_articles} articles from {len(news_sources)} sources\n")

for result in aggregated_results:
    print(f"Source: {result.sources[0]}")
    print(f"Articles: {result.total_articles}")
    
    for article in result.articles:
        print(f"\nTitle: {article.title}")
        print(f"Author: {article.author or 'Unknown'}")
        print(f"Date: {article.publication_date}")
        if article.category:
            print(f"Category: {article.category}")
        if article.tags:
            print(f"Tags: {', '.join(article.tags)}")

Blog Content Monitor

from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, timedelta
from scrapegraph_py import Client
import time  # For job polling

# Schema for blog post data
class BlogPost(BaseModel):
    title: str = Field(description="Blog post title")
    content: str = Field(description="Blog post content")
    excerpt: Optional[str] = Field(description="Post excerpt or summary")
    author: str = Field(description="Post author")
    publication_date: str = Field(description="Publication date")
    url: str = Field(description="Post URL")
    categories: Optional[List[str]] = Field(description="Post categories")
    tags: Optional[List[str]] = Field(description="Post tags")
    comments_count: Optional[int] = Field(description="Number of comments")
    reading_time: Optional[int] = Field(description="Estimated reading time in minutes")

# Schema for blog monitoring results
class BlogMonitorResult(BaseModel):
    posts: List[BlogPost] = Field(description="List of blog posts")
    total_posts: int = Field(description="Total number of posts found")
    blog_url: str = Field(description="Blog homepage URL")
    last_updated: str = Field(description="Last monitoring timestamp")

client = Client(api_key="your-api-key")

# Start the crawler job
job = client.crawl.start(
    url="https://example-blog.com",
    depth=2,  # Crawl up to 2 levels deep
    include_patterns=["/blog/*"],
    exclude_patterns=["/tag/*", "/author/*"]
)

# Wait for job completion and get results
job_id = job["id"]
while True:
    status = client.crawl.status(job_id)
    if status.get("status") == "completed":
        response = status.get("data", {})
        break
    elif status.get("status") in ["failed", "cancelled", "error"]:
        print(f"Job failed: {status.get('error')}")
        break
    time.sleep(5)  # Wait 5 seconds before checking again

# Process the crawled content if successful
if response and response.get("pages"):
    print(f"Total Pages Found: {len(response['pages'])}")

    for post in response["pages"]:
        # Check if post is recent
        post_date = datetime.strptime(post["publication_date"], "%Y-%m-%d")
        if post_date > datetime.now() - timedelta(days=7):
            print(f"Title: {post['title']}")
            print(f"Author: {post.get('author', 'Unknown')}")
            print(f"Published: {post['publication_date']}")
            print(f"Reading Time: {post.get('reading_time', 'N/A')} minutes")
            if post.get("categories"):
                print(f"Categories: {', '.join(post['categories'])}")
            if post.get("tags"):
                print(f"Tags: {', '.join(post['tags'])}")
            if post.get("excerpt"):
                print(f"\nExcerpt: {post['excerpt']}")
            print(f"URL: {post['url']}\n")

Best Practices

  1. Content Freshness: Implement appropriate monitoring intervals for different content types
  2. Deduplication: Maintain a system to avoid duplicate content
  3. Content Storage: Use efficient storage solutions for historical content
  4. Error Handling: Implement robust error handling for failed scraping attempts
  5. Rate Limiting: Respect source websites’ rate limits and implement appropriate delays
  6. Content Attribution: Always maintain and display proper attribution for aggregated content