import numpy as np
import math
from scipy.linalg import inv, pinv
from scipy.optimize import least_squares
import georinex as gr
import pandas as pd
import matplotlib.pyplot as plt
import logging
from datetime import datetime, timedelta
import xarray as xr
from tqdm import tqdm
import pymap3d as pm
import numba as nb
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("gnss_processing.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger('GNSS_Solver')
logger.setLevel(logging.INFO)
LIGHT_SPEED = 299792458.0
SECONDS_IN_WEEK = 604800.0
GM = 3.986004418e14
EARTH_ROT_RATE = 7.2921151467e-5
SYSTEM_FREQUENCIES = {
'G': {
'L1': 1575.42e6,
'L2': 1227.60e6,
'L5': 1176.45e6
},
'R': {
'G1': lambda k: 1602.0e6 + k * 0.5625e6,
'G2': lambda k: 1246.0e6 + k * 0.4375e6
},
'E': {
'E1': 1575.42e6,
'E5a': 1176.45e6,
'E5b': 1207.14e6,
'E6': 1278.75e6
},
'C': {
'B1': 1561.098e6,
'B2': 1207.14e6,
'B3': 1268.52e6
},
'J': {
'L1': 1575.42e6,
'L2': 1227.60e6,
'L5': 1176.45e6,
'L6': 1278.75e6
}
}
ELLIPSOID_PARAMS = {
'WGS84': (6378137.0, 1/298.257223563),
'PZ90': (6378136.0, 1/298.25784),
'GTRF': (6378136.3, 1/298.257222101),
'CGCS2000': (6378137.0, 1/298.257222101),
'JGD2011': (6378136.6, 1/298.25642)
}
class AdvancedGNSSPositionSolver:
def __init__(self, obs_file, nav_file):
self.obs_file = obs_file
self.nav_file = nav_file
self.load_rinex_files()
self.previous_position = np.array([0, 0, 0])
self.previous_velocity = np.array([0, 0, 0])
self.ambiguities = {}
self.previous_phases = {}
self.cycle_slip_flags = {}
self.params = {
'elevation_mask': 10.0,
'use_ionosphere': True,
'use_troposphere': True,
'use_relativity': True,
'use_earth_rotation': True,
'use_phase_center': True,
'phase_weight': 0.001,
'pseudorange_weight': 1.0,
'ambiguity_resolution': 'float',
'cycle_slip_method': 'mw_gf',
'cycle_slip_threshold': 0.5,
'weighting_strategy': 'elevation',
'max_iterations': 50,
'convergence_threshold': 0.001,
'tropo_model': 'saastamoinen',
'iono_model': 'klobuchar',
'system_ellipsoid': True,
'frame_conversion': True,
'output_ellipsoid': 'WGS84',
'use_glonass_freq_channel': True
}
logger.info("高级GNSS解算器初始化完成")
def set_parameter(self, param_name, value):
"""更新解算参数"""
if param_name in self.params:
old_value = self.params[param_name]
self.params[param_name] = value
logger.info(f"参数更新: {param_name} = {value} (原值: {old_value})")
else:
logger.warning(f"尝试更新无效参数: {param_name}")
def load_rinex_files(self):
"""加载RINEX文件,处理版本差异"""
logger.info(f"加载观测文件: {self.obs_file} (版本: 3.02)")
try:
self.obs_data = gr.load(self.obs_file, version='3.02')
logger.info(f"观测文件加载成功! 时间范围: {self.obs_data.time[0].values} 到 {self.obs_data.time[-1].values}")
logger.info(f"包含卫星系统: {np.unique([sv[0] for sv in self.obs_data.sv.values])}")
unique_sats = np.unique(self.obs_data.sv.values)
logger.info(f"观测文件中包含 {len(unique_sats)} 颗唯一卫星")
except Exception as e:
logger.error(f"观测文件加载失败: {str(e)}")
raise
logger.info(f"加载导航文件: {self.nav_file} (版本: 3.04)")
try:
self.nav_data = gr.load(self.nav_file, version='3.04')
logger.info(f"导航文件加载成功! 包含卫星: {self.nav_data.sv.values}")
unique_nav_sats = np.unique(self.nav_data.sv.values)
logger.info(f"导航文件中包含 {len(unique_nav_sats)} 颗唯一卫星")
except Exception as e:
logger.error(f"导航文件加载失败: {str(e)}")
raise
def get_satellite_system(self, prn):
"""获取卫星系统标识"""
return prn[0] if isinstance(prn, str) else 'G'
def get_glonass_freq_channel(self, prn):
"""获取GLONASS频率通道号"""
try:
nav = self.nav_data.sel(sv=prn)
return nav['FrequencyNumber'].isel(time=0).values
except:
return 0
def get_frequency(self, system, signal, prn=None):
"""获取精确频率值"""
freq_dict = SYSTEM_FREQUENCIES.get(system, {})
if system == 'R' and self.params['use_glonass_freq_channel'] and prn:
k = self.get_glonass_freq_channel(prn)
if signal == 'G1':
return freq_dict['G1'](k)
elif signal == 'G2':
return freq_dict['G2'](k)
return freq_dict.get(signal, None)
def get_ellipsoid_params(self, system):
"""获取系统椭球参数"""
if not self.params['system_ellipsoid']:
return ELLIPSOID_PARAMS[self.params['output_ellipsoid']]
mapping = {
'G': 'WGS84',
'R': 'PZ90',
'E': 'GTRF',
'C': 'CGCS2000',
'J': 'JGD2011'
}
ellipsoid = mapping.get(system, 'WGS84')
return ELLIPSOID_PARAMS[ellipsoid]
def ecef2lla(self, pos, system='G'):
"""ECEF坐标转大地坐标 (使用系统特定椭球)"""
a, f = self.get_ellipsoid_params(system)
return pm.ecef2geodetic(pos[0], pos[1], pos[2], a=a, f=f)
def lla2ecef(self, lat, lon, alt, system='G'):
"""大地坐标转ECEF坐标 (使用系统特定椭球)"""
a, f = self.get_ellipsoid_params(system)
return pm.geodetic2ecef(lat, lon, alt, a=a, f=f)
def compute_satellite_position(self, prn, transmit_time):
"""根据广播星历精确计算卫星位置和钟差"""
system = self.get_satellite_system(prn)
try:
nav = self.nav_data.sel(sv=prn)
time_diff = np.abs((nav.time - transmit_time).astype(float))
min_idx = np.argmin(time_diff)
nav_epoch = nav.isel(time=min_idx)
if system == 'G':
return self.compute_gps_position(nav_epoch, transmit_time)
elif system == 'R':
return self.compute_glonass_position(nav_epoch, transmit_time)
elif system == 'E':
return self.compute_galileo_position(nav_epoch, transmit_time)
elif system == 'C':
return self.compute_bds_position(nav_epoch, transmit_time)
elif system == 'J':
return self.compute_qzss_position(nav_epoch, transmit_time)
else:
logger.warning(f"未知卫星系统: {system}")
return None, None
except Exception as e:
logger.error(f"计算卫星 {prn} 位置失败: {str(e)}")
return None, None
def compute_gps_position(self, nav, transmit_time):
"""精确计算GPS卫星位置和钟差"""
toc = nav['Toc'].values
af0 = nav['SVclockBias'].values
af1 = nav['SVclockDrift'].values
af2 = nav['SVclockDriftRate'].values
toe = nav['Toe'].values
sqrtA = nav['sqrtA'].values
e = nav['Eccentricity'].values
M0 = nav['M0'].values
delta_n = nav['DeltaN'].values
Omega0 = nav['Omega0'].values
OmegaDot = nav['OmegaDot'].values
i0 = nav['Io'].values
IDOT = nav['IDOT'].values
Cuc = nav['Cuc'].values
Cus = nav['Cus'].values
Crc = nav['Crc'].values
Crs = nav['Crs'].values
Cic = nav['Cic'].values
Cis = nav['Cis'].values
omega = nav['omega'].values
dt = transmit_time - toc
clock_corr = af0 + af1*dt + af2*dt**2
t = transmit_time - clock_corr
tk = t - toe
if tk > SECONDS_IN_WEEK/2:
tk -= SECONDS_IN_WEEK
elif tk < -SECONDS_IN_WEEK/2:
tk += SECONDS_IN_WEEK
n0 = np.sqrt(GM) / (sqrtA**3)
n = n0 + delta_n
M = M0 + n * tk
E = M
for _ in range(10):
E_old = E
E = M + e * np.sin(E)
if abs(E - E_old) < 1e-12:
break
sin_v = np.sqrt(1 - e**2) * np.sin(E) / (1 - e * np.cos(E))
cos_v = (np.cos(E) - e) / (1 - e * np.cos(E))
v = np.arctan2(sin_v, cos_v)
phi = v + omega
du = Cuc * np.cos(2*phi) + Cus * np.sin(2*phi)
dr = Crc * np.cos(2*phi) + Crs * np.sin(2*phi)
di = Cic * np.cos(2*phi) + Cis * np.sin(2*phi)
u = phi + du
r = (sqrtA**2) * (1 - e * np.cos(E)) + dr
i = i0 + di + IDOT * tk
x_prime = r * np.cos(u)
y_prime = r * np.sin(u)
Omega = Omega0 + (OmegaDot - EARTH_ROT_RATE) * tk - EARTH_ROT_RATE * toe
Omega = Omega % (2 * np.pi)
x = x_prime * np.cos(Omega) - y_prime * np.cos(i) * np.sin(Omega)
y = x_prime * np.sin(Omega) + y_prime * np.cos(i) * np.cos(Omega)
z = y_prime * np.sin(i)
F = -4.442807633e-10
dtr = F * e * sqrtA * np.sin(E)
clock_corr += dtr
if self.params['use_earth_rotation']:
rotation_angle = EARTH_ROT_RATE * (t - toe)
x_rot = x * np.cos(rotation_angle) - y * np.sin(rotation_angle)
y_rot = x * np.sin(rotation_angle) + y * np.cos(rotation_angle)
return np.array([x_rot, y_rot, z]), clock_corr
return np.array([x, y, z]), clock_corr
def compute_glonass_position(self, nav, transmit_time):
"""精确计算GLONASS卫星位置和钟差"""
t_oc = nav['Toc'].values
tau_n = nav['TauN'].values
gamma_n = nav['GammaN'].values
x = nav['X'].values
y = nav['Y'].values
z = nav['Z'].values
vx = nav['VX'].values
vy = nav['VY'].values
vz = nav['VZ'].values
ax = nav['AX'].values
ay = nav['AY'].values
az = nav['AZ'].values
dt = transmit_time - t_oc
x_sat = x + vx * dt + 0.5 * ax * dt**2
y_sat = y + vy * dt + 0.5 * ay * dt**2
z_sat = z + vz * dt + 0.5 * az * dt**2
clock_corr = -tau_n + gamma_n * dt
return np.array([x_sat, y_sat, z_sat]), clock_corr
def compute_galileo_position(self, nav, transmit_time):
"""精确计算Galileo卫星位置和钟差 (类似GPS但参数不同)"""
return self.compute_gps_position(nav, transmit_time)
def compute_bds_position(self, nav, transmit_time):
"""精确计算BDS卫星位置和钟差"""
return self.compute_gps_position(nav, transmit_time)
def compute_qzss_position(self, nav, transmit_time):
"""精确计算QZSS卫星位置和钟差"""
return self.compute_gps_position(nav, transmit_time)
def calculate_elevation_azimuth(self, rx_pos, sv_pos, system='G'):
"""计算卫星高度角和方位角 (考虑椭球)"""
lat, lon, alt = self.ecef2lla(rx_pos, system)
dx = sv_pos - rx_pos
r = np.linalg.norm(dx)
e = np.array([-np.sin(lon), np.cos(lon), 0])
n = np.array([-np.sin(lat)*np.cos(lon), -np.sin(lat)*np.sin(lon), np.cos(lat)])
u = np.array([np.cos(lat)*np.cos(lon), np.cos(lat)*np.sin(lon), np.sin(lat)])
dx_e = np.dot(dx, e)
dx_n = np.dot(dx, n)
dx_u = np.dot(dx, u)
elev = np.arctan2(dx_u, np.sqrt(dx_e**2 + dx_n**2))
azim = np.arctan2(dx_e, dx_n)
return np.degrees(elev), np.degrees(azim)
def saastamoinen_tropo(self, elev, lat, lon, alt, doy):
"""完整的Saastamoinen对流层模型"""
if not self.params['use_troposphere']:
return 0.0
P0 = 1013.25 * (1 - 0.0000226 * alt)**5.225
T0 = 15.0 - 0.0065 * alt + 273.15
RH = 0.5
e = 6.108 * RH * np.exp((17.15 * T0 - 4684.0) / (T0 - 38.45))
m_elev = 1.001 / np.sqrt(0.002001 + np.sin(np.radians(elev)))**2
trop_dry = (0.002277 * P0) / (1 - 0.00266 * np.cos(2 * np.radians(lat))) - 0.00028 * alt
trop_wet = (0.002277 * (1255/T0 + 0.05) * e)
return (trop_dry + trop_wet) * m_elev
def hopfield_tropo(self, elev, lat, lon, alt, T, P, RH):
"""完整的Hopfield对流层模型"""
e = 6.108 * RH * np.exp(17.15 * (T - 273.15) / (T - 38.45))
h_dry = 40136 + 148.72 * (T - 273.15)
h_wet = 11000
K_dry = 77.64 * P / T
K_wet = -12.96 * e / T + 3.718e5 * e / T**2
trop_dry = 1e-6 * K_dry * h_dry
trop_wet = 1e-6 * K_wet * h_wet
m_elev = 1 / np.sin(np.radians(elev))
return (trop_dry + trop_wet) * m_elev
def klobuchar_iono(self, lat, lon, elev, az, time, alpha, beta):
"""完整的Klobuchar电离层模型"""
phi_u = np.radians(lat) / np.pi
lambda_u = np.radians(lon) / np.pi
phi_m = phi_u + 0.064 * np.cos((lambda_u - 1.617) * np.pi)
t = 43200 * lambda_u + time.hour * 3600 + time.minute * 60 + time.second
t = t % 86400
F = 1.0 + 16.0 * (0.53 - elev/180)**3
PER = beta[0] + beta[1]*phi_m + beta[2]*phi_m**2 + beta[3]*phi_m**3
if PER < 72000: PER = 72000
x = 2 * np.pi * (t - 50400) / PER
if abs(x) < np.pi/2:
I = (5e-9 + (alpha[0] + alpha[1]*phi_m + alpha[2]*phi_m**2 + alpha[3]*phi_m**3) *
(1 - x**2/2 + x**4/24)) * F
else:
I = 5e-9 * F
return I * LIGHT_SPEED
def dual_freq_iono_correction(self, f1, f2, P1, P2):
"""双频电离层校正 (精确模型)"""
gamma = (f1/f2)**2
return (gamma * P1 - P2) / (gamma - 1)
def detect_cycle_slip(self, prn, signal, time, phase_L1, phase_L2, pseudo_L1, pseudo_L2):
"""周跳检测 (MW组合 + GF组合)"""
key = (prn, signal)
system = self.get_satellite_system(prn)
f1 = self.get_frequency(system, 'L1', prn) if system == 'G' else self.get_frequency(system, 'E1', prn)
f2 = self.get_frequency(system, 'L2', prn) if system == 'G' else self.get_frequency(system, 'E5a', prn)
if f1 is None or f2 is None:
return False
lambda1 = LIGHT_SPEED / f1
lambda2 = LIGHT_SPEED / f2
N_w = (phase_L1 - phase_L2) - (f1 - f2)/(f1 + f2) * (pseudo_L1/lambda1 + pseudo_L2/lambda2)
GF = phase_L1 * lambda1 - phase_L2 * lambda2
if key not in self.previous_phases:
self.previous_phases[key] = (N_w, GF, time)
return False
prev_N_w, prev_GF, prev_time = self.previous_phases[key]
dt = (time - prev_time).total_seconds()
if dt <= 0:
return False
MW_threshold = self.params['cycle_slip_threshold']
GF_threshold = 0.05
MW_slip = abs(N_w - prev_N_w) > MW_threshold
GF_slip = abs(GF - prev_GF) > GF_threshold
self.previous_phases[key] = (N_w, GF, time)
if MW_slip or GF_slip:
logger.info(f"检测到周跳: {prn} {signal} (MW变化: {abs(N_w - prev_N_w):.3f}, GF变化: {abs(GF - prev_GF):.3f}m)")
return True
return False
def process_epoch(self, time):
"""处理单个历元数据"""
logger.info(f"\n{'='*50}")
logger.info(f"处理历元: {time}")
try:
epoch_data = self.obs_data.sel(time=time, method='nearest')
logger.info(f"找到 {len(epoch_data.sv)} 颗卫星的观测值")
measurements = []
num_valid_sats = 0
for sv in epoch_data.sv.values:
prn = sv.item()
system = self.get_satellite_system(prn)
sat_data = epoch_data.sel(sv=prn)
signals = {}
for obs_type in sat_data.data_vars:
if obs_type.startswith('C') and not np.isnan(sat_data[obs_type].values):
pseudo = sat_data[obs_type].values
phase_type = 'L' + obs_type[1:]
if phase_type in sat_data.data_vars:
phase = sat_data[phase_type].values
if system == 'G':
signal = 'L1' if '1' in obs_type else 'L2'
elif system == 'R':
signal = 'G1' if '1' in obs_type else 'G2'
elif system == 'E':
signal = 'E1' if '1' in obs_type else 'E5a'
elif system == 'C':
signal = 'B1' if '2' in obs_type else 'B2'
elif system == 'J':
signal = 'L1' if '1' in obs_type else 'L2'
else:
signal = obs_type
freq = self.get_frequency(system, signal, prn)
if freq is None:
continue
snr_type = 'S' + obs_type[1:]
snr = sat_data[snr_type].values if snr_type in sat_data.data_vars else 50
signals[signal] = {
'pseudo': pseudo,
'phase': phase,
'snr': snr,
'freq': freq,
'obs_type': obs_type
}
if not signals:
logger.debug(f"卫星 {prn} 无有效信号")
continue
pseudo_estimate = min([s['pseudo'] for s in signals.values()])
transmit_time = time - timedelta(seconds=pseudo_estimate/LIGHT_SPEED)
sv_pos, sv_clock = self.compute_satellite_position(prn, transmit_time)
if sv_pos is None:
continue
for signal, obs_data in signals.items():
try:
if 'L1' in signals and 'L2' in signals and signal == 'L1':
slip = self.detect_cycle_slip(
prn, signal, time,
signals['L1']['phase'], signals['L2']['phase'],
signals['L1']['pseudo'], signals['L2']['pseudo']
)
if slip:
for s in signals:
key = (prn, s)
if key in self.ambiguities:
del self.ambiguities[key]
measurements.append({
'prn': prn,
'signal': signal,
'pseudo': obs_data['pseudo'],
'phase': obs_data['phase'],
'snr': obs_data['snr'],
'freq': obs_data['freq'],
'sv_pos': sv_pos,
'sv_clock': sv_clock,
'system': system,
'obs_type': obs_data['obs_type']
})
num_valid_sats += 1
logger.debug(f"添加卫星: {prn}-{signal}, "
f"伪距={obs_data['pseudo']:.2f}m, 相位={obs_data['phase']:.2f}周")
except Exception as e:
logger.warning(f"处理卫星 {prn} 信号 {signal} 时出错: {str(e)}")
min_sats = 4
if num_valid_sats < min_sats:
logger.warning(f"有效卫星不足: 需要至少 {min_sats} 颗, 当前 {num_valid_sats} 颗")
return None
logger.info(f"使用 {num_valid_sats} 颗卫星进行定位")
if np.any(self.previous_position != 0):
x0 = np.append(self.previous_position, 0.0)
else:
x0 = self.initialize_position(measurements)
solution = self.solve_position(measurements, time, x0)
if solution is None:
return None
self.previous_position = solution['position']
lat, lon, alt = self.ecef2lla(solution['position'])
result = {
'time': time,
'position': solution['position'],
'clock_offset': solution['clock_offset'],
'lat': np.degrees(lat),
'lon': np.degrees(lon),
'alt': alt,
'num_sats': num_valid_sats,
'residuals': solution['residuals'],
'dop': solution['dop'],
'used_systems': set(m['system'] for m in measurements)
}
logger.info(f"定位成功: 位置=[{result['lat']:.6f}°, {result['lon']:.6f}°, {result['alt']:.2f}m], "
f"钟差={result['clock_offset']*1e9:.2f}ns, 卫星数={result['num_sats']}")
return result
except Exception as e:
logger.error(f"处理历元时出错: {str(e)}", exc_info=True)
return None
def initialize_position(self, measurements):
"""使用伪距单点定位初始化位置"""
positions = []
weights = []
for m in measurements:
est_pos = m['sv_pos'] - m['pseudo'] * (m['sv_pos'] / np.linalg.norm(m['sv_pos']))
positions.append(est_pos)
elev, _ = self.calculate_elevation_azimuth(np.zeros(3), m['sv_pos'], m['system'])
weights.append(np.sin(np.radians(elev)))
weights = np.array(weights) / np.sum(weights)
return np.average(positions, axis=0, weights=weights)
def solve_position(self, measurements, time, x0):
"""使用加权最小二乘解算位置"""
def residuals(x):
rx_pos = x[:3]
dt_rx = x[3] if len(x) > 3 else 0.0
res = []
for m in measurements:
sv_pos = m['sv_pos']
sv_clock = m['sv_clock']
pseudo = m['pseudo']
phase = m['phase']
freq = m['freq']
wavelength = LIGHT_SPEED / freq
system = m['system']
geo_dist = np.linalg.norm(sv_pos - rx_pos)
elev, azim = self.calculate_elevation_azimuth(rx_pos, sv_pos, system)
if elev < self.params['elevation_mask']:
continue
lat, lon, alt = self.ecef2lla(rx_pos, system)
tropo_delay = 0.0
if self.params['use_troposphere']:
if self.params['tropo_model'] == 'saastamoinen':
doy = time.timetuple().tm_yday
tropo_delay = self.saastamoinen_tropo(elev, lat, lon, alt, doy)
elif self.params['tropo_model'] == 'hopfield':
T = 288.15 - 0.0065 * alt
P = 1013.25 * (1 - 0.0000226 * alt)**5.225
RH = 0.5
tropo_delay = self.hopfield_tropo(elev, lat, lon, alt, T, P, RH)
iono_delay = 0.0
if self.params['use_ionosphere'] and self.params['iono_model'] == 'klobuchar':
try:
alpha = self.nav_data.IONAlpha.values
beta = self.nav_data.IONBeta.values
except:
alpha = [1.49e-8, -1.79e-8, -1.59e-7, 5.59e-7]
beta = [1.40e5, 0, -3.03e5, 5.47e5]
iono_delay = self.klobuchar_iono(
np.degrees(lat), np.degrees(lon), elev, azim, time, alpha, beta
)
rel_correction = 0.0
model_dist = (geo_dist
+ tropo_delay
- iono_delay
+ rel_correction
+ LIGHT_SPEED * (sv_clock - dt_rx))
res_pseudo = pseudo - model_dist
key = (m['prn'], m['signal'])
if key in self.ambiguities:
N = self.ambiguities[key]
else:
N = phase - model_dist / wavelength
self.ambiguities[key] = N
res_phase = phase - (model_dist / wavelength + N)
res.append(res_pseudo)
res.append(res_phase)
return np.array(res)
try:
result = least_squares(
residuals,
x0,
method='lm',
max_nfev=self.params['max_iterations'],
ftol=self.params['convergence_threshold'],
verbose=0
)
if not result.success:
logger.warning(f"最小二乘解算失败: {result.message}")
return None
dop = self.calculate_dop(measurements, result.x[:3])
return {
'position': result.x[:3],
'clock_offset': result.x[3] if len(result.x) > 3 else 0.0,
'residuals': result.fun,
'dop': dop
}
except Exception as e:
logger.error(f"最小二乘解算出错: {str(e)}")
return None
def calculate_dop(self, measurements, rx_pos):
"""计算精度因子(DOP)"""
A = []
for m in measurements:
sv_pos = m['sv_pos']
dx = sv_pos - rx_pos
r = np.linalg.norm(dx)
unit_vec = dx / r
A.append(np.append(unit_vec, 1))
if len(A) < 4:
return {}
A = np.array(A)
Q = np.linalg.inv(A.T @ A)
dop = {
'GDOP': np.sqrt(np.trace(Q)),
'PDOP': np.sqrt(Q[0,0] + Q[1,1] + Q[2,2]),
'HDOP': np.sqrt(Q[0,0] + Q[1,1]),
'VDOP': np.sqrt(Q[2,2]),
'TDOP': np.sqrt(Q[3,3])
}
return dop
def process_file(self, output_file=None):
"""处理整个RINEX文件"""
results = []
success_count = 0
failure_count = 0
times = self.obs_data.time.values
logger.info(f"开始处理 {len(times)} 个历元")
for i, time in enumerate(tqdm(times, desc="处理历元")):
result = self.process_epoch(time)
if result:
results.append(result)
success_count += 1
else:
failure_count += 1
logger.info(f"处理完成: 成功 {success_count} 个历元, 失败 {failure_count} 个历元")
if output_file and results:
df = pd.DataFrame([{
'time': r['time'],
'x': r['position'][0],
'y': r['position'][1],
'z': r['position'][2],
'lat': r['lat'],
'lon': r['lon'],
'alt': r['alt'],
'clock_offset_ns': r['clock_offset'] * 1e9,
'num_sats': r['num_sats'],
'gdop': r['dop'].get('GDOP', 0),
'systems': ','.join(r['used_systems'])
} for r in results])
df.to_csv(output_file, index=False)
logger.info(f"结果保存至: {output_file}")
self.plot_results(df)
return results
def plot_results(self, df):
"""绘制结果图表"""
if len(df) < 2:
return
plt.figure(figsize=(15, 12))
plt.subplot(3, 2, 1)
plt.scatter(df['lon'], df['lat'], c=df['alt'], cmap='viridis', alpha=0.7)
plt.colorbar(label='高度 (m)')
plt.xlabel('经度 (°)')
plt.ylabel('纬度 (°)')
plt.title('GNSS接收机轨迹')
plt.grid(True)
plt.subplot(3, 2, 2)
plt.plot(df['time'], df['alt'])
plt.xlabel('时间')
plt.ylabel('高度 (m)')
plt.title('高度变化')
plt.grid(True)
plt.xticks(rotation=45)
plt.subplot(3, 2, 3)
plt.plot(df['time'], df['num_sats'])
plt.xlabel('时间')
plt.ylabel('卫星数量')
plt.title('可见卫星数量变化')
plt.grid(True)
plt.xticks(rotation=45)
plt.subplot(3, 2, 4)
plt.plot(df['time'], df['clock_offset_ns'])
plt.xlabel('时间')
plt.ylabel('钟差 (ns)')
plt.title('接收机钟差变化')
plt.grid(True)
plt.xticks(rotation=45)
plt.subplot(3, 2, 5)
plt.plot(df['time'], df['gdop'])
plt.xlabel('时间')
plt.ylabel('GDOP')
plt.title('几何精度因子变化')
plt.grid(True)
plt.xticks(rotation=45)
plt.subplot(3, 2, 6)
system_counts = {}
for systems in df['systems']:
for sys in systems.split(','):
system_counts[sys] = system_counts.get(sys, 0) + 1
plt.pie(system_counts.values(), labels=system_counts.keys(), autopct='%1.1f%%')
plt.title('卫星系统使用分布')
plt.tight_layout()
plt.savefig('gnss_results.png')
logger.info("结果图表已保存为 gnss_results.png")
if __name__ == "__main__":
solver = AdvancedGNSSPositionSolver('your_obs.21o', 'your_nav.21n')
logger.setLevel(logging.INFO)
solver.set_parameter('elevation_mask', 5)
solver.set_parameter('system_ellipsoid', True)
solver.set_parameter('use_glonass_freq_channel', True)
results = solver.process_file('positions.csv')
if results:
num_sats = [r['num_sats'] for r in results]
logger.info(f"平均卫星数: {np.mean(num_sats):.1f}, 最小: {min(num_sats)}, 最大: {max(num_sats)}")
residuals = np.concatenate([r['residuals'] for r in results if 'residuals' in r])
if len(residuals) > 0:
logger.info(f"残差统计: 均值={np.mean(residuals):.2f}m, 标准差={np.std(residuals):.2f}m")
system_usage = {}
for r in results:
for sys in r['used_systems']:
system_usage[sys] = system_usage.get(sys, 0) + 1
logger.info(f"卫星系统使用情况: {system_usage}")