Universal workflow for building and running signal collectors in the Discovery Engine. Use when creating a new collector, running an existing collector, debugging collector failures, or understanding the collector architecture. Covers the 5-step workflow (Initialize, Fetch, Enrich, Convert, Persist) that all collectors follow.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: collector-framework description: Universal workflow for building and running signal collectors in the Discovery Engine. Use when creating a new collector, running an existing collector, debugging collector failures, or understanding the collector architecture. Covers the 5-step workflow (Initialize, Fetch, Enrich, Convert, Persist) that all collectors follow. allowed-tools:
- Bash
- Read
- Grep license: MIT metadata: version: 1.0.0 category: mcp-enhancement author: Press On Ventures
Collector Framework
Universal workflow pattern for building and running signal collectors.
When to Use This Skill
Use this skill when you want to:
- Run an existing collector via conversation (instead of CLI)
- Create a new collector (collector #11, #12, etc.)
- Debug collector failures (rate limits, API errors)
- Understand collector architecture (how signals are collected and stored)
Trigger phrases:
- "Run the SEC EDGAR collector"
- "Create a new collector for [source]"
- "Debug the GitHub collector"
- "Explain the collector workflow"
Quick Start
Run existing collector:
User: "Run the SEC EDGAR collector"
→ Guides you through execution with dry-run preview
Create new collector:
User: "Create a collector for Product Hunt"
→ Uses template_reference.md to build skeleton
Universal 5-Step Workflow
All collectors follow this pattern:
Step 1: INITIALIZE
Set up API client, authentication, rate limiting, and retry strategy.
Code Pattern:
class MyCollector(BaseCollector):
def __init__(self, store=None, api_key=None, lookback_days=30):
super().__init__(store=store, collector_name="my_collector")
self.api_key = api_key or os.environ.get("MY_API_KEY")
self.lookback_days = lookback_days
self._client = None # httpx.AsyncClient
BaseCollector provides:
rate_limiter- Automatic rate limiting viaget_rate_limiter(api_name)retry_config- Exponential backoff configurationtimeout_config- Operation-specific timeouts
Step 2: FETCH RAW DATA
Make rate-limited HTTP requests with pagination and error handling.
Code Pattern:
async def _fetch_raw_data(self) -> List[Dict]:
# Rate-limited request
await self.rate_limiter.acquire()
# HTTP GET with retry
response = await self._http_get(url, headers={...}, params={...})
# Parse response (JSON, XML, or feed)
data = response.json()
# Handle pagination
while len(items) < self.max_items and has_more:
page += 1
items.extend(await self._fetch_page(page))
return items
BaseCollector helpers:
_http_get()- HTTP GET with retry + rate limit_fetch_with_retry()- Generic retry wrapperwith_retry()- Decorator for exponential backoff
Step 3: ENRICH & PARSE
Extract structured data, normalize fields, classify by industry/sector.
Code Pattern:
def _enrich_item(self, raw_item: Dict) -> EnrichedData:
# Extract fields
company_name = raw_item.get("name")
website = raw_item.get("url")
# Normalize dates
event_date = parse_iso_date(raw_item.get("created_at"))
# Classify industry (SIC codes, topics, etc.)
industry_group = classify_industry(raw_item.get("sic_code"))
# Build canonical keys
canonical_keys = build_canonical_key_candidates(
domain_or_website=website,
fallback_company_name=company_name
)
return EnrichedData(...)
Utilities:
build_canonical_key_candidates()- Multi-candidate deduplicationcreate_provenance()- Glass.AI audit trailhash_response()- Change detection hash
Step 4: CONVERT TO SIGNALS
Transform enriched data to Signal objects with confidence scores.
Code Pattern:
def to_signal(self, enriched: EnrichedData) -> Signal:
# Calculate confidence
confidence = self._calculate_confidence(enriched)
# Create provenance
provenance = create_provenance(
source_url=enriched.url,
response_data=enriched.raw_data,
endpoint="/api/path",
query_params={...}
)
return Signal(
id=f"{self.SOURCE_TYPE}_{enriched.unique_id}",
signal_type="funding_event", # or incorporation, github_spike, etc.
confidence=confidence,
source_api=self.collector_name,
source_url=enriched.url,
source_response_hash=hash_response(enriched.raw_data),
detected_at=enriched.event_date,
retrieved_at=datetime.now(timezone.utc),
verification_status=VerificationStatus.SINGLE_SOURCE,
verified_by_sources=[self.collector_name],
raw_data={
**enriched.__dict__,
"canonical_key": enriched.canonical_keys[0],
"canonical_key_candidates": enriched.canonical_keys,
**provenance,
}
)
Confidence Formula Pattern:
def _calculate_confidence(self, data) -> float:
base = 0.7 # Base confidence by signal type
# Boosts
if is_target_sector(data): base += 0.15
if has_strong_signal(data): base += 0.1
if data_complete(data): base += 0.05
# Penalties
if age_days > 60: base -= 0.05
if missing_metadata(data): base -= 0.1
return min(1.0, max(0.0, base))
Step 5: PERSIST & DEDUPLICATE
Check suppression cache, deduplicate via canonical keys, save to database.
Code Pattern:
async def _collect_signals(self) -> List[Signal]:
# Steps 2-4: Fetch, enrich, convert
signals = []
for raw_item in await self._fetch_raw_data():
enriched = self._enrich_item(raw_item)
signal = enriched.to_signal()
signals.append(signal)
# Step 5: Deduplication (handled by BaseCollector.run())
return signals # BaseCollector._save_signals() will dedupe
BaseCollector handles:
- Extract
canonical_keyfromraw_data - Check run-level dedup (
_processed_canonical_keysset) - Check database dedup (
store.is_duplicate()) - Check suppression cache (
store.check_suppression()) - Save to database (
store.save_signal()) - Update stats (
signals_new,signals_suppressed)
Integration with MCP
The skill integrates with the internal MCP server:
# MCP prompt: run-collector
mcp__discovery-engine__run-collector(
collector="sec_edgar",
dry_run=true
)
Returns:
CollectorResult(
collector="sec_edgar",
status="SUCCESS", # or PARTIAL_SUCCESS, ERROR, SKIPPED
signals_found=18,
signals_new=15,
signals_suppressed=3,
error_message=None
)
Collector References
For collector-specific details (API endpoints, SIC codes, confidence formulas):
- SEC EDGAR - Form D filings, SIC classification
- GitHub - Trending repos, spike detection
- Companies House - UK incorporations, SIC 2007
Creating a New Collector
Use Template Reference as starting point:
- Copy template to
collectors/my_collector.py - Fill in API details, authentication, endpoints
- Implement 5-step workflow
- Add to
ALLOWED_COLLECTORSindiscovery_engine/mcp_server.py - Test with dry-run mode
Estimated time: 2-4 hours for experienced developer using template
Testing Collectors
Dry-Run Mode
python run_pipeline.py collect --collectors my_collector --dry-run
Validates:
- API authentication
- Pagination logic
- Signal conversion
- Canonical key building
Does NOT:
- Write to database
- Call Notion API
- Update suppression cache
Full Run
python run_pipeline.py collect --collectors my_collector
Performs:
- All dry-run validations
- Database writes
- Suppression cache checks
- Change detection (if
ENABLE_ASSET_STORE=true)
Error Handling Patterns
Network Errors (Retryable)
# Automatic retry via with_retry()
try:
response = await self._http_get(url)
except httpx.HTTPStatusError as e:
if e.response.status_code in [500, 502, 503, 504, 429]:
# with_retry() handles exponential backoff
raise # Will be retried
elif e.response.status_code == 404:
# Not found, skip gracefully
return None
else:
# Non-retryable error
raise
Rate Limits
# Automatic handling via rate_limiter
await self.rate_limiter.acquire() # Blocks if rate limit hit
# Manual handling for specific APIs
if response.status_code == 429:
reset_time = response.headers.get("X-RateLimit-Reset")
wait_seconds = int(reset_time) - time.time()
await asyncio.sleep(wait_seconds)
Graceful Degradation
# Continue on individual failures
for item in items:
try:
signal = self._process_item(item)
signals.append(signal)
except Exception as e:
self._errors.append(str(e))
logger.warning(f"Skipping item: {e}")
continue # Don't fail entire batch
Examples
- SEC EDGAR Session - Running SEC collector step-by-step
- Creating New Collector - Building collector #11
Success Criteria
You'll know this skill is working when:
- You can run any collector via conversation
- You can create a new collector in <4 hours using template
- Collector failures are easy to debug (clear error messages)
- All collectors follow the same 5-step pattern
Related Files
| File | Purpose |
|---|---|
collectors/base.py | BaseCollector class with helpers |
storage/signal_store.py | SignalStore for persistence |
verification/verification_gate_v2.py | Signal dataclass definition |
utils/canonical_keys.py | Deduplication utilities |
collectors/provenance.py | Glass.AI provenance tracking |
collectors/retry_strategy.py | Exponential backoff helpers |
More by nikhillinit
View allAutomated dependency management with security scanning, update orchestration, and compatibility validation
Reserve allocation and follow-on optimization for the Phoenix fund model. Use when working on deterministic reserve engine logic and optimal reserves ranking.
Waterfall ledger semantics and clawback behavior for the Phoenix VC fund model. Use when working on tier or ledger waterfall code, clawback behavior, or waterfall truth cases.
Run the Discovery Engine pipeline to find new consumer companies. Use when the user asks to "find deals", "source companies", "run the pipeline", "discover startups", or "search for prospects" in CPG, health tech, travel, or marketplaces.
