GPX internal python module update

GPX internal pythong module update for open source and geoartifacthelper
This commit is contained in:
Mark McKinnon 2020-01-31 15:29:38 -05:00
parent 678d661c21
commit edd20e506d
9 changed files with 4137 additions and 0 deletions

View File

@ -0,0 +1,253 @@
"""
Autopsy Forensic Browser
Copyright 2019-2020 Basis Technology Corp.
Contact: carrier <at> sleuthkit <dot> org
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import jarray
import inspect
import time
import calendar
from datetime import datetime
from java.lang import System
from java.util.logging import Level
from java.io import File
from java.util import ArrayList
from org.sleuthkit.datamodel import SleuthkitCase
from org.sleuthkit.datamodel import AbstractFile
from org.sleuthkit.datamodel import ReadContentInputStream
from org.sleuthkit.datamodel import Blackboard
from org.sleuthkit.datamodel import BlackboardArtifact
from org.sleuthkit.datamodel import BlackboardAttribute
from org.sleuthkit.datamodel import TskCoreException
from org.sleuthkit.datamodel.blackboardutils import GeoArtifactsHelper
from org.sleuthkit.autopsy.datamodel import ContentUtils
from org.sleuthkit.autopsy.ingest import IngestModule
from org.sleuthkit.autopsy.ingest.IngestModule import IngestModuleException
from org.sleuthkit.autopsy.ingest import DataSourceIngestModule
from org.sleuthkit.autopsy.ingest import FileIngestModule
from org.sleuthkit.autopsy.ingest import IngestModuleFactoryAdapter
from org.sleuthkit.autopsy.ingest import IngestMessage
from org.sleuthkit.autopsy.ingest import IngestServices
from org.sleuthkit.autopsy.coreutils import Logger
from org.sleuthkit.autopsy.casemodule import Case
from org.sleuthkit.autopsy.casemodule.services import Services
from org.sleuthkit.autopsy.casemodule.services import FileManager
from org.sleuthkit.autopsy.ingest import ModuleDataEvent
# Based on gpxpy module: https://github.com/tkrajina/gpxpy
import gpxpy
import gpxpy.gpx
# Factory that defines the name and details of the module and allows Autopsy
# to create instances of the modules that will do the analysis.
class GPXParserDataSourceIngestModuleFactory(IngestModuleFactoryAdapter):
moduleName = "GPX Parser Module"
# True - Verbose debugging messages sent to log file.
# False - Verbose debugging turned off.
debuglevel = True
def getModuleDisplayName(self):
return self.moduleName
# TODO: Give it a description
def getModuleDescription(self):
return "Module that extracts GEO data from GPX files."
def getModuleVersionNumber(self):
return "1.1"
def isDataSourceIngestModuleFactory(self):
return True
def createDataSourceIngestModule(self, ingestOptions):
return GPXParserDataSourceIngestModule()
# Data Source-level ingest module. One gets created per data source.
class GPXParserDataSourceIngestModule(DataSourceIngestModule):
_logger = Logger.getLogger(GPXParserDataSourceIngestModuleFactory.moduleName)
def log(self, level, msg):
self._logger.logp(level, self.__class__.__name__, inspect.stack()[1][3], msg)
def __init__(self):
self.context = None
# Where any setup and configuration is done.
def startUp(self, context):
self.context = context
# Where the analysis is done.
def process(self, dataSource, progressBar):
# We don't know how much work there is yet.
progressBar.switchToIndeterminate()
# This will work in 4.0.1 and beyond.
# Use blackboard class to index blackboard artifacts for keyword search.
blackboard = Case.getCurrentCase().getServices().getBlackboard()
# Get the sleuthkitcase
skcase = Case.getCurrentCase().getSleuthkitCase()
# In the name and then count and read them.
fileManager = Case.getCurrentCase().getServices().getFileManager()
files = fileManager.findFiles(dataSource, "%.gpx")
# TODO: Would like to change this to find files based on mimetype rather than extension.
#files = findFiles(dataSource, "text/xml")
#if (file.isMimeType('text/xml') == False):
numFiles = len(files)
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "found " + str(numFiles) + " files")
progressBar.switchToDeterminate(numFiles)
fileCount = 0;
# Get module name for adding attributes
modulename = GPXParserDataSourceIngestModuleFactory.moduleName
for file in files:
# Get the GeoArtifactsHelper
geoartifacthelper = GeoArtifactsHelper(skcase, modulename, file)
# Check if the user pressed cancel while we were busy.
if self.context.isJobCancelled():
return IngestModule.ProcessResult.OK
#self.log(Level.INFO, "GPX: Processing file: " + file.getName())
fileCount += 1
# Check if module folder is present. If not, create it.
dirname = os.path.join(Case.getCurrentCase().getTempDirectory(), "GPX_Parser_Module")
try:
os.stat(dirname)
except:
os.mkdir(dirname)
filename = os.path.join(dirname, "tmp.gpx")
# Check to see if temporary file exists. If it does, remove it.
if os.path.exists(filename):
try:
os.remove(filename)
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX:\t" + "FILE DELETED " + filename )
except:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX:\t" + "FILE NOT DELETED " + filename)
# This writes the file to the local file system.
localFile = File(filename)
ContentUtils.writeToFile(file, localFile)
# Send to gpxpy for parsing.
gpxfile = open(filename)
try:
gpx = gpxpy.parse(gpxfile)
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX:\t" + "FILE PARSED")
except:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.SEVERE, "GPX:\t" + file.getName() + " - FILE NOT PARSED")
continue
if gpx:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX: TRACKS")
for track in gpx.tracks:
for segment in track.segments:
for point in segment.points:
# Make an Array to add attributes to allows to use bulk add attributes
otherattributes = ArrayList()
# Evelation may be None so why it is in a try block
try:
otherattributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_GEO_ALTITUDE.getTypeID(), modulename, point.elevation))
except:
pass
otherattributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_FLAG.getTypeID(), modulename, "Tracks"))
datetime = 0
try:
if (point.time != None):
datetime = long(time.mktime(point.time.timetuple()))
except:
pass
try:
# Add the trackpoint using the helper class
geoartifact = geoartifacthelper.addGPSTrackpoint(point.latitude, point.longitude, datetime, "Trackpoint", "GPXParser", otherattributes)
except Blackboard.BlackboardException as e:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.SEVERE, "GPX: Error using geo artifact helper with blackboard " )
except TskCoreException as e:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.SEVERE, "GPX: Error using geo artifact helper tskcoreexception" )
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX: WAYPOINTS")
for waypoint in gpx.waypoints:
attributes = ArrayList()
art = file.newArtifact(BlackboardArtifact.ARTIFACT_TYPE.TSK_GPS_BOOKMARK)
attributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_GEO_LATITUDE.getTypeID(), modulename, waypoint.latitude))
attributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_GEO_LONGITUDE.getTypeID(), modulename, waypoint.longitude))
attributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_FLAG.getTypeID(), modulename, "Waypoint"))
attributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_NAME.getTypeID(), modulename, waypoint.name))
attributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_PROG_NAME.getTypeID(), modulename, "GPXParser"))
art.addAttributes(attributes)
try:
# Post the artifact to blackboard
skcase.getBlackboard().postArtifact(art, modulename)
except Blackboard.BlackboardException as e:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.SEVERE, "GPX: Error using geo artifact helper with blackboard for waypoints" )
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX: ROUTES")
for route in gpx.routes:
for point in route.points:
otherattributes = ArrayList()
otherattributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_FLAG.getTypeID(), modulename, "Route"))
# Evelation may be None so why it is in a try block
try:
otherattributes.add(BlackboardAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_GEO_ALTITUDE.getTypeID(), modulename, point.elevation))
except:
pass
try:
# Add the artifact using the geoArtifactHelper.
geoartifact = geoartifacthelper.addGPSTrackpoint(point.latitude, point.longitude, 0, "Trackpoint", "GPXParser", otherattributes)
except Blackboard.BlackboardException as e:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.SEVERE, "GPX: Error using geo artifact helper with blackboard for Routes")
except TskCoreException as e:
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.SEVERE, "GPX: Error using geo artifact helper tskcoreexception for routes" )
# Update the progress bar.
progressBar.progress(fileCount)
if os.path.exists(filename):
try:
os.remove(filename)
if GPXParserDataSourceIngestModuleFactory.debuglevel: self.log(Level.INFO, "GPX:\t" + "FILE DELETED")
except:
self.log(Level.SEVERE, "GPX:\t" + "FILE NOT DELETED")
# Post a message to the ingest messages inbox.
message = IngestMessage.createMessage(IngestMessage.MessageType.DATA, "GPX Parser Data Source Ingest Module", "Found %d files" % fileCount)
IngestServices.getInstance().postMessage(message)
return IngestModule.ProcessResult.OK;

View File

@ -0,0 +1,13 @@
Metadata-Version: 1.1
Name: gpxpy
Version: 0.8.8
Summary: GPX file parser and GPS track manipulation library
Home-page: http://www.trackprofiler.com/gpxpy/index.html
Author: Tomo Krajina
Author-email: tkrajina@gmail.com
License: Apache License, Version 2.0
Description: UNKNOWN
Platform: UNKNOWN
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 3

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright 2011 Tomo Krajina
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def parse(xml_or_file, parser=None):
"""
Parse xml (string) or file object. This is just an wrapper for
GPXParser.parse() function.
parser may be 'lxml', 'minidom' or None (then it will be automatically
detected, lxml if possible).
xml_or_file must be the xml to parse or a file-object with the XML.
"""
from . import gpx as mod_gpx
from . import parser as mod_parser
parser = mod_parser.GPXParser(xml_or_file, parser=parser)
return parser.parse()

View File

@ -0,0 +1,384 @@
# -*- coding: utf-8 -*-
# Copyright 2011 Tomo Krajina
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pdb
import logging as mod_logging
import math as mod_math
from . import utils as mod_utils
# Generic geo related function and class(es)
# One degree in meters:
ONE_DEGREE = 1000. * 10000.8 / 90.
EARTH_RADIUS = 6371 * 1000
def to_rad(x):
return x / 180. * mod_math.pi
def haversine_distance(latitude_1, longitude_1, latitude_2, longitude_2):
"""
Haversine distance between two points, expressed in meters.
Implemented from http://www.movable-type.co.uk/scripts/latlong.html
"""
d_lat = to_rad(latitude_1 - latitude_2)
d_lon = to_rad(longitude_1 - longitude_2)
lat1 = to_rad(latitude_1)
lat2 = to_rad(latitude_2)
a = mod_math.sin(d_lat/2) * mod_math.sin(d_lat/2) + \
mod_math.sin(d_lon/2) * mod_math.sin(d_lon/2) * mod_math.cos(lat1) * mod_math.cos(lat2)
c = 2 * mod_math.atan2(mod_math.sqrt(a), mod_math.sqrt(1-a))
d = EARTH_RADIUS * c
return d
def length(locations=None, _3d=None):
locations = locations or []
if not locations:
return 0
length = 0
for i in range(len(locations)):
if i > 0:
previous_location = locations[i - 1]
location = locations[i]
if _3d:
d = location.distance_3d(previous_location)
else:
d = location.distance_2d(previous_location)
if d != 0 and not d:
pass
else:
length += d
return length
def length_2d(locations=None):
""" 2-dimensional length (meters) of locations (only latitude and longitude, no elevation). """
locations = locations or []
return length(locations, False)
def length_3d(locations=None):
""" 3-dimensional length (meters) of locations (it uses latitude, longitude, and elevation). """
locations = locations or []
return length(locations, True)
def calculate_max_speed(speeds_and_distances):
"""
Compute average distance and standard deviation for distance. Extremes
in distances are usually extremes in speeds, so we will ignore them,
here.
speeds_and_distances must be a list containing pairs of (speed, distance)
for every point in a track segment.
"""
assert speeds_and_distances
if len(speeds_and_distances) > 0:
assert len(speeds_and_distances[0]) == 2
# ...
assert len(speeds_and_distances[-1]) == 2
size = float(len(speeds_and_distances))
if size < 20:
mod_logging.debug('Segment too small to compute speed, size=%s', size)
return None
distances = list(map(lambda x: x[1], speeds_and_distances))
average_distance = sum(distances) / float(size)
standard_distance_deviation = mod_math.sqrt(sum(map(lambda distance: (distance-average_distance)**2, distances))/size)
# Ignore items where the distance is too big:
filtered_speeds_and_distances = filter(lambda speed_and_distance: abs(speed_and_distance[1] - average_distance) <= standard_distance_deviation * 1.5, speeds_and_distances)
# sort by speed:
speeds = list(map(lambda speed_and_distance: speed_and_distance[0], filtered_speeds_and_distances))
if not isinstance(speeds, list): # python3
speeds = list(speeds)
if not speeds:
return None
speeds.sort()
# Even here there may be some extremes => ignore the last 5%:
index = int(len(speeds) * 0.95)
if index >= len(speeds):
index = -1
return speeds[index]
def calculate_uphill_downhill(elevations):
if not elevations:
return 0, 0
size = len(elevations)
def __filter(n):
current_ele = elevations[n]
if current_ele is None:
return False
if 0 < n < size - 1:
previous_ele = elevations[n-1]
next_ele = elevations[n+1]
if previous_ele is not None and current_ele is not None and next_ele is not None:
return previous_ele*.3 + current_ele*.4 + next_ele*.3
return current_ele
smoothed_elevations = list(map(__filter, range(size)))
uphill, downhill = 0., 0.
for n, elevation in enumerate(smoothed_elevations):
if n > 0 and elevation is not None and smoothed_elevations is not None:
d = elevation - smoothed_elevations[n-1]
if d > 0:
uphill += d
else:
downhill -= d
return uphill, downhill
def distance(latitude_1, longitude_1, elevation_1, latitude_2, longitude_2, elevation_2,
haversine=None):
"""
Distance between two points. If elevation is None compute a 2d distance
if haversine==True -- haversine will be used for every computations,
otherwise...
Haversine distance will be used for distant points where elevation makes a
small difference, so it is ignored. That's because haversine is 5-6 times
slower than the dummy distance algorithm (which is OK for most GPS tracks).
"""
# If points too distant -- compute haversine distance:
if haversine or (abs(latitude_1 - latitude_2) > .2 or abs(longitude_1 - longitude_2) > .2):
return haversine_distance(latitude_1, longitude_1, latitude_2, longitude_2)
coef = mod_math.cos(latitude_1 / 180. * mod_math.pi)
x = latitude_1 - latitude_2
y = (longitude_1 - longitude_2) * coef
distance_2d = mod_math.sqrt(x * x + y * y) * ONE_DEGREE
if elevation_1 is None or elevation_2 is None or elevation_1 == elevation_2:
return distance_2d
return mod_math.sqrt(distance_2d ** 2 + (elevation_1 - elevation_2) ** 2)
def elevation_angle(location1, location2, radians=False):
""" Uphill/downhill angle between two locations. """
if location1.elevation is None or location2.elevation is None:
return None
b = float(location2.elevation - location1.elevation)
a = location2.distance_2d(location1)
if a == 0:
return 0
angle = mod_math.atan(b / a)
if radians:
return angle
return 180 * angle / mod_math.pi
def distance_from_line(point, line_point_1, line_point_2):
""" Distance of point from a line given with two points. """
assert point, point
assert line_point_1, line_point_1
assert line_point_2, line_point_2
a = line_point_1.distance_2d(line_point_2)
if a == 0:
return line_point_1.distance_2d(point)
b = line_point_1.distance_2d(point)
c = line_point_2.distance_2d(point)
s = (a + b + c) / 2.
return 2. * mod_math.sqrt(abs(s * (s - a) * (s - b) * (s - c))) / a
def get_line_equation_coefficients(location1, location2):
"""
Get line equation coefficients for:
latitude * a + longitude * b + c = 0
This is a normal cartesian line (not spherical!)
"""
if location1.longitude == location2.longitude:
# Vertical line:
return float(0), float(1), float(-location1.longitude)
else:
a = float(location1.latitude - location2.latitude) / (location1.longitude - location2.longitude)
b = location1.latitude - location1.longitude * a
return float(1), float(-a), float(-b)
def simplify_polyline(points, max_distance):
"""Does Ramer-Douglas-Peucker algorithm for simplification of polyline """
if len(points) < 3:
return points
begin, end = points[0], points[-1]
# Use a "normal" line just to detect the most distant point (not its real distance)
# this is because this is faster to compute than calling distance_from_line() for
# every point.
#
# This is an approximation and may have some errors near the poles and if
# the points are too distant, but it should be good enough for most use
# cases...
a, b, c = get_line_equation_coefficients(begin, end)
tmp_max_distance = -1000000
tmp_max_distance_position = None
for point_no in range(len(points[1:-1])):
point = points[point_no]
d = abs(a * point.latitude + b * point.longitude + c)
if d > tmp_max_distance:
tmp_max_distance = d
tmp_max_distance_position = point_no
# Now that we have the most distance point, compute its real distance:
real_max_distance = distance_from_line(points[tmp_max_distance_position], begin, end)
if real_max_distance < max_distance:
return [begin, end]
return (simplify_polyline(points[:tmp_max_distance_position + 2], max_distance) +
simplify_polyline(points[tmp_max_distance_position + 1:], max_distance)[1:])
class Location:
""" Generic geographical location """
latitude = None
longitude = None
elevation = None
def __init__(self, latitude, longitude, elevation=None):
self.latitude = latitude
self.longitude = longitude
self.elevation = elevation
def has_elevation(self):
return self.elevation or self.elevation == 0
def remove_elevation(self):
self.elevation = None
def distance_2d(self, location):
if not location:
return None
return distance(self.latitude, self.longitude, None, location.latitude, location.longitude, None)
def distance_3d(self, location):
if not location:
return None
return distance(self.latitude, self.longitude, self.elevation, location.latitude, location.longitude, location.elevation)
def elevation_angle(self, location, radians=False):
return elevation_angle(self, location, radians)
def move(self, location_delta):
self.latitude, self.longitude = location_delta.move(self)
def __add__(self, location_delta):
latitude, longitude = location_delta.move(self)
return Location(latitude, longitude)
def __str__(self):
return '[loc:%s,%s@%s]' % (self.latitude, self.longitude, self.elevation)
def __repr__(self):
if self.elevation is None:
return 'Location(%s, %s)' % (self.latitude, self.longitude)
else:
return 'Location(%s, %s, %s)' % (self.latitude, self.longitude, self.elevation)
def __hash__(self):
return mod_utils.hash_object(self, ('latitude', 'longitude', 'elevation'))
class LocationDelta:
"""
Intended to use similar to timestamp.timedelta, but for Locations.
"""
NORTH = 0
EAST = 90
SOUTH = 180
WEST = 270
def __init__(self, distance=None, angle=None, latitude_diff=None, longitude_diff=None):
"""
Version 1:
Distance (in meters).
angle_from_north *clockwise*.
...must be given
Version 2:
latitude_diff and longitude_diff
...must be given
"""
if (distance is not None) and (angle is not None):
if (latitude_diff is not None) or (longitude_diff is not None):
raise Exception('No lat/lon diff if using distance and angle!')
self.distance = distance
self.angle_from_north = angle
self.move_function = self.move_by_angle_and_distance
elif (latitude_diff is not None) and (longitude_diff is not None):
if (distance is not None) or (angle is not None):
raise Exception('No distance/angle if using lat/lon diff!')
this.latitude_diff = latitude_diff
this.longitude_diff = longitude_diff
self.move_function = self.move_by_lat_lon_diff
def move(self, location):
"""
Move location by this timedelta.
"""
return self.move_function(location)
def move_by_angle_and_distance(self, location):
coef = mod_math.cos(location.latitude / 180. * mod_math.pi)
vertical_distance_diff = mod_math.sin((90 - self.angle_from_north) / 180. * mod_math.pi) / ONE_DEGREE
horizontal_distance_diff = mod_math.cos((90 - self.angle_from_north) / 180. * mod_math.pi) / ONE_DEGREE
lat_diff = self.distance * vertical_distance_diff
lon_diff = self.distance * horizontal_distance_diff / coef
return location.latitude + lat_diff, location.longitude + lon_diff
def move_by_lat_lon_diff(self, location):
return location.latitude + self.latitude_diff, location.longitude + self.longitude_diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,406 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Tomo Krajina
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect as mod_inspect
import datetime as mod_datetime
from . import utils as mod_utils
class GPXFieldTypeConverter:
def __init__(self, from_string, to_string):
self.from_string = from_string
self.to_string = to_string
def parse_time(string):
from . import gpx as mod_gpx
if not string:
return None
if 'T' in string:
string = string.replace('T', ' ')
if 'Z' in string:
string = string.replace('Z', '')
for date_format in mod_gpx.DATE_FORMATS:
try:
return mod_datetime.datetime.strptime(string, date_format)
except ValueError as e:
pass
raise GPXException('Invalid time: %s' % string)
# ----------------------------------------------------------------------------------------------------
# Type converters used to convert from/to the string in the XML:
# ----------------------------------------------------------------------------------------------------
class FloatConverter:
def __init__(self):
self.from_string = lambda string : None if string is None else float(string.strip())
self.to_string = lambda flt : str(flt)
class IntConverter:
def __init__(self):
self.from_string = lambda string : None if string is None else int(string.strip())
self.to_string = lambda flt : str(flt)
class TimeConverter:
def from_string(self, string):
from . import gpx as mod_gpx
if not string:
return None
if 'T' in string:
string = string.replace('T', ' ')
if 'Z' in string:
string = string.replace('Z', '')
for date_format in mod_gpx.DATE_FORMATS:
try:
return mod_datetime.datetime.strptime(string, date_format)
except ValueError as e:
pass
return None
def to_string(self, time):
from . import gpx as mod_gpx
return time.strftime(mod_gpx.DATE_FORMAT) if time else None
INT_TYPE = IntConverter()
FLOAT_TYPE = FloatConverter()
TIME_TYPE = TimeConverter()
# ----------------------------------------------------------------------------------------------------
# Field converters:
# ----------------------------------------------------------------------------------------------------
class AbstractGPXField:
def __init__(self, attribute_field=None, is_list=None):
self.attribute_field = attribute_field
self.is_list = is_list
self.attribute = False
def from_xml(self, parser, node, version):
raise Exception('Not implemented')
def to_xml(self, value, version):
raise Exception('Not implemented')
class GPXField(AbstractGPXField):
"""
Used for to (de)serialize fields with simple field<->xml_tag mapping.
"""
def __init__(self, name, tag=None, attribute=None, type=None, possible=None, mandatory=None):
AbstractGPXField.__init__(self)
self.name = name
if tag and attribute:
raise GPXException('Only tag *or* attribute may be given!')
if attribute:
self.tag = None
self.attribute = name if attribute is True else attribute
elif tag:
self.tag = name if tag is True else tag
self.attribute = None
else:
self.tag = name
self.attribute = None
self.type_converter = type
self.possible = possible
self.mandatory = mandatory
def from_xml(self, parser, node, version):
if self.attribute:
result = parser.get_node_attribute(node, self.attribute)
else:
__node = parser.get_first_child(node, self.tag)
result = parser.get_node_data(__node)
if result is None:
if self.mandatory:
from . import gpx as mod_gpx
raise mod_gpx.GPXException('%s is mandatory in %s' % (self.name, self.tag))
return None
if self.type_converter:
try:
result = self.type_converter.from_string(result)
except Exception as e:
from . import gpx as mod_gpx
raise mod_gpx.GPXException('Invalid value for <%s>... %s (%s)' % (self.tag, result, e))
if self.possible:
if not (result in self.possible):
from . import gpx as mod_gpx
raise mod_gpx.GPXException('Invalid value "%s", possible: %s' % (result, self.possible))
return result
def to_xml(self, value, version):
if not value:
return ''
if self.attribute:
return '%s="%s"' % (self.attribute, mod_utils.make_str(value))
else:
if self.type_converter:
value = self.type_converter.to_string(value)
if isinstance(self.tag, list) or isinstance(self.tag, tuple):
raise Exception('Not yet implemented')
return mod_utils.to_xml(self.tag, content=value, escape=True)
class GPXComplexField(AbstractGPXField):
def __init__(self, name, classs, tag=None, is_list=None):
AbstractGPXField.__init__(self, is_list=is_list)
self.name = name
self.tag = tag or name
self.classs = classs
def from_xml(self, parser, node, version):
if self.is_list:
result = []
for child_node in parser.get_children(node):
if parser.get_node_name(child_node) == self.tag:
result.append(gpx_fields_from_xml(self.classs, parser, child_node, version))
return result
else:
field_node = parser.get_first_child(node, self.tag)
if field_node is None:
return None
return gpx_fields_from_xml(self.classs, parser, field_node, version)
def to_xml(self, value, version):
if self.is_list:
result = ''
for obj in value:
result += gpx_fields_to_xml(obj, self.tag, version)
return result
else:
return gpx_fields_to_xml(value, self.tag, version)
class GPXEmailField(AbstractGPXField):
"""
Converts GPX1.1 email tag group from/to string.
"""
def __init__(self, name, tag=None):
self.attribute = False
self.is_list = False
self.name = name
self.tag = tag or name
def from_xml(self, parser, node, version):
email_node = parser.get_first_child(node, self.tag)
if email_node is None:
return None
email_id = parser.get_node_attribute(email_node, 'id')
email_domain = parser.get_node_attribute(email_node, 'domain')
return '%s@%s' % (email_id, email_domain)
def to_xml(self, value, version):
if not value:
return ''
if '@' in value:
pos = value.find('@')
email_id = value[:pos]
email_domain = value[pos+1:]
else:
email_id = value
email_domain = 'unknown'
return '\n<%s id="%s" domain="%s" />' % (self.tag, email_id, email_domain)
class GPXExtensionsField(AbstractGPXField):
"""
GPX1.1 extensions <extensions>...</extensions> key-value type.
"""
def __init__(self, name, tag=None):
self.attribute = False
self.name = name
self.is_list = False
self.tag = tag or 'extensions'
def from_xml(self, parser, node, version):
result = {}
if node is None:
return result
extensions_node = parser.get_first_child(node, self.tag)
if extensions_node is None:
return result
children = parser.get_children(extensions_node)
if children is None:
return result
for child in children:
result[parser.get_node_name(child)] = parser.get_node_data(child)
return result
def to_xml(self, value, version):
if value is None or not value:
return ''
result = '\n<' + self.tag + '>'
for ext_key, ext_value in value.items():
result += mod_utils.to_xml(ext_key, content=ext_value)
result += '</' + self.tag + '>'
return result
# ----------------------------------------------------------------------------------------------------
# Utility methods:
# ----------------------------------------------------------------------------------------------------
def gpx_fields_to_xml(instance, tag, version, custom_attributes=None):
fields = instance.gpx_10_fields
if version == '1.1':
fields = instance.gpx_11_fields
tag_open = bool(tag)
body = ''
if tag:
body = '\n<' + tag
if custom_attributes:
for key, value in custom_attributes.items():
body += ' %s="%s"' % (key, mod_utils.make_str(value))
for gpx_field in fields:
if isinstance(gpx_field, str):
if tag_open:
body += '>'
tag_open = False
if gpx_field[0] == '/':
body += '<%s>' % gpx_field
else:
body += '\n<%s' % gpx_field
tag_open = True
else:
value = getattr(instance, gpx_field.name)
if gpx_field.attribute:
body += ' ' + gpx_field.to_xml(value, version)
elif value:
if tag_open:
body += '>'
tag_open = False
xml_value = gpx_field.to_xml(value, version)
if xml_value:
body += xml_value
if tag:
if tag_open:
body += '>'
body += '</' + tag + '>'
return body
def gpx_fields_from_xml(class_or_instance, parser, node, version):
if mod_inspect.isclass(class_or_instance):
result = class_or_instance()
else:
result = class_or_instance
fields = result.gpx_10_fields
if version == '1.1':
fields = result.gpx_11_fields
node_path = [ node ]
for gpx_field in fields:
current_node = node_path[-1]
if isinstance (gpx_field, str):
if gpx_field.startswith('/'):
node_path.pop()
else:
if current_node is None:
node_path.append(None)
else:
node_path.append(parser.get_first_child(current_node, gpx_field))
else:
if current_node is not None:
value = gpx_field.from_xml(parser, current_node, version)
setattr(result, gpx_field.name, value)
elif gpx_field.attribute:
value = gpx_field.from_xml(parser, node, version)
setattr(result, gpx_field.name, value)
return result
def gpx_check_slots_and_default_values(classs):
"""
Will fill the default values for this class. Instances will inherit those
values so we don't need to fill default values for every instance.
This method will also fill the attribute gpx_field_names with a list of
gpx field names. This can be used
"""
fields = classs.gpx_10_fields + classs.gpx_11_fields
gpx_field_names = []
instance = classs()
try:
attributes = list(filter(lambda x : x[0] != '_', dir(instance)))
attributes = list(filter(lambda x : not callable(getattr(instance, x)), attributes))
attributes = list(filter(lambda x : not x.startswith('gpx_'), attributes))
except Exception as e:
raise Exception('Error reading attributes for %s: %s' % (classs.__name__, e))
attributes.sort()
slots = list(classs.__slots__)
slots.sort()
if attributes != slots:
raise Exception('Attributes for %s is\n%s but should be\n%s' % (classs.__name__, attributes, slots))
for field in fields:
if not isinstance(field, str):
if field.is_list:
value = []
else:
value = None
try:
actual_value = getattr(instance, field.name)
except:
raise Exception('%s has no attribute %s' % (classs.__name__, field.name))
if value != actual_value:
raise Exception('Invalid default value %s.%s is %s but should be %s'
% (classs.__name__, field.name, actual_value, value))
#print('%s.%s -> %s' % (classs, field.name, value))
if not field.name in gpx_field_names:
gpx_field_names.append(field.name)
gpx_field_names = tuple(gpx_field_names)
if not hasattr(classs, '__slots__') or not classs.__slots__ or classs.__slots__ != gpx_field_names:
try: slots = classs.__slots__
except Exception as e: slots = '[Unknown:%s]' % e
raise Exception('%s __slots__ invalid, found %s, but should be %s' % (classs, slots, gpx_field_names))

View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
import xml.dom.minidom as mod_minidom
import gpxpy as mod_gpxpy
def split_gpxs(xml):
"""
Split single tracks from this one, without parsing with gpxpy
"""
dom = mod_minidom.parseString(xml)
gpx_node = _find_gpx_node(dom)
gpx_track_nodes = []
if gpx_node:
for child_node in gpx_node.childNodes:
if child_node.nodeName == 'trk':
gpx_track_nodes.append(child_node)
gpx_node.removeChild(child_node)
for gpx_track_node in gpx_track_nodes:
gpx_node.appendChild(gpx_track_node)
yield dom.toxml()
gpx_node.removeChild(gpx_track_node)
def join_gpxs(xmls):
"""
Utility to join GPX files without parsing them with gpxpy
"""
result = None
wpt_elements = []
rte_elements = []
trk_elements = []
for xml in xmls:
dom = mod_minidom.parseString(xml)
if not result:
result = dom
gpx_node = _find_gpx_node(dom)
if gpx_node:
for child_node in gpx_node.childNodes:
if child_node.nodeName == 'wpt':
wpt_elements.append(child_node)
gpx_node.removeChild(child_node)
elif child_node.nodeName == 'rte':
rte_elements.append(child_node)
gpx_node.removeChild(child_node)
elif child_node.nodeName == 'trk':
trk_elements.append(child_node)
gpx_node.removeChild(child_node)
gpx_node = _find_gpx_node(result)
if gpx_node:
for wpt_element in wpt_elements:
gpx_node.appendChild(wpt_element)
for rte_element in rte_elements:
gpx_node.appendChild(rte_element)
for trk_element in trk_elements:
gpx_node.appendChild(trk_element)
return result.toxml()
def _find_gpx_node(dom):
for gpx_candidate_node in dom.childNodes:
if gpx_candidate_node.nodeName == 'gpx':
return gpx_candidate_node
return None

View File

@ -0,0 +1,221 @@
# -*- coding: utf-8 -*-
# Copyright 2011 Tomo Krajina
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import pdb
import re as mod_re
import logging as mod_logging
import datetime as mod_datetime
import xml.dom.minidom as mod_minidom
try:
import lxml.etree as mod_etree
except:
mod_etree = None
pass # LXML not available
from . import gpx as mod_gpx
from . import utils as mod_utils
from . import gpxfield as mod_gpxfield
class XMLParser:
"""
Used when lxml is not available. Uses standard minidom.
"""
def __init__(self, xml):
self.xml = xml
self.dom = mod_minidom.parseString(xml)
def get_first_child(self, node=None, name=None):
# TODO: Remove find_first_node from utils!
if not node:
node = self.dom
children = node.childNodes
if not children:
return None
if not name:
return children[0]
for tmp_node in children:
if tmp_node.nodeName == name:
return tmp_node
return None
def get_node_name(self, node):
if not node:
return None
return node.nodeName
def get_children(self, node=None):
if not node:
node = self.dom
return list(filter(lambda node : node.nodeType == node.ELEMENT_NODE, node.childNodes))
def get_node_data(self, node):
if node is None:
return None
child_nodes = node.childNodes
if not child_nodes or len(child_nodes) == 0:
return None
return child_nodes[0].nodeValue
def get_node_attribute(self, node, attribute):
if (not hasattr(node, 'attributes')) or (not node.attributes):
return None
if attribute in node.attributes.keys():
return node.attributes[attribute].nodeValue
return None
class LXMLParser:
"""
Used when lxml is available.
"""
def __init__(self, xml):
if not mod_etree:
raise Exception('Cannot use LXMLParser without lxml installed')
if mod_utils.PYTHON_VERSION[0] == '3':
# In python 3 all strings are unicode and for some reason lxml
# don't like unicode strings with XMLs declared as UTF-8:
self.xml = xml.encode('utf-8')
else:
self.xml = xml
self.dom = mod_etree.XML(self.xml)
# get the namespace
self.ns = self.dom.nsmap.get(None)
def get_first_child(self, node=None, name=None):
if node is None:
if name:
if self.get_node_name(self.dom) == name:
return self.dom
return self.dom
children = node.getchildren()
if not children:
return None
if name:
for node in children:
if self.get_node_name(node) == name:
return node
return None
return children[0]
def get_node_name(self, node):
if callable(node.tag):
tag = str(node.tag())
else:
tag = str(node.tag)
if '}' in tag:
return tag.split('}')[1]
return tag
def get_children(self, node=None):
if node is None:
node = self.dom
return node.getchildren()
def get_node_data(self, node):
if node is None:
return None
return node.text
def get_node_attribute(self, node, attribute):
if node is None:
return None
return node.attrib.get(attribute)
class GPXParser:
def __init__(self, xml_or_file=None, parser=None):
"""
Parser may be lxml of minidom. If you set to None then lxml will be used if installed
otherwise minidom.
"""
self.init(xml_or_file)
self.gpx = mod_gpx.GPX()
self.xml_parser_type = parser
self.xml_parser = None
def init(self, xml_or_file):
text = xml_or_file.read() if hasattr(xml_or_file, 'read') else xml_or_file
self.xml = mod_utils.make_str(text)
self.gpx = mod_gpx.GPX()
def parse(self):
"""
Parses the XML file and returns a GPX object.
It will throw GPXXMLSyntaxException if the XML file is invalid or
GPXException if the XML file is valid but something is wrong with the
GPX data.
"""
try:
if self.xml_parser_type is None:
if mod_etree:
self.xml_parser = LXMLParser(self.xml)
else:
self.xml_parser = XMLParser(self.xml)
elif self.xml_parser_type == 'lxml':
self.xml_parser = LXMLParser(self.xml)
elif self.xml_parser_type == 'minidom':
self.xml_parser = XMLParser(self.xml)
else:
raise mod_gpx.GPXException('Invalid parser type: %s' % self.xml_parser_type)
self.__parse_dom()
return self.gpx
except Exception as e:
# The exception here can be a lxml or minidom exception.
mod_logging.debug('Error in:\n%s\n-----------\n' % self.xml)
mod_logging.exception(e)
# The library should work in the same way regardless of the
# underlying XML parser that's why the exception thrown
# here is GPXXMLSyntaxException (instead of simply throwing the
# original minidom or lxml exception e).
#
# But, if the user need the original exception (lxml or minidom)
# it is available with GPXXMLSyntaxException.original_exception:
raise mod_gpx.GPXXMLSyntaxException('Error parsing XML: %s' % str(e), e)
def __parse_dom(self):
node = self.xml_parser.get_first_child(name='gpx')
if node is None:
raise mod_gpx.GPXException('Document must have a `gpx` root node.')
version = self.xml_parser.get_node_attribute(node, 'version')
mod_gpxfield.gpx_fields_from_xml(self.gpx, self.xml_parser, node, version)

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
# Copyright 2011 Tomo Krajina
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys as mod_sys
import math as mod_math
import xml.sax.saxutils as mod_saxutils
PYTHON_VERSION = mod_sys.version.split(' ')[0]
def to_xml(tag, attributes=None, content=None, default=None, escape=False):
attributes = attributes or {}
result = '\n<%s' % tag
if content is None and default:
content = default
if attributes:
for attribute in attributes.keys():
result += make_str(' %s="%s"' % (attribute, attributes[attribute]))
if content is None:
result += '/>'
else:
if escape:
result += make_str('>%s</%s>' % (mod_saxutils.escape(content), tag))
else:
result += make_str('>%s</%s>' % (content, tag))
result = make_str(result)
return result
def is_numeric(object):
try:
float(object)
return True
except TypeError:
return False
except ValueError:
return False
def to_number(s, default=0, nan_value=None):
try:
result = float(s)
if mod_math.isnan(result):
return nan_value
return result
except TypeError:
pass
except ValueError:
pass
return default
def total_seconds(timedelta):
""" Some versions of python dont have timedelta.total_seconds() method. """
if timedelta is None:
return None
return (timedelta.days * 86400) + timedelta.seconds
# Hash utilities:
def __hash(obj):
result = 0
if obj is None:
return result
elif isinstance(obj, dict):
raise RuntimeError('__hash_single_object for dict not yet implemented')
elif isinstance(obj, list) or isinstance(obj, tuple):
return hash_list_or_tuple(obj)
return hash(obj)
def hash_list_or_tuple(iteration):
result = 17
for obj in iteration:
result = result * 31 + __hash(obj)
return result
def hash_object(obj, attributes):
result = 19
for attribute in attributes:
result = result * 31 + __hash(getattr(obj, attribute))
return result
def make_str(s):
""" Convert a str or unicode object into a str type. """
if PYTHON_VERSION[0] == '2':
if isinstance(s, unicode):
return s.encode("utf-8")
return str(s)