import urllib.parse
import urllib.request
import ssl
import csv
import time
import json
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
def get_month_range(start_date, end_date):
"""生成按月分割的日期范围(修复版)"""
current = start_date
while current <= end_date:
if current.month == 12:
next_month = current.replace(year=current.year+1, month=1, day=1)
else:
next_month = current.replace(month=current.month+1, day=1)
month_start = current
month_end = min(next_month - timedelta(days=1), end_date)
yield (month_start.strftime("%Y%m"), month_end.strftime("%Y%m%d"))
current = next_month
def fetch_weather_data(app_key, area_code, start_date, end_date):
"""获取天气数据(含增强型频率控制)"""
base_url = "https://route.showapi.com/9-7"
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
all_data = []
success_count = 0
error_count = 0
processed_months = set()
try:
start_dt = datetime.strptime(start_date, "%Y%m%d")
end_dt = datetime.strptime(end_date, "%Y%m%d")
if start_dt > end_dt:
raise ValueError("开始日期不能晚于结束日期")
except ValueError as e:
print(f"❌ 参数错误:{str(e)}")
return []
for idx, (month_start, month_end) in enumerate(get_month_range(start_dt, end_dt), 1):
if month_start in processed_months:
print(f"⚠️ 检测到重复月份:{month_start},跳过...")
continue
processed_months.add(month_start)
params = {
'appKey': app_key,
'areaCode': area_code,
'startDate': month_start + "01",
'endDate': month_end
}
max_retries = 5
retry_delay = 10
for attempt in range(max_retries):
try:
print(f"\n正在处理 [{idx}/{16}] {month_start} 数据({month_start}01 - {month_end})...")
data = urllib.parse.urlencode(params).encode('utf-8')
req = urllib.request.Request(
base_url,
data=data,
method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
res = response.read().decode('utf-8')
json_data = json.loads(res)
if json_data['showapi_res_code'] != 0:
error_msg = json_data.get('showapi_res_error', '未知错误')
print(f"❌ API错误:{error_msg} (Code: {json_data['showapi_res_code']})")
if "调用过于频繁" in error_msg:
raise Exception("async-limit")
break
body = json_data['showapi_res_body']
if not body.get('list'):
print(f"⚠️ {month_start} 无有效数据")
break
expected_days = (datetime.strptime(month_end, "%Y%m%d") -
datetime.strptime(month_start+"01", "%Y%m%d")).days + 1
if len(body['list']) != expected_days:
print(f"⚠️ 数据不完整:预期{expected_days}天,实际获取{len(body['list'])}天")
for item in body['list']:
record = {
'日期': item['time'],
'地区': body['area'],
'最高气温': item['max_temperature'].replace('℃', ''),
'最低气温': item['min_temperature'].replace('℃', ''),
'天气': item['weather'],
'风向': item['wind_direction'],
'风力': item['wind_power'],
'空气质量': item['aqiInfo'],
'AQI': item['aqi']
}
all_data.append(record)
success_count += 1
print(f"✅ {month_start} 数据获取成功(记录数:{len(body['list'])})")
retry_delay = 10
break
except Exception as e:
error_count += 1
if "async-limit" in str(e) or "调用过于频繁" in str(e):
wait_time = retry_delay + (attempt ** 2) * 2
print(f"⏳ 频率限制,等待 {wait_time} 秒后重试({attempt+1}/{max_retries})")
time.sleep(wait_time)
retry_delay *= 2
elif "Timeout" in str(e):
print("⏳ 网络超时,等待10秒后重试...")
time.sleep(10)
else:
print(f"❌ 致命错误:{str(e)}")
break
base_wait = 30 if success_count < 3 else 5
time.sleep(base_wait + success_count * 2)
print(f"\n任务完成!成功获取 {success_count} 个月数据,{error_count} 次错误")
return all_data
def save_to_csv(data, filename):
"""保存数据到CSV文件"""
if not data:
print("⚠️ 没有可保存的数据")
return
try:
with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = ['日期', '地区', '最高气温', '最低气温', '天气', '风向', '风力', '空气质量', 'AQI']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"✅ 数据已保存至 {filename}(记录数:{len(data)})")
except Exception as e:
print(f"❌ 保存文件失败:{str(e)}")
def visualize_data(csv_file):
"""数据可视化"""
try:
df = pd.read_csv(csv_file, parse_dates=['日期'])
df.sort_values('日期', inplace=True)
plt.figure(figsize=(16, 9), dpi=100)
# 温度趋势图
plt.subplot(3, 1, 1)
plt.plot(df['日期'], df['最高气温'], label='最高气温', color='red')
plt.plot(df['日期'], df['最低气温'], label='最低气温', color='blue')
plt.ylabel('温度 (℃)')
plt.title('气温变化趋势')
plt.legend(loc='upper right')
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=1))
plt.xticks(rotation=45)
plt.grid(True, linestyle='--', alpha=0.5)
# 空气质量分布
plt.subplot(3, 1, 2)
aqi_levels = df['空气质量'].value_counts().sort_index()
aqi_levels.plot(kind='bar', color='orange')
plt.ylabel('天数')
plt.title('空气质量分布')
plt.xticks(rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.5)
# 天气类型分布
plt.subplot(3, 1, 3)
weather_counts = df['天气'].value_counts().head(10)
weather_counts.plot(kind='pie', autopct='%1.1f%%', startangle=90)
plt.ylabel('')
plt.title('主要天气类型分布')
plt.tight_layout()
plt.savefig('weather_visualization.png', bbox_inches='tight')
plt.close()
print("✅ 可视化图表已保存为 weather_visualization.png")
except Exception as e:
print(f"❌ 可视化失败:{str(e)}")
if __name__ == "__main__":
# 配置参数(需替换为实际值)
APP_KEY = "youkey" # 替换为真实appKey
AREA_CODE = "130100" # 石家庄地区代码
START_DATE = "20230101"
END_DATE = "20231231"
OUTPUT_FILE = "weather_data.csv"
print("🚀 开始获取天气数据...")
start_time = time.time()
weather_data = fetch_weather_data(APP_KEY, AREA_CODE, START_DATE, END_DATE)
print(f"\n⏳ 总耗时:{time.time()-start_time:.0f}秒")
save_to_csv(weather_data, OUTPUT_FILE)
visualize_data(OUTPUT_FILE)