|
本帖最后由 菜园子 于 2025-8-13 20:50 编辑
我现在已经懒到连日期拆分都用大模型写了哈哈哈,使用大模型写代码真是方便,只要你有想象力,就没有做不出来的功能
以下代码应该可以直接运行,前提是要装好cartopy(运行时需要先把FNV3的相关文件放在指定文件夹中,下载地址为https://deepmind.google.com/science/weatherlab,需要梯子,你也可以自己修改指定文件夹)
代码中不懂的可以问大模型,现在的大模型真是很好的老师,我也是做大模型方向的,一起感受大模型的魅力吧~
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import matplotlib.patches as patches
- import cartopy.crs as ccrs
- import cartopy.feature as cfeature
- from cartopy.io.shapereader import Reader
- from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
- import matplotlib.ticker as mticker
- from scipy.interpolate import PchipInterpolator
- from shapely.geometry import Polygon, MultiPolygon
- from shapely.ops import unary_union
- from datetime import datetime, timedelta
- import os
- import shutil
- import re
- def extract_init_time(filename):
- """
- 从文件名中提取init_time(格式:YYYYMMDDHHZ)
- 示例文件名:FNV3_2025_08_13T06_00_paired.csv → 返回 2025081306Z
- """
- try:
- # 使用正则表达式匹配日期时间部分
- match = re.search(r'(\d{4})_(\d{2})_(\d{2})T(\d{2})_\d{2}', filename)
- if match:
- year, month, day, hour = match.groups()
- return f"{year}{month}{day}{hour}Z"
- else:
- raise ValueError("文件名格式不符合要求")
- except Exception as e:
- print(f"Error extracting init_time: {e}")
- raise
- def parse_lead_time(lead_time_str):
- """Parse lead_time string to get hours"""
- # Format: 'X days HH:MM:SS'
- parts = lead_time_str.strip().split()
- days = int(parts[0])
- time_part = parts[2] # HH:MM:SS
- hours, minutes, seconds = map(int, time_part.split(':'))
- total_hours = days * 24 + hours + minutes / 60 + seconds / 3600
- return total_hours
- def simplify_track_id(track_id):
- """Simplify track ID (e.g., AL052025 -> 05A, WP162025 -> 16W)"""
- if len(track_id) >= 8:
- basin = track_id[0] # First letter
- number = track_id[2:4] # First two numbers
- return f"{number}{basin}"
- return track_id
- def draw_wind_radii(ax, lat, lon, ne_radius, se_radius, sw_radius, nw_radius, color, zorder=15):
- """Draw wind radii for four quadrants"""
- # Convert 0km to 0.01km for robustness
- radii = [max(0.01, r) for r in [ne_radius, se_radius, sw_radius, nw_radius]]
-
- # Check if all radii are effectively 0
- if all(r <= 0.01 for r in radii):
- return
-
- # Convert km to degrees (approximate)
- km_to_deg = 1/111.32
- radii_deg = [r * km_to_deg for r in radii]
-
- # Define angles for each quadrant
- angles = {
- 'NE': np.linspace(0, np.pi/2, 50), # 0 to 90 degrees
- 'SE': np.linspace(np.pi/2, np.pi, 50), # 90 to 180 degrees
- 'SW': np.linspace(np.pi, 3*np.pi/2, 50), # 180 to 270 degrees
- 'NW': np.linspace(3*np.pi/2, 2*np.pi, 50) # 270 to 360 degrees
- }
-
- # Calculate points for each quadrant
- all_x, all_y = [], []
-
- for i, (quadrant, theta_range) in enumerate(angles.items()):
- radius = radii_deg[i]
- x_quad = lon + radius * np.cos(theta_range)
- y_quad = lat + radius * np.sin(theta_range)
- all_x.extend(x_quad)
- all_y.extend(y_quad)
-
- # Close the polygon
- if all_x and all_y:
- all_x.append(all_x[0])
- all_y.append(all_y[0])
- ax.plot(all_x, all_y, color=color, linewidth=1.0,
- transform=ccrs.PlateCarree(), zorder=zorder)
- def create_smooth_track(hours, lons, lats, points_per_segment=20):
- """Create smooth track using interpolation"""
- if len(hours) < 2:
- return lons, lats
-
- # Create interpolated points
- interp_hours = []
- for i in range(len(hours)-1):
- segment = np.linspace(hours[i], hours[i+1], num=points_per_segment)[:-1]
- interp_hours.extend(segment)
- interp_hours.append(hours[-1])
- interp_hours = np.array(interp_hours)
-
- # Use PCHIP interpolation for smooth curves
- smooth_lons = PchipInterpolator(hours, lons)(interp_hours)
- smooth_lats = PchipInterpolator(hours, lats)(interp_hours)
-
- return smooth_lons, smooth_lats
- def create_typhoon_map(df_typhoon, track_id, init_time_str):
-
- global textPairedContent
- textPairedContent = textPairedContent + simplify_track_id(track_id) + ','
-
- """Create typhoon forecast map for a single typhoon"""
- # Filter data for forecast times up to 120 hours
- df_filtered = df_typhoon[df_typhoon['lead_time_hours'] <= 120].copy()
-
- # Only plot specific time points: 0, 12, 24, 36, 48, 72, 120 hours
- target_hours = [0, 12, 24, 36, 48, 72, 120]
- df_plot = df_filtered[df_filtered['lead_time_hours'].isin(target_hours)].copy()
- df_plot = df_plot.sort_values('lead_time_hours')
-
- if df_plot.empty:
- print(f"No data to plot for typhoon {track_id}")
- return
-
- # Calculate map extent
- lats = df_plot['lat'].values
- lons = df_plot['lon'].values
-
- lat_margin = 8
- lon_margin = 8
-
- extendLatMin = max(-90, np.min(lats) - lat_margin)
- extendLatMax = min(90, np.max(lats) + lat_margin)
- extendLonMin = max(-180, np.min(lons) - lon_margin)
- extendLonMax = min(180, np.max(lons) + lon_margin)
-
- centerlon = (extendLonMin + extendLonMax) / 2
-
- # Create the map
- myproj = ccrs.PlateCarree()
- fig = plt.figure(figsize=(12.5, 11))
- ax = fig.add_axes([0.01, 0.01, 0.9, 0.9], projection=ccrs.PlateCarree(central_longitude=centerlon))
-
- extend = [extendLonMin, extendLonMax, extendLatMin, extendLatMax]
- ax.set_extent(extend, crs=ccrs.Geodetic())
- ax.set_adjustable('datalim')
-
- # Add gridlines
- gl = ax.gridlines(draw_labels=True, linewidth=0.3, color='k', alpha=0.65, linestyle=':', zorder=90)
- gl.xformatter = LONGITUDE_FORMATTER
- gl.yformatter = LATITUDE_FORMATTER
- gl.xlocator = mticker.FixedLocator(np.arange(-180, 180, 5))
- gl.ylocator = mticker.FixedLocator(np.arange(-90, 90, 5))
- gl.top_labels = False
- gl.right_labels = False
- gl.xlines = True
- gl.ylines = True
- gl.xlabel_style = {'size': 10}
- gl.ylabel_style = {'size': 10}
-
- # Add map features
- ax.add_feature(cfeature.COASTLINE.with_scale('50m'), edgecolor='#6b6b6b', facecolor='none', lw=0.42)
- ax.add_feature(cfeature.OCEAN.with_scale('50m'), facecolor='white')
- ax.add_feature(cfeature.LAND.with_scale('50m'), facecolor='#e3e3e3')
-
- # Add shapefiles (if they exist)
- shapefiles = [
- ]
-
- for shapefile in shapefiles:
- if os.path.exists(shapefile):
- try:
- ax.add_geometries(Reader(shapefile).geometries(),
- crs=myproj, facecolor='none', edgecolor='k', linewidth=0.80, zorder=10)
- except Exception as e:
- print(f"Warning: Could not load shapefile {shapefile}: {e}")
-
- # Draw error circles first (lower zorder)
- hours = df_plot['lead_time_hours'].values
- fore_lon = df_plot['lon'].values
- fore_lat = df_plot['lat'].values
-
- # Calculate error radius: 50km per 12 hours, convert to degrees
- fore_r = (hours / 12) * 50 / 111.32 # 50km per 12 hours, convert km to degrees
-
- print(f"Debug: Error radii for {track_id}: {fore_r}") # Debug info
-
- if len(hours) > 1:
- # Interpolate for smooth error envelope
- points_num = 12
- interp_hours = []
- for i in range(len(hours)-1):
- segment = np.linspace(hours[i], hours[i+1], num=points_num)[:-1]
- interp_hours.extend(segment)
- interp_hours.append(hours[-1])
- interp_hours = np.array(interp_hours)
-
- x = PchipInterpolator(hours, fore_lon)(interp_hours)
- y = PchipInterpolator(hours, fore_lat)(interp_hours)
- r = PchipInterpolator(hours, fore_r)(interp_hours)
-
- # Create error envelope
- thetas = np.linspace(0, 2 * np.pi, 360)
- polygon_x = x[:,None] + r[:,None] * np.cos(thetas) # Note: cos for longitude
- polygon_y = y[:,None] + r[:,None] * np.sin(thetas) # Note: sin for latitude
-
- ps = [Polygon(i) for i in np.dstack((polygon_x, polygon_y))]
- n = range(len(ps)-1)
- if n: # Only if there are at least 2 circles
- convex_hulls = [MultiPolygon([ps[i], ps[i+1]]).convex_hull for i in n]
- polygons = unary_union(convex_hulls)
-
- ax.add_geometries([polygons], ccrs.PlateCarree(),
- facecolor='yellow', alpha=0.5, edgecolor='black', linewidth=0.5, zorder=5)
- print(f"Added error envelope for {track_id}")
- else:
- # If only one point, draw a simple circle
- if len(hours) == 1 and fore_r[0] > 0:
- circle = plt.Circle((fore_lon[0], fore_lat[0]), fore_r[0],
- facecolor='yellow', alpha=0.5, edgecolor='black',
- linewidth=0.5, transform=ccrs.PlateCarree(), zorder=5)
- ax.add_patch(circle)
- print(f"Added single error circle for {track_id}")
- else:
- print(f"Not enough points for error envelope: {len(hours)} points")
-
- # ===== 修改部分:绘制平滑的台风路径曲线 =====
- track_lons = df_plot['lon'].values
- track_lats = df_plot['lat'].values
- track_hours = df_plot['lead_time_hours'].values
-
- # 创建平滑曲线
- if len(track_hours) >= 2:
- smooth_lons, smooth_lats = create_smooth_track(track_hours, track_lons, track_lats)
- # 绘制平滑的路径曲线
- ax.plot(smooth_lons, smooth_lats, color='hotpink', linewidth=1.1,
- transform=ccrs.PlateCarree(), zorder=20, label='Track')
- print(f"Drew smooth track with {len(smooth_lons)} interpolated points")
- else:
- # 如果只有一个点,绘制原来的直线
- ax.plot(track_lons, track_lats, color='hotpink', linewidth=1.1,
- transform=ccrs.PlateCarree(), zorder=20, label='Track')
- print(f"Drew straight track (insufficient points for smoothing)")
-
- global_max_34kt = max(
- df_plot['radius_34_knot_winds_ne_km'].max(),
- df_plot['radius_34_knot_winds_se_km'].max(),
- df_plot['radius_34_knot_winds_sw_km'].max(),
- df_plot['radius_34_knot_winds_nw_km'].max()
- )
-
- prev_lon, prev_lat = None, None # 记录上一点
- # Draw typhoon positions and wind radii
- for idx, row in df_plot.iterrows():
- lat, lon = row['lat'], row['lon']
-
- # Draw position marker
- if row['lead_time_hours'] == 0:
- # 第一个点用黑色'x'标记,加粗显示
- ax.plot(lon, lat, 'x', color='black', markersize=5.5,
- markeredgewidth=1, transform=ccrs.PlateCarree(), zorder=25)
- else:
- # 其他点用粉色'o'标记
- ax.plot(lon, lat, 'o', color='hotpink', markersize=3.5,
- transform=ccrs.PlateCarree(), zorder=25)
-
- # Draw wind radii (34, 50, 64 knots)
- wind_data = [
- # 34-knot winds (7级风圈) - pink
- (row['radius_34_knot_winds_ne_km'], row['radius_34_knot_winds_se_km'],
- row['radius_34_knot_winds_sw_km'], row['radius_34_knot_winds_nw_km'], 'hotpink'),
- # 50-knot winds (10级风圈) - dark red
- (row['radius_50_knot_winds_ne_km'], row['radius_50_knot_winds_se_km'],
- row['radius_50_knot_winds_sw_km'], row['radius_50_knot_winds_nw_km'], 'darkred'),
- # 64-knot winds (12级风圈) - red
- (row['radius_64_knot_winds_ne_km'], row['radius_64_knot_winds_se_km'],
- row['radius_64_knot_winds_sw_km'], row['radius_64_knot_winds_nw_km'], 'red')
- ]
-
- for ne, se, sw, nw, color in wind_data:
- if pd.notna(ne) and pd.notna(se) and pd.notna(sw) and pd.notna(nw):
- draw_wind_radii(ax, lat, lon, ne, se, sw, nw, color)
-
- # 获取风速值并转换为整数
- # 解析valid_time获取日期和小时
- try:
- # 解析valid_time字符串(格式:'2025-08-12 06:00:00')
- valid_time_str = row['valid_time']
-
- # 直接提取月份中的日和小时部分
- # 格式:'YYYY-MM-DD HH:MM:SS'
- parts = valid_time_str.split()
- date_part = parts[0] # '2025-08-12'
- time_part = parts[1] # '06:00:00'
-
- # 提取日和小时
- day = date_part.split('-')[2] # 从'2025-08-12'提取'12'
- hour = time_part.split(':')[0] # 从'06:00:00'提取'06'
-
- # 格式化日期标签:日/小时Z
- date_label = f"{day}/{hour}Z"
- except Exception as e:
- print(f"Error parsing valid_time: {valid_time_str}, using fallback. Error: {e}")
- # 如果解析失败,使用备用标签(小时数)
- date_label = f"{int(row['lead_time_hours'])}h"
-
- # 获取风速值并转换为整数
- try:
- wind_speed = int(row['maximum_sustained_wind_speed_knots'])
- except (ValueError, TypeError):
- wind_speed = "N/A"
-
- # 计算动态c_r = 最大34kt风圈半径(转换为地理坐标偏移量)
- c_r = global_max_34kt/ 111.32 # 约1°纬度=111.32km
- global_max_34kt = global_max_34kt + 10
- offset_deg = c_r * 1.5 # 维持原有的偏移系数
-
- # 计算连线方向
- if prev_lon is not None and prev_lat is not None:
- dx = lon - prev_lon
- dy = lat - prev_lat
-
- # 判定斜向
- if abs(dx) < 1e-6 or abs(dy) < 1e-6:
- # 水平或竖直
- angle_deg = 30
- else:
- if dx > 0 and dy < 0: # 左上到右下
- angle_deg = 30
- elif dx > 0 and dy > 0: # 左下到右上
- angle_deg = -30
- else:
- # 其它情况(比如从右到左),也可以用默认值
- angle_deg = 30
- else:
- # 第一个点
- angle_deg = 30
-
- # 偏移计算
- theta = np.radians(angle_deg)
- dlat = offset_deg * np.cos(theta)
- dlon = (offset_deg * np.sin(theta)) / np.cos(np.radians(lat))
-
- label_lon = lon + dlon
- label_lat = lat + dlat
-
- ax.text(label_lon, label_lat, f"{date_label} {wind_speed}KTS",
- transform=ccrs.PlateCarree(), fontsize=9, ha='left', va='bottom', zorder=30)
- ax.plot([lon, label_lon], [lat, label_lat], color='black', linewidth=0.6,
- transform=ccrs.PlateCarree(), zorder=29)
-
- prev_lon, prev_lat = lon, lat
-
- # Add title and legend
- simplified_id = simplify_track_id(track_id)
- plt.title(f'FNV3 Ensemble Mean Track Forecast for Tropical Cyclone [{simplified_id}]\nInitTime: {init_time_str}',
- fontsize=14, fontweight='bold')
-
- # Add legend for wind radii
- legend_elements = [
- plt.Line2D([0], [0], color='hotpink', linewidth=2, label='34kt winds'),
- plt.Line2D([0], [0], color='darkred', linewidth=2, label='50kt winds'),
- plt.Line2D([0], [0], color='red', linewidth=2, label='64kt winds'),
- patches.Rectangle((0, 0), 1, 1, facecolor='yellow', alpha=0.3, label='Error envelope')
- ]
- ax.legend(handles=legend_elements, loc='upper right', bbox_to_anchor=(0.991, 0.991))
- plt.figtext(0.775, 0.0276, "Copyright\xa9 SMCA", fontsize = 11,color="black",zorder=101)
-
- # Save the figure
- filename = f"{simplified_id}_paired_{init_time_str[:-1]}.png"
- filepath = f"/NWP/AI_TC_pic/{init_time_str[:-1]}/{filename}"
- plt.savefig(filepath, dpi=300, bbox_inches='tight')
-
-
- plt.close()
-
- import os
- import pandas as pd
- textPairedContent = ''
- def main():
- # 定义目录路径 将FNV3的文件放在该目录下即可 例如 FNV3_2025_08_13T00_00_paired.csv
- directory = "/NWP/AI_TC_ENS/FNV3_paired_circle/"
-
- try:
- # 获取目录下所有CSV文件
- csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')]
-
- if not csv_files:
- print("No CSV files found in the directory.")
- return
-
- print(f"Found {len(csv_files)} CSV files to process.")
-
- # 处理每个CSV文件
- for csv_file in csv_files:
- file_path = os.path.join(directory, csv_file)
- print(f"\nProcessing file: {csv_file}")
-
-
- try:
- init_time = extract_init_time(csv_file) # 修改点:从文件名获取
- # 读取CSV文件,跳过前6行
- df = pd.read_csv(file_path, skiprows=6)
-
- # 解析lead_time获取小时数
- df['lead_time_hours'] = df['lead_time'].apply(parse_lead_time)
-
- # 获取唯一的台风track_id
- unique_typhoons = df['track_id'].unique()
-
- print(f"Found {len(unique_typhoons)} typhoons in this file: {unique_typhoons}")
-
- # 处理每个台风
- for track_id in unique_typhoons:
- df_typhoon = df[df['track_id'] == track_id].copy()
-
- # 获取init_time(同一台风的所有行应该相同)
-
-
- print(f"Processing typhoon {track_id} with {len(df_typhoon)} data points")
-
- # 为这个台风创建地图
- create_typhoon_map(df_typhoon, track_id, init_time)
-
- print(f"Finished processing file: {csv_file}")
-
-
- except Exception as e:
- print(f"Error processing file {csv_file}: {e}")
- import traceback
- traceback.print_exc()
- continue # 继续处理下一个文件
-
- print("\n全部台风路径文件制作完成")
-
- except Exception as e:
- print(f"Error accessing directory: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- main()
复制代码
爱丽丝女儿镇楼 |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
×
|