Skip to content

Development Guide

Prerequisites

  • Python 3.11 or higher
  • BrightData account and API token
  • Google Cloud Platform project with BigQuery enabled
  • Access to required BigQuery tables and GCS bucket

Local Development

1. Install Dependencies

pip install -r requirements.txt
# or using uv
uv sync --dev
uv pip install -e .

2. Set Environment Variables

export GCP_PROJECT_ID="cred-1556636033881"
export BRIGHTDATA="your-brightdata-token"
export WEBHOOK_URL="http://localhost:8081"  # For API service
export ENVIRONMENT="dev"

3. Run Services Locally

Run Coordinator Job

cd /path/to/repo
python -m src.priority_pipeline.jobs.coordinator

Run API Service

uvicorn src.priority_pipeline.api_main:app --host 0.0.0.0 --port 8080

Run Webhook Service

uvicorn src.priority_pipeline.webhook_main:app --host 0.0.0.0 --port 8081

4. Test Endpoints

Health Check

# API Service
curl http://localhost:8080/health

# Webhook Service
curl http://localhost:8081/health

Test Batch Trigger (API Service)

curl -X POST http://localhost:8080/batch-trigger/ \
  -H "Content-Type: application/json" \
  -d '{
    "message": {
      "data": "eyJwcm9maWxlcyI6IFt7ImxpbmtlZGluX3VzZXJuYW1lIjogInRlc3QtdXNlciIsICJwZXJzb25faWQiOiAidGVzdC1pZCJ9XX0=",
      "attributes": {"batch_number": "1", "total_batches": "1"}
    }
  }'

Development Workflow

Feature Development Process

  1. Create Feature Branch

       git checkout main
       git pull origin main
       git checkout -b feature/your-feature-name
    

  2. Develop and Test Locally

  3. Make your changes
  4. Test locally using the services above
  5. Run any unit tests

  6. Open Pull Request

       git push origin feature/your-feature-name
    

  7. Open PR to main branch
  8. This triggers test workflow
  9. No automatic deployment on PR

  10. Merge and Deploy

  11. Merge PR to main
  12. This triggers automatic deployment to production
  13. Or use manual workflow dispatch for staging

Testing Tips

Query Priority Profiles

Check how many profiles will be scraped:

SELECT COUNT(DISTINCT pi.identifierValue) as profile_count
FROM `credentity.PersonFields` pf
JOIN `credmodel_google.PersonIdentifier` pi 
  ON pf.personId = pi.personId
WHERE pf.isPriority = TRUE
  AND pi.identifierType = 'LINKEDIN'
  AND pi.identifierValue IS NOT NULL
  AND NOT EXISTS (
    SELECT 1 
    FROM `linkedin.LinkedinApiCall` lac
    WHERE lac.requestResource = pi.identifierValue
      AND lac.requestType = 'BRIGHTDATA_API_PERSON'
      AND lac.requestDate > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 25 DAY)
  );

Check Recent Scrapes

SELECT 
  requestResource,
  requestDate,
  requestStatus
FROM `linkedin.LinkedinApiCall`
WHERE requestType = 'BRIGHTDATA_API_PERSON'
  AND requestDate >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
ORDER BY requestDate DESC
LIMIT 10;

Verify GCS Backup

gsutil ls gs://brightdata-monthly-priority-people/ | tail -10

Common Issues

1. Coordinator Job Not Finding Profiles

Symptoms: Coordinator completes but publishes 0 batches

Solutions: - Check isPriority=TRUE records exist in PersonFields - Verify LinkedIn usernames exist in PersonIdentifier - Check refresh_window_days setting (25 days default) - Review SQL query in config.py

2. BrightData Not Delivering Data

Symptoms: Profiles triggered but no data in BigQuery

Solutions: - Verify WEBHOOK_URL is set correctly in API service - Check webhook service is deployed and accessible - Review BrightData dashboard for snapshot status - Check webhook service logs for errors

3. Webhook Service Failures

Symptoms: 500 errors in webhook service

Solutions: - Verify BigQuery table exists: linkedin.LinkedinApiCall - Check GCS bucket exists: brightdata-monthly-priority-people - Verify service account has write permissions - Review webhook service logs

4. Pub/Sub Messages Not Processed

Symptoms: API service not receiving batches

Solutions: - Check Pub/Sub subscription exists and is active - Verify API service is deployed and running - Check Pub/Sub dead letter queue - Review API service logs

Configuration

Pipeline Settings

Edit src/priority_pipeline/config.py:

# Batch configuration
batch_size = 20  # Profiles per batch

# Refresh window
refresh_window_days = 25  # Days to skip recently scraped profiles

# Rate limiting
batches_per_small_group = 100  # Batches before 5s delay
batches_per_large_group = 1000  # Batches before 5min delay

SQL Query Customization

Modify the priority profile query in config.py:

PRIORITY_PROFILES_QUERY = """
SELECT DISTINCT
  pi.identifierValue AS linkedin_username,
  pf.personId
FROM `credentity.PersonFields` pf
JOIN `credmodel_google.PersonIdentifier` pi 
  ON pf.personId = pi.personId
WHERE pf.isPriority = TRUE
  -- Add your custom filters here
"""

Best Practices

  1. Test with small batches first - Use batch_size = 5 for initial testing
  2. Monitor BigQuery costs - Priority queries can be expensive
  3. Check GCS backup regularly - Ensure backups are being created
  4. Review BrightData usage - Monitor API quota and costs
  5. Validate webhook delivery - Check that all triggered profiles get data