Tag Archives: Python

PyHSS Update – MongoDB Backend & SQN Resync

After a few quiet months I’m excited to say I’ve pushed through some improvements recently to PyHSS and it’s growing into a more usable HSS platform.

MongoDB Backend

This has a few obvious advantages – More salable, etc, but also opens up the ability to customize more of the subscriber parameters, like GBR bearers, etc, that simple flat text files just wouldn’t support, as well as the obvious issues with threading and writing to and from text files at scale.

Knock knock.

Race condition.

Who’s there?

— Threading Joke.

For now I’m using the Open5GS MongoDB schema, so the Open5Gs web UI can be used for administering the system and adding subscribers.

The CSV / text file backend is still there and still works, the MongoDB backend is only used if you enable it in the YAML file.

The documentation for setting this up is in the readme.

SQN Resync

If you’re working across multiple different HSS’ or perhaps messing with some crypto stuff on your USIM, there’s a chance you’ll get the SQN (The Sequence Number) on the USIM out of sync with what’s on the HSS.

This manifests itself as an Update Location Request being sent from the UE in response to an Authentication Information Answer and coming back with a Re-Syncronization-Info AVP in the Authentication Info AVP. I’ll talk more about how this works in another post, but in short PyHSS now looks at this value and uses it combined with the original RAND value sent in the Authentication Information Answer, to find the correct SQN value and update whichever database backend you’re using accordingly, and then send another Authentication Information Answer with authentication vectors with the correct SQN.

SQN Resync is something that’s really cryptographically difficult to implement / confusing, hence this taking so long.

What’s next? – IMS / Multimedia Auth

The next feature that’s coming soon is the Multimedia Authentication Request / Answer to allow CSCFs to query for IMS Registration and manage the Cx and Dx interfaces.

Code for this is already in place but failing some tests, not sure if that’s to do with the MAA response or something on my CSCFs,

Keep an eye on the GitLab repo!

Open5Gs- Python HSS Interface

Note: NextEPC the Open Source project rebranded as Open5Gs in 2019 due to a naming issue. The remaining software called NextEPC is a branch of an old version of Open5Gs. This post was written before the rebranding.

I’ve been working for some time on Private LTE networks, the packet core I’m using is NextEPC, it’s well written, flexible and well supported.

I joined the Open5Gs group and I’ve contributed a few bits and pieces to the project, including a Python wrapper for adding / managing subscribers in the built in Home Subscriber Server (HSS).

You can get it from the support/ directory in Open5Gs.

NextEPC Python Library

Basic Python library to interface with MongoDB subscriber DB in NextEPC HSS / PCRF. Requires Python 3+, mongo, pymongo and bson. (All available through PIP)

If you are planning to run this on a different machine other than localhost (the machine hosting the MongoDB service) you will need to enable remote access to MongoDB by binding it’s IP to 0.0.0.0:

This is done by editing /etc/mongodb.conf and changing the bind IP to: bind_ip = 0.0.0.0

Restart MongoDB for changes to take effect.

$ /etc/init.d/mongodb restart

Basic Example:

import NextEPC
NextEPC_1 = NextEPC("10.0.1.118", 27017)

pdn = [{'apn': 'internet', 'pcc_rule': [], 'ambr': {'downlink': 1234, 'uplink': 1234}, 'qos': {'qci': 9, 'arp': {'priority_level': 8, 'pre_emption_vulnerability': 1, 'pre_emption_capability': 1}}, 'type': 2}]
sub_data = {'imsi': '891012222222300', \
             'pdn': pdn, \
             'ambr': {'downlink': 1024000, 'uplink': 1024001}, \
             'subscribed_rau_tau_timer': 12, \
             'network_access_mode': 2, \
             'subscriber_status': 0, \
             'access_restriction_data': 32, \
             'security': {'k': '465B5CE8 B199B49F AA5F0A2E E238A6BC', 'amf': '8000', 'op': None, 'opc': 'E8ED289D EBA952E4 283B54E8 8E6183CA'}, '__v': 0}

print(NextEPC_1.AddSubscriber(sub_data))                        #Add Subscriber using dict of sub_data

print(NextEPC_1.GetSubscriber('891012222222300'))               #Get added Subscriber's details

print(NextEPC_1.DeleteSubscriber('891012222222300'))            #Delete Subscriber

Subscriber_List = NextEPC_1.GetSubscribers()
for subscribers in Subscriber_List:
  print(subscribers['imsi'])

RTPengine Python API Calls via ng Control Protocol

RTPengine has an API / control protocol, which is what Kamailio / OpenSER uses to interact with RTPengine, called the ng Control Protocol.

Connection is based on Bencode encoded data and communicates via a UDP socket.

I wrote a simple Python script to pull active calls from RTPengine, code below:

#Quick Python library for interfacing with Sipwise's fantastic rtpengine - https://github.com/sipwise/rtpengine
#Bencode library from https://pypi.org/project/bencode.py/ (Had to download files from webpage (PIP was out of date))

import bencode
import socket
import sys
import random
import string

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('188.0.169.13', 2224)     #Your server address

cookie = "0_2393_6"
data = bencode.encode({'command': 'list'})

message = str(cookie) + " " + str(data)
print(message)


sent = sock.sendto(message, server_address)

print('waiting to receive')
data, server = sock.recvfrom(4096)
print('received "%s"' % data)
data = data.split(" ", 1)       #Only split on first space
print("Cookie is: " + str(data[0]))
print("Data is: " + str(bencode.decode(data[1])))
print("There are " + str(len(bencode.decode(data[1])['calls'])) + " calls up on RTPengine at " + str(server_address[0]))
for calls in bencode.decode(data[1])['calls']:
    print(calls)
    cookie = "1_2393_6"
    data = bencode.encode({'command': 'query', 'call-id': str(calls)})
    message = str(cookie).encode('utf-8') + " ".encode('utf-8') + str(data).encode('utf-8')
    sent = sock.sendto(message, server_address)
    print('\n\nwaiting to receive')
    data, server = sock.recvfrom(8192)

    data = data.split(" ", 1)       #Only split on first space
    bencoded_data = bencode.decode(data[1])

    for keys in bencoded_data:
        print(keys)
        print("\t" + str(bencoded_data[keys]))

sock.close()

PyHSS – Python 3GPP LTE Home Subscriber Server

I recently started working on an issue that I’d seen was to do with the HSS response to the MME on an Update Location Answer.

I took some Wireshark traces of a connection from the MME to the HSS, and compared that to a trace from a different HSS. (Amarisoft EPC/HSS)

The Update Location Answer sent by the Amarisoft HSS to the MME over the S6a (Diameter) interface includes an AVP for “Multiple APN Configuration” which has the the dedicated bearer for IMS, while the HSS in the software I was working on didn’t.

After a bit of bashing trying to modify the S6a responses, I decided I’d just implement my own Home Subscriber Server.

The Diameter interface is pretty straight forward to understand, using a similar structure to RADIUS, and with the exception of the Crypto for the EUTRAN Authentication Vectors, it was all pretty straight forward.

If you’d like to know more you can download PyHSS from my GitLab page, and view my Diameter Primer post and my post on Diameter packet structure.

PyRTP – Simple RTP Library for Python

I recently had a scenario where I had to encode and decode RTP packets off the wire.

I wrote a Python Library to handle it which I’ve published for anyone to use.

Encoding data is quite simple, it takes a dictionary of values to fill the headers and payload and returns hex data to be sent down the wire:

payload = 'd5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5' 

packet_vars = {'version' : 2, 'padding' : 0, 'extension' : 0, 'csi_count' : 0, 'marker' : 0, 'payload_type' : 8, 'sequence_number' : 306, 'timestamp' : 306, 'ssrc' : 185755418, payload' : payload} 

PyRTP.GenerateRTPpacket(packet_vars)             #Generates hex to send down the wire 

And decoding is the same but reverse, feed it hex data and it returns a dict of values:

packet_bytes = '8008d4340000303c0b12671ad5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5'

rtp_params = PyRTP.DecodeRTPpacket(packet_bytes) #Returns dict of values from packet

Hopefully it’ll save someone else some time in the future.

For more info on RTP see:

RTP – More than you Wanted to Know for a deep dive into the packet structure

Reverse MD5 on SIP Auth

MD5 isn’t a particularly well regarded hashing function these days, but it’s still pretty ubiquitous.

SIP authentication, for the most part, still uses MD5 in the form of Message Digest Authentication,

If we were to take the password password and hash it using an online tool to generate MD5 Hashes we’d get “482c811da5d5b4bc6d497ffa98491e38”


If we hash password again with MD5 we’d get the same output – “482c811da5d5b4bc6d497ffa98491e38”,


The catch with this is if you put “5f4dcc3b5aa765d61d8327deb882cf99” into a search engine, Google immediately tells you it’s plain text value. That’s because the MD5 of password is always 5f4dcc3b5aa765d61d8327deb882cf99, hashing the same input phase “password” always results in the same output MD5 hash aka “response”.

By using Message Digest Authentication we introduce a “nonce” value and mix it (“salt”) with the SIP realm, username, password and request URI, to ensure that the response is different every time.

Let’s look at this example REGISTER flow:

We can see a REGISTER message has been sent by Bob to the SIP Server.

REGISTER sips:ss2.biloxi.example.com SIP/2.0    
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashds7
Max-Forwards: 70
From: Bob <sips:[email protected]>;tag=a73kszlfl
To: Bob <sips:[email protected]>
Call-ID: [email protected]
CSeq: 1 REGISTER
Contact: <sips:[email protected]>
Content-Length: 0

The SIP Server has sent back a 401 Unauthorised message, but includes the WWW-Authenticate header field, from this, we can grab a Realm value, and a Nonce, which we’ll use to generate our response that we’ll send back.

 SIP/2.0 401 Unauthorized    
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashds7 ;received=192.0.2.201
From: Bob <sips:[email protected]>;tag=a73kszlfl
To: Bob <sips:[email protected]>;tag=1410948204
Call-ID: [email protected]
CSeq: 1 REGISTER
WWW-Authenticate: Digest realm="atlanta.example.com", qop="auth",nonce="ea9c8e88df84f1cec4341ae6cbe5a359", opaque="", stale=FALSE, algorithm=MD5
Content-Length: 0

The formula for generating the response looks rather complex but really isn’t that bad.

HA1=MD5(username:realm:password)
HA2=MD5(method:digestURI)
response=MD5(HA1:nonce:HA2)

Let’s say in this case Bob’s password is “bobspassword”, let’s generate a response back to the server.

We know the username which is bob, the realm which is atlanta.example.com, digest URI is sips:biloxi.example.com, method is REGISTER and the password which is bobspassword. This seems like a lot to go through but all of these values, with the exception of the password, we just get from the 401 headers above.

So let’s generate the first part called HA1 using the formula HA1=MD5(username:realm:password), so let’s substitute this with our real values:
HA1 = MD5(bob:atlanta.example.com:bobspassword)
So if we drop bob:atlanta.example.com:bobspassword into our MD5 hasher and we get our HA1 hash and it it looks like 2da91700e1ef4f38df91500c8729d35f, so HA1 = 2da91700e1ef4f38df91500c8729d35f

Now onto the second part, we know the Method is REGISTER, and our digestURI is sips:biloxi.example.com
HA2=MD5(method:digestURI)
HA2=MD5(REGISTER:sips:biloxi.example.com)
Again, drop REGISTER:sips:biloxi.example.com into our MD5 hasher, and grab the output – 8f2d44a2696b3b3ed781d2f44375b3df
This means HA2 = 8f2d44a2696b3b3ed781d2f44375b3df

Finally we join HA1, the nonce and HA2 in one string and hash it:
Response = MD5(2da91700e1ef4f38df91500c8729d35f:ea9c8e88df84f1cec4341ae6cbe5a359:8f2d44a2696b3b3ed781d2f44375b3df)

Which gives us our final response of “bc2f51f99c2add3e9dfce04d43df0c6a”, so let’s see what happens when Bob sends this to the SIP Server.

REGISTER sips:ss2.biloxi.example.com SIP/2.0 
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashd92
Max-Forwards: 70
From: Bob <sips:[email protected]>;tag=ja743ks76zlflH
To: Bob <sips:[email protected]>
Call-ID: [email protected]
CSeq: 2 REGISTER
Contact: <sips:[email protected]>
Authorization: Digest username="bob", realm="atlanta.example.com", nonce="ea9c8e88df84f1cec4341ae6cbe5a359", opaque="", uri="sips:ss2.biloxi.example.com", response="bc2f51f99c2add3e9dfce04d43df0c6a"
Content-Length: 0
SIP/2.0 200 OK
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashd92;received=192.0.2.201
From: Bob <sips:[email protected]>;tag=ja743ks76zlflH
To: Bob <sips:[email protected]>;tag=37GkEhwl6
Call-ID: [email protected]
CSeq: 2 REGISTER
Contact: <sips:[email protected]>;expires=3600
Content-Length: 0

There you have it, a 200 OK response and Bob is registered on biloxi.example.com.

Update 2021: Jason Murley has contributed a much more robust version of the code below, which is way better than what I’d made!

You can find his code here.

I’ve written a little tool in Python to generate the correct response based on the nonce and values associated with it:

import hashlib

nonce = 'ea9c8e88df84f1cec4341ae6cbe5a359'
realm = 'sips:biloxi.example.com'
password = 'bobspassword'
username    =   str("bob")
requesturi  =   str(s"ips:biloxi.example.com")
print("username: " + username)
print("nonce: " + nonce)
print("realm: " + realm)
print("password: " + password)
print("\n")

HA1str = username + ":" + realm + ":" + password
HA1enc = (hashlib.md5(HA1str.encode()).hexdigest())
print ("HA1 String: " + HA1str)
print ("HA1 Encrypted: " + HA1enc)
HA2str = "REGISTER:" + requesturi
HA2enc = (hashlib.md5(HA2str.encode()).hexdigest())

print ("HA2 String: " + HA2str)
print ("HA2 Encrypted: " + HA2enc)

responsestr = HA1enc + ":" + nonce + ":" + HA2enc
print("Response String: " + responsestr)
responseenc = str((hashlib.md5(responsestr.encode()).hexdigest()))
print("Response Encrypted" + responseenc)