mirror of
https://github.com/marceloprates/prettymaps.git
synced 2025-01-16 20:48:13 +01:00
Refactored quite a bit of of the code, simplified the usage of some features, added option of saving / loading presets and updated the examples.ipynb notebook and the README with a short tutorial
This commit is contained in:
parent
3d58c60221
commit
c6c8f19084
BIN
assets/PermanentMarker-Regular.ttf
Normal file
BIN
assets/PermanentMarker-Regular.ttf
Normal file
Binary file not shown.
File diff suppressed because one or more lines are too long
@ -1 +1 @@
|
||||
from .draw import plot
|
||||
from .draw import plot, multiplot, Subplot, create_preset, delete_preset, preset, presets
|
||||
|
1152
prettymaps/draw.py
1152
prettymaps/draw.py
File diff suppressed because it is too large
Load Diff
@ -1,4 +1,4 @@
|
||||
'''
|
||||
"""
|
||||
Prettymaps - A minimal Python library to draw pretty maps from OpenStreetMap Data
|
||||
Copyright (C) 2021 Marcelo Prates
|
||||
|
||||
@ -14,373 +14,217 @@ GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
"""
|
||||
|
||||
from ast import Dict
|
||||
from functools import reduce
|
||||
from tokenize import Number, String
|
||||
from typing import Optional, Union, Tuple
|
||||
|
||||
# from unittest.runner import _ResultClassType
|
||||
from xmlrpc.client import Boolean
|
||||
|
||||
import re
|
||||
import osmnx as ox
|
||||
from osmnx import utils_geo
|
||||
from osmnx._errors import EmptyOverpassResponse
|
||||
import numpy as np
|
||||
from shapely.geometry import Point, Polygon, MultiPolygon, LineString, MultiLineString
|
||||
from shapely.geometry import (
|
||||
box,
|
||||
Point,
|
||||
Polygon,
|
||||
MultiPolygon,
|
||||
LineString,
|
||||
MultiLineString,
|
||||
)
|
||||
from shapely.affinity import rotate
|
||||
from shapely.ops import unary_union
|
||||
from geopandas import GeoDataFrame, read_file
|
||||
import warnings
|
||||
from shapely.errors import ShapelyDeprecationWarning
|
||||
from copy import deepcopy
|
||||
|
||||
|
||||
def get_boundary(
|
||||
point: Tuple, radius: float, crs: String, circle: Boolean = True, dilate: float = 0
|
||||
) -> Polygon:
|
||||
"""
|
||||
Compute circular or square boundary given point, radius and crs.
|
||||
|
||||
Args:
|
||||
point (Tuple): GPS coordinates
|
||||
radius (Number): radius in meters
|
||||
crs (String): Coordinate Reference System
|
||||
circle (bool, optional): Whether to use a circular (True) or square (False) boundary. Defaults to True.
|
||||
dilate (int, optional): Dilate the boundary by this much, in meters. Defaults to 0.
|
||||
|
||||
Returns:
|
||||
Polygon: a shapely Polygon representing the boundary
|
||||
"""
|
||||
|
||||
if circle:
|
||||
return (
|
||||
ox.project_gdf(GeoDataFrame(geometry=[Point(point[::-1])], crs=crs))
|
||||
.geometry[0]
|
||||
.buffer(radius)
|
||||
)
|
||||
# Parse query (by coordinates, OSMId or name)
|
||||
def parse_query(query):
|
||||
if isinstance(query, GeoDataFrame):
|
||||
return "polygon"
|
||||
elif isinstance(query, tuple):
|
||||
return "coordinates"
|
||||
elif re.match("""[A-Z][0-9]+""", query):
|
||||
return "osmid"
|
||||
else:
|
||||
x, y = np.stack(
|
||||
ox.project_gdf(GeoDataFrame(geometry=[Point(point[::-1])], crs=crs))
|
||||
.geometry[0]
|
||||
.xy
|
||||
)
|
||||
r = radius
|
||||
return Polygon(
|
||||
[(x - r, y - r), (x + r, y - r), (x + r, y + r), (x - r, y + r)]
|
||||
).buffer(dilate)
|
||||
return "address"
|
||||
|
||||
|
||||
def get_perimeter(query, by_osmid: Boolean = False, **kwargs) -> GeoDataFrame:
|
||||
"""
|
||||
Fetch perimeter given query
|
||||
# Get circular or square boundary around point
|
||||
def get_boundary(query, radius, circle=False, rotation=0):
|
||||
|
||||
Args:
|
||||
query (String): Query for the perimeter to be fetched (for example, "France")
|
||||
by_osmid (bool, optional): Whether to fetch perimeter by OSM Id. Defaults to False.
|
||||
|
||||
Returns:
|
||||
GeoDataFrame: GeoDataFrame representation of the perimeter
|
||||
"""
|
||||
return ox.geocode_to_gdf(
|
||||
query,
|
||||
by_osmid=by_osmid,
|
||||
**kwargs,
|
||||
**{x: kwargs[x] for x in ["circle", "dilate"] if x in kwargs.keys()}
|
||||
# Get point from query
|
||||
point = query if parse_query(query) == "coordinates" else ox.geocode(query)
|
||||
# Create GeoDataFrame from point
|
||||
boundary = ox.project_gdf(
|
||||
GeoDataFrame(geometry=[Point(point[::-1])], crs="EPSG:4326")
|
||||
)
|
||||
|
||||
if circle: # Circular shape
|
||||
# use .buffer() to expand point into circle
|
||||
boundary.geometry = boundary.geometry.buffer(radius)
|
||||
else: # Square shape
|
||||
x, y = np.concatenate(boundary.geometry[0].xy)
|
||||
r = radius
|
||||
boundary = GeoDataFrame(
|
||||
geometry=[
|
||||
rotate(
|
||||
Polygon(
|
||||
[(x - r, y - r), (x + r, y - r),
|
||||
(x + r, y + r), (x - r, y + r)]
|
||||
),
|
||||
rotation,
|
||||
)
|
||||
],
|
||||
crs=boundary.crs,
|
||||
)
|
||||
|
||||
def get_coast(
|
||||
perimeter=None,
|
||||
point=None,
|
||||
radius=None,
|
||||
perimeter_tolerance=0,
|
||||
union=True,
|
||||
buffer=0,
|
||||
circle=True,
|
||||
dilate=0,
|
||||
file_location=None,
|
||||
# Unproject
|
||||
boundary = boundary.to_crs(4326)
|
||||
|
||||
return boundary
|
||||
|
||||
|
||||
# Get perimeter from query
|
||||
def get_perimeter(
|
||||
query, radius=None, by_osmid=False, circle=False, dilate=None, rotation=0, **kwargs
|
||||
):
|
||||
|
||||
if perimeter is not None:
|
||||
# Boundary defined by polygon (perimeter)
|
||||
bbox = perimeter.to_crs(3174)
|
||||
bbox = bbox.buffer(perimeter_tolerance + dilate + buffer)
|
||||
bbox = bbox.to_crs(4326)
|
||||
bbox = bbox.envelope
|
||||
# Load the polygons for the coastline from a file
|
||||
geometries = read_file(file_location, bbox=bbox)
|
||||
perimeter = unary_union(ox.project_gdf(perimeter).geometry)
|
||||
|
||||
elif (point is not None) and (radius is not None):
|
||||
# Boundary defined by circle with radius 'radius' around point
|
||||
north, south, west, east = utils_geo.bbox_from_point(
|
||||
point, dist=radius + dilate + buffer
|
||||
)
|
||||
bbox = (west, south, east, north)
|
||||
# Load the polygons for the coastline from a file
|
||||
geometries = read_file(file_location, bbox=bbox)
|
||||
if radius:
|
||||
# Perimeter is a circular or square shape
|
||||
perimeter = get_boundary(
|
||||
point, radius, geometries.crs, circle=circle, dilate=dilate
|
||||
)
|
||||
|
||||
# Project GDF
|
||||
if len(geometries) > 0:
|
||||
geometries = ox.project_gdf(geometries)
|
||||
|
||||
# Intersect with perimeter
|
||||
geometries = geometries.intersection(perimeter)
|
||||
|
||||
if union:
|
||||
geometries = unary_union(
|
||||
reduce(
|
||||
lambda x, y: x + y,
|
||||
[
|
||||
[x] if type(x) == Polygon else list(x)
|
||||
for x in geometries
|
||||
if type(x) in [Polygon, MultiPolygon]
|
||||
],
|
||||
[],
|
||||
)
|
||||
)
|
||||
query, radius, circle=circle, rotation=rotation)
|
||||
else:
|
||||
geometries = MultiPolygon(
|
||||
reduce(
|
||||
lambda x, y: x + y,
|
||||
[
|
||||
[x] if type(x) == Polygon else list(x)
|
||||
for x in geometries
|
||||
if type(x) in [Polygon, MultiPolygon]
|
||||
],
|
||||
[],
|
||||
)
|
||||
)
|
||||
|
||||
return geometries
|
||||
|
||||
|
||||
def get_geometries(
|
||||
perimeter: Optional[GeoDataFrame] = None,
|
||||
point: Optional[Tuple] = None,
|
||||
radius: Optional[float] = None,
|
||||
tags: Dict = {},
|
||||
perimeter_tolerance: float = 0,
|
||||
union: Boolean = True,
|
||||
buffer: float = 0,
|
||||
circle: Boolean = True,
|
||||
dilate: float = 0,
|
||||
point_size: float = 1,
|
||||
line_width: float = 1
|
||||
) -> Union[Polygon, MultiPolygon]:
|
||||
"""Get geometries
|
||||
|
||||
Args:
|
||||
perimeter (Optional[GeoDataFrame], optional): Perimeter from within geometries will be fetched. Defaults to None.
|
||||
point (Optional[Tuple], optional): GPS coordinates. Defaults to None.
|
||||
radius (Optional[Number], optional): Radius in meters. Defaults to None.
|
||||
tags (Dict, optional): OpenStreetMap tags for the geometries to be fetched. Defaults to {}.
|
||||
perimeter_tolerance (Number, optional): Tolerance in meters for fetching geometries that fall outside the perimeter. Defaults to 0.
|
||||
union (Boolean, optional): Whether to compute the union of all geometries. Defaults to True.
|
||||
circle (Boolean, optional): Whether to fetch geometries in a circular (True) or square (False) shape. Defaults to True.
|
||||
dilate (Number, optional): Dilate the boundary by this much in meters. Defaults to 0.
|
||||
|
||||
Returns:
|
||||
[type]: [description]
|
||||
"""
|
||||
|
||||
# Boundary defined by polygon (perimeter)
|
||||
if perimeter is not None:
|
||||
geometries = ox.geometries_from_polygon(
|
||||
unary_union(perimeter.to_crs(3174).buffer(buffer+perimeter_tolerance).to_crs(4326).geometry)
|
||||
if buffer >0 or perimeter_tolerance > 0
|
||||
else unary_union(perimeter.geometry),
|
||||
tags={tags: True} if type(tags) == str else tags,
|
||||
)
|
||||
perimeter = unary_union(ox.project_gdf(perimeter).geometry)
|
||||
# Boundary defined by circle with radius 'radius' around point
|
||||
elif (point is not None) and (radius is not None):
|
||||
geometries = ox.geometries_from_point(
|
||||
point,
|
||||
dist=radius + dilate + buffer,
|
||||
tags={tags: True} if type(tags) == str else tags,
|
||||
)
|
||||
perimeter = get_boundary(
|
||||
point, radius, geometries.crs, circle=circle, dilate=dilate
|
||||
)
|
||||
|
||||
# Project GDF
|
||||
if len(geometries) > 0:
|
||||
geometries = ox.project_gdf(geometries)
|
||||
|
||||
# Intersect with perimeter
|
||||
geometries = geometries.intersection(perimeter)
|
||||
|
||||
# Get points, lines, polys & multipolys
|
||||
points, lines, polys, multipolys = map(
|
||||
lambda t: [x for x in geometries if isinstance(x, t)],
|
||||
[Point, LineString, Polygon, MultiPolygon]
|
||||
)
|
||||
# Convert points, lines & polygons into multipolygons
|
||||
points = [x.buffer(point_size) for x in points]
|
||||
lines = [x.buffer(line_width) for x in lines]
|
||||
# Concatenate multipolys
|
||||
multipolys = reduce(lambda x,y: x+y, [list(x) for x in multipolys]) if len(multipolys) > 0 else []
|
||||
# Group everything
|
||||
geometries = MultiPolygon(points + lines + polys + multipolys)
|
||||
# Compute union if specified
|
||||
if union: geometries = unary_union(geometries);
|
||||
|
||||
return geometries
|
||||
|
||||
|
||||
def get_streets(
|
||||
perimeter: Optional[GeoDataFrame] = None,
|
||||
point: Optional[Tuple] = None,
|
||||
radius: Optional[float] = None,
|
||||
layer: String = "streets",
|
||||
width: float = 6,
|
||||
custom_filter: Optional[str] = None,
|
||||
buffer: float = 0,
|
||||
retain_all: Boolean = False,
|
||||
circle: Boolean = True,
|
||||
dilate: float = 0,
|
||||
truncate_by_edge: Boolean = True
|
||||
) -> MultiPolygon:
|
||||
"""
|
||||
Get streets
|
||||
|
||||
Args:
|
||||
perimeter (Optional[GeoDataFrame], optional): [description]. Defaults to None.
|
||||
point (Optional[Tuple], optional): [description]. Defaults to None.
|
||||
radius (Optional[Number], optional): [description]. Defaults to None.
|
||||
layer (String, optional): [description]. Defaults to "streets".
|
||||
width (Number, optional): [description]. Defaults to 6.
|
||||
custom_filter (Optional[String], optional): [description]. Defaults to None.
|
||||
buffer (Number, optional): [description]. Defaults to 0.
|
||||
retain_all (Boolean, optional): [description]. Defaults to False.
|
||||
circle (Boolean, optional): [description]. Defaults to True.
|
||||
dilate (Number, optional): [description]. Defaults to 0.
|
||||
truncate_by_edge (Boolean, optional): [description]. Defaults to True.
|
||||
|
||||
Returns:
|
||||
MultiPolygon: [description]
|
||||
"""
|
||||
|
||||
if layer == "streets":
|
||||
layer = "highway"
|
||||
|
||||
# Boundary defined by polygon (perimeter)
|
||||
if perimeter is not None:
|
||||
# Fetch streets data, project & convert to GDF
|
||||
try:
|
||||
streets = ox.graph_from_polygon(
|
||||
unary_union(perimeter.geometry).buffer(buffer)
|
||||
if buffer > 0
|
||||
else unary_union(perimeter.geometry),
|
||||
custom_filter=custom_filter,
|
||||
)
|
||||
streets = ox.project_graph(streets)
|
||||
streets = ox.graph_to_gdfs(streets, nodes=False)
|
||||
except EmptyOverpassResponse:
|
||||
return MultiLineString()
|
||||
# Boundary defined by polygon (perimeter)
|
||||
elif (point is not None) and (radius is not None):
|
||||
# Fetch streets data, save CRS & project
|
||||
try:
|
||||
streets = ox.graph_from_point(
|
||||
point,
|
||||
dist=radius + dilate + buffer,
|
||||
retain_all=retain_all,
|
||||
custom_filter=custom_filter,
|
||||
truncate_by_edge = truncate_by_edge,
|
||||
)
|
||||
crs = ox.graph_to_gdfs(streets, nodes=False).crs
|
||||
streets = ox.project_graph(streets)
|
||||
# Compute perimeter from point & CRS
|
||||
perimeter = get_boundary(point, radius, crs, circle=circle, dilate=dilate)
|
||||
# Convert to GDF
|
||||
streets = ox.graph_to_gdfs(streets, nodes=False)
|
||||
# Intersect with perimeter & filter empty elements
|
||||
streets.geometry = streets.geometry.intersection(perimeter)
|
||||
streets = streets[~streets.geometry.is_empty]
|
||||
except EmptyOverpassResponse:
|
||||
return MultiLineString()
|
||||
|
||||
if type(width) == dict:
|
||||
streets = unary_union(
|
||||
[
|
||||
# Dilate streets of each highway type == 'highway' using width 'w'
|
||||
MultiLineString(
|
||||
streets[
|
||||
[highway in value for value in streets[layer]]
|
||||
& (streets.geometry.type == "LineString")
|
||||
].geometry.tolist()
|
||||
+ list(
|
||||
reduce(
|
||||
lambda x, y: x + y,
|
||||
[
|
||||
list(lines)
|
||||
for lines in streets[
|
||||
[highway in value for value in streets[layer]]
|
||||
& (streets.geometry.type == "MultiLineString")
|
||||
].geometry
|
||||
],
|
||||
[],
|
||||
)
|
||||
)
|
||||
).buffer(w)
|
||||
for highway, w in width.items()
|
||||
]
|
||||
)
|
||||
else:
|
||||
# Dilate all streets by same amount 'width'
|
||||
streets= MultiLineString(
|
||||
streets[streets.geometry.type == "LineString"].geometry.tolist()
|
||||
+ list(
|
||||
reduce(
|
||||
lambda x, y: x + y,
|
||||
[
|
||||
list(lines)
|
||||
for lines in streets[streets.geometry.type == "MultiLineString"].geometry
|
||||
],
|
||||
[],
|
||||
)
|
||||
)
|
||||
).buffer(width)
|
||||
|
||||
return streets
|
||||
|
||||
|
||||
def get_layer(layer: String, **kwargs) -> Union[Polygon, MultiPolygon]:
|
||||
"""[summary]
|
||||
|
||||
Args:
|
||||
layer (String): [description]
|
||||
|
||||
Raises:
|
||||
Exception: [description]
|
||||
|
||||
Returns:
|
||||
Union[Polygon, MultiPolygon]: [description]
|
||||
"""
|
||||
# Fetch perimeter
|
||||
if layer == "perimeter":
|
||||
# If perimeter is already provided:
|
||||
if "perimeter" in kwargs:
|
||||
return unary_union(ox.project_gdf(kwargs["perimeter"]).geometry)
|
||||
# If point and radius are provided:
|
||||
elif "point" in kwargs and "radius" in kwargs:
|
||||
crs = "EPSG:4326"
|
||||
perimeter = get_boundary(
|
||||
kwargs["point"],
|
||||
kwargs["radius"],
|
||||
crs,
|
||||
**{x: kwargs[x] for x in ["circle", "dilate"] if x in kwargs.keys()}
|
||||
)
|
||||
return perimeter
|
||||
# Perimeter is a OSM or user-provided polygon
|
||||
if parse_query(query) == "polygon":
|
||||
# Perimeter was already provided
|
||||
perimeter = query
|
||||
else:
|
||||
raise Exception("Either 'perimeter' or 'point' & 'radius' must be provided")
|
||||
# Fetch streets or railway
|
||||
if layer in ["streets", "railway", "waterway"]:
|
||||
return get_streets(**kwargs, layer=layer)
|
||||
# Fetch Coastline
|
||||
# Fetch perimeter from OSM
|
||||
perimeter = ox.geocode_to_gdf(
|
||||
query,
|
||||
by_osmid=by_osmid,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# Apply dilation
|
||||
perimeter = ox.project_gdf(perimeter)
|
||||
if dilate is not None:
|
||||
perimeter.geometry = perimeter.geometry.buffer(dilate)
|
||||
perimeter = perimeter.to_crs(4326)
|
||||
|
||||
return perimeter
|
||||
|
||||
|
||||
# Get a GeoDataFrame
|
||||
def get_gdf(
|
||||
layer,
|
||||
perimeter,
|
||||
perimeter_tolerance=0,
|
||||
tags=None,
|
||||
osmid=None,
|
||||
custom_filter=None,
|
||||
union=False,
|
||||
elevation=None,
|
||||
vert_exag=1,
|
||||
azdeg=90,
|
||||
altdeg=80,
|
||||
pad=1,
|
||||
min_height=30,
|
||||
max_height=None,
|
||||
n_curves=100,
|
||||
**kwargs
|
||||
):
|
||||
|
||||
# Supress shapely deprecation warning
|
||||
warnings.simplefilter("ignore", ShapelyDeprecationWarning)
|
||||
|
||||
# Apply tolerance to the perimeter
|
||||
perimeter_with_tolerance = (
|
||||
ox.project_gdf(perimeter).buffer(perimeter_tolerance).to_crs(4326)
|
||||
)
|
||||
perimeter_with_tolerance = unary_union(
|
||||
perimeter_with_tolerance.geometry).buffer(0)
|
||||
|
||||
# Fetch from perimeter's bounding box, to avoid missing some geometries
|
||||
bbox = box(*perimeter_with_tolerance.bounds)
|
||||
|
||||
if layer == "hillshade":
|
||||
gdf = get_hillshade(
|
||||
mask_elevation(get_elevation(elevation), perimeter),
|
||||
pad=pad,
|
||||
azdeg=azdeg,
|
||||
altdeg=altdeg,
|
||||
vert_exag=vert_exag,
|
||||
min_height=min_height,
|
||||
max_height=max_height,
|
||||
)
|
||||
elif layer == "level_curves":
|
||||
gdf = get_level_curves(
|
||||
mask_elevation(get_elevation(elevation), perimeter),
|
||||
pad=pad,
|
||||
n_curves=n_curves,
|
||||
min_height=min_height,
|
||||
max_height=max_height,
|
||||
)
|
||||
elif layer in ["streets", "railway", "waterway"]:
|
||||
graph = ox.graph_from_polygon(
|
||||
bbox,
|
||||
custom_filter=custom_filter,
|
||||
truncate_by_edge=True,
|
||||
)
|
||||
gdf = ox.graph_to_gdfs(graph, nodes=False)
|
||||
elif layer == "coastline":
|
||||
return get_coast(**kwargs)
|
||||
# Fetch geometries
|
||||
# Fetch geometries from OSM
|
||||
gdf = ox.geometries_from_polygon(
|
||||
bbox, tags={tags: True} if type(tags) == str else tags
|
||||
)
|
||||
else:
|
||||
return get_geometries(**kwargs)
|
||||
if osmid is None:
|
||||
# Fetch geometries from OSM
|
||||
gdf = ox.geometries_from_polygon(
|
||||
bbox, tags={tags: True} if type(tags) == str else tags
|
||||
)
|
||||
else:
|
||||
gdf = ox.geocode_to_gdf(osmid, by_osmid=True)
|
||||
|
||||
# Intersect with perimeter
|
||||
gdf.geometry = gdf.geometry.intersection(perimeter_with_tolerance)
|
||||
# gdf = gdf[~gdf.geometry.is_empty]
|
||||
gdf.drop(gdf[gdf.geometry.is_empty].index, inplace=True)
|
||||
|
||||
return gdf
|
||||
|
||||
|
||||
# Fetch GeoDataFrames given query and a dictionary of layers
|
||||
def get_gdfs(query, layers_dict, radius, dilate, rotation=0) -> Dict:
|
||||
|
||||
perimeter_kwargs = {}
|
||||
if "perimeter" in layers_dict:
|
||||
perimeter_kwargs = deepcopy(layers_dict["perimeter"])
|
||||
perimeter_kwargs.pop("dilate")
|
||||
|
||||
# Get perimeter
|
||||
perimeter = get_perimeter(
|
||||
query,
|
||||
radius=radius,
|
||||
rotation=rotation,
|
||||
dilate=dilate,
|
||||
**perimeter_kwargs
|
||||
)
|
||||
|
||||
# Get other layers as GeoDataFrames
|
||||
gdfs = {"perimeter": perimeter} | {
|
||||
layer: get_gdf(layer, perimeter, **kwargs)
|
||||
for layer, kwargs in layers_dict.items()
|
||||
if layer != "perimeter"
|
||||
}
|
||||
|
||||
return gdfs
|
||||
|
Loading…
x
Reference in New Issue
Block a user