Tuesday, August 03, 2021

For a project where I was plotting some locations using Matplotlib, I needed a way to get a PNG map from a bounding box lat-long pair. Here's a Python function I came up with.

```from os import listdir, mkdir
from os.path import exists
from PIL import Image
from random import uniform
from time import sleep
from urllib.request import Request, urlopen
import math

# Similar to https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames#Python .
def deg2float(lat_deg, lon_deg, zoom):
n = 2.0 ** zoom
xtile = (lon_deg + 180.0) / 360.0 * n
ytile = (1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n
return (xtile, ytile)

lon_start, lon_end = min(lon1, lon2), max(lon1, lon2)
lat_start, lat_end = max(lat1, lat2), min(lat1, lat2)

# Top left corner of bounding box.
x1, y1 = deg2float(lat_start, lon_start, zoom)
x1i, y1i = math.floor(x1), math.floor(y1)

# Bottom right corner of bounding box.
x2, y2 = deg2float(lat_end, lon_end, zoom)
x2i, y2i = math.ceil(x2), math.ceil(y2)

x_cnt, y_cnt = abs(x1i - x2i), abs(y1i - y2i)
if x_cnt*y_cnt > 250:
err = "Too many tiles. Probably too big an area at too high a zoom level."
err += " See https://operations.osmfoundation.org/policies/tiles/ ."
raise Exception(err)

if not exists("maptiles"):
mkdir("maptiles")

for x in range(x_cnt):
for y in range(y_cnt):
xt, yt = x + x1i, y + y1i
path = "maptiles/{}_{}_{}.png".format(zoom, xt, yt)

if not exists(path):
sleep(uniform(0.5, 1.5))
url = "https://tile.openstreetmap.org/{}/{}/{}.png".format(zoom, xt, yt)
req = Request(url)
ua = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0"
resp = urlopen(req)
with open(path, "wb") as f:
f.write(body)

im = Image.open("maptiles/{}_{}_{}.png".format(zoom, x1i, y1i))
tile_w, tile_h = im.size
total_w = x_cnt*tile_w
total_h = y_cnt*tile_h

new_im = Image.new("RGB", (total_w, total_h))

for x in range(x_cnt):
for y in range(y_cnt):
xt, yt = x + x1i, y + y1i
im = Image.open("maptiles/{}_{}_{}.png".format(zoom, xt, yt))
new_im.paste(im, (x*tile_w, y*tile_h))

cropped_w = round((x2 - x1)*tile_w)
cropped_h = round((y2 - y1)*tile_h)
cropped_im = Image.new("RGB", (cropped_w, cropped_h))
translate_x = round(-(x1 - x1i)*tile_w)
translate_y = round(-(y1 - y1i)*tile_h)
cropped_im.paste(new_im, (translate_x, translate_y))
cropped_im.save("map.png")

# Download a map of the SF Bay Area at zoom level 12. Approx 3000*3000px.

```

Tuesday, April 27, 2021

Geohash Python example

Here's a simple Geohash encode/decode implementation. The decoding function pretty closely follows the technique layed out on the Geohash Wikipedia entry.

```from fractions import Fraction

"""
First decode the special geohash variant of base32 encoding.
Each encoded digit (0-9-b..z) (not continuous abc) is a 5 bit val 0,1,2...,30,31.
In the resulting bitstream, every second bit is now for latitude and longtitude.
Initially the latitutde range is -90,+90.
When a latitude bit is 1, then it now starts at the mid of these.
Else if 0 it now ends at the mid of these.
Same for longtitude but with range -180,+180.
"""
def decode_geohash(s):
alphabet_32ghs = "0123456789bcdefghjkmnpqrstuvwxyz"
dec_from_32ghs = dict()
for i, c in enumerate(alphabet_32ghs):
dec_from_32ghs[c] = i

bits = 0  # Integer representation of hash.
bit_cnt = 0
for c in s:
bits = (bits << 5) | dec_from_32ghs[c]
bit_cnt += 5

# Every second bit is longtitude and latitude. Digits in even positions are latitude.
lat_bits, lon_bits = 0, 0
lat_bit_cnt = bit_cnt // 2
lon_bit_cnt = lat_bit_cnt
if bit_cnt % 2 == 1:
lon_bit_cnt += 1

for i in range(bit_cnt):
cur_bit_pos = bit_cnt - i
cur_bit = (bits & (1 << cur_bit_pos)) >> cur_bit_pos
if i % 2 == 0:
lat_bits |= cur_bit << (cur_bit_pos//2)
else:
lon_bits |= cur_bit << (cur_bit_pos//2)

lat_start, lat_end = Fraction(-90), Fraction(90)
for cur_bit_pos in range(lat_bit_cnt-1, -1, -1):
mid = (lat_start + lat_end) / 2
if lat_bits & (1 << cur_bit_pos):
lat_start = mid
else:
lat_end = mid

lon_start, lon_end = Fraction(-180), Fraction(180)
for cur_bit_pos in range(lon_bit_cnt-1, -1, -1):
mid = (lon_start + lon_end) / 2
if lon_bits & (1 << cur_bit_pos):
lon_start = mid
else:
lon_end = mid

return float(lat_start), float(lat_end), float(lon_start), float(lon_end)

# Inspired by https://www.factual.com/blog/how-geohashes-work/
def encode_geohash(lat, lon, bit_cnt):
if bit_cnt % 5 != 0:
raise ValueError("bit_cnt must be divisible by 5")

bits = 0
lat_start, lat_end = Fraction(-90), Fraction(90)
lon_start, lon_end = Fraction(-180), Fraction(180)
for i in range(bit_cnt):
if i % 2 == 0:
mid = (lon_start + lon_end) / 2
if lon < mid:
bits = (bits << 1) | 0
lon_end = mid
else:
bits = (bits << 1) | 1
lon_start = mid
else:
mid = (lat_start + lat_end) / 2
if lat < mid:
bits = (bits << 1) | 0
lat_end = mid
else:
bits = (bits << 1) | 1
lat_start = mid

print("bits: {:>b}".format(bits))

# Do the special geohash base32 encoding.
s = ""
alphabet_32ghs = "0123456789bcdefghjkmnpqrstuvwxyz"
for i in range(bit_cnt // 5):
idx = (bits >> i*5) & (1 | 2 | 4 | 8 | 16)
s += alphabet_32ghs[idx]
return s[::-1]

print(decode_geohash("ezs42"))
print(decode_geohash("9q8y"))
print(encode_geohash(37.7, -122.5, 20))
```

Sunday, April 25, 2021

Lossy Counting Algorithm Python example

This is an implementation of the Lossy Counting Algorithm described in Manku and Motwani's 2002 paper "Approximate Frequency Counts over Data Streams".

It is an algorithm for estimate elements in a stream whose frequency count exceeds a threshold, while using only limited memory. For example for video view counts on something like YouTube, finding which videos that each constitute more than 3% of the views.

Please let me know if you spot any bugs.

```from math import ceil
class LossyCount:
def __init__(self, max_error=0.005):  # max_error is the parameter they call epsilon in the paper.
self.max_error = max_error
self.bucket_width = ceil(1/max_error)
self.entries = dict()
self.n = 0

def put(self, x):
self.n += 1
current_bucket = ceil(self.n / self.bucket_width)

freq, delta = 1, current_bucket-1
if x in self.entries:
freq, delta = self.entries[x]
freq += 1
self.entries[x] = (freq, delta)

# If at bucket boundary then prune low frequency entries.
if self.n % self.bucket_width == 0:
prune = []
for key in self.entries:
freq, delta = self.entries[key]
if freq + delta <= current_bucket:
prune.append(key)
for key in prune:
del self.entries[key]

def get(self, support_threshold=0.001):  # support_threshold is the parameter they call s in the paper.
res = []
for key in self.entries:
freq, delta = self.entries[key]
if freq >= (support_threshold - self.max_error)*self.n:
res.append(key)
return res

# Generate test data.
from math import log
from random import random, randint
view_cnt = 500000
videos_cnt = 100000
x = [random() for _ in range(view_cnt)]
# y = [1/v for v in x]  # This distribution is too steep...
y = [(1/v)*0.01 -log(v) for v in x]  # A distribution that is reasonably steep and has a very long tail.
m = max(y)
y = [v/m for v in y]
# ids = [i for i in range(videos_cnt)]  # Easy to read IDs, but unrealistic. Most popular video will have ID 0, second most popular ID 1, etc.
ids = [randint(1000000, 9000000) for _ in range(videos_cnt)]  # More realistic video IDs.
idxs = [int(v*(videos_cnt-1)) for v in y]
views = [ids[v] for v in idxs]

# import matplotlib.pyplot as plt
# plt.hist(views, bins=200)  # Only works when the IDs are 1,2,3,4...
# plt.show()

threshold = 0.03  # We are interested in videos that each constitute more than 3% of the views.

# Generate exact results using a counter, to compare with.
from collections import Counter
c = Counter(views)
r = []
for k in c:
r.append((c[k], k))
r2 = []
for cnt, id in r:
if cnt >= view_cnt*threshold:
r2.append(id)
print(sorted(r2))

# Test the LossyCount class. Should give similar (but not exact) results to the above.
lc = LossyCount()
for v in views:
lc.put(v)
print(sorted(lc.get(threshold)))
```

Friday, February 12, 2021

Create a local Kubernetes cluster and deploy your code end to end

These are the commands accompanying the video tutorial I've made on how to create a local Kubernetes cluster and deploy a simple stateless app from code, end to end. You can find the video tutorial here: https://youtu.be/MZr9Ls38uPw .
```#
# Install k3s
#

curl -sfL https://get.k3s.io | K3S_CLUSTER_INIT=1 INSTALL_K3S_EXEC="--disable=servicelb" sh -

cat /var/lib/rancher/k3s/server/node-token

# Run on other nodes to join the cluster
curl -sfL https://get.k3s.io | \
INSTALL_K3S_EXEC=server \
K3S_URL=https://192.168.50.203:6443 \
K3S_TOKEN=... \
sh -

kubectl get nodes --watch

journalctl --unit=k3s

kubectl get all --all-namespaces

#
# Install dashboard
#

GITHUB_URL=https://github.com/kubernetes/dashboard/releases
VERSION_KUBE_DASHBOARD=\$(curl -w '%{url_effective}' -I -L -s -S \${GITHUB_URL}/latest -o /dev/null | sed -e 's|.*/||')
echo \$VERSION_KUBE_DASHBOARD
kubectl create -f https://raw.githubusercontent.com/kubernetes/dashboard/\${VERSION_KUBE_DASHBOARD}/aio/deploy/recommended.yaml

apiVersion: v1
kind: ServiceAccount
namespace: kubernetes-dashboard
EOF

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
subjects:
- kind: ServiceAccount
namespace: kubernetes-dashboard
EOF

kubectl get all --all-namespaces

# Connect to node again, but with your port 8001 forwarded.
ssh -L 8001:127.0.0.1:8001 user1@ubuntutest1
kubectl proxy
# Open in browser:  http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/
kubectl --namespace kubernetes-dashboard describe secret admin-user-token | grep ^token

#
# Install MetalLB
#

GITHUB_URL=https://github.com/metallb/metallb/releases
VERSION=\$(curl -w '%{url_effective}' -I -L -s -S \${GITHUB_URL}/latest -o /dev/null | sed -e 's|.*/||')
echo \$VERSION

kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/\$VERSION/manifests/namespace.yaml
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/\$VERSION/manifests/metallb.yaml
kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="\$(openssl rand -base64 128)"

cat>metallb-configmap.yml<<"EOF"
apiVersion: v1
kind: ConfigMap
namespace: metallb-system
name: config
data:
config: |
- name: default
protocol: layer2
- 192.168.50.20-192.168.50.40
EOF
kubectl apply -f metallb-configmap.yml

# Check logs for errors
kubectl logs -lapp=metallb --namespace metallb-system --all-containers=true --prefix -f

#
# Install docker registry
#

apt-get install docker-compose
docker run -d -p 5000:5000 --restart=always --name registry registry
docker ps -a

# On each node:
mkdir -p /etc/rancher/k3s/
cat>/etc/rancher/k3s/registries.yaml<<"EOF"
mirrors:
"ubuntutest1:5000":
endpoint:
- "http://ubuntutest1:5000"
EOF
systemctl restart k3s

#
# Create containerized app
#

mkdir -p /home/user1/dev/myapp1
cd /home/user1/dev/myapp1
cat>main.go<<"EOF"
package main

import (
"github.com/gorilla/mux"
"html/template"
"log"
"net/http"
"os"
"time"
)

var helloTemplate, _ = template.New("").Parse(`<!DOCTYPE html>
<html>
<body>
<h1>Test 1</h1>
<p>Now: {{.now.Format "2006-01-02 15:04:05" }}</p>
<p>Served from node: {{ .node }}</p>
<p>Served from pod: {{ .pod }}</p>
</body>
</html>
`)

type Msg struct {
Ts  time.Time `json:"ts"`
}

func helloGet(w http.ResponseWriter, r *http.Request) {
v := map[string]interface{}{
"now":  time.Now(),
"node": os.Getenv("NODE_NAME"),
"pod": os.Getenv("POD_NAME"),
}
helloTemplate.Execute(w, v)
}

func main() {
log.Println("Starting")

router := mux.NewRouter()
router.HandleFunc("/", helloGet).Methods("GET")
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Method + " " + r.URL.String())
router.ServeHTTP(w, r)
})

log.Fatal(http.ListenAndServe(":8080", handler))
}
EOF

cat>go.mod<<"EOF"
module myapp1
EOF

touch go.sum

cat>Dockerfile<<"EOF"
FROM golang as builder
WORKDIR /app
COPY . ./
RUN CGO_ENABLED=0 GOOS=linux go build -v -o server

FROM alpine
COPY --from=builder /app/server /server
CMD ["/server"]
EOF

docker build -t "myapp1" .
docker run -it --rm -p 8080:8080 myapp1
docker tag myapp1 ubuntutest1:5000/myapp1:v1
docker push ubuntutest1:5000/myapp1:v1

curl -X GET ubuntutest1:5000/v2/_catalog
curl -X GET ubuntutest1:5000/v2/myapp1/tags/list

#
# Create and deploy our app in k8s
#

mkdir -p /home/user1/dev/myapp1/k8s
cd /home/user1/dev/myapp1

cat>k8s/deployment.yml<<"EOF"
apiVersion: apps/v1
kind: Deployment
name: myapp1-deployment
spec:
selector:
matchLabels:
app: myapp1
replicas: 3
template:
labels:
app: myapp1
spec:
containers:
- name: myapp1
image: ubuntutest1:5000/myapp1:v1
ports:
- containerPort: 8080
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
- maxSkew: 1
topologyKey: "kubernetes.io/hostname"
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: myapp1
EOF
kubectl apply -f k8s/deployment.yml

cat>k8s/service.yml<<"EOF"
kind: Service
apiVersion: v1
name: myapp1-service
spec:
type: ClusterIP
selector:
app: myapp1
ports:
- name: http-myapp1
protocol: TCP
port: 8080
EOF
kubectl apply -f k8s/service.yml

# Expose app via HTTP proxy (aka. "ingress")
cat>k8s/ingress.yml<<"EOF"
apiVersion: networking.k8s.io/v1
kind: Ingress
name: myapp1-ingress
spec:
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: myapp1-service
port:
number: 8080
EOF
kubectl apply -f k8s/ingress.yml

kubectl logs -lapp=myapp1 --all-containers=true --prefix -f

#
# Upgrading to a new image. Either use this command or update the same field in deployment.yml.
#

cd /home/user1/dev/myapp1
docker build -t "myapp1" .
docker tag myapp1 ubuntutest1:5000/myapp1:v2
docker push ubuntutest1:5000/myapp1:v2

kubectl set image deployments/myapp1-deployment myapp1=ubuntutest1:5000/myapp1:v2
kubectl rollout status deployments/myapp1-deployment

# Rolling back (calling repeatedly switches between two newest versions).
kubectl rollout undo deployments/myapp1-deployment

#
# Testing HA
#

kubectl get nodes ; kubectl get pod -o=custom-columns=NAME:.metadata.name,STATUS:.status.phase,NODE:.spec.nodeName --all-namespaces

kubectl drain ubuntutest1 --ignore-daemonsets --delete-emptydir-data
kubectl uncordon ubuntutest1

```

Sunday, February 07, 2021

k3s and MetalLB "destination unreachable" issue

I was trying out MetalLB with a bare-metal Kubernetes cluster (using the k3s distro), and was hitting issues with requests to the cluster IP seemingly randomly giving me "destination unreachable". I noticed with Wireshark that I was getting a ton of "gratuitous" ARP packets. Every few seconds.

I inspected MetalLB's logs with the following command:

```kubectl logs -lapp=metallb --namespace metallb-system --all-containers=true --prefix -f
```

It was logging the message "IP allocated by controller not allowed by config" every few seconds.

Turned out to be k3s's builtin internal load balancer that was interferring with MetalLB. Disabling it with the "--disable=servicelb" flag during k3s's installation fixed the issue.

```curl -sfL https://get.k3s.io | K3S_CLUSTER_INIT=1 INSTALL_K3S_EXEC="--disable=servicelb" sh -
```