Administrator
Published on 2025-04-29 / 7 Visits
0

获取历史天气信息

import urllib.parse
import urllib.request
import ssl
import csv
import time
import json
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta

def get_month_range(start_date, end_date):
    """生成按月分割的日期范围(修复版)"""
    current = start_date
    while current <= end_date:
        if current.month == 12:
            next_month = current.replace(year=current.year+1, month=1, day=1)
        else:
            next_month = current.replace(month=current.month+1, day=1)
            
        month_start = current
        month_end = min(next_month - timedelta(days=1), end_date)
        
        yield (month_start.strftime("%Y%m"), month_end.strftime("%Y%m%d"))
        current = next_month

def fetch_weather_data(app_key, area_code, start_date, end_date):
    """获取天气数据(含增强型频率控制)"""
    base_url = "https://route.showapi.com/9-7"
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    all_data = []
    success_count = 0
    error_count = 0
    processed_months = set()

    try:
        start_dt = datetime.strptime(start_date, "%Y%m%d")
        end_dt = datetime.strptime(end_date, "%Y%m%d")
        if start_dt > end_dt:
            raise ValueError("开始日期不能晚于结束日期")
    except ValueError as e:
        print(f"❌ 参数错误:{str(e)}")
        return []

    for idx, (month_start, month_end) in enumerate(get_month_range(start_dt, end_dt), 1):
        if month_start in processed_months:
            print(f"⚠️ 检测到重复月份:{month_start},跳过...")
            continue
        processed_months.add(month_start)

        params = {
            'appKey': app_key,
            'areaCode': area_code,
            'startDate': month_start + "01",
            'endDate': month_end
        }

        max_retries = 5
        retry_delay = 10
        
        for attempt in range(max_retries):
            try:
                print(f"\n正在处理 [{idx}/{16}] {month_start} 数据({month_start}01 - {month_end})...")
                data = urllib.parse.urlencode(params).encode('utf-8')
                req = urllib.request.Request(
                    base_url,
                    data=data,
                    method='POST',
                    headers={'Content-Type': 'application/x-www-form-urlencoded'}
                )
                
                with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
                    res = response.read().decode('utf-8')
                    json_data = json.loads(res)
                    
                    if json_data['showapi_res_code'] != 0:
                        error_msg = json_data.get('showapi_res_error', '未知错误')
                        print(f"❌ API错误:{error_msg} (Code: {json_data['showapi_res_code']})")
                        if "调用过于频繁" in error_msg:
                            raise Exception("async-limit")
                        break

                    body = json_data['showapi_res_body']
                    if not body.get('list'):
                        print(f"⚠️ {month_start} 无有效数据")
                        break

                    expected_days = (datetime.strptime(month_end, "%Y%m%d") - 
                                    datetime.strptime(month_start+"01", "%Y%m%d")).days + 1
                    if len(body['list']) != expected_days:
                        print(f"⚠️ 数据不完整:预期{expected_days}天,实际获取{len(body['list'])}天")

                    for item in body['list']:
                        record = {
                            '日期': item['time'],
                            '地区': body['area'],
                            '最高气温': item['max_temperature'].replace('℃', ''),
                            '最低气温': item['min_temperature'].replace('℃', ''),
                            '天气': item['weather'],
                            '风向': item['wind_direction'],
                            '风力': item['wind_power'],
                            '空气质量': item['aqiInfo'],
                            'AQI': item['aqi']
                        }
                        all_data.append(record)
                    
                    success_count += 1
                    print(f"✅ {month_start} 数据获取成功(记录数:{len(body['list'])})")
                    retry_delay = 10
                    break

            except Exception as e:
                error_count += 1
                if "async-limit" in str(e) or "调用过于频繁" in str(e):
                    wait_time = retry_delay + (attempt ** 2) * 2
                    print(f"⏳ 频率限制,等待 {wait_time} 秒后重试({attempt+1}/{max_retries})")
                    time.sleep(wait_time)
                    retry_delay *= 2
                elif "Timeout" in str(e):
                    print("⏳ 网络超时,等待10秒后重试...")
                    time.sleep(10)
                else:
                    print(f"❌ 致命错误:{str(e)}")
                    break

        base_wait = 30 if success_count < 3 else 5
        time.sleep(base_wait + success_count * 2)

    print(f"\n任务完成!成功获取 {success_count} 个月数据,{error_count} 次错误")
    return all_data

def save_to_csv(data, filename):
    """保存数据到CSV文件"""
    if not data:
        print("⚠️ 没有可保存的数据")
        return

    try:
        with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
            fieldnames = ['日期', '地区', '最高气温', '最低气温', '天气', '风向', '风力', '空气质量', 'AQI']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            
            writer.writeheader()
            writer.writerows(data)
        print(f"✅ 数据已保存至 {filename}(记录数:{len(data)})")
    except Exception as e:
        print(f"❌ 保存文件失败:{str(e)}")

def visualize_data(csv_file):
    """数据可视化"""
    try:
        df = pd.read_csv(csv_file, parse_dates=['日期'])
        df.sort_values('日期', inplace=True)

        plt.figure(figsize=(16, 9), dpi=100)

        # 温度趋势图
        plt.subplot(3, 1, 1)
        plt.plot(df['日期'], df['最高气温'], label='最高气温', color='red')
        plt.plot(df['日期'], df['最低气温'], label='最低气温', color='blue')
        plt.ylabel('温度 (℃)')
        plt.title('气温变化趋势')
        plt.legend(loc='upper right')
        plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
        plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=1))
        plt.xticks(rotation=45)
        plt.grid(True, linestyle='--', alpha=0.5)

        # 空气质量分布
        plt.subplot(3, 1, 2)
        aqi_levels = df['空气质量'].value_counts().sort_index()
        aqi_levels.plot(kind='bar', color='orange')
        plt.ylabel('天数')
        plt.title('空气质量分布')
        plt.xticks(rotation=45)
        plt.grid(axis='y', linestyle='--', alpha=0.5)

        # 天气类型分布
        plt.subplot(3, 1, 3)
        weather_counts = df['天气'].value_counts().head(10)
        weather_counts.plot(kind='pie', autopct='%1.1f%%', startangle=90)
        plt.ylabel('')
        plt.title('主要天气类型分布')

        plt.tight_layout()
        plt.savefig('weather_visualization.png', bbox_inches='tight')
        plt.close()
        print("✅ 可视化图表已保存为 weather_visualization.png")

    except Exception as e:
        print(f"❌ 可视化失败:{str(e)}")

if __name__ == "__main__":
    # 配置参数(需替换为实际值)
    APP_KEY = "youkey"  # 替换为真实appKey
    AREA_CODE = "130100"      # 石家庄地区代码
    START_DATE = "20230101"
    END_DATE = "20231231"
    OUTPUT_FILE = "weather_data.csv"

    print("🚀 开始获取天气数据...")
    start_time = time.time()
    
    weather_data = fetch_weather_data(APP_KEY, AREA_CODE, START_DATE, END_DATE)
    
    print(f"\n⏳ 总耗时:{time.time()-start_time:.0f}秒")
    save_to_csv(weather_data, OUTPUT_FILE)
    visualize_data(OUTPUT_FILE)