Spaces:
Running
Running
| """CVE Dashboard - Real-time vulnerability monitoring with NVD API and LLM-powered audience customization.""" | |
| import os | |
| import json | |
| import time | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Tuple | |
| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import requests | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Audience profiles for tailored CVE descriptions | |
| AUDIENCE_PROFILES = { | |
| "Cybersecurity Professional": { | |
| "focus": "threat assessment, attack vectors, mitigation strategies, and security controls", | |
| "tone": "technical and precise", | |
| "priorities": ["exploitation methods", "defensive measures", "risk assessment", "compliance implications"] | |
| }, | |
| "Data Scientist": { | |
| "focus": "data exposure risks, model vulnerabilities, and statistical analysis implications", | |
| "tone": "analytical and research-oriented", | |
| "priorities": ["data integrity", "model security", "pipeline vulnerabilities", "privacy concerns"] | |
| }, | |
| "Data Engineer": { | |
| "focus": "infrastructure vulnerabilities, data pipeline security, and system architecture impacts", | |
| "tone": "technical with infrastructure emphasis", | |
| "priorities": ["database security", "ETL vulnerabilities", "infrastructure risks", "data flow security"] | |
| }, | |
| "Full-Stack Developer": { | |
| "focus": "code vulnerabilities, dependency risks, and implementation fixes", | |
| "tone": "practical and code-oriented", | |
| "priorities": ["code examples", "library updates", "patch implementation", "secure coding practices"] | |
| }, | |
| "Product Owner": { | |
| "focus": "business impact, user experience, and prioritization for backlog", | |
| "tone": "business-oriented with technical context", | |
| "priorities": ["user impact", "feature implications", "timeline considerations", "resource requirements"] | |
| }, | |
| "Manager": { | |
| "focus": "business risk, resource allocation, and strategic implications", | |
| "tone": "executive summary style", | |
| "priorities": ["business impact", "cost implications", "team requirements", "timeline urgency"] | |
| } | |
| } | |
| class CVEDashboard: | |
| """Main CVE Dashboard application class.""" | |
| def __init__(self): | |
| """Initialize the CVE Dashboard.""" | |
| self.api_key = os.getenv('NVD_API_KEY') | |
| self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" | |
| self.headers = {'apiKey': self.api_key} if self.api_key else {} | |
| self.cache = {} | |
| self.last_request_time = 0 | |
| self.rate_limit_delay = 0.7 if self.api_key else 6 # seconds between requests | |
| # HuggingFace token - try environment first | |
| self.hf_token = os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN') | |
| def _rate_limit(self): | |
| """Implement rate limiting for NVD API.""" | |
| current_time = time.time() | |
| time_since_last = current_time - self.last_request_time | |
| if time_since_last < self.rate_limit_delay: | |
| time.sleep(self.rate_limit_delay - time_since_last) | |
| self.last_request_time = time.time() | |
| def fetch_cves(self, | |
| year: int, | |
| keyword: Optional[str] = None, | |
| severity: Optional[str] = None, | |
| results_per_page: int = 2000) -> Tuple[List[Dict], str]: | |
| """ | |
| Fetch CVEs from NVD API for a specific year, handling the 120-day range limit | |
| and ensuring the date range does not extend into the future. | |
| Args: | |
| year: The year to fetch CVEs for. | |
| keyword: Optional keyword to search | |
| severity: Optional severity filter (LOW, MEDIUM, HIGH, CRITICAL) | |
| results_per_page: Number of results per page (max 2000) | |
| Returns: | |
| Tuple of (list of CVEs, status message) | |
| """ | |
| try: | |
| all_vulnerabilities = [] | |
| now = datetime.now() | |
| year_start = datetime(year, 1, 1) | |
| # If the selected year is the current year, end the search today. | |
| # Otherwise, use the end of the selected year. | |
| if year == now.year: | |
| year_end = now | |
| else: | |
| year_end = datetime(year, 12, 31, 23, 59, 59) | |
| current_start = year_start | |
| while current_start < year_end: | |
| self._rate_limit() | |
| # Calculate the end of the chunk, respecting the 120-day limit | |
| chunk_end = min(current_start + timedelta(days=119), year_end) | |
| logger.info(f"Fetching CVEs from {current_start.date()} to {chunk_end.date()}") | |
| # Format dates with timezone information (Z for UTC) | |
| start_date_str = current_start.strftime('%Y-%m-%dT%H:%M:%S.000Z') | |
| end_date_str = chunk_end.strftime('%Y-%m-%dT%H:%M:%S.999Z') | |
| params = { | |
| 'pubStartDate': start_date_str, | |
| 'pubEndDate': end_date_str, | |
| 'resultsPerPage': min(results_per_page, 2000) | |
| } | |
| if keyword: | |
| params['keywordSearch'] = keyword | |
| response = requests.get( | |
| self.base_url, | |
| headers=self.headers, | |
| params=params, | |
| timeout=30 | |
| ) | |
| # Handle different error scenarios | |
| if response.status_code == 404: | |
| logger.warning(f"No data found for date range {current_start.date()} to {chunk_end.date()}") | |
| # Move to the next chunk and continue | |
| current_start = chunk_end + timedelta(days=1) | |
| continue | |
| elif response.status_code != 200: | |
| response.raise_for_status() | |
| data = response.json() | |
| vulnerabilities = data.get('vulnerabilities', []) | |
| all_vulnerabilities.extend(vulnerabilities) | |
| # Move to the next chunk | |
| current_start = chunk_end + timedelta(days=1) | |
| # Process and filter all aggregated CVEs | |
| processed_cves = [] | |
| for vuln in all_vulnerabilities: | |
| cve = self._process_cve(vuln.get('cve', {})) | |
| if severity and cve['severity'] != severity: | |
| continue | |
| processed_cves.append(cve) | |
| if not processed_cves: | |
| return [], f"No CVEs found for year {year}" + (f" matching '{keyword}'" if keyword else "") + (f" with {severity} severity" if severity else "") | |
| status = f"β Fetched {len(processed_cves)} CVEs from the year {year}" | |
| if keyword: | |
| status += f" matching '{keyword}'" | |
| if severity: | |
| status += f" with {severity} severity" | |
| return processed_cves, status | |
| except requests.exceptions.RequestException as e: | |
| error_details = "" | |
| if e.response is not None: | |
| try: | |
| error_data = e.response.json() | |
| error_details = f" - {error_data.get('message', e.response.text)}" | |
| except json.JSONDecodeError: | |
| error_details = f" - Status: {e.response.status_code}, Response: {e.response.text[:200]}" | |
| return [], f"β API Error: {str(e)}{error_details}" | |
| except Exception as e: | |
| return [], f"β Error: {str(e)}" | |
| def _process_cve(self, cve_data: Dict) -> Dict: | |
| """Process raw CVE data into a structured format.""" | |
| cve_id = cve_data.get('id', 'Unknown') | |
| # Extract description | |
| descriptions = cve_data.get('descriptions', []) | |
| description = next( | |
| (d['value'] for d in descriptions if d.get('lang') == 'en'), | |
| 'No description available' | |
| ) | |
| # Extract CVSS metrics and severity | |
| metrics = cve_data.get('metrics', {}) | |
| cvss_data = {} | |
| severity = 'UNKNOWN' | |
| score = 0.0 | |
| # Try CVSS 3.1 first, then 3.0, then 2.0 | |
| for cvss_version in ['cvssMetricV31', 'cvssMetricV30', 'cvssMetricV2']: | |
| if cvss_version in metrics and metrics[cvss_version]: | |
| metric = metrics[cvss_version][0] | |
| cvss_data = metric.get('cvssData', {}) | |
| score = cvss_data.get('baseScore', 0.0) | |
| severity = cvss_data.get('baseSeverity', 'UNKNOWN') | |
| break | |
| # Extract references | |
| references = cve_data.get('references', []) | |
| ref_urls = [ref.get('url', '') for ref in references[:5]] # Limit to 5 refs | |
| # Extract dates | |
| published = cve_data.get('published', '') | |
| modified = cve_data.get('lastModified', '') | |
| return { | |
| 'id': cve_id, | |
| 'description': description, # Keep full description for LLM processing | |
| 'display_description': description[:500] + '...' if len(description) > 500 else description, | |
| 'severity': severity, | |
| 'score': score, | |
| 'published': published[:10] if published else 'Unknown', | |
| 'modified': modified[:10] if modified else 'Unknown', | |
| 'references': ref_urls, | |
| 'cvss_version': cvss_data.get('version', 'Unknown'), | |
| 'vector_string': cvss_data.get('vectorString', 'N/A') | |
| } | |
| def create_severity_chart(self, cves: List[Dict]) -> go.Figure: | |
| """Create a pie chart of CVE severities.""" | |
| if not cves: | |
| fig = go.Figure() | |
| fig.add_annotation(text="No data available", | |
| xref="paper", yref="paper", | |
| x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| severity_counts = pd.DataFrame(cves)['severity'].value_counts() | |
| colors = { | |
| 'CRITICAL': '#d32f2f', | |
| 'HIGH': '#f57c00', | |
| 'MEDIUM': '#fbc02d', | |
| 'LOW': '#388e3c', | |
| 'UNKNOWN': '#9e9e9e' | |
| } | |
| fig = px.pie( | |
| values=severity_counts.values, | |
| names=severity_counts.index, | |
| title="CVE Distribution by Severity", | |
| color=severity_counts.index, | |
| color_discrete_map=colors | |
| ) | |
| fig.update_traces(textposition='inside', textinfo='percent+label') | |
| fig.update_layout(height=400) | |
| return fig | |
| def create_timeline_chart(self, cves: List[Dict]) -> go.Figure: | |
| """Create a timeline chart of CVE publications.""" | |
| if not cves: | |
| fig = go.Figure() | |
| fig.add_annotation(text="No data available", | |
| xref="paper", yref="paper", | |
| x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| df = pd.DataFrame(cves) | |
| df['published'] = pd.to_datetime(df['published']) | |
| # Group by date and severity | |
| timeline_data = df.groupby([df['published'].dt.date, 'severity']).size().reset_index(name='count') | |
| fig = px.bar( | |
| timeline_data, | |
| x='published', | |
| y='count', | |
| color='severity', | |
| title="CVE Publications Timeline", | |
| color_discrete_map={ | |
| 'CRITICAL': '#d32f2f', | |
| 'HIGH': '#f57c00', | |
| 'MEDIUM': '#fbc02d', | |
| 'LOW': '#388e3c', | |
| 'UNKNOWN': '#9e9e9e' | |
| } | |
| ) | |
| fig.update_layout( | |
| xaxis_title="Publication Date", | |
| yaxis_title="Number of CVEs", | |
| height=400, | |
| hovermode='x unified' | |
| ) | |
| return fig | |
| def create_score_distribution(self, cves: List[Dict]) -> go.Figure: | |
| """Create a histogram of CVSS scores.""" | |
| if not cves: | |
| fig = go.Figure() | |
| fig.add_annotation(text="No data available", | |
| xref="paper", yref="paper", | |
| x=0.5, y=0.5, showarrow=False) | |
| return fig | |
| scores = [cve['score'] for cve in cves if cve['score'] > 0] | |
| fig = go.Figure(data=[go.Histogram( | |
| x=scores, | |
| nbinsx=20, | |
| marker_color='#1976d2' | |
| )]) | |
| fig.update_layout( | |
| title="CVSS Score Distribution", | |
| xaxis_title="CVSS Score", | |
| yaxis_title="Count", | |
| height=400, | |
| showlegend=False | |
| ) | |
| # Add severity range annotations | |
| fig.add_vrect(x0=0, x1=3.9, fillcolor="green", opacity=0.1, annotation_text="Low") | |
| fig.add_vrect(x0=4, x1=6.9, fillcolor="yellow", opacity=0.1, annotation_text="Medium") | |
| fig.add_vrect(x0=7, x1=8.9, fillcolor="orange", opacity=0.1, annotation_text="High") | |
| fig.add_vrect(x0=9, x1=10, fillcolor="red", opacity=0.1, annotation_text="Critical") | |
| return fig | |
| def format_cve_table(self, cves: List[Dict]) -> pd.DataFrame: | |
| """Format CVEs for display in a table.""" | |
| if not cves: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(cves) | |
| # Select and reorder columns | |
| columns = ['id', 'severity', 'score', 'published', 'display_description'] | |
| df = df[columns] | |
| # Format the dataframe | |
| df = df.rename(columns={ | |
| 'id': 'CVE ID', | |
| 'severity': 'Severity', | |
| 'score': 'CVSS Score', | |
| 'published': 'Published', | |
| 'display_description': 'Description' | |
| }) | |
| return df | |
| def generate_tailored_summary(cve_description: str, audience: str, hf_token: Optional[str] = None, max_retries: int = 2) -> str: | |
| """ | |
| Generates a tailored CVE summary using google/gemma-2-2b-it via HuggingFace Inference API. | |
| Args: | |
| cve_description: The original CVE description | |
| audience: Target audience from AUDIENCE_PROFILES | |
| hf_token: HuggingFace API token (optional if set as env var) | |
| max_retries: Maximum number of retry attempts | |
| Returns: | |
| Tailored summary or error message | |
| """ | |
| # Use provided token or fall back to environment variable | |
| token = hf_token or os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN') | |
| if not token: | |
| return "β HuggingFace API token is required. Please set HF_TOKEN environment variable or enter your token." | |
| if not cve_description or not audience: | |
| return "β Please select a CVE and an audience first." | |
| if audience not in AUDIENCE_PROFILES: | |
| return f"β Unknown audience: {audience}" | |
| # Define the model(s) to use | |
| models = [ | |
| "google/gemma-2-2b-it", | |
| ] | |
| headers = {"Authorization": f"Bearer {token}"} | |
| profile = AUDIENCE_PROFILES[audience] | |
| # Gemma uses a specific chat template format. | |
| # Combine the system and user prompts into a single user turn. | |
| full_prompt = f"""You are an expert cybersecurity analyst. | |
| Rewrite this CVE description for a {audience}. | |
| **Target Audience:** {audience} | |
| **Focus:** {profile['focus']} | |
| **Tone:** {profile['tone']} | |
| **Key Priorities:** {', '.join(profile['priorities'])} | |
| **CVE Description:** | |
| {cve_description[:1200]} | |
| Provide a concise, actionable summary (2-3 sentences) highlighting what matters most to this audience. Focus on practical implications and next steps.""" | |
| # Use the OpenAI-compatible messages format | |
| messages = [ | |
| {"role": "user", "content": full_prompt} | |
| ] | |
| # Use the new, standardized router endpoint | |
| api_url = "https://router.huggingface.co/v1/chat/completions" | |
| for model in models: | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "max_tokens": 250, | |
| "temperature": 0.7, | |
| "top_p": 0.95, | |
| "stop": ["<end_of_turn>", "<start_of_turn>"] # Stop sequences for Gemma | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"Generating summary with {model} (attempt {attempt + 1})") | |
| response = requests.post(api_url, headers=headers, json=payload, timeout=45) | |
| if response.status_code == 200: | |
| try: | |
| result = response.json() | |
| # New OpenAI-compatible response parsing | |
| summary = "" | |
| if "choices" in result and len(result["choices"]) > 0: | |
| message = result["choices"][0].get("message", {}) | |
| summary = message.get("content", "").strip() | |
| if summary and len(summary) > 20: | |
| logger.info(f"Successfully generated summary with {model}") | |
| return f"**{audience} Summary (via {model.split('/')[-1]}):**\n{summary}" | |
| else: | |
| # Handle cases where the model returns an empty summary | |
| logger.warning(f"Model {model} returned an empty or short summary.") | |
| continue # Retry if possible | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"JSON decode error with {model}: {e}") | |
| continue | |
| elif response.status_code == 503: | |
| logger.warning(f"Model {model} is loading, trying next model...") | |
| break # Try next model | |
| elif response.status_code == 429: | |
| if attempt < max_retries - 1: | |
| time.sleep(5) | |
| continue | |
| else: | |
| break | |
| else: | |
| error_message = response.json().get("error", response.text) | |
| logger.warning(f"HTTP {response.status_code} with {model}: {error_message}") | |
| # If the model is not found or there's a validation error, don't retry. | |
| if response.status_code in [404, 422]: | |
| return f"β Model '{model}' not found or request is invalid. Please check the model name." | |
| break | |
| except requests.exceptions.Timeout: | |
| logger.warning(f"Timeout with {model} on attempt {attempt + 1}") | |
| if attempt >= max_retries - 1: | |
| break # Break outer loop if all retries failed | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request failed with {model}: {e}") | |
| break | |
| return "β³ AI models are currently busy. This can happen during peak usage. Please try again in a few minutes." | |
| def create_interface(): | |
| """Create the Gradio interface.""" | |
| dashboard = CVEDashboard() | |
| with gr.Blocks(title="CVE Dashboard", theme=gr.themes.Soft()) as interface: | |
| # State to store fetched CVEs | |
| cve_state = gr.State([]) | |
| gr.Markdown( | |
| """ | |
| # π‘οΈ CVE Dashboard with AI-Powered Audience Customization | |
| Real-time vulnerability monitoring using NIST National Vulnerability Database (NVD) with LLM-powered audience-specific summaries | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| hf_token = gr.State(dashboard.hf_token) | |
| gr.Markdown("### π Search Parameters") | |
| current_year = datetime.now().year | |
| # Default to previous year to ensure we have data | |
| default_year = current_year - 1 if current_year == 2025 else current_year | |
| year_filter = gr.Dropdown( | |
| choices=list(range(current_year, current_year - 10, -1)), | |
| value=default_year, | |
| label="Year" | |
| ) | |
| keyword = gr.Textbox( | |
| label="Keyword Search (Optional)", | |
| placeholder="e.g., Apache, Linux, Microsoft" | |
| ) | |
| severity_filter = gr.Dropdown( | |
| choices=[None, "CRITICAL", "HIGH", "MEDIUM", "LOW"], | |
| label="Severity Filter", | |
| value=None | |
| ) | |
| fetch_btn = gr.Button("π Fetch CVEs", variant="primary") | |
| with gr.Column(scale=3): | |
| status_text = gr.Textbox(label="Status", interactive=False) | |
| with gr.Tabs(): | |
| with gr.Tab("π Overview"): | |
| with gr.Row(): | |
| severity_chart = gr.Plot(label="Severity Distribution") | |
| timeline_chart = gr.Plot(label="Timeline") | |
| score_chart = gr.Plot(label="CVSS Score Distribution") | |
| with gr.Tab("π CVE List"): | |
| cve_table = gr.DataFrame( | |
| label="CVE Details", | |
| wrap=True, | |
| row_count=15 | |
| ) | |
| with gr.Tab("π€ AI-Powered Summaries"): | |
| gr.Markdown("### Generate Audience-Specific CVE Summaries") | |
| with gr.Row(): | |
| with gr.Column(): | |
| cve_selector = gr.Dropdown( | |
| label="Select CVE", | |
| choices=[], | |
| info="Choose a CVE from the fetched results" | |
| ) | |
| audience_selector = gr.Dropdown( | |
| label="Target Audience", | |
| choices=list(AUDIENCE_PROFILES.keys()), | |
| value="Cybersecurity Professional", | |
| info="Select the professional perspective" | |
| ) | |
| generate_btn = gr.Button("π§ Generate AI Summary", variant="primary") | |
| # Add status for generation | |
| generation_status = gr.Textbox( | |
| label="Generation Status", | |
| value="Ready to generate summaries", | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| audience_info = gr.Markdown( | |
| value="**Focus:** threat assessment, attack vectors, mitigation strategies, and security controls\n\n**Priorities:** exploitation methods, defensive measures, risk assessment, compliance implications" | |
| ) | |
| original_description = gr.Textbox( | |
| label="Original CVE Description", | |
| lines=4, | |
| interactive=False | |
| ) | |
| tailored_summary = gr.Textbox( | |
| label="AI-Generated Summary", | |
| lines=6, | |
| interactive=False, | |
| placeholder="Select a CVE and audience, then click 'Generate AI Summary'" | |
| ) | |
| with gr.Tab("βΉοΈ About"): | |
| gr.Markdown( | |
| """ | |
| ### About this Dashboard | |
| This dashboard provides real-time monitoring of [Common Vulnerabilities and Exposures (CVEs)](https://en.wikipedia.org/wiki/Common_Vulnerabilities_and_Exposures) | |
| from the NIST National Vulnerability Database with AI-powered audience customization. | |
| **Features:** | |
| - Search CVEs by date range and keywords | |
| - Filter by severity levels | |
| - Visualize CVE distributions and trends | |
| - AI-powered audience-specific summaries using the google/gemma-2-2b-it model. | |
| **Severity Levels:** | |
| - **CRITICAL** (9.0-10.0): Complete system compromise possible | |
| - **HIGH** (7.0-8.9): Significant impact, immediate patching recommended | |
| - **MEDIUM** (4.0-6.9): Moderate impact, plan for updates | |
| - **LOW** (0.1-3.9): Minor impact, update in regular cycle | |
| **Supported Audiences:** | |
| - **Cybersecurity Professional:** Focus on threats, attack vectors, and mitigation | |
| - **Data Scientist:** Emphasis on data risks and model vulnerabilities | |
| - **Data Engineer:** Infrastructure security and pipeline risks | |
| - **Full-Stack Developer:** Code vulnerabilities and implementation fixes | |
| - **Product Owner:** Business impact and prioritization guidance | |
| - **Manager:** Executive summary with business implications | |
| **Data Source:** [NIST NVD API](https://nvd.nist.gov/developers/vulnerabilities) | |
| **AI Model:** [google/gemma-2-2b-it](https://huggingface.co/google/gemma-2-2b-it) | |
| **Disclaimer:** Generated content may be inaccurate or false. | |
| The free community tier of the Hugging Face Inference API powers this app's AI features. | |
| Since computing resources are shared, anticipate some delay on your initial request as the model loads. Later requests usually process more quickly. | |
| **Note:** If you encounter rate limiting or timeouts, please try again after a short wait. | |
| **Developed by** [M. Murat Ardag](https://mmuratardag.github.io/). | |
| """ | |
| ) | |
| # Event handlers | |
| def fetch_and_display(year, keyword_search, severity): | |
| """Fetch CVEs and update all displays.""" | |
| cves, status = dashboard.fetch_cves( | |
| year=year, | |
| keyword=keyword_search if keyword_search else None, | |
| severity=severity if severity else None | |
| ) | |
| if cves: | |
| df = dashboard.format_cve_table(cves) | |
| severity_fig = dashboard.create_severity_chart(cves) | |
| timeline_fig = dashboard.create_timeline_chart(cves) | |
| score_fig = dashboard.create_score_distribution(cves) | |
| # Update CVE selector choices | |
| cve_choices = [f"{cve['id']} ({cve['severity']}, {cve['score']})" for cve in cves] | |
| return ( | |
| cves, # Update state | |
| status, | |
| df, | |
| severity_fig, | |
| timeline_fig, | |
| score_fig, | |
| gr.Dropdown(choices=cve_choices, value=cve_choices[0] if cve_choices else None) # Update CVE selector | |
| ) | |
| else: | |
| empty_fig = go.Figure() | |
| empty_fig.add_annotation( | |
| text="No data available", | |
| xref="paper", yref="paper", | |
| x=0.5, y=0.5, showarrow=False | |
| ) | |
| return ( | |
| [], # Update state | |
| status, | |
| pd.DataFrame(), | |
| empty_fig, | |
| empty_fig, | |
| empty_fig, | |
| gr.Dropdown(choices=[], value=None) # Clear CVE selector | |
| ) | |
| def update_audience_info(audience): | |
| """Update audience information display.""" | |
| if audience in AUDIENCE_PROFILES: | |
| profile = AUDIENCE_PROFILES[audience] | |
| info = f"**Focus:** {profile['focus']}\n\n**Priorities:** {', '.join(profile['priorities'])}" | |
| return info | |
| return "Select an audience to see details" | |
| def update_cve_description(selected_cve, cves): | |
| """Update the original CVE description when a CVE is selected.""" | |
| if not selected_cve or not cves: | |
| return "" | |
| # Extract CVE ID from the selection (format: "CVE-2024-1234 (HIGH, 7.5)") | |
| cve_id = selected_cve.split(" (")[0] | |
| # Find the matching CVE | |
| for cve in cves: | |
| if cve['id'] == cve_id: | |
| return cve['description'] | |
| return "CVE description not found" | |
| def generate_summary_with_status(selected_cve, audience, token, cves): | |
| """Generate audience-specific summary with status updates.""" | |
| if not selected_cve or not audience or not cves: | |
| return "Please select a CVE and audience first.", "β Missing selection" | |
| # Extract CVE ID from the selection | |
| cve_id = selected_cve.split(" (")[0] | |
| # Find the matching CVE | |
| for cve in cves: | |
| if cve['id'] == cve_id: | |
| # Update status to show generation in progress | |
| yield "Generating AI summary... This may take 30-60 seconds.", "π Generating..." | |
| summary = generate_tailored_summary(cve['description'], audience, token) | |
| if summary.startswith("β"): | |
| yield summary, "β Generation failed" | |
| elif summary.startswith("β³"): | |
| yield summary, "β³ Models busy" | |
| else: | |
| yield summary, "β Summary generated" | |
| return | |
| yield "CVE not found", "β CVE not found" | |
| # Wire up the event handlers | |
| fetch_btn.click( | |
| fn=fetch_and_display, | |
| inputs=[year_filter, keyword, severity_filter], | |
| outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector] | |
| ) | |
| audience_selector.change( | |
| fn=update_audience_info, | |
| inputs=[audience_selector], | |
| outputs=[audience_info] | |
| ) | |
| cve_selector.change( | |
| fn=update_cve_description, | |
| inputs=[cve_selector, cve_state], | |
| outputs=[original_description] | |
| ) | |
| generate_btn.click( | |
| fn=generate_summary_with_status, | |
| inputs=[cve_selector, audience_selector, hf_token, cve_state], | |
| outputs=[tailored_summary, generation_status] | |
| ) | |
| # Load initial data | |
| interface.load( | |
| fn=fetch_and_display, | |
| inputs=[year_filter, keyword, severity_filter], | |
| outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector] | |
| ) | |
| return interface | |
| if __name__ == "__main__": | |
| # Check for API keys | |
| if os.getenv('NVD_API_KEY'): | |
| print("β NVD API key loaded - Higher rate limits enabled") | |
| else: | |
| print("β No NVD API key found - Using lower rate limits") | |
| print(" Get a free API key at: https://nvd.nist.gov/developers/request-an-api-key") | |
| if os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN'): | |
| print("β HuggingFace token loaded - AI summaries enabled") | |
| else: | |
| print("β No HuggingFace token found - Users will need to enter their own") | |
| print(" Get a free token at: https://huggingface.co/settings/tokens") | |
| # Create and launch the interface | |
| app = create_interface() | |
| app.launch() |