Tuesday, April 27, 2021

Geohash Python example

Here's a simple Geohash encode/decode implementation. The decoding function pretty closely follows the technique layed out on the Geohash Wikipedia entry.

from fractions import Fraction

"""
First decode the special geohash variant of base32 encoding.
Each encoded digit (0-9-b..z) (not continuous abc) is a 5 bit val 0,1,2...,30,31.
In the resulting bitstream, every second bit is now for latitude and longtitude.
Initially the latitutde range is -90,+90.
When a latitude bit is 1, then it now starts at the mid of these.
Else if 0 it now ends at the mid of these.
Same for longtitude but with range -180,+180.
"""
def decode_geohash(s):
    alphabet_32ghs = "0123456789bcdefghjkmnpqrstuvwxyz"
    dec_from_32ghs = dict()
    for i, c in enumerate(alphabet_32ghs):
        dec_from_32ghs[c] = i

    bits = 0  # Integer representation of hash.
    bit_cnt = 0
    for c in s:
        bits = (bits << 5) | dec_from_32ghs[c]
        bit_cnt += 5

    # Every second bit is longtitude and latitude. Digits in even positions are latitude.
    lat_bits, lon_bits = 0, 0
    lat_bit_cnt = bit_cnt // 2
    lon_bit_cnt = lat_bit_cnt
    if bit_cnt % 2 == 1:
        lon_bit_cnt += 1

    for i in range(bit_cnt):
        cur_bit_pos = bit_cnt - i
        cur_bit = (bits & (1 << cur_bit_pos)) >> cur_bit_pos
        if i % 2 == 0:
            lat_bits |= cur_bit << (cur_bit_pos//2)
        else:
            lon_bits |= cur_bit << (cur_bit_pos//2)

    lat_start, lat_end = Fraction(-90), Fraction(90)
    for cur_bit_pos in range(lat_bit_cnt-1, -1, -1):
        mid = (lat_start + lat_end) / 2
        if lat_bits & (1 << cur_bit_pos):
            lat_start = mid
        else:
            lat_end = mid

    lon_start, lon_end = Fraction(-180), Fraction(180)
    for cur_bit_pos in range(lon_bit_cnt-1, -1, -1):
        mid = (lon_start + lon_end) / 2
        if lon_bits & (1 << cur_bit_pos):
            lon_start = mid
        else:
            lon_end = mid

    return float(lat_start), float(lat_end), float(lon_start), float(lon_end)


# Inspired by https://www.factual.com/blog/how-geohashes-work/
def encode_geohash(lat, lon, bit_cnt):
    if bit_cnt % 5 != 0:
        raise ValueError("bit_cnt must be divisible by 5")

    bits = 0
    lat_start, lat_end = Fraction(-90), Fraction(90)
    lon_start, lon_end = Fraction(-180), Fraction(180)
    for i in range(bit_cnt):
        if i % 2 == 0:
            mid = (lon_start + lon_end) / 2
            if lon < mid:
                bits = (bits << 1) | 0
                lon_end = mid
            else:
                bits = (bits << 1) | 1
                lon_start = mid
        else:
            mid = (lat_start + lat_end) / 2
            if lat < mid:
                bits = (bits << 1) | 0
                lat_end = mid
            else:
                bits = (bits << 1) | 1
                lat_start = mid

    print("bits: {:>b}".format(bits))

    # Do the special geohash base32 encoding.
    s = ""
    alphabet_32ghs = "0123456789bcdefghjkmnpqrstuvwxyz"
    for i in range(bit_cnt // 5):
        idx = (bits >> i*5) & (1 | 2 | 4 | 8 | 16)
        s += alphabet_32ghs[idx]
    return s[::-1]


print(decode_geohash("ezs42"))
print(decode_geohash("9q8y"))
print(encode_geohash(37.7, -122.5, 20))

Sunday, April 25, 2021

Lossy Counting Algorithm Python example

This is an implementation of the Lossy Counting Algorithm described in Manku and Motwani's 2002 paper "Approximate Frequency Counts over Data Streams".

It is an algorithm for estimate elements in a stream whose frequency count exceeds a threshold, while using only limited memory. For example for video view counts on something like YouTube, finding which videos that each constitute more than 3% of the views.

Please let me know if you spot any bugs.

from math import ceil
class LossyCount:
    def __init__(self, max_error=0.005):  # max_error is the parameter they call epsilon in the paper.
        self.max_error = max_error
        self.bucket_width = ceil(1/max_error)
        self.entries = dict()
        self.n = 0

    def put(self, x):
        self.n += 1
        current_bucket = ceil(self.n / self.bucket_width)

        freq, delta = 1, current_bucket-1
        if x in self.entries:
            freq, delta = self.entries[x]
            freq += 1
        self.entries[x] = (freq, delta)

        # If at bucket boundary then prune low frequency entries.
        if self.n % self.bucket_width == 0:
            prune = []
            for key in self.entries:
                freq, delta = self.entries[key]
                if freq + delta <= current_bucket:
                    prune.append(key)
            for key in prune:
                del self.entries[key]

    def get(self, support_threshold=0.001):  # support_threshold is the parameter they call s in the paper.
        res = []
        for key in self.entries:
            freq, delta = self.entries[key]
            if freq >= (support_threshold - self.max_error)*self.n:
                res.append(key)
        return res




# Generate test data.
from math import log
from random import random, randint
view_cnt = 500000
videos_cnt = 100000
x = [random() for _ in range(view_cnt)]
# y = [1/v for v in x]  # This distribution is too steep...
y = [(1/v)*0.01 -log(v) for v in x]  # A distribution that is reasonably steep and has a very long tail.
m = max(y)
y = [v/m for v in y]
# ids = [i for i in range(videos_cnt)]  # Easy to read IDs, but unrealistic. Most popular video will have ID 0, second most popular ID 1, etc.
ids = [randint(1000000, 9000000) for _ in range(videos_cnt)]  # More realistic video IDs.
idxs = [int(v*(videos_cnt-1)) for v in y]
views = [ids[v] for v in idxs]

# import matplotlib.pyplot as plt
# plt.hist(views, bins=200)  # Only works when the IDs are 1,2,3,4...
# plt.show()

threshold = 0.03  # We are interested in videos that each constitute more than 3% of the views.

# Generate exact results using a counter, to compare with.
from collections import Counter
c = Counter(views)
r = []
for k in c:
    r.append((c[k], k))
r2 = []
for cnt, id in r:
    if cnt >= view_cnt*threshold:
        r2.append(id)
print(sorted(r2))

# Test the LossyCount class. Should give similar (but not exact) results to the above.
lc = LossyCount()
for v in views:
    lc.put(v)
print(sorted(lc.get(threshold)))

Friday, February 12, 2021

Create a local Kubernetes cluster and deploy your code end to end

These are the commands accompanying the video tutorial I've made on how to create a local Kubernetes cluster and deploy a simple stateless app from code, end to end. You can find the video tutorial here: https://youtu.be/MZr9Ls38uPw .
#
# Install k3s
#

curl -sfL https://get.k3s.io | K3S_CLUSTER_INIT=1 INSTALL_K3S_EXEC="--disable=servicelb" sh -

cat /var/lib/rancher/k3s/server/node-token

# Run on other nodes to join the cluster
    curl -sfL https://get.k3s.io | \
      INSTALL_K3S_EXEC=server \
      K3S_URL=https://192.168.50.203:6443 \
      K3S_TOKEN=... \
      sh -


kubectl get nodes --watch

journalctl --unit=k3s

kubectl get all --all-namespaces


#
# Install dashboard
#

GITHUB_URL=https://github.com/kubernetes/dashboard/releases
VERSION_KUBE_DASHBOARD=$(curl -w '%{url_effective}' -I -L -s -S ${GITHUB_URL}/latest -o /dev/null | sed -e 's|.*/||')
echo $VERSION_KUBE_DASHBOARD
kubectl create -f https://raw.githubusercontent.com/kubernetes/dashboard/${VERSION_KUBE_DASHBOARD}/aio/deploy/recommended.yaml

cat>dashboard.admin-user.yml<<"EOF"
apiVersion: v1
kind: ServiceAccount
metadata:
  name: admin-user
  namespace: kubernetes-dashboard
EOF

cat>dashboard.admin-user-role.yml<<"EOF"
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: admin-user
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: cluster-admin
subjects:
- kind: ServiceAccount
  name: admin-user
  namespace: kubernetes-dashboard
EOF

kubectl create -f dashboard.admin-user.yml -f dashboard.admin-user-role.yml


kubectl get all --all-namespaces

# Connect to node again, but with your port 8001 forwarded.
ssh -L 8001:127.0.0.1:8001 user1@ubuntutest1
kubectl proxy
# Open in browser:  http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/
kubectl --namespace kubernetes-dashboard describe secret admin-user-token | grep ^token


#
# Install MetalLB
#

GITHUB_URL=https://github.com/metallb/metallb/releases
VERSION=$(curl -w '%{url_effective}' -I -L -s -S ${GITHUB_URL}/latest -o /dev/null | sed -e 's|.*/||')
echo $VERSION

kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/$VERSION/manifests/namespace.yaml
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/$VERSION/manifests/metallb.yaml
kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="$(openssl rand -base64 128)"

cat>metallb-configmap.yml<<"EOF"
apiVersion: v1
kind: ConfigMap
metadata:
  namespace: metallb-system
  name: config
data:
  config: |
    address-pools:
    - name: default
      protocol: layer2
      addresses:
      - 192.168.50.20-192.168.50.40
EOF
kubectl apply -f metallb-configmap.yml

# Check logs for errors
kubectl logs -lapp=metallb --namespace metallb-system --all-containers=true --prefix -f



#
# Install docker registry
#

apt-get install docker-compose
docker run -d -p 5000:5000 --restart=always --name registry registry
docker ps -a

# On each node:
mkdir -p /etc/rancher/k3s/
cat>/etc/rancher/k3s/registries.yaml<<"EOF"
mirrors:
  "ubuntutest1:5000":
    endpoint:
      - "http://ubuntutest1:5000"
EOF
systemctl restart k3s



#
# Create containerized app
#

mkdir -p /home/user1/dev/myapp1
cd /home/user1/dev/myapp1
cat>main.go<<"EOF"
package main

import (
  "github.com/gorilla/mux"
  "html/template"
  "log"
  "net/http"
  "os"
  "time"
)

var helloTemplate, _ = template.New("").Parse(`<!DOCTYPE html>
<html>
<head><title>testapp</title></head>
<body>
<h1>Test 1</h1>
<p>Now: {{.now.Format "2006-01-02 15:04:05" }}</p>
<p>Served from node: {{ .node }}</p>
<p>Served from pod: {{ .pod }}</p>
</body>
</html>
`)

type Msg struct {
  Ts  time.Time `json:"ts"`
}

func helloGet(w http.ResponseWriter, r *http.Request) {
  v := map[string]interface{}{
    "now":  time.Now(),
    "node": os.Getenv("NODE_NAME"),
    "pod": os.Getenv("POD_NAME"),
  }
  helloTemplate.Execute(w, v)
}

func main() {
  log.Println("Starting")

  router := mux.NewRouter()
  router.HandleFunc("/", helloGet).Methods("GET")
  handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    log.Println(r.Method + " " + r.URL.String())
    router.ServeHTTP(w, r)
  })

  log.Fatal(http.ListenAndServe(":8080", handler))
}
EOF


cat>go.mod<<"EOF"
module myapp1
EOF

touch go.sum

cat>Dockerfile<<"EOF"
FROM golang as builder
WORKDIR /app
COPY . ./
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -v -o server

FROM alpine
RUN apk add --no-cache ca-certificates
COPY --from=builder /app/server /server
CMD ["/server"]
EOF


docker build -t "myapp1" .
docker run -it --rm -p 8080:8080 myapp1
docker tag myapp1 ubuntutest1:5000/myapp1:v1
docker push ubuntutest1:5000/myapp1:v1

curl -X GET ubuntutest1:5000/v2/_catalog
curl -X GET ubuntutest1:5000/v2/myapp1/tags/list


#
# Create and deploy our app in k8s
#

mkdir -p /home/user1/dev/myapp1/k8s
cd /home/user1/dev/myapp1

cat>k8s/deployment.yml<<"EOF"
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp1-deployment
spec:
  selector:
    matchLabels:
      app: myapp1
  replicas: 3
  template:
    metadata:
      labels:
        app: myapp1
    spec:
      containers:
      - name: myapp1
        image: ubuntutest1:5000/myapp1:v1
        ports:
          - containerPort: 8080
        env:
          - name: NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
          - name: POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: "kubernetes.io/hostname"
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            app: myapp1
EOF
kubectl apply -f k8s/deployment.yml


cat>k8s/service.yml<<"EOF"
kind: Service
apiVersion: v1
metadata:
  name: myapp1-service
spec:
  type: ClusterIP
  selector:
    app: myapp1
  ports:
  - name: http-myapp1
    protocol: TCP
    port: 8080
EOF
kubectl apply -f k8s/service.yml


# Expose app via HTTP proxy (aka. "ingress")
cat>k8s/ingress.yml<<"EOF"
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: myapp1-ingress
spec:
  rules:
  - http:
      paths:
        - path: /
          pathType: Prefix
          backend:
            service:
              name: myapp1-service
              port:
                number: 8080
EOF
kubectl apply -f k8s/ingress.yml

kubectl logs -lapp=myapp1 --all-containers=true --prefix -f


#
# Upgrading to a new image. Either use this command or update the same field in deployment.yml.
#


cd /home/user1/dev/myapp1
docker build -t "myapp1" .
docker tag myapp1 ubuntutest1:5000/myapp1:v2
docker push ubuntutest1:5000/myapp1:v2

kubectl set image deployments/myapp1-deployment myapp1=ubuntutest1:5000/myapp1:v2
kubectl rollout status deployments/myapp1-deployment

# Rolling back (calling repeatedly switches between two newest versions).
kubectl rollout undo deployments/myapp1-deployment



#
# Testing HA
#

kubectl get nodes ; kubectl get pod -o=custom-columns=NAME:.metadata.name,STATUS:.status.phase,NODE:.spec.nodeName --all-namespaces

kubectl drain ubuntutest1 --ignore-daemonsets --delete-emptydir-data
kubectl uncordon ubuntutest1

Sunday, February 07, 2021

k3s and MetalLB "destination unreachable" issue

I was trying out MetalLB with a bare-metal Kubernetes cluster (using the k3s distro), and was hitting issues with requests to the cluster IP seemingly randomly giving me "destination unreachable". I noticed with Wireshark that I was getting a ton of "gratuitous" ARP packets. Every few seconds.

I inspected MetalLB's logs with the following command:

kubectl logs -lapp=metallb --namespace metallb-system --all-containers=true --prefix -f

It was logging the message "IP allocated by controller not allowed by config" every few seconds.

Turned out to be k3s's builtin internal load balancer that was interferring with MetalLB. Disabling it with the "--disable=servicelb" flag during k3s's installation fixed the issue.

curl -sfL https://get.k3s.io | K3S_CLUSTER_INIT=1 INSTALL_K3S_EXEC="--disable=servicelb" sh -